diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 7ee3b36b..f38e0d68 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -248,6 +248,12 @@ type ChainHead struct { Error string // Last error if any } +// MethodRouting contains configuration for method routing decisions +type MethodRouting struct { + SecondaryWhitelist map[string]bool // Methods allowed on secondary backends + PreferSecondary map[string]bool // Methods that should prefer secondary backends +} + // RequestInfo contains parsed information about a JSON-RPC request type RequestInfo struct { Method string @@ -2196,6 +2202,21 @@ func isNullResponse(respBody []byte) bool { return string(response.Result) == "null" } +// methodShouldWaitOnSecondaryError returns true if we should wait for primary +// when secondary returns an error response +func methodShouldWaitOnSecondaryError(method string) bool { + // Methods where secondary errors might be transient and primary could succeed + waitOnErrorMethods := map[string]bool{ + "eth_call": true, // State execution - secondary might be behind or have issues + "eth_estimateGas": true, // Similar to eth_call + "trace_call": true, // Tracing calls + "debug_traceCall": true, // Debug tracing + "eth_createAccessList": true, // Access list creation + } + + return waitOnErrorMethods[method] +} + // flushingResponseWriter wraps http.ResponseWriter to flush after every write type flushingResponseWriter struct { http.ResponseWriter @@ -2210,6 +2231,89 @@ func (f *flushingResponseWriter) Write(p []byte) (n int, err error) { return } +// peekingReader wraps a reader to peek at the first N bytes and detect null or error responses +type peekingReader struct { + r io.Reader + buf []byte + bufPos int + isNull bool + hasError bool + checkDone bool + peekSize int +} + +func newPeekingReader(r io.Reader, peekSize int) *peekingReader { + return &peekingReader{ + r: r, + buf: make([]byte, 0, peekSize), + peekSize: peekSize, + } +} + +func (pr *peekingReader) Read(p []byte) (n int, err error) { + // If we haven't finished checking yet + if !pr.checkDone && len(pr.buf) < pr.peekSize { + // Read more data into our buffer + tempBuf := make([]byte, pr.peekSize-len(pr.buf)) + readN, readErr := pr.r.Read(tempBuf) + if readN > 0 { + pr.buf = append(pr.buf, tempBuf[:readN]...) + } + + // Check if we have enough data or hit EOF + if len(pr.buf) >= pr.peekSize || readErr == io.EOF { + pr.checkDone = true + // Check for null response pattern + pr.detectPatterns() + } + + if readErr != nil && readErr != io.EOF { + return 0, readErr + } + } + + // Serve data from buffer first + if pr.bufPos < len(pr.buf) { + n = copy(p, pr.buf[pr.bufPos:]) + pr.bufPos += n + return n, nil + } + + // Then serve from underlying reader + return pr.r.Read(p) +} + +func (pr *peekingReader) detectPatterns() { + // Look for patterns indicating null result or errors in JSON-RPC response + bufStr := string(pr.buf) + + // Remove whitespace for easier pattern matching + compactStr := strings.ReplaceAll(bufStr, " ", "") + compactStr = strings.ReplaceAll(compactStr, "\n", "") + compactStr = strings.ReplaceAll(compactStr, "\r", "") + compactStr = strings.ReplaceAll(compactStr, "\t", "") + + // Check for null result + pr.isNull = strings.Contains(compactStr, `"result":null`) + + // Check for JSON-RPC errors + pr.hasError = strings.Contains(compactStr, `"error":`) && !strings.Contains(compactStr, `"error":null`) + + // Log what we found for debugging + if pr.isNull || pr.hasError { + log.Printf("Detected short response pattern - null: %v, error: %v, buffer preview: %s", + pr.isNull, pr.hasError, strings.ReplaceAll(bufStr[:minInt(len(bufStr), 80)], "\n", " ")) + } +} + +func (pr *peekingReader) IsNull() bool { + return pr.isNull +} + +func (pr *peekingReader) HasError() bool { + return pr.hasError +} + func main() { // Get configuration from environment variables listenAddr := getEnv("LISTEN_ADDR", ":8080") @@ -2224,6 +2328,10 @@ func main() { minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") + // Method routing configuration + secondaryWhitelistStr := getEnv("SECONDARY_WHITELIST", "") // Methods allowed on secondary + preferSecondaryStr := getEnv("PREFER_SECONDARY", "") // Methods that should prefer secondary + summaryInterval, err := strconv.Atoi(summaryIntervalStr) if err != nil { log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") @@ -2242,6 +2350,32 @@ func main() { minDelayBuffer = 2 } + // Parse method routing configuration + methodRouting := &MethodRouting{ + SecondaryWhitelist: make(map[string]bool), + PreferSecondary: make(map[string]bool), + } + + // Parse whitelist + if secondaryWhitelistStr != "" { + whitelist := strings.Split(secondaryWhitelistStr, ",") + for _, method := range whitelist { + methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true + } + log.Printf("Secondary whitelist: %v", whitelist) + } + + // Parse prefer secondary list + if preferSecondaryStr != "" { + preferList := strings.Split(preferSecondaryStr, ",") + for _, method := range preferList { + methodRouting.PreferSecondary[strings.TrimSpace(method)] = true + // Also add to whitelist automatically + methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true + } + log.Printf("Prefer secondary for methods: %v", preferList) + } + // Create stats collector for periodic summaries statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") @@ -2339,14 +2473,14 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor) + handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor, methodRouting) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor) { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor, methodRouting *MethodRouting) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) @@ -2407,6 +2541,29 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c log.Printf("Method: %s%s", displayMethod, blockTagInfo) } } + + // Check method routing configuration + if !batchInfo.IsBatch && methodRouting != nil { + // For single methods, check routing rules + method := displayMethod + + // Check if this method prefers secondary backends + if methodRouting.PreferSecondary[method] { + if enableDetailedLogs { + log.Printf("Method %s configured to prefer secondary backends", method) + } + } + + // Check if whitelist is configured and method is not in it + if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { + // Method not in whitelist - force primary only + if enableDetailedLogs { + log.Printf("Method %s not in secondary whitelist - using primary only", method) + } + // This will be enforced in the backend loop + } + } + // Process backends with adaptive delay strategy var wg sync.WaitGroup var primaryWg sync.WaitGroup // Separate wait group for primary backend @@ -2424,6 +2581,30 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately for _, backend := range backends { + // Method routing checks for secondary backends + if backend.Role == "secondary" && methodRouting != nil && !batchInfo.IsBatch { + method := displayMethod + + // Check if whitelist is configured and method is not in it + if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s for method %s (not in whitelist)", backend.Name, method) + } + continue + } + } + + // Skip primary backend if method prefers secondary (and we have secondary backends available) + if backend.Role == "primary" && methodRouting != nil && !batchInfo.IsBatch { + method := displayMethod + if methodRouting.PreferSecondary[method] && len(backends) > 1 { + if enableDetailedLogs { + log.Printf("Skipping primary backend for method %s (configured to prefer secondary)", method) + } + continue + } + } + // Skip secondary backends for stateful methods if isStateful && backend.Role != "primary" { if enableDetailedLogs { @@ -2489,43 +2670,56 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // If this is a secondary backend, wait for p75 delay if b.Role != "primary" { - // Get backend-specific delay - var backendSpecificDelay time.Duration - if batchInfo.IsBatch { - // For batch requests, use the maximum delay of all methods for this backend - backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector) - } else if secondaryProbe != nil { - backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod) - } else { - // Fallback to method-based delay if no probe - backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod) - } - - if enableDetailedLogs { - log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod) - } - - delayTimer := time.NewTimer(backendSpecificDelay) - select { - case <-delayTimer.C: - // Timer expired, primary is slow, proceed with secondary request - case <-primaryResponseChan: - // Primary already got a response, skip secondary - delayTimer.Stop() - - // Still record that we skipped this backend - statsChan <- ResponseStats{ - Backend: b.Name, - Error: fmt.Errorf("skipped - primary responded quickly"), - Method: displayMethod, - Duration: time.Since(goroutineStartTime), + // Skip delay if this method prefers secondary backends + skipDelay := false + if methodRouting != nil && !batchInfo.IsBatch { + if methodRouting.PreferSecondary[displayMethod] { + skipDelay = true + if enableDetailedLogs { + log.Printf("Secondary backend %s starting immediately for method %s (configured to prefer secondary)", b.Name, displayMethod) + } } - return - case <-primaryFailedFast: - // Primary failed immediately, start secondary now - delayTimer.Stop() + } + + if !skipDelay { + // Get backend-specific delay + var backendSpecificDelay time.Duration + if batchInfo.IsBatch { + // For batch requests, use the maximum delay of all methods for this backend + backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector) + } else if secondaryProbe != nil { + backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod) + } else { + // Fallback to method-based delay if no probe + backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod) + } + if enableDetailedLogs { - log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) + log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod) + } + + delayTimer := time.NewTimer(backendSpecificDelay) + select { + case <-delayTimer.C: + // Timer expired, primary is slow, proceed with secondary request + case <-primaryResponseChan: + // Primary already got a response, skip secondary + delayTimer.Stop() + + // Still record that we skipped this backend + statsChan <- ResponseStats{ + Backend: b.Name, + Error: fmt.Errorf("skipped - primary responded quickly"), + Method: displayMethod, + Duration: time.Since(goroutineStartTime), + } + return + case <-primaryFailedFast: + // Primary failed immediately, start secondary now + delayTimer.Stop() + if enableDetailedLogs { + log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) + } } } } @@ -2617,22 +2811,101 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // CRITICAL FIX 2: Check for null responses from secondary backends for certain methods if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) { - // Need to read the body to check if it's null - bodyBytes, err := io.ReadAll(resp.Body) - resp.Body.Close() // Close the original body + // Check Content-Length first - null responses are typically very short + contentLength := resp.Header.Get("Content-Length") + if contentLength != "" { + length, err := strconv.Atoi(contentLength) + if err == nil && length < 100 { // Null responses are typically < 100 bytes + // This is suspiciously short, likely a null response + // Create a peeking reader to confirm + peeker := newPeekingReader(resp.Body, 100) + resp.Body = io.NopCloser(peeker) - if err == nil && isNullResponse(bodyBytes) { - // Secondary returned null - don't let it win the race - if enableDetailedLogs { - log.Printf("Secondary backend %s returned null for %s - waiting for primary", - b.Name, displayMethod) + // Read a bit to trigger null detection + smallBuf := make([]byte, 1) + peeker.Read(smallBuf) + + if peeker.IsNull() { + // Confirmed null response - don't let secondary win + if enableDetailedLogs { + log.Printf("Secondary backend %s returned null for %s (Content-Length: %s) - waiting for primary", + b.Name, displayMethod, contentLength) + } + // Close the response body + resp.Body.Close() + return + } + // Not null, continue with normal flow } - return - } + } else { + // No Content-Length header, use peeking reader anyway for safety + peeker := newPeekingReader(resp.Body, 200) // Peek at first 200 bytes + resp.Body = io.NopCloser(peeker) - // Not null or couldn't read - recreate the body for potential use - if err == nil { - resp.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + // Read a bit to trigger null detection + smallBuf := make([]byte, 1) + peeker.Read(smallBuf) + + if peeker.IsNull() { + // Confirmed null response - don't let secondary win + if enableDetailedLogs { + log.Printf("Secondary backend %s returned null for %s - waiting for primary", + b.Name, displayMethod) + } + // Close the response body + resp.Body.Close() + return + } + } + } + + // CRITICAL FIX 3: Check for error responses from secondary backends for certain methods + if b.Role == "secondary" && resp.StatusCode == 200 && methodShouldWaitOnSecondaryError(displayMethod) { + // Check Content-Length first - error responses are typically short + contentLength := resp.Header.Get("Content-Length") + if contentLength != "" { + length, err := strconv.Atoi(contentLength) + if err == nil && length < 500 { // Error responses are typically < 500 bytes + // This might be an error response + // Create a peeking reader to confirm + peeker := newPeekingReader(resp.Body, 500) + resp.Body = io.NopCloser(peeker) + + // Read a bit to trigger error detection + smallBuf := make([]byte, 1) + peeker.Read(smallBuf) + + if peeker.HasError() { + // Confirmed error response - don't let secondary win + if enableDetailedLogs { + log.Printf("Secondary backend %s returned JSON-RPC error for %s (Content-Length: %s) - waiting for primary", + b.Name, displayMethod, contentLength) + } + // Close the response body + resp.Body.Close() + return + } + // Not an error, continue with normal flow + } + } else { + // No Content-Length header, use peeking reader anyway for safety + peeker := newPeekingReader(resp.Body, 500) // Peek at first 500 bytes + resp.Body = io.NopCloser(peeker) + + // Read a bit to trigger error detection + smallBuf := make([]byte, 1) + peeker.Read(smallBuf) + + if peeker.HasError() { + // Confirmed error response - don't let secondary win + if enableDetailedLogs { + log.Printf("Secondary backend %s returned JSON-RPC error for %s - waiting for primary", + b.Name, displayMethod) + } + // Close the response body + resp.Body.Close() + return + } } }