a set of random fixes into the blue
This commit is contained in:
@@ -4,6 +4,7 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -1195,6 +1196,20 @@ func isStatefulMethod(method string) bool {
|
||||
return statefulMethods[method]
|
||||
}
|
||||
|
||||
// flushingResponseWriter wraps http.ResponseWriter to flush after every write
|
||||
type flushingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
flusher http.Flusher
|
||||
}
|
||||
|
||||
func (f *flushingResponseWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = f.ResponseWriter.Write(p)
|
||||
if err == nil && n > 0 {
|
||||
f.flusher.Flush()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Get configuration from environment variables
|
||||
listenAddr := getEnv("LISTEN_ADDR", ":8080")
|
||||
@@ -1276,6 +1291,10 @@ func main() {
|
||||
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) {
|
||||
startTime := time.Now()
|
||||
|
||||
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 35*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Limit request body size to 10MB to prevent memory exhaustion
|
||||
const maxBodySize = 10 * 1024 * 1024 // 10MB
|
||||
r.Body = http.MaxBytesReader(w, r.Body, maxBodySize)
|
||||
@@ -1433,8 +1452,26 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
}{b.Name, resp, nil}
|
||||
} else {
|
||||
// Not the winning response, need to drain and close the body
|
||||
// Use a goroutine with timeout to prevent hanging
|
||||
go func() {
|
||||
defer resp.Body.Close()
|
||||
// Create a deadline for draining
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Drained successfully
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout draining, just close
|
||||
if enableDetailedLogs {
|
||||
log.Printf("Timeout draining response from backend %s", b.Name)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}(backend)
|
||||
}
|
||||
@@ -1484,13 +1521,44 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
w.Header().Add(name, value)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure we flush data as it arrives for better streaming
|
||||
flusher, canFlush := w.(http.Flusher)
|
||||
|
||||
w.WriteHeader(response.resp.StatusCode)
|
||||
|
||||
// Stream the response body to the client
|
||||
_, streamErr := io.Copy(w, response.resp.Body)
|
||||
if streamErr != nil && enableDetailedLogs {
|
||||
// Stream the response body to the client with proper error handling
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
// Use a custom writer that flushes periodically
|
||||
var err error
|
||||
if canFlush {
|
||||
// Flush every 32KB for better streaming performance
|
||||
flushingWriter := &flushingResponseWriter{
|
||||
ResponseWriter: w,
|
||||
flusher: flusher,
|
||||
}
|
||||
_, err = io.Copy(flushingWriter, response.resp.Body)
|
||||
} else {
|
||||
_, err = io.Copy(w, response.resp.Body)
|
||||
}
|
||||
done <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case streamErr := <-done:
|
||||
if streamErr != nil {
|
||||
if enableDetailedLogs {
|
||||
log.Printf("Error streaming response body: %v", streamErr)
|
||||
}
|
||||
// Connection is likely broken, can't send error to client
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Context timeout - client connection might be gone
|
||||
if enableDetailedLogs {
|
||||
log.Printf("Context cancelled while streaming response: %v", ctx.Err())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No valid response received from any backend
|
||||
http.Error(w, "All backends failed", http.StatusBadGateway)
|
||||
|
||||
Reference in New Issue
Block a user