diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 4d70a17b..ea7b5cdd 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -198,140 +198,205 @@ type SecondaryProbe struct { lastSuccessTime time.Time // Last time probes succeeded } -// BlockHeightTracker monitors block heights from different backends +// BlockHeightTracker monitors block heights from different backends using WebSocket subscriptions type BlockHeightTracker struct { mu sync.RWMutex backends []Backend - client *http.Client - blockHeights map[string]uint64 // Backend name -> latest block number - lastUpdateTime map[string]time.Time // Backend name -> last successful update - checkInterval time.Duration + blockHeights map[string]uint64 // Backend name -> latest block number + lastUpdateTime map[string]time.Time // Backend name -> last successful update + connections map[string]*websocket.Conn // Active WebSocket connections enableDetailedLogs bool maxBlockBehind uint64 // Maximum blocks a secondary can be behind primary + stopChan chan struct{} } -// NewBlockHeightTracker creates a new block height tracker +// NewBlockHeightTracker creates a new WebSocket-based block height tracker func NewBlockHeightTracker(backends []Backend, client *http.Client, checkInterval time.Duration, maxBlockBehind uint64, enableDetailedLogs bool) *BlockHeightTracker { bht := &BlockHeightTracker{ backends: backends, - client: client, blockHeights: make(map[string]uint64), lastUpdateTime: make(map[string]time.Time), - checkInterval: checkInterval, + connections: make(map[string]*websocket.Conn), enableDetailedLogs: enableDetailedLogs, maxBlockBehind: maxBlockBehind, + stopChan: make(chan struct{}), } - // Start periodic block height checking - go bht.startPeriodicCheck() - - // Run initial check - go bht.checkBlockHeights() + // Start WebSocket connections for all backends + go bht.startWebSocketConnections() return bht } -// checkBlockHeights queries all backends for their current block number -func (bht *BlockHeightTracker) checkBlockHeights() { +// startWebSocketConnections establishes WebSocket connections to all backends +func (bht *BlockHeightTracker) startWebSocketConnections() { for _, backend := range bht.backends { - go func(b Backend) { - reqBody := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":"blockcheck"}`) - - req, err := http.NewRequest("POST", b.URL, bytes.NewReader(reqBody)) - if err != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: failed to create request for %s: %v", b.Name, err) - } - return - } - - req.Header.Set("Content-Type", "application/json") - - start := time.Now() - resp, err := bht.client.Do(req) - if err != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: request failed for %s: %v", b.Name, err) - } - return - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - if bht.enableDetailedLogs { - log.Printf("Block height check: HTTP error %d for %s", resp.StatusCode, b.Name) - } - return - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: failed to read response for %s: %v", b.Name, err) - } - return - } - - // Parse JSON-RPC response - var jsonResp struct { - Result string `json:"result"` - Error *struct { - Code int `json:"code"` - Message string `json:"message"` - } `json:"error"` - } - - if err := json.Unmarshal(body, &jsonResp); err != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: failed to parse JSON for %s: %v", b.Name, err) - } - return - } - - if jsonResp.Error != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: JSON-RPC error for %s: %s", b.Name, jsonResp.Error.Message) - } - return - } - - // Parse hex block number - blockHex := strings.TrimPrefix(jsonResp.Result, "0x") - blockNum, err := strconv.ParseUint(blockHex, 16, 64) - if err != nil { - if bht.enableDetailedLogs { - log.Printf("Block height check: failed to parse block number for %s: %s", b.Name, jsonResp.Result) - } - return - } - - // Update the block height - bht.mu.Lock() - oldHeight := bht.blockHeights[b.Name] - bht.blockHeights[b.Name] = blockNum - bht.lastUpdateTime[b.Name] = time.Now() - bht.mu.Unlock() - - if bht.enableDetailedLogs { - duration := time.Since(start) - if oldHeight != blockNum { - log.Printf("Block height check: %s updated from %d to %d (took %s)", - b.Name, oldHeight, blockNum, duration) - } - } - }(backend) + go bht.connectToBackend(backend) } } -// startPeriodicCheck runs block height checks at regular intervals -func (bht *BlockHeightTracker) startPeriodicCheck() { - ticker := time.NewTicker(bht.checkInterval) - defer ticker.Stop() +// connectToBackend establishes and maintains a WebSocket connection to a single backend +func (bht *BlockHeightTracker) connectToBackend(backend Backend) { + for { + select { + case <-bht.stopChan: + return + default: + } - for range ticker.C { - bht.checkBlockHeights() + // Create WebSocket URL from HTTP URL + wsURL := strings.Replace(backend.URL, "http://", "ws://", 1) + wsURL = strings.Replace(wsURL, "https://", "wss://", 1) + + if bht.enableDetailedLogs { + log.Printf("Block height tracker: connecting to %s at %s", backend.Name, wsURL) + } + + // Connect to WebSocket + conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: failed to connect to %s: %v", backend.Name, err) + } + time.Sleep(10 * time.Second) // Wait before retry + continue + } + + // Store connection + bht.mu.Lock() + bht.connections[backend.Name] = conn + bht.mu.Unlock() + + // Subscribe to new block headers + subscribeMsg := map[string]interface{}{ + "jsonrpc": "2.0", + "method": "eth_subscribe", + "params": []string{"newHeads"}, + "id": 1, + } + + if err := conn.WriteJSON(subscribeMsg); err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: failed to subscribe to %s: %v", backend.Name, err) + } + conn.Close() + bht.mu.Lock() + delete(bht.connections, backend.Name) + bht.mu.Unlock() + time.Sleep(5 * time.Second) + continue + } + + if bht.enableDetailedLogs { + log.Printf("Block height tracker: subscribed to newHeads for %s", backend.Name) + } + + // Handle incoming messages + bht.handleWebSocketMessages(conn, backend) + + // Connection closed, clean up + bht.mu.Lock() + delete(bht.connections, backend.Name) + bht.mu.Unlock() + + if bht.enableDetailedLogs { + log.Printf("Block height tracker: connection to %s closed, will retry", backend.Name) + } + + time.Sleep(5 * time.Second) // Wait before reconnecting + } +} + +// handleWebSocketMessages processes incoming WebSocket messages from a backend +func (bht *BlockHeightTracker) handleWebSocketMessages(conn *websocket.Conn, backend Backend) { + defer conn.Close() + + for { + select { + case <-bht.stopChan: + return + default: + } + + var message map[string]interface{} + if err := conn.ReadJSON(&message); err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: error reading from %s: %v", backend.Name, err) + } + return + } + + // Check if this is a subscription notification + if method, exists := message["method"]; exists && method == "eth_subscription" { + bht.handleSubscriptionNotification(message, backend) + } else if result, exists := message["result"]; exists { + // This might be the subscription confirmation + if bht.enableDetailedLogs { + log.Printf("Block height tracker: subscription confirmed for %s: %v", backend.Name, result) + } + } else if errorObj, exists := message["error"]; exists { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: error from %s: %v", backend.Name, errorObj) + } + } + } +} + +// handleSubscriptionNotification processes a newHeads subscription notification +func (bht *BlockHeightTracker) handleSubscriptionNotification(message map[string]interface{}, backend Backend) { + params, exists := message["params"] + if !exists { + return + } + + paramsMap, ok := params.(map[string]interface{}) + if !ok { + return + } + + result, exists := paramsMap["result"] + if !exists { + return + } + + resultMap, ok := result.(map[string]interface{}) + if !ok { + return + } + + // Extract block number from the header + blockNumberHex, exists := resultMap["number"] + if !exists { + return + } + + blockNumberStr, ok := blockNumberHex.(string) + if !ok { + return + } + + // Parse hex block number + blockHex := strings.TrimPrefix(blockNumberStr, "0x") + blockNum, err := strconv.ParseUint(blockHex, 16, 64) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: failed to parse block number for %s: %s", backend.Name, blockNumberStr) + } + return + } + + // Update the block height + bht.mu.Lock() + oldHeight := bht.blockHeights[backend.Name] + bht.blockHeights[backend.Name] = blockNum + bht.lastUpdateTime[backend.Name] = time.Now() + bht.mu.Unlock() + + if bht.enableDetailedLogs && oldHeight != blockNum { + log.Printf("Block height tracker: %s updated from %d to %d via WebSocket", + backend.Name, oldHeight, blockNum) } } @@ -386,6 +451,22 @@ func (bht *BlockHeightTracker) getBlockHeightStatus() map[string]uint64 { return result } +// Stop gracefully shuts down the block height tracker +func (bht *BlockHeightTracker) Stop() { + close(bht.stopChan) + + // Close all WebSocket connections + bht.mu.Lock() + defer bht.mu.Unlock() + + for name, conn := range bht.connections { + if bht.enableDetailedLogs { + log.Printf("Block height tracker: closing connection to %s", name) + } + conn.Close() + } +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -1845,6 +1926,113 @@ func isStatefulMethod(method string) bool { return statefulMethods[method] } +// isExpensiveMethod returns true if the method is computationally expensive and should be offloaded to secondary +func isExpensiveMethod(method string) bool { + expensiveMethods := map[string]bool{ + // Debug methods - typically very expensive + "debug_traceBlockByHash": true, + "debug_traceBlockByNumber": true, + "debug_traceCall": true, + "debug_traceTransaction": true, + "debug_storageRangeAt": true, + "debug_getModifiedAccountsByHash": true, + "debug_getModifiedAccountsByNumber": true, + + // Trace methods - expensive computation + "trace_block": true, + "trace_call": true, + "trace_callMany": true, + "trace_filter": true, + "trace_get": true, + "trace_rawTransaction": true, + "trace_replayBlockTransactions": true, + "trace_replayTransaction": true, + "trace_transaction": true, + + // VM trace variants - extremely expensive + "trace_replayBlockTransactions#vmTrace": true, + "trace_replayTransaction#vmTrace": true, + } + + return expensiveMethods[method] +} + +// isExpensiveMethodByStats determines if a method is expensive based on actual performance statistics +func (sc *StatsCollector) isExpensiveMethodByStats(method string) bool { + sc.mu.Lock() + defer sc.mu.Unlock() + + // Need sufficient data to make a determination + const minSamplesForAnalysis = 10 + const expensiveThresholdMultiplier = 5.0 + + // Get all method durations for primary backend to calculate average + var allDurations []time.Duration + totalMethods := 0 + + // Collect all durations from primary backend across all methods + if methodStats, exists := sc.backendMethodStats["primary"]; exists { + for _, durations := range methodStats { + if len(durations) >= minSamplesForAnalysis { + allDurations = append(allDurations, durations...) + totalMethods++ + } + } + } + + // Need sufficient methods and samples for meaningful analysis + if len(allDurations) < minSamplesForAnalysis*3 || totalMethods < 3 { + return false // Not enough data, be conservative + } + + // Calculate overall average latency across all methods on primary + var totalDuration time.Duration + for _, duration := range allDurations { + totalDuration += duration + } + averageLatency := totalDuration / time.Duration(len(allDurations)) + + // Get this method's durations from primary backend + var methodDurations []time.Duration + if methodStats, exists := sc.backendMethodStats["primary"]; exists { + if durations, methodExists := methodStats[method]; methodExists { + methodDurations = durations + } + } + + // Need sufficient samples for this specific method + if len(methodDurations) < minSamplesForAnalysis { + return false // Not enough data for this method + } + + // Find minimum duration for this method (best case performance) + minDuration := methodDurations[0] + for _, duration := range methodDurations { + if duration < minDuration { + minDuration = duration + } + } + + // Method is expensive if its minimum time is >= 5x the average + expensiveThreshold := time.Duration(float64(averageLatency) * expensiveThresholdMultiplier) + + return minDuration >= expensiveThreshold +} + +// hasAvailableSecondaryAtChainHead checks if there are synchronized secondary backends available +func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker) bool { + if blockHeightTracker == nil { + return false + } + + for _, backend := range backends { + if backend.Role == "secondary" && !blockHeightTracker.isSecondaryBehind(backend.Name) { + return true + } + } + return false +} + // flushingResponseWriter wraps http.ResponseWriter to flush after every write type flushingResponseWriter struct { http.ResponseWriter @@ -1875,9 +2063,12 @@ func main() { // Block height tracking configuration enableBlockHeightTracking := getEnv("ENABLE_BLOCK_HEIGHT_TRACKING", "true") == "true" - blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds + blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds (unused for WebSocket) maxBlockBehindStr := getEnv("MAX_BLOCKS_BEHIND", "1") // Default 1 block behind + // Expensive method routing configuration + enableExpensiveMethodRouting := getEnv("ENABLE_EXPENSIVE_METHOD_ROUTING", "true") == "true" + summaryInterval, err := strconv.Atoi(summaryIntervalStr) if err != nil { log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") @@ -1936,6 +2127,9 @@ func main() { if enableSecondaryProbing && secondaryBackendsStr != "" { log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer) } + if enableExpensiveMethodRouting && secondaryBackendsStr != "" { + log.Printf("Expensive method routing: enabled (trace/debug calls prefer secondary backends)") + } // Set up HTTP client with reasonable timeouts client := &http.Client{ @@ -1989,8 +2183,8 @@ func main() { if blockHeightTracker == nil { log.Printf("Block height tracker initialization failed") } else { - log.Printf("Block height tracking: enabled (interval: %ds, max blocks behind: %d)", - blockHeightCheckInterval, maxBlocksBehind) + log.Printf("Block height tracking: enabled (WebSocket-based, max blocks behind: %d)", + maxBlocksBehind) // Set the tracker in stats collector for display statsCollector.SetBlockHeightTracker(blockHeightTracker) } @@ -2016,14 +2210,14 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker) + handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker, enableExpensiveMethodRouting) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker) { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker, enableExpensiveMethodRouting bool) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) @@ -2111,6 +2305,37 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately + // Check if this is an expensive method that should be offloaded to secondary + var isExpensive bool + var hasExpensiveMethod bool + if batchInfo.IsBatch { + // Check if any method in the batch is expensive based on stats + for _, method := range batchInfo.Methods { + if statsCollector.isExpensiveMethodByStats(method) { + hasExpensiveMethod = true + if enableDetailedLogs { + log.Printf("Stats-based expensive method detected in batch: %s", method) + } + break + } + } + } else { + hasExpensiveMethod = statsCollector.isExpensiveMethodByStats(batchInfo.Methods[0]) + if hasExpensiveMethod && enableDetailedLogs { + log.Printf("Stats-based expensive method detected: %s", batchInfo.Methods[0]) + } + } + isExpensive = hasExpensiveMethod + + // For expensive methods, prefer secondary backends if available and synchronized + var preferSecondary bool + if enableExpensiveMethodRouting && isExpensive && !isStateful && hasAvailableSecondaryAtChainHead(backends, blockHeightTracker) { + preferSecondary = true + if enableDetailedLogs { + log.Printf("Expensive method detected (%s), preferring secondary backends", displayMethod) + } + } + for _, backend := range backends { // Skip secondary backends for stateful methods if isStateful && backend.Role != "primary" { @@ -2135,6 +2360,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } } + // For expensive methods, skip primary backend if we prefer secondary and have good ones + if preferSecondary && backend.Role == "primary" { + // Still add primary to the pool but with a delay to let secondaries try first + if enableDetailedLogs { + log.Printf("Delaying primary backend for expensive method %s to allow secondary backends priority", displayMethod) + } + } + wg.Add(1) if backend.Role == "primary" { primaryWg.Add(1) @@ -2155,30 +2388,53 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c firstBackendStartTime.Store(&t) } - // If this is a secondary backend, wait for p75 delay + // Delay logic based on backend role and method type if b.Role != "primary" { - delayTimer := time.NewTimer(secondaryDelay) + // Secondary backend: apply normal delay logic unless it's an expensive method + if !preferSecondary { + // Normal delay for secondary backends + delayTimer := time.NewTimer(secondaryDelay) + select { + case <-delayTimer.C: + // Timer expired, primary is slow, proceed with secondary request + case <-primaryResponseChan: + // Primary already got a response, skip secondary + delayTimer.Stop() + + // Still record that we skipped this backend + statsChan <- ResponseStats{ + Backend: b.Name, + Error: fmt.Errorf("skipped - primary responded quickly"), + Method: displayMethod, + Duration: time.Since(goroutineStartTime), + } + return + case <-primaryFailedFast: + // Primary failed immediately, start secondary now + delayTimer.Stop() + if enableDetailedLogs { + log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) + } + } + } else { + // Expensive method: secondary backends start immediately + if enableDetailedLogs { + log.Printf("Starting secondary backend %s immediately for expensive method %s", b.Name, displayMethod) + } + } + } else if preferSecondary { + // Primary backend: delay for expensive methods to give secondaries priority + expensiveMethodDelay := 50 * time.Millisecond // Give secondaries a head start + delayTimer := time.NewTimer(expensiveMethodDelay) select { case <-delayTimer.C: - // Timer expired, primary is slow, proceed with secondary request - case <-primaryResponseChan: - // Primary already got a response, skip secondary - delayTimer.Stop() - - // Still record that we skipped this backend - statsChan <- ResponseStats{ - Backend: b.Name, - Error: fmt.Errorf("skipped - primary responded quickly"), - Method: displayMethod, - Duration: time.Since(goroutineStartTime), - } - return - case <-primaryFailedFast: - // Primary failed immediately, start secondary now - delayTimer.Stop() + // Timer expired, proceed with primary request if enableDetailedLogs { - log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) + log.Printf("Starting primary backend after delay for expensive method %s", displayMethod) } + case <-primaryFailedFast: + // If something went wrong, start immediately + delayTimer.Stop() } }