diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index da86cfb7..4d70a17b 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -171,13 +171,14 @@ type StatsCollector struct { appStartTime time.Time // Application start time (never reset) intervalStartTime time.Time // Current interval start time (reset each interval) summaryInterval time.Duration - methodCUPrices map[string]int // Map of method names to CU prices - totalCU int // Total CU earned - methodCU map[string]int // Track CU earned per method - historicalCU []CUDataPoint // Historical CU data for different time windows - hasSecondaryBackends bool // Track if secondary backends are configured - skippedSecondaryRequests int // Track how many secondary requests were skipped - secondaryProbe *SecondaryProbe // Reference to secondary probe + methodCUPrices map[string]int // Map of method names to CU prices + totalCU int // Total CU earned + methodCU map[string]int // Track CU earned per method + historicalCU []CUDataPoint // Historical CU data for different time windows + hasSecondaryBackends bool // Track if secondary backends are configured + skippedSecondaryRequests int // Track how many secondary requests were skipped + secondaryProbe *SecondaryProbe // Reference to secondary probe + blockHeightTracker *BlockHeightTracker // Reference to block height tracker } // SecondaryProbe maintains latency information for secondary backends through active probing @@ -197,6 +198,194 @@ type SecondaryProbe struct { lastSuccessTime time.Time // Last time probes succeeded } +// BlockHeightTracker monitors block heights from different backends +type BlockHeightTracker struct { + mu sync.RWMutex + backends []Backend + client *http.Client + blockHeights map[string]uint64 // Backend name -> latest block number + lastUpdateTime map[string]time.Time // Backend name -> last successful update + checkInterval time.Duration + enableDetailedLogs bool + maxBlockBehind uint64 // Maximum blocks a secondary can be behind primary +} + +// NewBlockHeightTracker creates a new block height tracker +func NewBlockHeightTracker(backends []Backend, client *http.Client, checkInterval time.Duration, + maxBlockBehind uint64, enableDetailedLogs bool) *BlockHeightTracker { + + bht := &BlockHeightTracker{ + backends: backends, + client: client, + blockHeights: make(map[string]uint64), + lastUpdateTime: make(map[string]time.Time), + checkInterval: checkInterval, + enableDetailedLogs: enableDetailedLogs, + maxBlockBehind: maxBlockBehind, + } + + // Start periodic block height checking + go bht.startPeriodicCheck() + + // Run initial check + go bht.checkBlockHeights() + + return bht +} + +// checkBlockHeights queries all backends for their current block number +func (bht *BlockHeightTracker) checkBlockHeights() { + for _, backend := range bht.backends { + go func(b Backend) { + reqBody := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":"blockcheck"}`) + + req, err := http.NewRequest("POST", b.URL, bytes.NewReader(reqBody)) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: failed to create request for %s: %v", b.Name, err) + } + return + } + + req.Header.Set("Content-Type", "application/json") + + start := time.Now() + resp, err := bht.client.Do(req) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: request failed for %s: %v", b.Name, err) + } + return + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + if bht.enableDetailedLogs { + log.Printf("Block height check: HTTP error %d for %s", resp.StatusCode, b.Name) + } + return + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: failed to read response for %s: %v", b.Name, err) + } + return + } + + // Parse JSON-RPC response + var jsonResp struct { + Result string `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` + } + + if err := json.Unmarshal(body, &jsonResp); err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: failed to parse JSON for %s: %v", b.Name, err) + } + return + } + + if jsonResp.Error != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: JSON-RPC error for %s: %s", b.Name, jsonResp.Error.Message) + } + return + } + + // Parse hex block number + blockHex := strings.TrimPrefix(jsonResp.Result, "0x") + blockNum, err := strconv.ParseUint(blockHex, 16, 64) + if err != nil { + if bht.enableDetailedLogs { + log.Printf("Block height check: failed to parse block number for %s: %s", b.Name, jsonResp.Result) + } + return + } + + // Update the block height + bht.mu.Lock() + oldHeight := bht.blockHeights[b.Name] + bht.blockHeights[b.Name] = blockNum + bht.lastUpdateTime[b.Name] = time.Now() + bht.mu.Unlock() + + if bht.enableDetailedLogs { + duration := time.Since(start) + if oldHeight != blockNum { + log.Printf("Block height check: %s updated from %d to %d (took %s)", + b.Name, oldHeight, blockNum, duration) + } + } + }(backend) + } +} + +// startPeriodicCheck runs block height checks at regular intervals +func (bht *BlockHeightTracker) startPeriodicCheck() { + ticker := time.NewTicker(bht.checkInterval) + defer ticker.Stop() + + for range ticker.C { + bht.checkBlockHeights() + } +} + +// isSecondaryBehind checks if a secondary backend is behind the primary by more than maxBlockBehind +func (bht *BlockHeightTracker) isSecondaryBehind(secondaryName string) bool { + bht.mu.RLock() + defer bht.mu.RUnlock() + + // Find primary backend block height + var primaryHeight uint64 + var primaryFound bool + for name, height := range bht.blockHeights { + // Check if this is the primary backend (assuming it's named "primary") + if name == "primary" { + primaryHeight = height + primaryFound = true + break + } + } + + if !primaryFound { + // If we can't find primary height, be conservative and allow secondary + return false + } + + // Get secondary height + secondaryHeight, exists := bht.blockHeights[secondaryName] + if !exists { + // If we don't have secondary height data, be conservative and block it + return true + } + + // Check if secondary is behind by more than the threshold + if primaryHeight > secondaryHeight { + blocksBehind := primaryHeight - secondaryHeight + return blocksBehind > bht.maxBlockBehind + } + + // Secondary is not behind + return false +} + +// getBlockHeightStatus returns a summary of current block heights for logging +func (bht *BlockHeightTracker) getBlockHeightStatus() map[string]uint64 { + bht.mu.RLock() + defer bht.mu.RUnlock() + + result := make(map[string]uint64) + for name, height := range bht.blockHeights { + result[name] = height + } + return result +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -231,6 +420,13 @@ func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) { sc.secondaryProbe = probe } +// SetBlockHeightTracker sets the block height tracker reference after stats collector is created +func (sc *StatsCollector) SetBlockHeightTracker(tracker *BlockHeightTracker) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.blockHeightTracker = tracker +} + // NewSecondaryProbe creates a new secondary probe instance func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { @@ -597,7 +793,8 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { // Don't count skipped secondary backends as errors - if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { + if !strings.Contains(stat.Error.Error(), "skipped - primary responded") && + !strings.Contains(stat.Error.Error(), "skipped - behind in block height") { sc.errorCount++ } else { // Track that we skipped a secondary request @@ -785,6 +982,47 @@ func (sc *StatsCollector) printSummary() { sc.secondaryProbe.mu.RUnlock() } + // Display block height information if available + if sc.blockHeightTracker != nil { + blockStatus := sc.blockHeightTracker.getBlockHeightStatus() + if len(blockStatus) > 0 { + fmt.Printf("\n--- Block Height Status ---\n") + + // Find primary height for comparison + var primaryHeight uint64 + var primaryFound bool + if height, exists := blockStatus["primary"]; exists { + primaryHeight = height + primaryFound = true + } + + // Sort backend names for consistent output + var backendNames []string + for name := range blockStatus { + backendNames = append(backendNames, name) + } + sort.Strings(backendNames) + + for _, name := range backendNames { + height := blockStatus[name] + if name == "primary" { + fmt.Printf(" %s: %d (primary)\n", name, height) + } else if primaryFound { + behind := int64(primaryHeight) - int64(height) + if behind > 0 { + fmt.Printf(" %s: %d (%d blocks behind)\n", name, height, behind) + } else if behind == 0 { + fmt.Printf(" %s: %d (synchronized)\n", name, height) + } else { + fmt.Printf(" %s: %d (%d blocks ahead)\n", name, height, -behind) + } + } else { + fmt.Printf(" %s: %d\n", name, height) + } + } + } + } + if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, @@ -1635,6 +1873,11 @@ func main() { minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") + // Block height tracking configuration + enableBlockHeightTracking := getEnv("ENABLE_BLOCK_HEIGHT_TRACKING", "true") == "true" + blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds + maxBlockBehindStr := getEnv("MAX_BLOCKS_BEHIND", "1") // Default 1 block behind + summaryInterval, err := strconv.Atoi(summaryIntervalStr) if err != nil { log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") @@ -1653,6 +1896,18 @@ func main() { minDelayBuffer = 2 } + blockHeightCheckInterval, err := strconv.Atoi(blockHeightCheckIntervalStr) + if err != nil { + log.Printf("Invalid BLOCK_HEIGHT_CHECK_INTERVAL, using default of 5 seconds") + blockHeightCheckInterval = 5 + } + + maxBlocksBehind, err := strconv.ParseUint(maxBlockBehindStr, 10, 64) + if err != nil { + log.Printf("Invalid MAX_BLOCKS_BEHIND, using default of 1 block") + maxBlocksBehind = 1 + } + // Create stats collector for periodic summaries statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") @@ -1720,6 +1975,27 @@ func main() { } } + // Initialize block height tracker if enabled and we have secondary backends + var blockHeightTracker *BlockHeightTracker + if enableBlockHeightTracking && secondaryBackendsStr != "" { + blockHeightTracker = NewBlockHeightTracker( + backends, + client, + time.Duration(blockHeightCheckInterval)*time.Second, + maxBlocksBehind, + enableDetailedLogs == "true", + ) + + if blockHeightTracker == nil { + log.Printf("Block height tracker initialization failed") + } else { + log.Printf("Block height tracking: enabled (interval: %ds, max blocks behind: %d)", + blockHeightCheckInterval, maxBlocksBehind) + // Set the tracker in stats collector for display + statsCollector.SetBlockHeightTracker(blockHeightTracker) + } + } + // Configure websocket upgrader with larger buffer sizes // 20MB frame size and 50MB message size const ( @@ -1740,14 +2016,14 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe) + handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) @@ -1841,6 +2117,24 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c continue } + // Skip secondary backends if they're behind in block height + if backend.Role != "primary" && blockHeightTracker != nil { + if blockHeightTracker.isSecondaryBehind(backend.Name) { + if enableDetailedLogs { + blockStatus := blockHeightTracker.getBlockHeightStatus() + log.Printf("Skipping secondary backend %s - behind in block height: %v", backend.Name, blockStatus) + } + // Record that we skipped this secondary backend due to block height + statsChan <- ResponseStats{ + Backend: backend.Name, + Error: fmt.Errorf("skipped - behind in block height"), + Method: displayMethod, + Duration: 0, // No actual request made + } + continue + } + } + wg.Add(1) if backend.Role == "primary" { primaryWg.Add(1)