diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 2009c29d..82a06f45 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -4,7 +4,9 @@ package main import ( "bytes" + "context" "encoding/json" + "errors" "fmt" "io" "log" @@ -14,6 +16,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" @@ -904,6 +907,33 @@ func formatCUWithExtrapolation(cu int, isExtrapolated bool) string { return fmt.Sprintf("%d CU", cu) } +// GetPrimaryP50 calculates the current p50 latency for the primary backend +func (sc *StatsCollector) GetPrimaryP50() time.Duration { + sc.mu.Lock() + defer sc.mu.Unlock() + + // Collect primary backend durations + var primaryDurations []time.Duration + for _, stat := range sc.requestStats { + if stat.Backend == "primary" && stat.Error == nil { + primaryDurations = append(primaryDurations, stat.Duration) + } + } + + // If we don't have enough data, return a sensible default + if len(primaryDurations) < 10 { + return 10 * time.Millisecond // Default to 10ms + } + + // Sort and find p50 + sort.Slice(primaryDurations, func(i, j int) bool { + return primaryDurations[i] < primaryDurations[j] + }) + + p50idx := len(primaryDurations) * 50 / 100 + return primaryDurations[p50idx] +} + func main() { // Get configuration from environment variables listenAddr := getEnv("LISTEN_ADDR", ":8080") @@ -975,7 +1005,7 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - stats := handleRequest(w, r, backends, client, enableDetailedLogs == "true") + stats := handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector) statsCollector.AddStats(stats, 0) // The 0 is a placeholder, we're not using totalDuration in the collector } }) @@ -983,7 +1013,7 @@ func main() { log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool) []ResponseStats { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) []ResponseStats { startTime := time.Now() // Read the entire request body @@ -1001,28 +1031,55 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c method = jsonRPCReq.Method } - // Process backends in parallel + // Get current p50 delay for primary backend + p50Delay := statsCollector.GetPrimaryP50() + + // Process backends with adaptive delay strategy var wg sync.WaitGroup statsChan := make(chan ResponseStats, len(backends)) - primaryRespChan := make(chan *http.Response, 1) - primaryErrChan := make(chan error, 1) + responseChan := make(chan struct { + backend string + resp *http.Response + err error + body []byte + }, len(backends)) + + // Create a context that we can cancel once we get the first response + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Track if we've already sent a response + var responseHandled atomic.Bool for _, backend := range backends { wg.Add(1) go func(b Backend) { defer wg.Done() + // If this is a secondary backend, wait for p50 delay + if b.Role != "primary" { + select { + case <-time.After(p50Delay): + // Continue after delay + case <-ctx.Done(): + // Primary already responded, skip secondary + return + } + } + + // Check if response was already handled + if responseHandled.Load() { + return + } + // Create a new request - backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body)) + backendReq, err := http.NewRequestWithContext(ctx, r.Method, b.URL, bytes.NewReader(body)) if err != nil { statsChan <- ResponseStats{ Backend: b.Name, Error: err, Method: method, } - if b.Role == "primary" { - primaryErrChan <- err - } return } @@ -1038,6 +1095,22 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c resp, err := client.Do(backendReq) reqDuration := time.Since(reqStart) + if err != nil { + // Only record stats if this isn't a context cancellation + if !errors.Is(err, context.Canceled) { + statsChan <- ResponseStats{ + Backend: b.Name, + Duration: reqDuration, + Error: err, + Method: method, + } + } + return + } + defer resp.Body.Close() + + // Read response body + respBody, err := io.ReadAll(resp.Body) if err != nil { statsChan <- ResponseStats{ Backend: b.Name, @@ -1045,12 +1118,8 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Error: err, Method: method, } - if b.Role == "primary" { - primaryErrChan <- err - } return } - defer resp.Body.Close() statsChan <- ResponseStats{ Backend: b.Name, @@ -1059,40 +1128,66 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Method: method, } - if b.Role == "primary" { - // For primary, we need to return this response to the client - respBody, err := io.ReadAll(resp.Body) - if err != nil { - primaryErrChan <- err - return - } + // Try to be the first to respond + if responseHandled.CompareAndSwap(false, true) { + responseChan <- struct { + backend string + resp *http.Response + err error + body []byte + }{b.Name, resp, nil, respBody} - // Create a new response to send back to client - primaryResp := *resp - primaryResp.Body = io.NopCloser(bytes.NewReader(respBody)) - primaryRespChan <- &primaryResp + // Cancel other requests + cancel() } }(backend) } - // Wait for primary response + // Wait for the first successful response + var response struct { + backend string + resp *http.Response + err error + body []byte + } + select { - case primaryResp := <-primaryRespChan: - // Copy the response to the client - for name, values := range primaryResp.Header { + case response = <-responseChan: + // Got a response + case <-time.After(30 * time.Second): + // Timeout + if !responseHandled.CompareAndSwap(false, true) { + // Someone else handled it + response = <-responseChan + } else { + http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout) + cancel() + go func() { + wg.Wait() + close(statsChan) + }() + // Collect stats + var stats []ResponseStats + for stat := range statsChan { + stats = append(stats, stat) + } + return stats + } + } + + // Send the response to the client + if response.err == nil && response.resp != nil { + // Copy response headers + for name, values := range response.resp.Header { for _, value := range values { w.Header().Add(name, value) } } - w.WriteHeader(primaryResp.StatusCode) - io.Copy(w, primaryResp.Body) - case err := <-primaryErrChan: - http.Error(w, "Error from primary backend: "+err.Error(), http.StatusBadGateway) - case <-time.After(30 * time.Second): - http.Error(w, "Timeout waiting for primary backend", http.StatusGatewayTimeout) + w.WriteHeader(response.resp.StatusCode) + w.Write(response.body) } - // Wait for all goroutines to complete + // Wait for all goroutines to complete and collect stats go func() { wg.Wait() close(statsChan)