From b70e0f4afb7bbb051af0d43b6ae65cbfde143b30 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Thu, 29 May 2025 01:55:12 +0700 Subject: [PATCH] a set of random fixes into the blue --- benchmark-proxy/main.go | 80 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 6 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index dbfa1bd0..cdf9c539 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -4,6 +4,7 @@ package main import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -1195,6 +1196,20 @@ func isStatefulMethod(method string) bool { return statefulMethods[method] } +// flushingResponseWriter wraps http.ResponseWriter to flush after every write +type flushingResponseWriter struct { + http.ResponseWriter + flusher http.Flusher +} + +func (f *flushingResponseWriter) Write(p []byte) (n int, err error) { + n, err = f.ResponseWriter.Write(p) + if err == nil && n > 0 { + f.flusher.Flush() + } + return +} + func main() { // Get configuration from environment variables listenAddr := getEnv("LISTEN_ADDR", ":8080") @@ -1276,6 +1291,10 @@ func main() { func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) { startTime := time.Now() + // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) + ctx, cancel := context.WithTimeout(r.Context(), 35*time.Second) + defer cancel() + // Limit request body size to 10MB to prevent memory exhaustion const maxBodySize = 10 * 1024 * 1024 // 10MB r.Body = http.MaxBytesReader(w, r.Body, maxBodySize) @@ -1433,8 +1452,26 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c }{b.Name, resp, nil} } else { // Not the winning response, need to drain and close the body - io.Copy(io.Discard, resp.Body) - resp.Body.Close() + // Use a goroutine with timeout to prevent hanging + go func() { + defer resp.Body.Close() + // Create a deadline for draining + done := make(chan struct{}) + go func() { + io.Copy(io.Discard, resp.Body) + close(done) + }() + + select { + case <-done: + // Drained successfully + case <-time.After(5 * time.Second): + // Timeout draining, just close + if enableDetailedLogs { + log.Printf("Timeout draining response from backend %s", b.Name) + } + } + }() } }(backend) } @@ -1484,12 +1521,43 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c w.Header().Add(name, value) } } + + // Ensure we flush data as it arrives for better streaming + flusher, canFlush := w.(http.Flusher) + w.WriteHeader(response.resp.StatusCode) - // Stream the response body to the client - _, streamErr := io.Copy(w, response.resp.Body) - if streamErr != nil && enableDetailedLogs { - log.Printf("Error streaming response body: %v", streamErr) + // Stream the response body to the client with proper error handling + done := make(chan error, 1) + go func() { + // Use a custom writer that flushes periodically + var err error + if canFlush { + // Flush every 32KB for better streaming performance + flushingWriter := &flushingResponseWriter{ + ResponseWriter: w, + flusher: flusher, + } + _, err = io.Copy(flushingWriter, response.resp.Body) + } else { + _, err = io.Copy(w, response.resp.Body) + } + done <- err + }() + + select { + case streamErr := <-done: + if streamErr != nil { + if enableDetailedLogs { + log.Printf("Error streaming response body: %v", streamErr) + } + // Connection is likely broken, can't send error to client + } + case <-ctx.Done(): + // Context timeout - client connection might be gone + if enableDetailedLogs { + log.Printf("Context cancelled while streaming response: %v", ctx.Err()) + } } } else { // No valid response received from any backend