diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 8dda5a58..47b06df5 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -202,10 +202,10 @@ type StatsCollector struct { methodCU map[string]int // Track CU earned per method historicalCU []CUDataPoint // Historical CU data for different time windows hasSecondaryBackends bool // Track if secondary backends are configured - skippedSecondaryRequests int // Track how many REQUESTS had secondaries skipped - totalSecondarySkips int // Track total individual secondary backend skips + skippedSecondaryRequests int // Track how many secondary requests were skipped secondaryProbe *SecondaryProbe // Reference to secondary probe chainHeadMonitor *ChainHeadMonitor // Reference to chain head monitor + ethCallStats *EthCallStats // Track eth_call specific statistics } // SecondaryProbe maintains latency information for secondary backends through active probing @@ -262,6 +262,43 @@ type RequestInfo struct { HasParams bool } +// EthCallFeatures tracks specific features used in eth_call requests +type EthCallFeatures struct { + HasStateOverrides bool // Whether the call includes state overrides + BlockTagType string // Type of block tag: latest, pending, safe, finalized, number, hash, earliest + HasAccessList bool // Whether the call includes an access list + GasLimit uint64 // Gas limit if specified + DataSize int // Size of the call data + IsContractCall bool // Whether 'to' address is specified + HasValue bool // Whether the call includes value transfer +} + +// EthCallStats tracks statistics for eth_call requests by feature category +type EthCallStats struct { + TotalCount int + SecondaryWins int + PrimaryOnlyCount int // Requests that only went to primary + ErrorCount int + ByBlockTagType map[string]*EthCallCategoryStats + WithStateOverrides *EthCallCategoryStats + WithoutStateOverrides *EthCallCategoryStats + WithAccessList *EthCallCategoryStats + WithoutAccessList *EthCallCategoryStats + ByDataSizeRange map[string]*EthCallCategoryStats // Ranges: small(<1KB), medium(1-10KB), large(>10KB) +} + +// EthCallCategoryStats tracks stats for a specific category of eth_call +type EthCallCategoryStats struct { + Count int + SecondaryWins int + PrimaryOnlyCount int + ErrorCount int + AverageDuration time.Duration + P50Duration time.Duration + P90Duration time.Duration + Durations []time.Duration // For calculating percentiles +} + // Full JSON-RPC request structure for parsing parameters type JSONRPCFullRequest struct { Method string `json:"method"` @@ -551,6 +588,128 @@ func parseTraceFilter(params json.RawMessage) ([]string, error) { return blockTags, nil } +// parseEthCallFeatures extracts feature information from an eth_call request +func parseEthCallFeatures(params json.RawMessage, blockTag string) (*EthCallFeatures, error) { + features := &EthCallFeatures{ + BlockTagType: classifyBlockTag(blockTag), + } + + // eth_call params: [call_object, block_tag, state_overrides] + var paramArray []json.RawMessage + if err := json.Unmarshal(params, ¶mArray); err != nil { + return nil, err + } + + if len(paramArray) == 0 { + return nil, fmt.Errorf("eth_call requires at least one parameter") + } + + // Parse the call object (first parameter) + var callObject struct { + From json.RawMessage `json:"from"` + To json.RawMessage `json:"to"` + Gas json.RawMessage `json:"gas"` + GasPrice json.RawMessage `json:"gasPrice"` + Value json.RawMessage `json:"value"` + Data json.RawMessage `json:"data"` + AccessList json.RawMessage `json:"accessList"` + } + + if err := json.Unmarshal(paramArray[0], &callObject); err != nil { + return nil, err + } + + // Check if contract call (has 'to' address) + if len(callObject.To) > 0 { + var to string + if err := json.Unmarshal(callObject.To, &to); err == nil && to != "" && to != "0x0" { + features.IsContractCall = true + } + } + + // Check if has value + if len(callObject.Value) > 0 { + var value string + if err := json.Unmarshal(callObject.Value, &value); err == nil && value != "" && value != "0x0" { + features.HasValue = true + } + } + + // Check data size + if len(callObject.Data) > 0 { + var data string + if err := json.Unmarshal(callObject.Data, &data); err == nil { + // Remove 0x prefix and calculate byte size + if strings.HasPrefix(data, "0x") { + features.DataSize = (len(data) - 2) / 2 // Each byte is 2 hex chars + } + } + } + + // Check gas limit + if len(callObject.Gas) > 0 { + var gasHex string + if err := json.Unmarshal(callObject.Gas, &gasHex); err == nil && gasHex != "" { + if strings.HasPrefix(gasHex, "0x") { + gas, err := strconv.ParseUint(gasHex[2:], 16, 64) + if err == nil { + features.GasLimit = gas + } + } + } + } + + // Check for access list + if len(callObject.AccessList) > 0 && string(callObject.AccessList) != "null" { + features.HasAccessList = true + } + + // Check for state overrides (third parameter) + if len(paramArray) >= 3 && len(paramArray[2]) > 0 && string(paramArray[2]) != "null" && string(paramArray[2]) != "{}" { + features.HasStateOverrides = true + } + + return features, nil +} + +// classifyBlockTag categorizes a block tag into types +func classifyBlockTag(blockTag string) string { + if blockTag == "" { + return "latest" // Default + } + + // Check for special tags + switch blockTag { + case "latest", "pending", "safe", "finalized", "earliest": + return blockTag + } + + // Check if it's a block hash (0x followed by 64 hex chars) + if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") { + return "hash" + } + + // Check if it's a block number (hex) + if strings.HasPrefix(blockTag, "0x") { + _, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64) + if err == nil { + return "number" + } + } + + return "unknown" +} + +// getDataSizeRange categorizes data size into ranges +func getDataSizeRange(dataSize int) string { + if dataSize < 1024 { // Less than 1KB + return "small" + } else if dataSize <= 10240 { // 1KB to 10KB + return "medium" + } + return "large" // More than 10KB +} + // requiresPrimaryBackend checks if a request must be routed to primary based on block tag func requiresPrimaryBackend(blockTag string) bool { // These block tags must always go to primary @@ -651,6 +810,30 @@ func canUseSecondaryForBlockTag(blockTag string, backendName string, chainHeadMo return false } +// initEthCallStats initializes the eth_call statistics structure +func initEthCallStats() *EthCallStats { + return &EthCallStats{ + ByBlockTagType: map[string]*EthCallCategoryStats{ + "latest": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "pending": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "safe": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "finalized": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "earliest": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "number": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "hash": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + }, + WithStateOverrides: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + WithoutStateOverrides: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + WithAccessList: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + WithoutAccessList: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + ByDataSizeRange: map[string]*EthCallCategoryStats{ + "small": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "medium": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + "large": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, + }, + } +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -670,6 +853,7 @@ func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) methodCU: make(map[string]int), historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals hasSecondaryBackends: hasSecondaryBackends, + ethCallStats: initEthCallStats(), // Initialize eth_call stats } // Start the periodic summary goroutine @@ -1039,7 +1223,6 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur } // Add stats to the collection (skip actual-first-response as it's synthetic) - requestHadSkips := false // Track if this request had any secondary skips for _, stat := range stats { if stat.Backend == "actual-first-response" { continue // Don't add synthetic entries to regular stats @@ -1051,9 +1234,8 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { sc.errorCount++ } else { - // Track individual secondary skip - sc.totalSecondarySkips++ - requestHadSkips = true + // Track that we skipped a secondary request + sc.skippedSecondaryRequests++ } } @@ -1093,11 +1275,6 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur } } - // Count this request as having skipped secondaries if any were skipped - if requestHadSkips { - sc.skippedSecondaryRequests++ - } - sc.totalRequests++ } @@ -1139,6 +1316,129 @@ func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) { } } +// AddEthCallStats adds statistics for an eth_call request +func (sc *StatsCollector) AddEthCallStats(features *EthCallFeatures, winningBackend string, + duration time.Duration, hasError bool, sentToSecondary bool) { + + sc.mu.Lock() + defer sc.mu.Unlock() + + // Update total count + sc.ethCallStats.TotalCount++ + + // Update error count + if hasError { + sc.ethCallStats.ErrorCount++ + } + + // Update wins + if winningBackend != "" && winningBackend != "primary" { + sc.ethCallStats.SecondaryWins++ + } + + // Update primary-only count + if !sentToSecondary { + sc.ethCallStats.PrimaryOnlyCount++ + } + + // Update block tag type stats + if catStats, exists := sc.ethCallStats.ByBlockTagType[features.BlockTagType]; exists { + catStats.Count++ + if hasError { + catStats.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + catStats.SecondaryWins++ + } + if !sentToSecondary { + catStats.PrimaryOnlyCount++ + } + if !hasError { + catStats.Durations = append(catStats.Durations, duration) + } + } + + // Update state override stats + if features.HasStateOverrides { + sc.ethCallStats.WithStateOverrides.Count++ + if hasError { + sc.ethCallStats.WithStateOverrides.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + sc.ethCallStats.WithStateOverrides.SecondaryWins++ + } + if !sentToSecondary { + sc.ethCallStats.WithStateOverrides.PrimaryOnlyCount++ + } + if !hasError { + sc.ethCallStats.WithStateOverrides.Durations = append(sc.ethCallStats.WithStateOverrides.Durations, duration) + } + } else { + sc.ethCallStats.WithoutStateOverrides.Count++ + if hasError { + sc.ethCallStats.WithoutStateOverrides.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + sc.ethCallStats.WithoutStateOverrides.SecondaryWins++ + } + if !sentToSecondary { + sc.ethCallStats.WithoutStateOverrides.PrimaryOnlyCount++ + } + if !hasError { + sc.ethCallStats.WithoutStateOverrides.Durations = append(sc.ethCallStats.WithoutStateOverrides.Durations, duration) + } + } + + // Update access list stats + if features.HasAccessList { + sc.ethCallStats.WithAccessList.Count++ + if hasError { + sc.ethCallStats.WithAccessList.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + sc.ethCallStats.WithAccessList.SecondaryWins++ + } + if !sentToSecondary { + sc.ethCallStats.WithAccessList.PrimaryOnlyCount++ + } + if !hasError { + sc.ethCallStats.WithAccessList.Durations = append(sc.ethCallStats.WithAccessList.Durations, duration) + } + } else { + sc.ethCallStats.WithoutAccessList.Count++ + if hasError { + sc.ethCallStats.WithoutAccessList.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + sc.ethCallStats.WithoutAccessList.SecondaryWins++ + } + if !sentToSecondary { + sc.ethCallStats.WithoutAccessList.PrimaryOnlyCount++ + } + if !hasError { + sc.ethCallStats.WithoutAccessList.Durations = append(sc.ethCallStats.WithoutAccessList.Durations, duration) + } + } + + // Update data size range stats + dataSizeRange := getDataSizeRange(features.DataSize) + if catStats, exists := sc.ethCallStats.ByDataSizeRange[dataSizeRange]; exists { + catStats.Count++ + if hasError { + catStats.ErrorCount++ + } + if winningBackend != "" && winningBackend != "primary" { + catStats.SecondaryWins++ + } + if !sentToSecondary { + catStats.PrimaryOnlyCount++ + } + if !hasError { + catStats.Durations = append(catStats.Durations, duration) + } + } +} + func (sc *StatsCollector) periodicSummary() { ticker := time.NewTicker(sc.summaryInterval) defer ticker.Stop() @@ -1315,14 +1615,9 @@ func (sc *StatsCollector) printSummary() { } if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { - fmt.Printf("Requests with Skipped Secondaries: %d (%.1f%% of requests)\n", + fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, float64(sc.skippedSecondaryRequests)/float64(sc.totalRequests)*100) - if sc.totalSecondarySkips > sc.skippedSecondaryRequests { - fmt.Printf("Total Secondary Skips: %d (%.1f skips per request with skips)\n", - sc.totalSecondarySkips, - float64(sc.totalSecondarySkips)/float64(sc.skippedSecondaryRequests)) - } } fmt.Printf("Total Compute Units Earned (current interval): %d CU\n", sc.totalCU) @@ -1626,2187 +1921,3 @@ func (sc *StatsCollector) printSummary() { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) - - p50 = durations[p50idx] - p90 = durations[p90idx] - p99 = durations[p99idx] - } - - // Add CU information to the output - cuPrice := sc.methodCUPrices[method] - cuEarned := sc.methodCU[method] - - fmt.Printf(" %-50s Count: %-5d Avg: %-10s Min: %-10s Max: %-10s p50: %-10s p90: %-10s p99: %-10s CU: %d (%d)\n", - displayLabel, len(durations), - formatDuration(avg), formatDuration(minDuration), formatDuration(max), - formatDuration(p50), formatDuration(p90), formatDuration(p99), - cuEarned, cuPrice) - } - } - - // Print per-method statistics for ALL backends - if len(sc.backendMethodStats) > 0 { - fmt.Printf("\nPer-Method Backend Comparison (Top 3 Methods):\n") - - // Collect all unique methods across all backends with their total counts - methodCounts := make(map[string]int) - for _, methods := range sc.backendMethodStats { - for method, durations := range methods { - methodCounts[method] += len(durations) - } - } - - // Sort methods by total count (descending) - type methodCount struct { - method string - count int - } - var methodList []methodCount - for method, count := range methodCounts { - methodList = append(methodList, methodCount{method, count}) - } - sort.Slice(methodList, func(i, j int) bool { - return methodList[i].count > methodList[j].count - }) - - // Only show top 3 methods - maxMethods := 3 - if len(methodList) < maxMethods { - maxMethods = len(methodList) - } - - // For each of the top methods, show stats from all backends - for i := 0; i < maxMethods; i++ { - method := methodList[i].method - - fmt.Printf("\n Method: %s (Total requests: %d)\n", method, methodList[i].count) - - // Check if this is a stateful method - if isStatefulMethod(method) { - fmt.Printf(" Note: Stateful method - only sent to primary backend\n") - } - - // Show wins for this method if available - if methodWins, exists := sc.methodBackendWins[method]; exists { - fmt.Printf(" First Response Wins: ") - totalMethodWins := 0 - for _, wins := range methodWins { - totalMethodWins += wins - } - - // Sort backends by wins for this method - var methodWinList []backendWin - for backend, wins := range methodWins { - methodWinList = append(methodWinList, backendWin{backend, wins}) - } - sort.Slice(methodWinList, func(i, j int) bool { - return methodWinList[i].wins > methodWinList[j].wins - }) - - // Print wins inline - for idx, bw := range methodWinList { - if idx > 0 { - fmt.Printf(", ") - } - winPercentage := float64(bw.wins) / float64(totalMethodWins) * 100 - fmt.Printf("%s: %d (%.1f%%)", bw.backend, bw.wins, winPercentage) - } - fmt.Printf("\n") - } - - fmt.Printf(" %-20s %10s %10s %10s %10s %10s %10s %10s\n", - "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") - fmt.Printf(" %s\n", strings.Repeat("-", 98)) - - for _, backend := range backendNames { - durations, exists := sc.backendMethodStats[backend][method] - if !exists || len(durations) == 0 { - continue - } - - sort.Slice(durations, func(i, j int) bool { - return durations[i] < durations[j] - }) - - var sum time.Duration - for _, d := range durations { - sum += d - } - - avg := sum / time.Duration(len(durations)) - min := durations[0] - max := durations[len(durations)-1] - - p50 := min - p90 := min - p99 := min - - if len(durations) >= 2 { - p50idx := len(durations) * 50 / 100 - p90idx := len(durations) * 90 / 100 - p99idx := minInt(len(durations)-1, len(durations)*99/100) - - p50 = durations[p50idx] - p90 = durations[p90idx] - p99 = durations[p99idx] - } - - fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", - backend, len(durations), - formatDuration(min), formatDuration(avg), formatDuration(max), - formatDuration(p50), formatDuration(p90), formatDuration(p99)) - } - - // Show User Latency for this method if available - if methodActualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(methodActualDurations) > 0 { - // Make a copy and sort - durations := make([]time.Duration, len(methodActualDurations)) - copy(durations, methodActualDurations) - - sort.Slice(durations, func(i, j int) bool { - return durations[i] < durations[j] - }) - - var sum time.Duration - for _, d := range durations { - sum += d - } - - avg := sum / time.Duration(len(durations)) - min := durations[0] - max := durations[len(durations)-1] - - p50 := min - p90 := min - p99 := min - - if len(durations) >= 2 { - p50idx := len(durations) * 50 / 100 - p90idx := len(durations) * 90 / 100 - p99idx := minInt(len(durations)-1, len(durations)*99/100) - - p50 = durations[p50idx] - p90 = durations[p90idx] - p99 = durations[p99idx] - } - - fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", - "User Latency", len(durations), - formatDuration(min), formatDuration(avg), formatDuration(max), - formatDuration(p50), formatDuration(p90), formatDuration(p99)) - } - - // Show Backend Time statistics for this method - if methodFirstDurations, exists := sc.methodFirstResponseDurations[method]; exists && len(methodFirstDurations) > 0 { - // Make a copy and sort - durations := make([]time.Duration, len(methodFirstDurations)) - copy(durations, methodFirstDurations) - - sort.Slice(durations, func(i, j int) bool { - return durations[i] < durations[j] - }) - - var sum time.Duration - for _, d := range durations { - sum += d - } - - avg := sum / time.Duration(len(durations)) - min := durations[0] - max := durations[len(durations)-1] - - p50 := min - p90 := min - p99 := min - - if len(durations) >= 2 { - p50idx := len(durations) * 50 / 100 - p90idx := len(durations) * 90 / 100 - p99idx := minInt(len(durations)-1, len(durations)*99/100) - - p50 = durations[p50idx] - p90 = durations[p90idx] - p99 = durations[p99idx] - } - - fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", - "Backend Time", len(durations), - formatDuration(min), formatDuration(avg), formatDuration(max), - formatDuration(p50), formatDuration(p90), formatDuration(p99)) - fmt.Printf(" %s\n", strings.Repeat("-", 98)) - } - } - } - - fmt.Printf("================================\n\n") - - // Store current interval's CU data in historical data before resetting - if sc.totalCU > 0 { - sc.historicalCU = append(sc.historicalCU, CUDataPoint{ - Timestamp: time.Now(), // Store the end time of the interval - CU: sc.totalCU, - }) - } - - // Clean up old historical data (keep only last 24 hours + some buffer) - cutoff := time.Now().Add(-25 * time.Hour) - newStart := 0 - for i, point := range sc.historicalCU { - if point.Timestamp.After(cutoff) { - newStart = i - break - } - } - if newStart > 0 { - sc.historicalCU = sc.historicalCU[newStart:] - } - - // Reset statistics for the next interval - // Keep only the last 1000 requests to prevent unlimited memory growth - if len(sc.requestStats) > 1000 { - sc.requestStats = sc.requestStats[len(sc.requestStats)-1000:] - } - - // Reset method-specific statistics - for method := range sc.methodStats { - sc.methodStats[method] = sc.methodStats[method][:0] - } - - // Reset backend method-specific statistics - for backend := range sc.backendMethodStats { - for method := range sc.backendMethodStats[backend] { - sc.backendMethodStats[backend][method] = sc.backendMethodStats[backend][method][:0] - } - } - - // Reset backend wins statistics - sc.backendWins = make(map[string]int) - sc.methodBackendWins = make(map[string]map[string]int) - - // Reset first response statistics - if len(sc.firstResponseDurations) > 1000 { - sc.firstResponseDurations = sc.firstResponseDurations[len(sc.firstResponseDurations)-1000:] - } else { - sc.firstResponseDurations = sc.firstResponseDurations[:0] - } - if len(sc.actualFirstResponseDurations) > 1000 { - sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[len(sc.actualFirstResponseDurations)-1000:] - } else { - sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[:0] - } - for method := range sc.methodFirstResponseDurations { - sc.methodFirstResponseDurations[method] = sc.methodFirstResponseDurations[method][:0] - } - for method := range sc.methodActualFirstResponseDurations { - sc.methodActualFirstResponseDurations[method] = sc.methodActualFirstResponseDurations[method][:0] - } - - // Reset CU counters for the next interval - sc.totalCU = 0 - sc.methodCU = make(map[string]int) - - // Reset error count for the next interval - sc.errorCount = 0 - sc.skippedSecondaryRequests = 0 - sc.totalSecondarySkips = 0 - sc.totalRequests = 0 - sc.totalWsConnections = 0 - - // Reset WebSocket connections to prevent memory leak - sc.wsConnections = sc.wsConnections[:0] - - // Reset the interval start time for the next interval - sc.intervalStartTime = time.Now() -} - -// Helper function to avoid potential index out of bounds -func minInt(a, b int) int { - if a < b { - return a - } - return b -} - -// calculateCUForTimeWindow calculates total CU for a given time window -func (sc *StatsCollector) calculateCUForTimeWindow(window time.Duration) (int, bool) { - now := time.Now() - cutoff := now.Add(-window) - - totalCU := 0 - var oldestDataStartTime time.Time - hasData := false - - // Add current interval's CU if it started within the window - if sc.intervalStartTime.After(cutoff) { - totalCU += sc.totalCU - oldestDataStartTime = sc.intervalStartTime - hasData = true - } - - // Add historical CU data within the window - // Historical timestamps represent the END of intervals - for i := len(sc.historicalCU) - 1; i >= 0; i-- { - point := sc.historicalCU[i] - // Calculate the start time of this historical interval - intervalStart := point.Timestamp.Add(-sc.summaryInterval) - - // Skip if the interval ended before our cutoff - if point.Timestamp.Before(cutoff) { - break - } - - // Include this interval's CU - totalCU += point.CU - - // Track the oldest data start time - if !hasData || intervalStart.Before(oldestDataStartTime) { - oldestDataStartTime = intervalStart - } - hasData = true - } - - // Calculate actual data span - var actualDataDuration time.Duration - if hasData { - actualDataDuration = now.Sub(oldestDataStartTime) - } - - // We need extrapolation if we don't have enough historical data to cover the window - needsExtrapolation := hasData && actualDataDuration < window - - return totalCU, needsExtrapolation -} - -// extrapolateCU extrapolates CU data when there's insufficient historical data -func (sc *StatsCollector) extrapolateCU(actualCU int, actualDuration, targetDuration time.Duration) int { - if actualDuration <= 0 { - return 0 - } - - // Calculate CU per second rate - cuPerSecond := float64(actualCU) / actualDuration.Seconds() - - // Extrapolate to target duration - extrapolatedCU := cuPerSecond * targetDuration.Seconds() - - return int(extrapolatedCU) -} - -// formatCUWithExtrapolation formats CU value with extrapolation indicator -func formatCUWithExtrapolation(cu int, isExtrapolated bool) string { - if isExtrapolated { - return fmt.Sprintf("%d CU (extrapolated)", cu) - } - return fmt.Sprintf("%d CU", cu) -} - -// GetPrimaryP50 calculates the current p50 latency for the primary backend -func (sc *StatsCollector) GetPrimaryP50() time.Duration { - sc.mu.Lock() - defer sc.mu.Unlock() - - // Collect primary backend durations - var primaryDurations []time.Duration - for _, stat := range sc.requestStats { - if stat.Backend == "primary" && stat.Error == nil { - primaryDurations = append(primaryDurations, stat.Duration) - } - } - - // If we don't have enough data, return a sensible default - if len(primaryDurations) < 10 { - return 10 * time.Millisecond // Default to 10ms - } - - // Sort and find p50 - sort.Slice(primaryDurations, func(i, j int) bool { - return primaryDurations[i] < primaryDurations[j] - }) - - p50idx := len(primaryDurations) * 50 / 100 - return primaryDurations[p50idx] -} - -// GetPrimaryP75ForMethod calculates the current p75 latency for a specific method on the primary backend -func (sc *StatsCollector) GetPrimaryP75ForMethod(method string) time.Duration { - sc.mu.Lock() - defer sc.mu.Unlock() - - // Get method-specific durations for primary backend - if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 { - // Make a copy to avoid modifying the original - durationsCopy := make([]time.Duration, len(durations)) - copy(durationsCopy, durations) - - // Sort and find p75 - sort.Slice(durationsCopy, func(i, j int) bool { - return durationsCopy[i] < durationsCopy[j] - }) - - p75idx := len(durationsCopy) * 75 / 100 - if p75idx >= len(durationsCopy) { - p75idx = len(durationsCopy) - 1 - } - return durationsCopy[p75idx] - } - - // If we don't have enough method-specific data, calculate global p75 here - // (instead of calling GetPrimaryP50 which would cause nested mutex lock) - var primaryDurations []time.Duration - for _, stat := range sc.requestStats { - if stat.Backend == "primary" && stat.Error == nil { - primaryDurations = append(primaryDurations, stat.Duration) - } - } - - // If we don't have enough data, return a sensible default - if len(primaryDurations) < 10 { - return 15 * time.Millisecond // Default to 15ms for p75 - } - - // Sort and find p75 - sort.Slice(primaryDurations, func(i, j int) bool { - return primaryDurations[i] < primaryDurations[j] - }) - - p75idx := len(primaryDurations) * 75 / 100 - if p75idx >= len(primaryDurations) { - p75idx = len(primaryDurations) - 1 - } - return primaryDurations[p75idx] -} - -// GetPrimaryP50ForMethod calculates the current p50 latency for a specific method on the primary backend -func (sc *StatsCollector) GetPrimaryP50ForMethod(method string) time.Duration { - sc.mu.Lock() - defer sc.mu.Unlock() - - // Get method-specific durations for primary backend - if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 { - // Make a copy to avoid modifying the original - durationsCopy := make([]time.Duration, len(durations)) - copy(durationsCopy, durations) - - // Sort and find p50 - sort.Slice(durationsCopy, func(i, j int) bool { - return durationsCopy[i] < durationsCopy[j] - }) - - p50idx := len(durationsCopy) * 50 / 100 - return durationsCopy[p50idx] - } - - // If we don't have enough method-specific data, calculate global p50 here - // (instead of calling GetPrimaryP50 which would cause nested mutex lock) - var primaryDurations []time.Duration - for _, stat := range sc.requestStats { - if stat.Backend == "primary" && stat.Error == nil { - primaryDurations = append(primaryDurations, stat.Duration) - } - } - - // If we don't have enough data, return a sensible default - if len(primaryDurations) < 10 { - return 10 * time.Millisecond // Default to 10ms - } - - // Sort and find p50 - sort.Slice(primaryDurations, func(i, j int) bool { - return primaryDurations[i] < primaryDurations[j] - }) - - p50idx := len(primaryDurations) * 50 / 100 - return primaryDurations[p50idx] -} - -// isStatefulMethod returns true if the method requires session state and must always go to primary -func isStatefulMethod(method string) bool { - statefulMethods := map[string]bool{ - // Filter methods - these create server-side state - "eth_newFilter": true, - "eth_newBlockFilter": true, - "eth_newPendingTransactionFilter": true, - "eth_getFilterChanges": true, - "eth_getFilterLogs": true, - "eth_uninstallFilter": true, - - // Subscription methods (WebSocket) - maintain persistent connections - "eth_subscribe": true, - "eth_unsubscribe": true, - "eth_subscription": true, // Notification method - - // Some debug/trace methods might maintain state depending on implementation - // But these are typically stateless, so not included here - } - - return statefulMethods[method] -} - -// requiresPrimaryOnlyMethod returns true if the method should always go to primary -func requiresPrimaryOnlyMethod(method string) bool { - primaryOnlyMethods := map[string]bool{ - // Transaction sending methods - must go to primary - "eth_sendRawTransaction": true, - "eth_sendTransaction": true, - - // Mempool/txpool methods - these show pending transactions - "txpool_content": true, - "txpool_inspect": true, - "txpool_status": true, - "txpool_contentFrom": true, - - // Mining related methods - "eth_mining": true, - "eth_hashrate": true, - "eth_getWork": true, - "eth_submitWork": true, - "eth_submitHashrate": true, - - // Complex methods with special block handling - "eth_callMany": true, // Has complex block handling with multiple calls at different blocks - "eth_simulateV1": true, // Simulation should use primary for consistency - - // Trace methods that depend on transaction data - "trace_transaction": true, // Traces already mined transaction by hash - "trace_replayTransaction": true, // Replays transaction execution by hash - "trace_rawTransaction": true, // Simulates raw transaction data - - // Debug methods that should use primary - "debug_traceTransaction": true, // Debug version of trace by tx hash - "debug_storageRangeAt": true, // Accesses internal storage state - - // Transaction query methods - must go to primary - // These can return null if tx is not found, and secondary might not have recent txs - "eth_getTransactionByHash": true, - "eth_getTransactionReceipt": true, - } - - return primaryOnlyMethods[method] -} - -// methodMightReturnNull returns true if the method might legitimately return null -// and we should wait for primary's response instead of returning secondary's null -func methodMightReturnNull(method string) bool { - nullableMethods := map[string]bool{ - "eth_getTransactionReceipt": true, - "eth_getTransactionByHash": true, - "eth_getTransactionByBlockHashAndIndex": true, - "eth_getTransactionByBlockNumberAndIndex": true, - "eth_getBlockByHash": true, - "eth_getBlockByNumber": true, - "eth_getUncleByBlockHashAndIndex": true, - "eth_getUncleByBlockNumberAndIndex": true, - } - - return nullableMethods[method] -} - -// isNullResponse checks if a JSON-RPC response has a null result -func isNullResponse(respBody []byte) bool { - // Simple structure to check the result field - var response struct { - Result json.RawMessage `json:"result"` - } - - if err := json.Unmarshal(respBody, &response); err != nil { - return false - } - - // Check if result is null - return string(response.Result) == "null" -} - -// methodShouldWaitOnSecondaryError returns true if we should wait for primary -// when secondary returns an error response -func methodShouldWaitOnSecondaryError(method string) bool { - // Methods where secondary errors might be transient and primary could succeed - waitOnErrorMethods := map[string]bool{ - "eth_call": true, // State execution - secondary might be behind or have issues - "eth_estimateGas": true, // Similar to eth_call - "trace_call": true, // Tracing calls - "debug_traceCall": true, // Debug tracing - "eth_createAccessList": true, // Access list creation - } - - return waitOnErrorMethods[method] -} - -// flushingResponseWriter wraps http.ResponseWriter to flush after every write -type flushingResponseWriter struct { - http.ResponseWriter - flusher http.Flusher -} - -func (f *flushingResponseWriter) Write(p []byte) (n int, err error) { - n, err = f.ResponseWriter.Write(p) - if err == nil && n > 0 { - f.flusher.Flush() - } - return -} - -// peekingReader wraps a reader to peek at the first N bytes and detect null or error responses -type peekingReader struct { - r io.Reader - buf []byte - bufPos int - isNull bool - hasError bool - checkDone bool - peekSize int -} - -func newPeekingReader(r io.Reader, peekSize int) *peekingReader { - return &peekingReader{ - r: r, - buf: make([]byte, 0, peekSize), - peekSize: peekSize, - } -} - -func (pr *peekingReader) Read(p []byte) (n int, err error) { - // If we haven't finished checking yet - if !pr.checkDone && len(pr.buf) < pr.peekSize { - // Read more data into our buffer - tempBuf := make([]byte, pr.peekSize-len(pr.buf)) - readN, readErr := pr.r.Read(tempBuf) - if readN > 0 { - pr.buf = append(pr.buf, tempBuf[:readN]...) - } - - // Check if we have enough data or hit EOF - if len(pr.buf) >= pr.peekSize || readErr == io.EOF { - pr.checkDone = true - // Check for null response pattern - pr.detectPatterns() - } - - if readErr != nil && readErr != io.EOF { - return 0, readErr - } - } - - // Serve data from buffer first - if pr.bufPos < len(pr.buf) { - n = copy(p, pr.buf[pr.bufPos:]) - pr.bufPos += n - return n, nil - } - - // Then serve from underlying reader - return pr.r.Read(p) -} - -func (pr *peekingReader) detectPatterns() { - // Look for patterns indicating null result or errors in JSON-RPC response - bufStr := string(pr.buf) - - // Remove whitespace for easier pattern matching - compactStr := strings.ReplaceAll(bufStr, " ", "") - compactStr = strings.ReplaceAll(compactStr, "\n", "") - compactStr = strings.ReplaceAll(compactStr, "\r", "") - compactStr = strings.ReplaceAll(compactStr, "\t", "") - - // Check for null result - pr.isNull = strings.Contains(compactStr, `"result":null`) - - // Check for JSON-RPC errors - pr.hasError = strings.Contains(compactStr, `"error":`) && !strings.Contains(compactStr, `"error":null`) - - // Log what we found for debugging - if pr.isNull || pr.hasError { - log.Printf("Detected short response pattern - null: %v, error: %v, buffer preview: %s", - pr.isNull, pr.hasError, strings.ReplaceAll(bufStr[:minInt(len(bufStr), 80)], "\n", " ")) - } -} - -func (pr *peekingReader) IsNull() bool { - return pr.isNull -} - -func (pr *peekingReader) HasError() bool { - return pr.hasError -} - -func main() { - // Get configuration from environment variables - listenAddr := getEnv("LISTEN_ADDR", ":8080") - primaryBackend := getEnv("PRIMARY_BACKEND", "http://localhost:8545") - secondaryBackendsStr := getEnv("SECONDARY_BACKENDS", "") - summaryIntervalStr := getEnv("SUMMARY_INTERVAL", "60") // Default 60 seconds - enableDetailedLogs := getEnv("ENABLE_DETAILED_LOGS", "false") // Default to disabled - - // Secondary probe configuration - enableSecondaryProbing := getEnv("ENABLE_SECONDARY_PROBING", "true") == "true" - probeIntervalStr := getEnv("PROBE_INTERVAL", "10") // Default 10 seconds - minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer - probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") - - // Method routing configuration - secondaryWhitelistStr := getEnv("SECONDARY_WHITELIST", "") // Methods allowed on secondary - preferSecondaryStr := getEnv("PREFER_SECONDARY", "") // Methods that should prefer secondary - - summaryInterval, err := strconv.Atoi(summaryIntervalStr) - if err != nil { - log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") - summaryInterval = 60 - } - - probeInterval, err := strconv.Atoi(probeIntervalStr) - if err != nil { - log.Printf("Invalid PROBE_INTERVAL, using default of 10 seconds") - probeInterval = 10 - } - - minDelayBuffer, err := strconv.Atoi(minDelayBufferStr) - if err != nil { - log.Printf("Invalid MIN_DELAY_BUFFER, using default of 2ms") - minDelayBuffer = 2 - } - - // Parse method routing configuration - methodRouting := &MethodRouting{ - SecondaryWhitelist: make(map[string]bool), - PreferSecondary: make(map[string]bool), - } - - // Parse whitelist - if secondaryWhitelistStr != "" { - whitelist := strings.Split(secondaryWhitelistStr, ",") - for _, method := range whitelist { - methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true - } - log.Printf("Secondary whitelist: %v", whitelist) - } - - // Parse prefer secondary list - if preferSecondaryStr != "" { - preferList := strings.Split(preferSecondaryStr, ",") - for _, method := range preferList { - methodRouting.PreferSecondary[strings.TrimSpace(method)] = true - // Also add to whitelist automatically - methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true - } - log.Printf("Prefer secondary for methods: %v", preferList) - } - - // Create stats collector for periodic summaries - statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") - - // Configure backends - var backends []Backend - backends = append(backends, Backend{ - URL: primaryBackend, - Name: "primary", - Role: "primary", - }) - - if secondaryBackendsStr != "" { - secondaryList := strings.Split(secondaryBackendsStr, ",") - for i, url := range secondaryList { - backends = append(backends, Backend{ - URL: strings.TrimSpace(url), - Name: fmt.Sprintf("secondary-%d", i+1), - Role: "secondary", - }) - } - } - - log.Printf("Starting benchmark proxy on %s", listenAddr) - log.Printf("Primary backend: %s", primaryBackend) - log.Printf("Secondary backends: %s", secondaryBackendsStr) - if enableSecondaryProbing && secondaryBackendsStr != "" { - log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer) - } - - // Set up HTTP client with reasonable timeouts - client := &http.Client{ - Timeout: 30 * time.Second, - Transport: &http.Transport{ - MaxIdleConns: 200, // Increased for better connection pooling - MaxIdleConnsPerHost: 50, // Increased per-host limit for probing - IdleConnTimeout: 120 * time.Second, // Longer idle timeout for connection reuse - DisableCompression: true, // Typically JSON-RPC doesn't benefit from compression - DisableKeepAlives: false, // Ensure keep-alives are enabled - MaxConnsPerHost: 50, // Limit concurrent connections per host - }, - } - - // Initialize secondary probe if enabled and we have secondary backends - var secondaryProbe *SecondaryProbe - if enableSecondaryProbing && secondaryBackendsStr != "" { - probeMethods := strings.Split(probeMethodsStr, ",") - for i := range probeMethods { - probeMethods[i] = strings.TrimSpace(probeMethods[i]) - } - - secondaryProbe = NewSecondaryProbe( - backends, - client, - time.Duration(probeInterval)*time.Second, - time.Duration(minDelayBuffer)*time.Millisecond, - probeMethods, - enableDetailedLogs == "true", - ) - - if secondaryProbe == nil { - log.Printf("Secondary probe initialization failed - no secondary backends found") - } else { - // Set the probe in stats collector for display - statsCollector.SetSecondaryProbe(secondaryProbe) - } - } - - // Initialize chain head monitor - var chainHeadMonitor *ChainHeadMonitor - if len(backends) > 1 { // Only create if we have more than just primary - chainHeadMonitor = NewChainHeadMonitor(backends, enableDetailedLogs == "true") - log.Printf("Chain head monitoring: enabled") - - // Set the monitor in stats collector for display - statsCollector.SetChainHeadMonitor(chainHeadMonitor) - } - - // Configure websocket upgrader with larger buffer sizes - // 20MB frame size and 50MB message size - const ( - maxFrameSize = 20 * 1024 * 1024 // 20MB - maxMessageSize = 50 * 1024 * 1024 // 50MB - ) - - upgrader := websocket.Upgrader{ - ReadBufferSize: maxFrameSize, - WriteBufferSize: maxFrameSize, - // Allow all origins - CheckOrigin: func(r *http.Request) bool { return true }, - } - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - // Check if this is a WebSocket upgrade request - if websocket.IsWebSocketUpgrade(r) { - handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) - } else { - // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor, methodRouting) - } - }) - - log.Fatal(http.ListenAndServe(listenAddr, nil)) -} - -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor, methodRouting *MethodRouting) { - startTime := time.Now() - - // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) - ctx, cancel := context.WithTimeout(r.Context(), 35*time.Second) - defer cancel() - - // Limit request body size to 10MB to prevent memory exhaustion - const maxBodySize = 10 * 1024 * 1024 // 10MB - r.Body = http.MaxBytesReader(w, r.Body, maxBodySize) - - // Read the entire request body - body, err := io.ReadAll(r.Body) - if err != nil { - // Check if the error is due to body size limit - if strings.Contains(err.Error(), "request body too large") { - http.Error(w, "Request body too large (max 10MB)", http.StatusRequestEntityTooLarge) - } else { - http.Error(w, "Error reading request body", http.StatusBadRequest) - } - return - } - defer r.Body.Close() - - // Parse request to extract method information (handles both single and batch) - batchInfo, err := parseBatchInfo(body) - if err != nil { - http.Error(w, "Invalid JSON-RPC request", http.StatusBadRequest) - return - } - - // For logging and stats, use the first method or "batch" for batch requests - var displayMethod string - var isStateful bool - var requiresPrimaryDueToBlockTag bool - - if batchInfo.IsBatch { - displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount) - isStateful = batchInfo.HasStateful - requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary - } else { - displayMethod = batchInfo.Methods[0] - if displayMethod == "" { - displayMethod = "unknown" - } - isStateful = batchInfo.HasStateful - requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary - } - - if enableDetailedLogs { - if batchInfo.IsBatch { - log.Printf("Batch request: %d requests, methods: %s, block tags: %v", - batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.BlockTags) - } else { - var blockTagInfo string - if len(batchInfo.BlockTags) > 0 { - blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0]) - } - log.Printf("Method: %s%s", displayMethod, blockTagInfo) - } - } - - // Check method routing configuration - if !batchInfo.IsBatch && methodRouting != nil { - // For single methods, check routing rules - method := displayMethod - - // Check if this method prefers secondary backends - if methodRouting.PreferSecondary[method] { - if enableDetailedLogs { - log.Printf("Method %s configured to prefer secondary backends", method) - } - } - - // Check if whitelist is configured and method is not in it - if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { - // Method not in whitelist - force primary only - if enableDetailedLogs { - log.Printf("Method %s not in secondary whitelist - using primary only", method) - } - // This will be enforced in the backend loop - } - } - - // Process backends with adaptive delay strategy - var wg sync.WaitGroup - var primaryWg sync.WaitGroup // Separate wait group for primary backend - statsChan := make(chan ResponseStats, len(backends)) - responseChan := make(chan struct { - backend string - resp *http.Response - err error - }, len(backends)) - - // Track if we've already sent a response - var responseHandled atomic.Bool - var firstBackendStartTime atomic.Pointer[time.Time] - - // Count secondary backends for proper channel sizing - secondaryCount := 0 - for _, backend := range backends { - if backend.Role == "secondary" { - secondaryCount++ - } - } - - // Buffer size = number of secondary backends so all can receive signal - primaryResponseChan := make(chan struct{}, secondaryCount) // Signal when primary gets a response - primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately - - for _, backend := range backends { - // Method routing checks for secondary backends - if backend.Role == "secondary" && methodRouting != nil && !batchInfo.IsBatch { - method := displayMethod - - // Check if whitelist is configured and method is not in it - if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { - if enableDetailedLogs { - log.Printf("Skipping secondary backend %s for method %s (not in whitelist)", backend.Name, method) - } - continue - } - } - - // Skip primary backend if method prefers secondary (and we have secondary backends available) - if backend.Role == "primary" && methodRouting != nil && !batchInfo.IsBatch { - method := displayMethod - if methodRouting.PreferSecondary[method] && len(backends) > 1 { - if enableDetailedLogs { - log.Printf("Skipping primary backend for method %s (configured to prefer secondary)", method) - } - continue - } - } - - // Skip secondary backends for stateful methods - if isStateful && backend.Role != "primary" { - if enableDetailedLogs { - log.Printf("Skipping secondary backend %s for stateful method %s", backend.Name, displayMethod) - } - continue - } - - // Skip secondary backends if request requires primary due to block tag - if requiresPrimaryDueToBlockTag && backend.Role != "primary" { - if enableDetailedLogs { - log.Printf("Skipping secondary backend %s due to block tag requiring primary", backend.Name) - } - continue - } - - // Check if secondary backend can handle "latest" block tag requests - if backend.Role == "secondary" && len(batchInfo.BlockTags) > 0 { - // Check all block tags in the request - canUseSecondary := true - for _, blockTag := range batchInfo.BlockTags { - if !canUseSecondaryForBlockTag(blockTag, backend.Name, chainHeadMonitor) { - canUseSecondary = false - if enableDetailedLogs { - log.Printf("Skipping secondary backend %s for block tag '%s' - not at required height", - backend.Name, blockTag) - } - break - } - } - if !canUseSecondary { - continue - } - } - - // Skip unhealthy secondary backends - if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) { - if enableDetailedLogs { - log.Printf("Skipping unhealthy secondary backend %s for %s", backend.Name, displayMethod) - } - continue - } - - wg.Add(1) - if backend.Role == "primary" { - primaryWg.Add(1) - } - - go func(b Backend) { - defer wg.Done() - if b.Role == "primary" { - defer primaryWg.Done() - } - - // Track when this goroutine actually starts processing - goroutineStartTime := time.Now() - - // Record the first backend start time (should be primary) - if b.Role == "primary" { - t := goroutineStartTime - firstBackendStartTime.Store(&t) - } - - // If this is a secondary backend, wait for p75 delay - if b.Role != "primary" { - // Skip delay if this method prefers secondary backends - skipDelay := false - if methodRouting != nil && !batchInfo.IsBatch { - if methodRouting.PreferSecondary[displayMethod] { - skipDelay = true - if enableDetailedLogs { - log.Printf("Secondary backend %s starting immediately for method %s (configured to prefer secondary)", b.Name, displayMethod) - } - } - } - - if !skipDelay { - // Get backend-specific delay - var backendSpecificDelay time.Duration - if batchInfo.IsBatch { - // For batch requests, use the maximum delay of all methods for this backend - backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector) - } else if secondaryProbe != nil { - backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod) - } else { - // Fallback to method-based delay if no probe - backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod) - } - - if enableDetailedLogs { - log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod) - } - - delayTimer := time.NewTimer(backendSpecificDelay) - select { - case <-delayTimer.C: - // Timer expired, primary is slow, proceed with secondary request - case <-primaryResponseChan: - // Primary already got a response, skip secondary - delayTimer.Stop() - - // Still record that we skipped this backend - statsChan <- ResponseStats{ - Backend: b.Name, - Error: fmt.Errorf("skipped - primary responded quickly"), - Method: displayMethod, - Duration: time.Since(goroutineStartTime), - } - return - case <-primaryFailedFast: - // Primary failed immediately, start secondary now - delayTimer.Stop() - if enableDetailedLogs { - log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) - } - } - } - } - - // Create a new request (no longer using context for cancellation) - backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body)) - if err != nil { - statsChan <- ResponseStats{ - Backend: b.Name, - Error: err, - Method: displayMethod, - Duration: time.Since(goroutineStartTime), // Include any wait time - } - return - } - - // Copy headers - for name, values := range r.Header { - for _, value := range values { - backendReq.Header.Add(name, value) - } - } - - // Send the request - reqStart := time.Now() - resp, err := client.Do(backendReq) - reqDuration := time.Since(reqStart) - - if err != nil { - // If primary failed with connection error, signal secondary to start - if b.Role == "primary" { - select { - case primaryFailedFast <- struct{}{}: - default: - } - if enableDetailedLogs { - log.Printf("Primary backend failed with connection error for %s: %v", displayMethod, err) - } - } - - statsChan <- ResponseStats{ - Backend: b.Name, - Duration: reqDuration, // Keep backend-specific duration - Error: err, - Method: displayMethod, - } - return - } - - // Don't close resp.Body here - it will be closed by the winner or drained by losers - - // Check if primary returned an error status - if b.Role == "primary" && resp.StatusCode >= 400 { - // Any 4xx or 5xx error should trigger immediate secondary - select { - case primaryFailedFast <- struct{}{}: - default: - } - if enableDetailedLogs { - log.Printf("Primary backend returned HTTP error %d for %s", resp.StatusCode, displayMethod) - } - } - - // Signal primary response immediately for secondary backends to check - if b.Role == "primary" && resp.StatusCode < 400 { - // Send signal for each secondary backend so they all get notified - for i := 0; i < secondaryCount; i++ { - select { - case primaryResponseChan <- struct{}{}: - default: - // Channel is full, some secondaries already received signal - } - } - } - - statsChan <- ResponseStats{ - Backend: b.Name, - StatusCode: resp.StatusCode, - Duration: reqDuration, // This is the backend-specific duration - Method: displayMethod, - } - - // Primary responses should ALWAYS be sent to the user (except HTTP errors >= 400) - // JSON-RPC errors from primary are valid responses that must be propagated - // The following checks only apply to secondary backends - - // CRITICAL FIX: Only allow secondary backends to win if they have successful responses - if b.Role == "secondary" && resp.StatusCode >= 400 { - // Secondary returned an error - DO NOT let it win the race - if enableDetailedLogs { - log.Printf("Secondary backend %s returned error status %d for %s - ignoring", - b.Name, resp.StatusCode, displayMethod) - } - - // Still need to drain and close the body - go func() { - defer resp.Body.Close() - io.Copy(io.Discard, resp.Body) - }() - return - } - - // CRITICAL FIX 2: Check for null responses from secondary backends for certain methods - if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) { - // Check Content-Length first - null responses are typically very short - contentLength := resp.Header.Get("Content-Length") - if contentLength != "" { - length, err := strconv.Atoi(contentLength) - if err == nil && length < 100 { // Null responses are typically < 100 bytes - // This is suspiciously short, likely a null response - // Create a peeking reader to confirm - peeker := newPeekingReader(resp.Body, 100) - resp.Body = io.NopCloser(peeker) - - // Read a bit to trigger null detection - smallBuf := make([]byte, 1) - peeker.Read(smallBuf) - - if peeker.IsNull() { - // Confirmed null response - don't let secondary win - if enableDetailedLogs { - log.Printf("Secondary backend %s returned null for %s (Content-Length: %s) - waiting for primary", - b.Name, displayMethod, contentLength) - } - // Close the response body - resp.Body.Close() - return - } - // Not null, continue with normal flow - } - } else { - // No Content-Length header, use peeking reader anyway for safety - peeker := newPeekingReader(resp.Body, 200) // Peek at first 200 bytes - resp.Body = io.NopCloser(peeker) - - // Read a bit to trigger null detection - smallBuf := make([]byte, 1) - peeker.Read(smallBuf) - - if peeker.IsNull() { - // Confirmed null response - don't let secondary win - if enableDetailedLogs { - log.Printf("Secondary backend %s returned null for %s - waiting for primary", - b.Name, displayMethod) - } - // Close the response body - resp.Body.Close() - return - } - } - } - - // CRITICAL FIX 3: Check for error responses from secondary backends for certain methods - if b.Role == "secondary" && resp.StatusCode == 200 && methodShouldWaitOnSecondaryError(displayMethod) { - // Check Content-Length first - error responses are typically short - contentLength := resp.Header.Get("Content-Length") - if contentLength != "" { - length, err := strconv.Atoi(contentLength) - if err == nil && length < 500 { // Error responses are typically < 500 bytes - // This might be an error response - // Create a peeking reader to confirm - peeker := newPeekingReader(resp.Body, 500) - resp.Body = io.NopCloser(peeker) - - // Read a bit to trigger error detection - smallBuf := make([]byte, 1) - peeker.Read(smallBuf) - - if peeker.HasError() { - // Confirmed error response - don't let secondary win - if enableDetailedLogs { - log.Printf("Secondary backend %s returned JSON-RPC error for %s (Content-Length: %s) - waiting for primary", - b.Name, displayMethod, contentLength) - } - // Close the response body - resp.Body.Close() - return - } - // Not an error, continue with normal flow - } - } else { - // No Content-Length header, use peeking reader anyway for safety - peeker := newPeekingReader(resp.Body, 500) // Peek at first 500 bytes - resp.Body = io.NopCloser(peeker) - - // Read a bit to trigger error detection - smallBuf := make([]byte, 1) - peeker.Read(smallBuf) - - if peeker.HasError() { - // Confirmed error response - don't let secondary win - if enableDetailedLogs { - log.Printf("Secondary backend %s returned JSON-RPC error for %s - waiting for primary", - b.Name, displayMethod) - } - // Close the response body - resp.Body.Close() - return - } - } - } - - // Try to be the first to respond - if responseHandled.CompareAndSwap(false, true) { - if enableDetailedLogs { - log.Printf("Backend %s won the race for method %s with status %d", b.Name, displayMethod, resp.StatusCode) - } - responseChan <- struct { - backend string - resp *http.Response - err error - }{b.Name, resp, nil} - } else { - // Not the winning response, need to drain and close the body - // Use a goroutine with timeout to prevent hanging - go func() { - defer resp.Body.Close() - // Create a deadline for draining - done := make(chan struct{}) - go func() { - io.Copy(io.Discard, resp.Body) - close(done) - }() - - select { - case <-done: - // Drained successfully - case <-time.After(5 * time.Second): - // Timeout draining, just close - if enableDetailedLogs { - log.Printf("Timeout draining response from backend %s", b.Name) - } - } - }() - } - }(backend) - } - - // Wait for the first successful response - var response struct { - backend string - resp *http.Response - err error - } - var responseReceivedTime time.Time - - select { - case response = <-responseChan: - // Got a response - responseReceivedTime = time.Now() - case <-time.After(30 * time.Second): - // Timeout - if !responseHandled.CompareAndSwap(false, true) { - // Someone else handled it - response = <-responseChan - responseReceivedTime = time.Now() - } else { - http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout) - // Always wait for primary backend to complete before collecting stats - go func() { - primaryWg.Wait() // Wait for primary first - wg.Wait() // Then wait for all - close(statsChan) - }() - // Collect stats - var stats []ResponseStats - for stat := range statsChan { - stats = append(stats, stat) - } - return - } - } - - // Send the response to the client - if response.err == nil && response.resp != nil { - defer response.resp.Body.Close() - - // Copy response headers - for name, values := range response.resp.Header { - for _, value := range values { - w.Header().Add(name, value) - } - } - - // Ensure we flush data as it arrives for better streaming - flusher, canFlush := w.(http.Flusher) - - w.WriteHeader(response.resp.StatusCode) - - // Track when streaming started - streamingStartTime := time.Now() - - // Stream the response body to the client with proper error handling - done := make(chan error, 1) - go func() { - // Use a custom writer that flushes periodically - var err error - if canFlush { - // Flush every 32KB for better streaming performance - flushingWriter := &flushingResponseWriter{ - ResponseWriter: w, - flusher: flusher, - } - _, err = io.Copy(flushingWriter, response.resp.Body) - } else { - _, err = io.Copy(w, response.resp.Body) - } - done <- err - }() - - select { - case streamErr := <-done: - if streamErr != nil { - if enableDetailedLogs { - log.Printf("Error streaming response body: %v", streamErr) - } - // Connection is likely broken, can't send error to client - } - case <-ctx.Done(): - // Context timeout - client connection might be gone - if enableDetailedLogs { - streamingDuration := time.Since(streamingStartTime) - totalRequestDuration := time.Since(startTime) - - // Determine if it was a timeout or cancellation - reason := "timeout" - if ctx.Err() == context.Canceled { - reason = "client disconnection" - } - - log.Printf("Context cancelled while streaming response from backend '%s' (method: %s) after streaming for %s (total request time: %s) - reason: %s", - response.backend, displayMethod, streamingDuration, totalRequestDuration, reason) - } - } - } else { - // No valid response received from any backend - http.Error(w, "All backends failed", http.StatusBadGateway) - } - - // Collect stats asynchronously to avoid blocking the response - go func() { - // Always wait for primary backend to complete before collecting stats - // This ensures primary backend stats are always included - primaryWg.Wait() // Wait for primary backend to complete first - wg.Wait() // Then wait for all other backends - close(statsChan) - - // Collect stats - var stats []ResponseStats - for stat := range statsChan { - stats = append(stats, stat) - } - - // Log response times if enabled - totalDuration := time.Since(startTime) - if enableDetailedLogs { - logResponseStats(totalDuration, stats) - } - - // Add the actual user-experienced duration for the winning response - if response.err == nil && response.backend != "" { - // Find the stat for the winning backend and update it with the actual user-experienced duration - for i := range stats { - if stats[i].Backend == response.backend && stats[i].Error == nil { - // Calculate user latency from when the first backend started processing - var userLatency time.Duration - if firstStart := firstBackendStartTime.Load(); firstStart != nil && !responseReceivedTime.IsZero() { - userLatency = responseReceivedTime.Sub(*firstStart) - } else { - // Fallback to original calculation if somehow we don't have the times - userLatency = time.Since(startTime) - } - - // Create a special stat entry for the actual first response time - actualFirstResponseStat := ResponseStats{ - Backend: "actual-first-response", - StatusCode: stats[i].StatusCode, - Duration: userLatency, - Error: nil, - Method: stats[i].Method, - } - stats = append(stats, actualFirstResponseStat) - break - } - } - } - - // Send stats to collector - statsCollector.AddStats(stats, 0) - - // If this was a successful batch request from primary, add batch stats for CU calculation - if batchInfo.IsBatch && response.err == nil && response.backend == "primary" { - // Find the primary backend stat with successful response - for _, stat := range stats { - if stat.Backend == "primary" && stat.Error == nil { - statsCollector.AddBatchStats(batchInfo.Methods, stat.Duration, "primary") - break - } - } - } - }() - - // Return immediately after sending response to client - return -} - -func logResponseStats(totalDuration time.Duration, stats []ResponseStats) { - // Format: timestamp | total_time | method | backend1:time1 | backend2:time2 | ... - var parts []string - parts = append(parts, time.Now().Format("2006-01-02 15:04:05.000")) - parts = append(parts, fmt.Sprintf("total:%s", totalDuration)) - - // Add method if available (use the first stat with a method) - method := "unknown" - for _, stat := range stats { - if stat.Method != "" { - method = stat.Method - break - } - } - parts = append(parts, fmt.Sprintf("method:%s", method)) - - for _, stat := range stats { - if stat.Error != nil { - parts = append(parts, fmt.Sprintf("%s:error:%s", stat.Backend, stat.Error)) - } else { - parts = append(parts, fmt.Sprintf("%s:%d:%s", stat.Backend, stat.StatusCode, stat.Duration)) - } - } - - fmt.Println(strings.Join(parts, " | ")) -} - -// handleWebSocketRequest manages WebSocket proxying -func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []Backend, - httpClient *http.Client, upgrader *websocket.Upgrader, - statsCollector *StatsCollector) { - // Upgrade the client connection - clientConn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Printf("Failed to upgrade client connection: %v", err) - return - } - defer clientConn.Close() - - // Set max message size for client connection - clientConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit - - // Connect to all backends - var wg sync.WaitGroup - primaryConnected := make(chan bool, 1) // Track if primary connected successfully - - for _, backend := range backends { - wg.Add(1) - go func(b Backend) { - defer wg.Done() - - // Create backend URL with ws/wss instead of http/https - backendURL := strings.Replace(b.URL, "http://", "ws://", 1) - backendURL = strings.Replace(backendURL, "https://", "wss://", 1) - - // Create a clean header map for the dialer - header := http.Header{} - - // Only copy non-WebSocket headers - if origin := r.Header.Get("Origin"); origin != "" { - header.Set("Origin", origin) - } - if userAgent := r.Header.Get("User-Agent"); userAgent != "" { - header.Set("User-Agent", userAgent) - } - - startTime := time.Now() - // Connect to backend WebSocket with larger buffer sizes - dialer := &websocket.Dialer{ - ReadBufferSize: 20 * 1024 * 1024, // 20MB - WriteBufferSize: 20 * 1024 * 1024, // 20MB - } - backendConn, resp, err := dialer.Dial(backendURL, header) - connectDuration := time.Since(startTime) - - stats := WebSocketStats{ - Backend: b.Name, - ConnectTime: connectDuration, - IsActive: false, - } - - if err != nil { - status := 0 - if resp != nil { - status = resp.StatusCode - } - log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status) - stats.Error = err - statsCollector.AddWebSocketStats(stats) - - // If primary failed to connect, signal that - if b.Role == "primary" { - select { - case primaryConnected <- false: - default: - } - } - return - } - defer backendConn.Close() - - // Set max message size for backend connection - backendConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit - - stats.IsActive = true - statsCollector.AddWebSocketStats(stats) - - // If this is the primary backend, signal successful connection - if b.Role == "primary" { - select { - case primaryConnected <- true: - default: - } - } - - // If this is the primary backend, set up bidirectional proxying - if b.Role == "primary" { - // Channel to signal when primary connection fails - primaryFailed := make(chan struct{}, 2) // Buffered for 2 signals - - // Forward messages from client to primary backend - go func() { - for { - messageType, message, err := clientConn.ReadMessage() - if err != nil { - log.Printf("Error reading from client: %v", err) - select { - case primaryFailed <- struct{}{}: - default: - } - return - } - - err = backendConn.WriteMessage(messageType, message) - if err != nil { - log.Printf("Error writing to primary backend: %v", err) - select { - case primaryFailed <- struct{}{}: - default: - } - return - } - } - }() - - // Forward messages from primary backend to client - go func() { - for { - messageType, message, err := backendConn.ReadMessage() - if err != nil { - log.Printf("Error reading from primary backend: %v", err) - select { - case primaryFailed <- struct{}{}: - default: - } - return - } - - err = clientConn.WriteMessage(messageType, message) - if err != nil { - log.Printf("Error writing to client: %v", err) - select { - case primaryFailed <- struct{}{}: - default: - } - return - } - } - }() - - // Wait for primary connection failure - <-primaryFailed - - // Primary backend failed, close client connection with proper close message - log.Printf("Primary backend WebSocket failed, closing client connection") - closeMsg := websocket.FormatCloseMessage(websocket.CloseGoingAway, - "Primary backend unavailable") - clientConn.WriteControl(websocket.CloseMessage, closeMsg, - time.Now().Add(time.Second)) - - // Return to trigger cleanup - return - } else { - // For secondary backends, just read and discard messages - for { - _, _, err := backendConn.ReadMessage() - if err != nil { - log.Printf("Secondary backend %s connection closed: %v", b.Name, err) - return - } - } - } - }(backend) - } - - // Wait for all connections to terminate - wg.Wait() - - // Check if primary connected successfully - select { - case connected := <-primaryConnected: - if !connected { - // Primary failed to connect, close client connection - log.Printf("Primary backend WebSocket failed to connect, closing client connection") - closeMsg := websocket.FormatCloseMessage(websocket.CloseServiceRestart, - "Primary backend unavailable") - clientConn.WriteControl(websocket.CloseMessage, closeMsg, - time.Now().Add(time.Second)) - } - default: - // No primary backend in the configuration (shouldn't happen) - log.Printf("Warning: No primary backend configured for WebSocket") - } -} - -func getEnv(key, fallback string) string { - if value, exists := os.LookupEnv(key); exists { - return value - } - return fallback -} - -// NewChainHeadMonitor creates a new chain head monitor -func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHeadMonitor { - monitor := &ChainHeadMonitor{ - backends: backends, - chainHeads: make(map[string]*ChainHead), - enabledBackends: make(map[string]bool), - blockHashCache: make(map[string]uint64), - blockHashOrder: make([]string, 0, 128), - wsDialer: &websocket.Dialer{ - ReadBufferSize: 1024 * 1024, // 1MB - WriteBufferSize: 1024 * 1024, // 1MB - }, - stopChan: make(chan struct{}), - enableDetailedLogs: enableDetailedLogs, - } - - // Start monitoring - go monitor.startMonitoring() - - return monitor -} - -// IsBackendHealthy checks if a backend is healthy and at the correct chain head -func (m *ChainHeadMonitor) IsBackendHealthy(backendName string) bool { - m.mu.RLock() - defer m.mu.RUnlock() - - // Primary is always considered healthy for request routing - if backendName == "primary" { - return true - } - - enabled, exists := m.enabledBackends[backendName] - if !exists { - return false // Unknown backend - } - - return enabled -} - -// startMonitoring starts WebSocket monitoring for all backends -func (m *ChainHeadMonitor) startMonitoring() { - // Initial delay to let backends start - time.Sleep(2 * time.Second) - - for _, backend := range m.backends { - go m.monitorBackend(backend) - } - - // Periodic health check - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - m.checkBackendHealth() - case <-m.stopChan: - return - } - } -} - -// monitorBackend monitors a single backend via WebSocket -func (m *ChainHeadMonitor) monitorBackend(backend Backend) { - backoffDelay := time.Second - - for { - select { - case <-m.stopChan: - return - default: - } - - // Create WebSocket URL - wsURL := strings.Replace(backend.URL, "http://", "ws://", 1) - wsURL = strings.Replace(wsURL, "https://", "wss://", 1) - - // Connect to WebSocket - conn, _, err := m.wsDialer.Dial(wsURL, nil) - if err != nil { - m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("WebSocket connection failed: %v", err)) - time.Sleep(backoffDelay) - backoffDelay = min(backoffDelay*2, 30*time.Second) - continue - } - - if m.enableDetailedLogs { - log.Printf("Connected to %s WebSocket for chain head monitoring", backend.Name) - } - - // Reset backoff on successful connection - backoffDelay = time.Second - - // Get chain ID first - chainID, err := m.getChainID(conn, backend.Name) - if err != nil { - conn.Close() - m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to get chain ID: %v", err)) - time.Sleep(backoffDelay) - continue - } - - // Store chain ID (especially important for primary) - if backend.Role == "primary" { - m.mu.Lock() - m.primaryChainID = chainID - m.mu.Unlock() - } - - // Subscribe to new heads - subscribeMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}`) - err = conn.WriteMessage(websocket.TextMessage, subscribeMsg) - if err != nil { - conn.Close() - m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to subscribe: %v", err)) - time.Sleep(backoffDelay) - continue - } - - // Read subscription response - _, msg, err := conn.ReadMessage() - if err != nil { - conn.Close() - m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to read subscription response: %v", err)) - time.Sleep(backoffDelay) - continue - } - - var subResponse struct { - Result string `json:"result"` - Error *struct { - Message string `json:"message"` - } `json:"error"` - } - - if err := json.Unmarshal(msg, &subResponse); err != nil || subResponse.Error != nil { - conn.Close() - errMsg := "Subscription failed" - if subResponse.Error != nil { - errMsg = subResponse.Error.Message - } - m.updateBackendStatus(backend.Name, nil, errMsg) - time.Sleep(backoffDelay) - continue - } - - // Read new head notifications - m.readNewHeads(conn, backend.Name, chainID) - - conn.Close() - time.Sleep(backoffDelay) - } -} - -// getChainID gets the chain ID from a backend -func (m *ChainHeadMonitor) getChainID(conn *websocket.Conn, backendName string) (string, error) { - // Send eth_chainId request - chainIDMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":"chainId"}`) - if err := conn.WriteMessage(websocket.TextMessage, chainIDMsg); err != nil { - return "", err - } - - // Set read deadline - conn.SetReadDeadline(time.Now().Add(5 * time.Second)) - defer conn.SetReadDeadline(time.Time{}) - - // Read response - _, msg, err := conn.ReadMessage() - if err != nil { - return "", err - } - - var response struct { - Result string `json:"result"` - Error *struct { - Message string `json:"message"` - } `json:"error"` - } - - if err := json.Unmarshal(msg, &response); err != nil { - return "", err - } - - if response.Error != nil { - return "", fmt.Errorf("RPC error: %s", response.Error.Message) - } - - return response.Result, nil -} - -// readNewHeads reads new head notifications from WebSocket -func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string, chainID string) { - // Set long read deadline for subscriptions - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - - for { - _, msg, err := conn.ReadMessage() - if err != nil { - m.updateBackendStatus(backendName, nil, fmt.Sprintf("WebSocket read error: %v", err)) - return - } - - // Reset read deadline after successful read - conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - - // Parse the subscription notification - var notification struct { - Params struct { - Result struct { - Number string `json:"number"` // Hex encoded block number - Hash string `json:"hash"` // Block hash - } `json:"result"` - } `json:"params"` - } - - if err := json.Unmarshal(msg, ¬ification); err != nil { - continue // Skip malformed messages - } - - // Convert hex block number to uint64 - if notification.Params.Result.Number != "" { - blockNumber, err := strconv.ParseUint(strings.TrimPrefix(notification.Params.Result.Number, "0x"), 16, 64) - if err != nil { - continue - } - - head := &ChainHead{ - BlockNumber: blockNumber, - BlockHash: notification.Params.Result.Hash, - ChainID: chainID, - LastUpdate: time.Now(), - IsHealthy: true, - } - - m.updateBackendStatus(backendName, head, "") - - // Cache block hash if this is from primary backend - if backendName == "primary" && notification.Params.Result.Hash != "" { - m.cacheBlockHash(notification.Params.Result.Hash, blockNumber) - } - - if m.enableDetailedLogs { - log.Printf("Backend %s at block %d (hash: %s...)", - backendName, blockNumber, head.BlockHash[:8]) - } - } - } -} - -// updateBackendStatus updates the status of a backend -func (m *ChainHeadMonitor) updateBackendStatus(backendName string, head *ChainHead, errorMsg string) { - m.mu.Lock() - defer m.mu.Unlock() - - if head != nil { - m.chainHeads[backendName] = head - } else if errorMsg != "" { - // Update or create entry with error - if existing, exists := m.chainHeads[backendName]; exists { - existing.IsHealthy = false - existing.Error = errorMsg - existing.LastUpdate = time.Now() - } else { - m.chainHeads[backendName] = &ChainHead{ - IsHealthy: false, - Error: errorMsg, - LastUpdate: time.Now(), - } - } - } - - // Update enabled status - m.updateEnabledStatus() -} - -// updateEnabledStatus updates which backends are enabled based on chain head -func (m *ChainHeadMonitor) updateEnabledStatus() { - // Get primary chain head - primaryHead, primaryExists := m.chainHeads["primary"] - if !primaryExists || !primaryHead.IsHealthy { - // If primary is not healthy/available (e.g., during restarts), enable all healthy secondary backends - if m.enableDetailedLogs { - log.Printf("Primary backend not healthy/available - enabling all healthy secondary backends") - } - - for _, backend := range m.backends { - if backend.Role == "primary" { - m.enabledBackends[backend.Name] = true // Always mark primary as enabled for routing - continue - } - - head, exists := m.chainHeads[backend.Name] - if exists && head.IsHealthy { - m.enabledBackends[backend.Name] = true - if m.enableDetailedLogs { - log.Printf("Backend %s enabled (primary unavailable)", backend.Name) - } - } else { - m.enabledBackends[backend.Name] = false - } - } - return - } - - // Check each backend - for _, backend := range m.backends { - if backend.Role == "primary" { - m.enabledBackends[backend.Name] = true - continue - } - - head, exists := m.chainHeads[backend.Name] - if !exists || !head.IsHealthy { - m.enabledBackends[backend.Name] = false - if m.enableDetailedLogs { - log.Printf("Backend %s disabled: not healthy", backend.Name) - } - continue - } - - // Check if on same chain - if head.ChainID != primaryHead.ChainID { - m.enabledBackends[backend.Name] = false - if m.enableDetailedLogs { - log.Printf("Backend %s disabled: wrong chain ID (got %s, want %s)", - backend.Name, head.ChainID, primaryHead.ChainID) - } - continue - } - - // STRICT RULE: Only allow if secondary matches primary height or is ahead - if head.BlockNumber < primaryHead.BlockNumber { - m.enabledBackends[backend.Name] = false - if m.enableDetailedLogs { - log.Printf("Backend %s disabled: behind primary (at block %d, primary at %d)", - backend.Name, head.BlockNumber, primaryHead.BlockNumber) - } - continue - } - - // Secondary is at same height or ahead - enable it - m.enabledBackends[backend.Name] = true - if m.enableDetailedLogs { - if head.BlockNumber > primaryHead.BlockNumber { - log.Printf("Backend %s enabled: ahead of primary (at block %d, primary at %d)", - backend.Name, head.BlockNumber, primaryHead.BlockNumber) - } else { - log.Printf("Backend %s enabled: at same block as primary (%d)", - backend.Name, head.BlockNumber) - } - } - } -} - -// checkBackendHealth performs periodic health checks -func (m *ChainHeadMonitor) checkBackendHealth() { - m.mu.Lock() - defer m.mu.Unlock() - - now := time.Now() - - // Check for stale data (no update in 90 seconds) - for _, head := range m.chainHeads { - if now.Sub(head.LastUpdate) > 90*time.Second { - head.IsHealthy = false - head.Error = "No recent updates" - } - } - - // Update enabled status - m.updateEnabledStatus() - - // Log current status if detailed logs enabled - if m.enableDetailedLogs { - log.Printf("Chain head monitor status:") - for name, enabled := range m.enabledBackends { - status := "disabled" - if enabled { - status = "enabled" - } - - if head, exists := m.chainHeads[name]; exists { - if head.IsHealthy { - log.Printf(" %s: %s, block %d, chain %s", name, status, head.BlockNumber, head.ChainID) - } else { - log.Printf(" %s: %s, error: %s", name, status, head.Error) - } - } else { - log.Printf(" %s: %s, no data", name, status) - } - } - } -} - -// GetStatus returns the current status of all backends -func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead { - m.mu.RLock() - defer m.mu.RUnlock() - - status := make(map[string]ChainHead) - for name, head := range m.chainHeads { - status[name] = *head - } - return status -} - -// cacheBlockHash adds a block hash to the cache, maintaining a maximum of 128 entries -func (m *ChainHeadMonitor) cacheBlockHash(blockHash string, blockNumber uint64) { - m.mu.Lock() - defer m.mu.Unlock() - - // Check if hash already exists - if _, exists := m.blockHashCache[blockHash]; exists { - return - } - - // Add to cache - m.blockHashCache[blockHash] = blockNumber - m.blockHashOrder = append(m.blockHashOrder, blockHash) - - // Maintain maximum cache size of 128 blocks - if len(m.blockHashOrder) > 128 { - // Remove oldest entry - oldestHash := m.blockHashOrder[0] - delete(m.blockHashCache, oldestHash) - m.blockHashOrder = m.blockHashOrder[1:] - } -} - -// GetBlockNumberForHash returns the block number for a given hash if it's in the cache -func (m *ChainHeadMonitor) GetBlockNumberForHash(blockHash string) (uint64, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - - blockNumber, exists := m.blockHashCache[blockHash] - return blockNumber, exists -}