From 97cf8fd6fa352d1c4026eefd3bf0ca224a10ea9b Mon Sep 17 00:00:00 2001 From: Para Dox Date: Thu, 29 May 2025 10:18:30 +0700 Subject: [PATCH] a set of random fixes into the blue --- benchmark-proxy/main.go | 114 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 103 insertions(+), 11 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 8620073d..f0ba9c03 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -1343,6 +1343,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c var responseHandled atomic.Bool var firstBackendStartTime atomic.Pointer[time.Time] primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response + primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately for _, backend := range backends { // Skip secondary backends for stateful methods @@ -1388,6 +1389,12 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Duration: time.Since(goroutineStartTime), } return + case <-primaryFailedFast: + // Primary failed immediately, start secondary now + delayTimer.Stop() + if enableDetailedLogs { + log.Printf("Primary failed fast for %s, starting secondary immediately", method) + } } } @@ -1416,6 +1423,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c reqDuration := time.Since(reqStart) if err != nil { + // If primary failed with connection error, signal secondary to start + if b.Role == "primary" { + select { + case primaryFailedFast <- struct{}{}: + default: + } + } + statsChan <- ResponseStats{ Backend: b.Name, Duration: reqDuration, // Keep backend-specific duration @@ -1427,6 +1442,15 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Don't close resp.Body here - it will be closed by the winner or drained by losers + // Check if primary returned an error status + if b.Role == "primary" && resp.StatusCode >= 400 { + // Any 4xx or 5xx error should trigger immediate secondary + select { + case primaryFailedFast <- struct{}{}: + default: + } + } + // Signal primary response immediately for secondary backends to check if b.Role == "primary" && resp.StatusCode < 400 { select { @@ -1677,6 +1701,8 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B // Connect to all backends var wg sync.WaitGroup + primaryConnected := make(chan bool, 1) // Track if primary connected successfully + for _, backend := range backends { wg.Add(1) go func(b Backend) { @@ -1720,6 +1746,14 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status) stats.Error = err statsCollector.AddWebSocketStats(stats) + + // If primary failed to connect, signal that + if b.Role == "primary" { + select { + case primaryConnected <- false: + default: + } + } return } defer backendConn.Close() @@ -1730,39 +1764,81 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B stats.IsActive = true statsCollector.AddWebSocketStats(stats) + // If this is the primary backend, signal successful connection + if b.Role == "primary" { + select { + case primaryConnected <- true: + default: + } + } + // If this is the primary backend, set up bidirectional proxying if b.Role == "primary" { + // Channel to signal when primary connection fails + primaryFailed := make(chan struct{}, 2) // Buffered for 2 signals + // Forward messages from client to primary backend go func() { for { messageType, message, err := clientConn.ReadMessage() if err != nil { log.Printf("Error reading from client: %v", err) + select { + case primaryFailed <- struct{}{}: + default: + } return } err = backendConn.WriteMessage(messageType, message) if err != nil { log.Printf("Error writing to primary backend: %v", err) + select { + case primaryFailed <- struct{}{}: + default: + } return } } }() // Forward messages from primary backend to client - for { - messageType, message, err := backendConn.ReadMessage() - if err != nil { - log.Printf("Error reading from primary backend: %v", err) - return - } + go func() { + for { + messageType, message, err := backendConn.ReadMessage() + if err != nil { + log.Printf("Error reading from primary backend: %v", err) + select { + case primaryFailed <- struct{}{}: + default: + } + return + } - err = clientConn.WriteMessage(messageType, message) - if err != nil { - log.Printf("Error writing to client: %v", err) - return + err = clientConn.WriteMessage(messageType, message) + if err != nil { + log.Printf("Error writing to client: %v", err) + select { + case primaryFailed <- struct{}{}: + default: + } + return + } } - } + }() + + // Wait for primary connection failure + <-primaryFailed + + // Primary backend failed, close client connection with proper close message + log.Printf("Primary backend WebSocket failed, closing client connection") + closeMsg := websocket.FormatCloseMessage(websocket.CloseGoingAway, + "Primary backend unavailable") + clientConn.WriteControl(websocket.CloseMessage, closeMsg, + time.Now().Add(time.Second)) + + // Return to trigger cleanup + return } else { // For secondary backends, just read and discard messages for { @@ -1778,6 +1854,22 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B // Wait for all connections to terminate wg.Wait() + + // Check if primary connected successfully + select { + case connected := <-primaryConnected: + if !connected { + // Primary failed to connect, close client connection + log.Printf("Primary backend WebSocket failed to connect, closing client connection") + closeMsg := websocket.FormatCloseMessage(websocket.CloseServiceRestart, + "Primary backend unavailable") + clientConn.WriteControl(websocket.CloseMessage, closeMsg, + time.Now().Add(time.Second)) + } + default: + // No primary backend in the configuration (shouldn't happen) + log.Printf("Warning: No primary backend configured for WebSocket") + } } func getEnv(key, fallback string) string {