diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 1a590a68..1811eb9b 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -4,9 +4,7 @@ package main import ( "bytes" - "context" "encoding/json" - "errors" "fmt" "io" "log" @@ -1173,6 +1171,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Process backends with adaptive delay strategy var wg sync.WaitGroup + var primaryWg sync.WaitGroup // Separate wait group for primary backend statsChan := make(chan ResponseStats, len(backends)) responseChan := make(chan struct { backend string @@ -1181,10 +1180,6 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c body []byte }, len(backends)) - // Create a context that we can cancel once we get the first response - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // Track if we've already sent a response var responseHandled atomic.Bool @@ -1195,30 +1190,33 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } wg.Add(1) + if backend.Role == "primary" { + primaryWg.Add(1) + } + go func(b Backend) { defer wg.Done() + if b.Role == "primary" { + defer primaryWg.Done() + } // Track when this goroutine actually starts processing goroutineStartTime := time.Now() // If this is a secondary backend, wait for p50 delay if b.Role != "primary" { + delayTimer := time.NewTimer(p50Delay) select { - case <-time.After(p50Delay): + case <-delayTimer.C: // Continue after delay - case <-ctx.Done(): - // Primary already responded, skip secondary - return + case <-responseChan: + // Someone already responded, but continue anyway to collect stats + delayTimer.Stop() } } - // Check if response was already handled - if responseHandled.Load() { - return - } - - // Create a new request - backendReq, err := http.NewRequestWithContext(ctx, r.Method, b.URL, bytes.NewReader(body)) + // Create a new request (no longer using context for cancellation) + backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body)) if err != nil { statsChan <- ResponseStats{ Backend: b.Name, @@ -1242,14 +1240,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c reqDuration := time.Since(reqStart) if err != nil { - // Only record stats if this isn't a context cancellation - if !errors.Is(err, context.Canceled) { - statsChan <- ResponseStats{ - Backend: b.Name, - Duration: reqDuration, // Keep backend-specific duration - Error: err, - Method: method, - } + statsChan <- ResponseStats{ + Backend: b.Name, + Duration: reqDuration, // Keep backend-specific duration + Error: err, + Method: method, } return } @@ -1282,9 +1277,6 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c err error body []byte }{b.Name, resp, nil, respBody} - - // Cancel other requests - cancel() } }(backend) } @@ -1307,9 +1299,10 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c response = <-responseChan } else { http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout) - cancel() + // Always wait for primary backend to complete before collecting stats go func() { - wg.Wait() + primaryWg.Wait() // Wait for primary first + wg.Wait() // Then wait for all close(statsChan) }() // Collect stats @@ -1333,9 +1326,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c w.Write(response.body) } - // Wait for all goroutines to complete and collect stats + // Always wait for primary backend to complete before collecting stats + // This ensures primary backend stats are always included go func() { - wg.Wait() + primaryWg.Wait() // Wait for primary backend to complete first + wg.Wait() // Then wait for all other backends close(statsChan) }()