// hi package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" ) // Simple structure to extract just the method from JSON-RPC requests type JSONRPCRequest struct { Method string `json:"method"` } // BatchInfo contains information about a batch request type BatchInfo struct { IsBatch bool Methods []string RequestCount int HasStateful bool BlockTags []string // Added to track block tags in batch RequiresPrimary bool // Added to indicate if batch requires primary due to block tags } // parseBatchInfo analyzes the request body to extract method information func parseBatchInfo(body []byte) (*BatchInfo, error) { // Check for empty body if len(body) == 0 { return nil, fmt.Errorf("empty request body") } // Try parsing as array first (batch request) var batchReqs []JSONRPCRequest if err := json.Unmarshal(body, &batchReqs); err == nil { // It's a batch request info := &BatchInfo{ IsBatch: true, RequestCount: len(batchReqs), Methods: make([]string, 0, len(batchReqs)), BlockTags: make([]string, 0), } // Extract methods and check for stateful ones methodSet := make(map[string]bool) // Track unique methods for _, req := range batchReqs { if req.Method != "" { info.Methods = append(info.Methods, req.Method) methodSet[req.Method] = true if isStatefulMethod(req.Method) || requiresPrimaryOnlyMethod(req.Method) { info.HasStateful = true } } } // Extract block tags from the batch blockTags, err := parseBlockTagsFromBatch(body) if err == nil { info.BlockTags = blockTags // Check if any block tag requires primary for _, tag := range blockTags { if requiresPrimaryBackend(tag) { info.RequiresPrimary = true break } } } return info, nil } // Try parsing as single request var singleReq JSONRPCRequest if err := json.Unmarshal(body, &singleReq); err == nil { info := &BatchInfo{ IsBatch: false, Methods: []string{singleReq.Method}, RequestCount: 1, HasStateful: isStatefulMethod(singleReq.Method) || requiresPrimaryOnlyMethod(singleReq.Method), BlockTags: make([]string, 0), } // Extract block tag from single request reqInfo, err := parseRequestInfo(body) if err == nil && reqInfo.BlockTag != "" { info.BlockTags = []string{reqInfo.BlockTag} info.RequiresPrimary = requiresPrimaryBackend(reqInfo.BlockTag) } return info, nil } // Neither batch nor single request return nil, fmt.Errorf("invalid JSON-RPC request format") } // calculateBatchDelay determines the appropriate delay for a batch request for a specific backend func calculateBatchDelay(methods []string, backendName string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { var maxDelay time.Duration for _, method := range methods { var delay time.Duration if probe != nil { delay = probe.getDelayForBackendAndMethod(backendName, method) } else { delay = stats.GetPrimaryP75ForMethod(method) } if delay > maxDelay { maxDelay = delay } } // If no methods or all unknown, use a default if maxDelay == 0 { if probe != nil { return probe.minResponseTime + probe.minDelayBuffer } return 15 * time.Millisecond // Default fallback } return maxDelay } // formatMethodList creates a readable string from method list for logging func formatMethodList(methods []string) string { if len(methods) == 0 { return "[]" } if len(methods) <= 3 { return fmt.Sprintf("%v", methods) } // Show first 3 methods + count of remaining return fmt.Sprintf("[%s, %s, %s, ... +%d more]", methods[0], methods[1], methods[2], len(methods)-3) } type Backend struct { URL string Name string Role string } type ResponseStats struct { Backend string StatusCode int Duration time.Duration Error error Method string // Added method field } // WebSocketStats tracks information about websocket connections type WebSocketStats struct { Backend string Error error ConnectTime time.Duration IsActive bool MessagesSent int MessagesReceived int } // CUDataPoint represents a historical CU data point with timestamp type CUDataPoint struct { Timestamp time.Time // End time of the interval CU int } // StatsCollector maintains statistics for periodic summaries type StatsCollector struct { mu sync.Mutex requestStats []ResponseStats methodStats map[string][]time.Duration // Track durations by method backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method backendWins map[string]int // Track how many times each backend responded first methodBackendWins map[string]map[string]int // Track wins per method per backend firstResponseDurations []time.Duration // Track durations of first successful responses (from winning backend's perspective) actualFirstResponseDurations []time.Duration // Track actual user-experienced durations methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method (winning backend's perspective) methodActualFirstResponseDurations map[string][]time.Duration // Track actual user-experienced durations by method totalRequests int errorCount int wsConnections []WebSocketStats // Track websocket connections totalWsConnections int appStartTime time.Time // Application start time (never reset) intervalStartTime time.Time // Current interval start time (reset each interval) summaryInterval time.Duration methodCUPrices map[string]int // Map of method names to CU prices totalCU int // Total CU earned methodCU map[string]int // Track CU earned per method historicalCU []CUDataPoint // Historical CU data for different time windows hasSecondaryBackends bool // Track if secondary backends are configured skippedSecondaryRequests int // Track how many secondary requests were skipped secondaryProbe *SecondaryProbe // Reference to secondary probe chainHeadMonitor *ChainHeadMonitor // Reference to chain head monitor } // SecondaryProbe maintains latency information for secondary backends through active probing type SecondaryProbe struct { mu sync.RWMutex backends []Backend client *http.Client minResponseTime time.Duration // Overall minimum response time methodTimings map[string]time.Duration // Per-method minimum response times backendTimings map[string]time.Duration // Per-backend minimum response times lastProbeTime time.Time probeInterval time.Duration minDelayBuffer time.Duration // Buffer to add to minimum times probeMethods []string enableDetailedLogs bool failureCount int // Track consecutive probe failures lastSuccessTime time.Time // Last time probes succeeded } // ChainHeadMonitor monitors chain heads of all backends via WebSocket subscriptions type ChainHeadMonitor struct { mu sync.RWMutex backends []Backend chainHeads map[string]*ChainHead // backend name -> chain head info primaryChainID string // Chain ID of primary backend enabledBackends map[string]bool // Track which backends are enabled blockHashCache map[string]uint64 // block hash -> block number cache (last 128 blocks from primary) blockHashOrder []string // ordered list of block hashes (oldest first) wsDialer *websocket.Dialer stopChan chan struct{} enableDetailedLogs bool } // ChainHead tracks the current head of a backend type ChainHead struct { BlockNumber uint64 // Current block number BlockHash string // Current block hash ChainID string // Chain ID LastUpdate time.Time // Last time we received an update IsHealthy bool // Whether this backend is healthy Error string // Last error if any } // MethodRouting contains configuration for method routing decisions type MethodRouting struct { SecondaryWhitelist map[string]bool // Methods allowed on secondary backends PreferSecondary map[string]bool // Methods that should prefer secondary backends } // RequestInfo contains parsed information about a JSON-RPC request type RequestInfo struct { Method string BlockTag string HasParams bool } // Full JSON-RPC request structure for parsing parameters type JSONRPCFullRequest struct { Method string `json:"method"` Params json.RawMessage `json:"params"` ID interface{} `json:"id"` } // parseRequestInfo extracts detailed information from a JSON-RPC request func parseRequestInfo(body []byte) (*RequestInfo, error) { var req JSONRPCFullRequest if err := json.Unmarshal(body, &req); err != nil { return nil, err } info := &RequestInfo{ Method: req.Method, HasParams: len(req.Params) > 0, } // Special handling for eth_getLogs if req.Method == "eth_getLogs" && info.HasParams { blockTags, err := parseEthLogsFilter(req.Params) if err == nil && len(blockTags) > 0 { // For eth_getLogs, we'll return the first block tag that requires primary routing // or "latest" if any of them is "latest" for _, tag := range blockTags { if requiresPrimaryBackend(tag) || tag == "latest" { info.BlockTag = tag break } } // If no special tags found but we have tags, use the first one if info.BlockTag == "" && len(blockTags) > 0 { info.BlockTag = blockTags[0] } } return info, nil } // Special handling for trace_filter if req.Method == "trace_filter" && info.HasParams { blockTags, err := parseTraceFilter(req.Params) if err == nil && len(blockTags) > 0 { // For trace_filter, we'll return the first block tag that requires primary routing // or "latest" if any of them is "latest" for _, tag := range blockTags { if requiresPrimaryBackend(tag) || tag == "latest" { info.BlockTag = tag break } } // If no special tags found but we have tags, use the first one if info.BlockTag == "" && len(blockTags) > 0 { info.BlockTag = blockTags[0] } } return info, nil } // Methods that commonly use block tags methodsWithBlockTags := map[string]int{ "eth_getBalance": -1, // last param "eth_getCode": -1, // last param "eth_getTransactionCount": -1, // last param "eth_getStorageAt": -1, // last param "eth_call": -1, // last param "eth_estimateGas": -1, // last param "eth_getProof": -1, // last param "eth_getBlockByNumber": 0, // first param "eth_getBlockTransactionCountByNumber": 0, // first param "eth_getTransactionByBlockNumberAndIndex": 0, // first param "eth_getUncleByBlockNumberAndIndex": 0, // first param "eth_getUncleCountByBlockNumber": 0, // first param // Trace methods that use block tags "trace_block": 0, // first param (block number/tag) "trace_replayBlockTransactions": 0, // first param (block number/tag) "trace_call": -1, // last param (block tag) // Debug methods that use block tags "debug_traceBlockByNumber": 0, // first param (block number/tag) "debug_traceCall": 1, // SPECIAL: second param (call object, block tag, trace config) // Note: eth_getLogs uses a filter object with fromBlock/toBlock fields, // which is handled specially above // Note: trace_filter uses a filter object similar to eth_getLogs, // which needs special handling } // Methods that use block hashes as parameters methodsWithBlockHashes := map[string]int{ "eth_getBlockByHash": 0, // first param "eth_getBlockTransactionCountByHash": 0, // first param "eth_getTransactionByBlockHashAndIndex": 0, // first param "eth_getUncleByBlockHashAndIndex": 0, // first param "eth_getUncleCountByBlockHash": 0, // first param "debug_traceBlockByHash": 0, // first param } // Check for block hash methods first paramPos, hasBlockHash := methodsWithBlockHashes[req.Method] if hasBlockHash && info.HasParams { // Parse params as array var params []json.RawMessage if err := json.Unmarshal(req.Params, ¶ms); err == nil && len(params) > paramPos { // Try to parse as string (block hash) var blockHash string if err := json.Unmarshal(params[paramPos], &blockHash); err == nil { info.BlockTag = blockHash return info, nil } } } paramPos, hasBlockTag := methodsWithBlockTags[req.Method] if !hasBlockTag || !info.HasParams { return info, nil } // Parse params as array var params []json.RawMessage if err := json.Unmarshal(req.Params, ¶ms); err != nil { // Not an array, might be object params return info, nil } if len(params) == 0 { return info, nil } // Determine which parameter to check var blockTagParam json.RawMessage if paramPos == -1 { // Last parameter blockTagParam = params[len(params)-1] } else if paramPos < len(params) { // Specific position blockTagParam = params[paramPos] } else { return info, nil } // Special handling for debug_traceCall where position 1 might be omitted // If we're checking position 1 but only have 2 params, the middle param might be omitted if req.Method == "debug_traceCall" && paramPos == 1 && len(params) == 2 { // With only 2 params, it's likely (call_object, trace_config) without block tag // The block tag would default to "latest" on the backend info.BlockTag = "latest" return info, nil } // Try to parse as string (block tag) var blockTag string if err := json.Unmarshal(blockTagParam, &blockTag); err == nil { info.BlockTag = blockTag } return info, nil } // parseBlockTagsFromBatch extracts block tags from all requests in a batch func parseBlockTagsFromBatch(body []byte) ([]string, error) { var batchReqs []JSONRPCFullRequest if err := json.Unmarshal(body, &batchReqs); err != nil { return nil, err } blockTags := make([]string, 0) for _, req := range batchReqs { // Special handling for eth_getLogs if req.Method == "eth_getLogs" && len(req.Params) > 0 { logsTags, err := parseEthLogsFilter(req.Params) if err == nil { blockTags = append(blockTags, logsTags...) } continue } // Special handling for trace_filter if req.Method == "trace_filter" && len(req.Params) > 0 { traceTags, err := parseTraceFilter(req.Params) if err == nil { blockTags = append(blockTags, traceTags...) } continue } // Regular handling for other methods reqBytes, err := json.Marshal(req) if err != nil { continue } info, err := parseRequestInfo(reqBytes) if err != nil { continue } if info.BlockTag != "" { blockTags = append(blockTags, info.BlockTag) } } return blockTags, nil } // parseEthLogsFilter extracts block tags from eth_getLogs filter parameter func parseEthLogsFilter(params json.RawMessage) ([]string, error) { // eth_getLogs takes a single filter object parameter var paramArray []json.RawMessage if err := json.Unmarshal(params, ¶mArray); err != nil { return nil, err } if len(paramArray) == 0 { return nil, nil } // Parse the filter object var filter struct { FromBlock json.RawMessage `json:"fromBlock"` ToBlock json.RawMessage `json:"toBlock"` } if err := json.Unmarshal(paramArray[0], &filter); err != nil { return nil, err } blockTags := make([]string, 0, 2) // Extract fromBlock if present if len(filter.FromBlock) > 0 { var fromBlock string if err := json.Unmarshal(filter.FromBlock, &fromBlock); err == nil && fromBlock != "" { blockTags = append(blockTags, fromBlock) } } // Extract toBlock if present if len(filter.ToBlock) > 0 { var toBlock string if err := json.Unmarshal(filter.ToBlock, &toBlock); err == nil && toBlock != "" { blockTags = append(blockTags, toBlock) } } return blockTags, nil } // parseTraceFilter extracts block tags from trace_filter filter parameter func parseTraceFilter(params json.RawMessage) ([]string, error) { // trace_filter takes a single filter object parameter var paramArray []json.RawMessage if err := json.Unmarshal(params, ¶mArray); err != nil { return nil, err } if len(paramArray) == 0 { return nil, nil } // Parse the filter object var filter struct { FromBlock json.RawMessage `json:"fromBlock"` ToBlock json.RawMessage `json:"toBlock"` } if err := json.Unmarshal(paramArray[0], &filter); err != nil { return nil, err } blockTags := make([]string, 0, 2) // Extract fromBlock if present if len(filter.FromBlock) > 0 { var fromBlock string if err := json.Unmarshal(filter.FromBlock, &fromBlock); err == nil && fromBlock != "" { blockTags = append(blockTags, fromBlock) } } // Extract toBlock if present if len(filter.ToBlock) > 0 { var toBlock string if err := json.Unmarshal(filter.ToBlock, &toBlock); err == nil && toBlock != "" { blockTags = append(blockTags, toBlock) } } return blockTags, nil } // requiresPrimaryBackend checks if a request must be routed to primary based on block tag func requiresPrimaryBackend(blockTag string) bool { // These block tags must always go to primary primaryOnlyTags := map[string]bool{ "finalized": true, "pending": true, "safe": true, } return primaryOnlyTags[blockTag] } // canUseSecondaryForLatest checks if secondary backend can be used for "latest" block tag func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { // Only check for "latest" tag if blockTag != "latest" { // For non-latest tags (like specific block numbers), follow existing rules return true } if chainHeadMonitor == nil { // No monitor, can't verify - be conservative return false } // Get chain head status chainStatus := chainHeadMonitor.GetStatus() primaryHead, primaryExists := chainStatus["primary"] if !primaryExists || !primaryHead.IsHealthy { // Primary not healthy, allow secondary return true } secondaryHead, secondaryExists := chainStatus[backendName] if !secondaryExists || !secondaryHead.IsHealthy { // Secondary not healthy return false } // For "latest", secondary must be at EXACTLY the same block height return secondaryHead.BlockNumber == primaryHead.BlockNumber } // canUseSecondaryForBlockTag checks if secondary backend can be used for a given block tag func canUseSecondaryForBlockTag(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { if chainHeadMonitor == nil { // No monitor, can't verify - be conservative return false } // Get chain head status chainStatus := chainHeadMonitor.GetStatus() primaryHead, primaryExists := chainStatus["primary"] if !primaryExists || !primaryHead.IsHealthy { // Primary not healthy, allow secondary return true } secondaryHead, secondaryExists := chainStatus[backendName] if !secondaryExists || !secondaryHead.IsHealthy { // Secondary not healthy return false } // Handle "latest" tag - secondary must be at EXACTLY the same block height if blockTag == "latest" { return secondaryHead.BlockNumber == primaryHead.BlockNumber } // Handle "earliest" tag - always allowed if blockTag == "earliest" { return true } // Check if it's a block hash (0x followed by 64 hex chars) if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") { // Try to look up the block number from our cache if blockNumber, exists := chainHeadMonitor.GetBlockNumberForHash(blockTag); exists { // We know this block number, check if secondary has it return secondaryHead.BlockNumber >= blockNumber } // Unknown block hash - be conservative and route to primary return false } // Check if it's a numeric block tag (hex number) if strings.HasPrefix(blockTag, "0x") { blockNumber, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64) if err == nil { // Valid block number - check if secondary has reached it return secondaryHead.BlockNumber >= blockNumber } } // Unknown block tag format - be conservative return false } func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ requestStats: make([]ResponseStats, 0, 1000), methodStats: make(map[string][]time.Duration), backendMethodStats: make(map[string]map[string][]time.Duration), backendWins: make(map[string]int), methodBackendWins: make(map[string]map[string]int), firstResponseDurations: make([]time.Duration, 0, 1000), actualFirstResponseDurations: make([]time.Duration, 0, 1000), methodFirstResponseDurations: make(map[string][]time.Duration), methodActualFirstResponseDurations: make(map[string][]time.Duration), appStartTime: now, intervalStartTime: now, summaryInterval: summaryInterval, methodCUPrices: initCUPrices(), // Initialize CU prices methodCU: make(map[string]int), historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals hasSecondaryBackends: hasSecondaryBackends, } // Start the periodic summary goroutine go sc.periodicSummary() return sc } // SetSecondaryProbe sets the secondary probe reference after stats collector is created func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) { sc.mu.Lock() defer sc.mu.Unlock() sc.secondaryProbe = probe } // SetChainHeadMonitor sets the chain head monitor reference after stats collector is created func (sc *StatsCollector) SetChainHeadMonitor(monitor *ChainHeadMonitor) { sc.mu.Lock() defer sc.mu.Unlock() sc.chainHeadMonitor = monitor } // NewSecondaryProbe creates a new secondary probe instance func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { // Filter only secondary backends var secondaryBackends []Backend for _, b := range backends { if b.Role == "secondary" { secondaryBackends = append(secondaryBackends, b) } } if len(secondaryBackends) == 0 { return nil } sp := &SecondaryProbe{ backends: secondaryBackends, client: client, minResponseTime: 15 * time.Millisecond, // Start with reasonable default methodTimings: make(map[string]time.Duration), backendTimings: make(map[string]time.Duration), probeInterval: probeInterval, minDelayBuffer: minDelayBuffer, probeMethods: probeMethods, enableDetailedLogs: enableDetailedLogs, lastSuccessTime: time.Now(), } // Run initial probe immediately go func() { sp.runProbe() // Then start periodic probing sp.startPeriodicProbing() }() return sp } // getDelayForMethod returns the appropriate delay for a given method func (sp *SecondaryProbe) getDelayForMethod(method string) time.Duration { sp.mu.RLock() defer sp.mu.RUnlock() // If probes have been failing, use a conservative fallback if sp.failureCount > 3 && time.Since(sp.lastSuccessTime) > 5*time.Minute { return 20 * time.Millisecond // Conservative fallback } // Use method-specific timing if available if timing, exists := sp.methodTimings[method]; exists { return timing + sp.minDelayBuffer } // Fall back to general minimum return sp.minResponseTime + sp.minDelayBuffer } // getDelayForBackendAndMethod returns the appropriate delay for a specific backend and method func (sp *SecondaryProbe) getDelayForBackendAndMethod(backend, method string) time.Duration { sp.mu.RLock() defer sp.mu.RUnlock() // Start with backend-specific timing delay := sp.minResponseTime if backendTiming, exists := sp.backendTimings[backend]; exists { delay = backendTiming } // Use method-specific timing if it's longer if methodTiming, exists := sp.methodTimings[method]; exists && methodTiming > delay { delay = methodTiming } return delay + sp.minDelayBuffer } // runProbe performs a single probe cycle to all secondary backends func (sp *SecondaryProbe) runProbe() { newMethodTimings := make(map[string]time.Duration) newBackendTimings := make(map[string]time.Duration) successfulProbes := 0 for _, backend := range sp.backends { backendMin := time.Hour // Start with large value for _, method := range sp.probeMethods { methodMin := time.Hour // Track minimum for this method on this backend methodSuccesses := 0 // Perform 10 probes for this method and take the minimum for probe := 0; probe < 10; probe++ { reqBody := []byte(fmt.Sprintf( `{"jsonrpc":"2.0","method":"%s","params":[],"id":"probe-%d-%d"}`, method, time.Now().UnixNano(), probe, )) req, err := http.NewRequest("POST", backend.URL, bytes.NewReader(reqBody)) if err != nil { continue } req.Header.Set("Content-Type", "application/json") // Ensure connection reuse by setting Connection: keep-alive req.Header.Set("Connection", "keep-alive") start := time.Now() resp, err := sp.client.Do(req) duration := time.Since(start) if err == nil && resp != nil { resp.Body.Close() if resp.StatusCode == 200 { methodSuccesses++ successfulProbes++ // Track minimum for this method on this backend if duration < methodMin { methodMin = duration } if sp.enableDetailedLogs { log.Printf("Probe %d/10: backend=%s method=%s duration=%s status=%d (min so far: %s)", probe+1, backend.Name, method, duration, resp.StatusCode, methodMin) } } } // Small delay between probes to avoid overwhelming the backend if probe < 9 { // Don't delay after the last probe time.Sleep(10 * time.Millisecond) } } // Only use this method's timing if we had successful probes if methodSuccesses > 0 && methodMin < time.Hour { // Update method timing (use minimum across all backends) if currentMin, exists := newMethodTimings[method]; !exists || methodMin < currentMin { newMethodTimings[method] = methodMin } // Track backend minimum if methodMin < backendMin { backendMin = methodMin } if sp.enableDetailedLogs { log.Printf("Method %s on backend %s: %d/10 successful probes, min duration: %s", method, backend.Name, methodSuccesses, methodMin) } } } // Store backend minimum if we got any successful probes if backendMin < time.Hour { newBackendTimings[backend.Name] = backendMin } } // Update timings if we got successful probes sp.mu.Lock() defer sp.mu.Unlock() if successfulProbes > 0 { sp.failureCount = 0 sp.lastSuccessTime = time.Now() // Update method timings for method, timing := range newMethodTimings { sp.methodTimings[method] = timing } // Update backend timings for backend, timing := range newBackendTimings { sp.backendTimings[backend] = timing } // Update overall minimum overallMin := time.Hour for _, timing := range newBackendTimings { if timing < overallMin { overallMin = timing } } if overallMin < time.Hour { sp.minResponseTime = overallMin } sp.lastProbeTime = time.Now() if sp.enableDetailedLogs { log.Printf("Probe complete: min=%s methods=%v backends=%v", sp.minResponseTime, sp.methodTimings, sp.backendTimings) } } else { sp.failureCount++ if sp.enableDetailedLogs { log.Printf("Probe failed: consecutive failures=%d", sp.failureCount) } } } // startPeriodicProbing runs probes at regular intervals func (sp *SecondaryProbe) startPeriodicProbing() { ticker := time.NewTicker(sp.probeInterval) defer ticker.Stop() for range ticker.C { sp.runProbe() } } // initCUPrices initializes the map of method names to their CU prices func initCUPrices() map[string]int { return map[string]int{ "debug_traceBlockByHash": 90, "debug_traceBlockByNumber": 90, "debug_traceCall": 90, "debug_traceTransaction": 90, "debug_storageRangeAt": 50, // Storage access method "eth_accounts": 0, "eth_blockNumber": 10, "eth_call": 21, "eth_chainId": 0, "eth_coinbase": 0, "eth_createAccessList": 30, "eth_estimateGas": 60, "eth_feeHistory": 15, "eth_gasPrice": 15, "eth_getBalance": 11, "eth_getBlockByHash": 21, "eth_getBlockByHash#full": 60, "eth_getBlockByNumber": 24, "eth_getBlockByNumber#full": 60, "eth_getBlockReceipts": 80, "eth_getBlockTransactionCountByHash": 15, "eth_getBlockTransactionCountByNumber": 11, "eth_getCode": 24, "eth_getFilterChanges": 20, "eth_getFilterLogs": 60, "eth_getLogs": 60, "eth_getProof": 11, "eth_getStorageAt": 14, "eth_getTransactionByBlockHashAndIndex": 19, "eth_getTransactionByBlockNumberAndIndex": 13, "eth_getTransactionByHash": 11, "eth_getTransactionCount": 11, "eth_getTransactionReceipt": 30, "eth_getUncleByBlockHashAndIndex": 15, "eth_getUncleByBlockNumberAndIndex": 15, "eth_getUncleCountByBlockHash": 15, "eth_getUncleCountByBlockNumber": 15, "eth_hashrate": 0, "eth_maxPriorityFeePerGas": 16, "eth_mining": 0, "eth_newBlockFilter": 20, "eth_newFilter": 20, "eth_newPendingTransactionFilter": 20, "eth_protocolVersion": 0, "eth_sendRawTransaction": 90, "eth_syncing": 0, "eth_subscribe": 10, "eth_subscription": 25, // For "Notifications from the events you've subscribed to" "eth_uninstallFilter": 10, "eth_unsubscribe": 10, "net_listening": 0, "net_peerCount": 0, "net_version": 0, "trace_block": 90, "trace_call": 60, "trace_callMany": 90, "trace_filter": 75, "trace_get": 20, "trace_rawTransaction": 75, "trace_replayBlockTransactions": 90, "trace_replayBlockTransactions#vmTrace": 300, "trace_replayTransaction": 90, "trace_replayTransaction#vmTrace": 300, "trace_transaction": 90, "txpool_content": 1000, "web3_clientVersion": 0, "web3_sha3": 10, "bor_getAuthor": 10, "bor_getCurrentProposer": 10, "bor_getCurrentValidators": 10, "bor_getRootHash": 10, "bor_getSignersAtHash": 10, } } func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Duration) { sc.mu.Lock() defer sc.mu.Unlock() // Find the fastest successful response and actual first response var fastestBackend string var fastestDuration time.Duration = time.Hour // Initialize with a very large duration var actualFirstDuration time.Duration var method string var hasActualFirst bool for _, stat := range stats { if stat.Backend == "actual-first-response" { actualFirstDuration = stat.Duration hasActualFirst = true method = stat.Method } else if stat.Error == nil && stat.Duration < fastestDuration { fastestDuration = stat.Duration fastestBackend = stat.Backend if method == "" { method = stat.Method } } } // Track the win if we found a successful response if fastestBackend != "" { sc.backendWins[fastestBackend]++ // Track wins per method if _, exists := sc.methodBackendWins[method]; !exists { sc.methodBackendWins[method] = make(map[string]int) } sc.methodBackendWins[method][fastestBackend]++ // Track first response duration (from winning backend's perspective) sc.firstResponseDurations = append(sc.firstResponseDurations, fastestDuration) // Track first response duration by method if _, exists := sc.methodFirstResponseDurations[method]; !exists { sc.methodFirstResponseDurations[method] = make([]time.Duration, 0, 100) } sc.methodFirstResponseDurations[method] = append(sc.methodFirstResponseDurations[method], fastestDuration) // Track actual first response duration if available if hasActualFirst { sc.actualFirstResponseDurations = append(sc.actualFirstResponseDurations, actualFirstDuration) if _, exists := sc.methodActualFirstResponseDurations[method]; !exists { sc.methodActualFirstResponseDurations[method] = make([]time.Duration, 0, 100) } sc.methodActualFirstResponseDurations[method] = append(sc.methodActualFirstResponseDurations[method], actualFirstDuration) } } // Add stats to the collection (skip actual-first-response as it's synthetic) for _, stat := range stats { if stat.Backend == "actual-first-response" { continue // Don't add synthetic entries to regular stats } sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { // Don't count skipped secondary backends as errors if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { sc.errorCount++ } else { // Track that we skipped a secondary request sc.skippedSecondaryRequests++ } } // Track method-specific stats for all backends if stat.Error == nil { // Initialize backend map if not exists if _, exists := sc.backendMethodStats[stat.Backend]; !exists { sc.backendMethodStats[stat.Backend] = make(map[string][]time.Duration) } // Initialize method array if not exists if _, exists := sc.backendMethodStats[stat.Backend][stat.Method]; !exists { sc.backendMethodStats[stat.Backend][stat.Method] = make([]time.Duration, 0, 100) } // Add the duration sc.backendMethodStats[stat.Backend][stat.Method] = append( sc.backendMethodStats[stat.Backend][stat.Method], stat.Duration) // Keep tracking primary backend in the old way for backward compatibility if stat.Backend == "primary" { // Handle batch requests specially for CU calculation if strings.HasPrefix(stat.Method, "batch[") && len(stat.Method) > 6 { // Don't track batch as a method, it will be handled separately } else { if _, exists := sc.methodStats[stat.Method]; !exists { sc.methodStats[stat.Method] = make([]time.Duration, 0, 100) } sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration) // Add CU for this method cuValue := sc.methodCUPrices[stat.Method] sc.totalCU += cuValue sc.methodCU[stat.Method] += cuValue } } } } sc.totalRequests++ } // AddBatchStats adds statistics for a batch request func (sc *StatsCollector) AddBatchStats(methods []string, duration time.Duration, backend string) { sc.mu.Lock() defer sc.mu.Unlock() // Calculate total CU for the batch batchCU := 0 for _, method := range methods { if method != "" { cuValue := sc.methodCUPrices[method] batchCU += cuValue // Track individual method CU sc.methodCU[method] += cuValue // Track method durations (use batch duration for each method) if _, exists := sc.methodStats[method]; !exists { sc.methodStats[method] = make([]time.Duration, 0, 100) } sc.methodStats[method] = append(sc.methodStats[method], duration) } } sc.totalCU += batchCU } func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) { sc.mu.Lock() defer sc.mu.Unlock() sc.wsConnections = append(sc.wsConnections, stats) sc.totalWsConnections++ if stats.Error != nil { sc.errorCount++ } } func (sc *StatsCollector) periodicSummary() { ticker := time.NewTicker(sc.summaryInterval) defer ticker.Stop() for range ticker.C { sc.printSummary() } } // formatDuration formats a duration with at most 6 significant digits total func formatDuration(d time.Duration) string { // Convert to string with standard formatting str := d.String() // Find the decimal point if it exists decimalIdx := strings.Index(str, ".") if decimalIdx == -1 { // No decimal point, return as is (already ≤ 6 digits or no need to truncate) return str } // Find the unit suffix (ms, µs, etc.) unitIdx := -1 for i := decimalIdx; i < len(str); i++ { if !(str[i] >= '0' && str[i] <= '9') && str[i] != '.' { unitIdx = i break } } if unitIdx == -1 { unitIdx = len(str) // No unit suffix found } // Count digits before decimal (not including sign) digitsBeforeDecimal := 0 for i := 0; i < decimalIdx; i++ { if str[i] >= '0' && str[i] <= '9' { digitsBeforeDecimal++ } } // Calculate how many decimal places we can keep (allowing for 6 total digits) maxDecimalPlaces := 6 - digitsBeforeDecimal if maxDecimalPlaces <= 0 { // No room for decimal places return str[:decimalIdx] + str[unitIdx:] } // Calculate end position for truncation endPos := decimalIdx + 1 + maxDecimalPlaces if endPos > unitIdx { endPos = unitIdx } // Return truncated string return str[:endPos] + str[unitIdx:] } func (sc *StatsCollector) printSummary() { sc.mu.Lock() defer sc.mu.Unlock() uptime := time.Since(sc.appStartTime) fmt.Printf("\n=== BENCHMARK PROXY SUMMARY ===\n") fmt.Printf("Uptime: %s\n", uptime.Round(time.Second)) fmt.Printf("Total HTTP Requests: %d\n", sc.totalRequests) fmt.Printf("Total WebSocket Connections: %d\n", sc.totalWsConnections) fmt.Printf("Error Rate: %.2f%%\n", float64(sc.errorCount)/float64(sc.totalRequests+sc.totalWsConnections)*100) // Display secondary probe information if available if sc.secondaryProbe != nil { sc.secondaryProbe.mu.RLock() fmt.Printf("\n--- Secondary Probe Status ---\n") fmt.Printf("Minimum Secondary Latency: %s\n", formatDuration(sc.secondaryProbe.minResponseTime)) fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer)) fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer)) if len(sc.secondaryProbe.methodTimings) > 0 { fmt.Printf("Method-Specific Thresholds:\n") // Sort methods for consistent output var methods []string for method := range sc.secondaryProbe.methodTimings { methods = append(methods, method) } sort.Strings(methods) for _, method := range methods { timing := sc.secondaryProbe.methodTimings[method] fmt.Printf(" %s: %s (+ %s buffer = %s)\n", method, formatDuration(timing), formatDuration(sc.secondaryProbe.minDelayBuffer), formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) } } if len(sc.secondaryProbe.backendTimings) > 0 { fmt.Printf("Backend-Specific Minimum Latencies:\n") // Sort backend names for consistent output var backendNames []string for backend := range sc.secondaryProbe.backendTimings { backendNames = append(backendNames, backend) } sort.Strings(backendNames) for _, backend := range backendNames { timing := sc.secondaryProbe.backendTimings[backend] fmt.Printf(" %s: %s (+ %s buffer = %s)\n", backend, formatDuration(timing), formatDuration(sc.secondaryProbe.minDelayBuffer), formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) } } if sc.secondaryProbe.failureCount > 0 { fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount) } sc.secondaryProbe.mu.RUnlock() } // Display chain head monitor information if available if sc.chainHeadMonitor != nil { fmt.Printf("\n--- Chain Head Monitor Status ---\n") chainStatus := sc.chainHeadMonitor.GetStatus() // Get primary block height for comparison var primaryBlockHeight uint64 if primaryHead, exists := chainStatus["primary"]; exists && primaryHead.IsHealthy { primaryBlockHeight = primaryHead.BlockNumber } // Sort backend names for consistent output var backendNames []string for name := range chainStatus { backendNames = append(backendNames, name) } sort.Strings(backendNames) for _, name := range backendNames { head := chainStatus[name] status := "healthy" details := fmt.Sprintf("block %d, chain %s", head.BlockNumber, head.ChainID) // Add block difference info for secondary backends if name != "primary" && primaryBlockHeight > 0 && head.IsHealthy { diff := int64(head.BlockNumber) - int64(primaryBlockHeight) if diff > 0 { details += fmt.Sprintf(" (+%d ahead)", diff) } else if diff < 0 { details += fmt.Sprintf(" (%d behind)", diff) } else { details += " (in sync)" } } if !head.IsHealthy { status = "unhealthy" details = head.Error } else if sc.chainHeadMonitor.IsBackendHealthy(name) { status = "enabled" } else { status = "disabled" } fmt.Printf(" %s: %s (%s)\n", name, status, details) } // Show block hash cache stats sc.chainHeadMonitor.mu.RLock() cacheSize := len(sc.chainHeadMonitor.blockHashCache) sc.chainHeadMonitor.mu.RUnlock() fmt.Printf(" Block hash cache: %d entries (max 128)\n", cacheSize) } if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, float64(sc.skippedSecondaryRequests)/float64(sc.totalRequests)*100) } fmt.Printf("Total Compute Units Earned (current interval): %d CU\n", sc.totalCU) // Calculate and display CU for different time windows timeWindows := []struct { duration time.Duration label string }{ {10 * time.Minute, "Last 10 minutes"}, {1 * time.Hour, "Last hour"}, {3 * time.Hour, "Last 3 hours"}, {24 * time.Hour, "Last 24 hours"}, } fmt.Printf("\nHistorical Compute Units:\n") for _, window := range timeWindows { actualCU, needsExtrapolation := sc.calculateCUForTimeWindow(window.duration) if needsExtrapolation { // Calculate actual data duration for extrapolation now := time.Now() cutoff := now.Add(-window.duration) var oldestDataStartTime time.Time hasData := false // Check current interval if sc.intervalStartTime.After(cutoff) { oldestDataStartTime = sc.intervalStartTime hasData = true } // Check historical data for i := len(sc.historicalCU) - 1; i >= 0; i-- { point := sc.historicalCU[i] intervalStart := point.Timestamp.Add(-sc.summaryInterval) if point.Timestamp.Before(cutoff) { break } if !hasData || intervalStart.Before(oldestDataStartTime) { oldestDataStartTime = intervalStart } hasData = true } var actualDuration time.Duration if hasData { actualDuration = now.Sub(oldestDataStartTime) } extrapolatedCU := sc.extrapolateCU(actualCU, actualDuration, window.duration) fmt.Printf(" %s: %s\n", window.label, formatCUWithExtrapolation(extrapolatedCU, true)) } else { fmt.Printf(" %s: %s\n", window.label, formatCUWithExtrapolation(actualCU, false)) } } // Calculate response time statistics for primary backend var primaryDurations []time.Duration for _, stat := range sc.requestStats { if stat.Backend == "primary" && stat.Error == nil { primaryDurations = append(primaryDurations, stat.Duration) } } if len(primaryDurations) > 0 { sort.Slice(primaryDurations, func(i, j int) bool { return primaryDurations[i] < primaryDurations[j] }) var sum time.Duration for _, d := range primaryDurations { sum += d } avg := sum / time.Duration(len(primaryDurations)) min := primaryDurations[0] max := primaryDurations[len(primaryDurations)-1] p50idx := len(primaryDurations) * 50 / 100 p90idx := len(primaryDurations) * 90 / 100 p99idx := minInt(len(primaryDurations)-1, len(primaryDurations)*99/100) p50 := primaryDurations[p50idx] p90 := primaryDurations[p90idx] p99 := primaryDurations[p99idx] fmt.Printf("\nPrimary Backend Response Times:\n") fmt.Printf(" Min: %s\n", formatDuration(min)) fmt.Printf(" Avg: %s\n", formatDuration(avg)) fmt.Printf(" Max: %s\n", formatDuration(max)) fmt.Printf(" p50: %s\n", formatDuration(p50)) fmt.Printf(" p90: %s\n", formatDuration(p90)) fmt.Printf(" p99: %s\n", formatDuration(p99)) } // Calculate response time statistics for ALL backends backendDurations := make(map[string][]time.Duration) for _, stat := range sc.requestStats { if stat.Error == nil { backendDurations[stat.Backend] = append(backendDurations[stat.Backend], stat.Duration) } } // Sort backend names for consistent output var backendNames []string for backend := range backendDurations { backendNames = append(backendNames, backend) } sort.Strings(backendNames) // Print per-backend statistics fmt.Printf("\nPer-Backend Response Time Comparison:\n") fmt.Printf("Note: 'User Latency' = actual time users wait; 'Backend Time' = winning backend's response time\n") fmt.Printf("%-20s %10s %10s %10s %10s %10s %10s %10s\n", "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") fmt.Printf("%s\n", strings.Repeat("-", 100)) // First, show the actual user latency if available if len(sc.actualFirstResponseDurations) > 0 { actualDurations := make([]time.Duration, len(sc.actualFirstResponseDurations)) copy(actualDurations, sc.actualFirstResponseDurations) sort.Slice(actualDurations, func(i, j int) bool { return actualDurations[i] < actualDurations[j] }) var sum time.Duration for _, d := range actualDurations { sum += d } avg := sum / time.Duration(len(actualDurations)) min := actualDurations[0] max := actualDurations[len(actualDurations)-1] p50idx := len(actualDurations) * 50 / 100 p90idx := len(actualDurations) * 90 / 100 p99idx := minInt(len(actualDurations)-1, len(actualDurations)*99/100) p50 := actualDurations[p50idx] p90 := actualDurations[p90idx] p99 := actualDurations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", "User Latency", len(actualDurations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Then show the backend time (what backend actually took) if len(sc.firstResponseDurations) > 0 { firstRespDurations := make([]time.Duration, len(sc.firstResponseDurations)) copy(firstRespDurations, sc.firstResponseDurations) sort.Slice(firstRespDurations, func(i, j int) bool { return firstRespDurations[i] < firstRespDurations[j] }) var sum time.Duration for _, d := range firstRespDurations { sum += d } avg := sum / time.Duration(len(firstRespDurations)) min := firstRespDurations[0] max := firstRespDurations[len(firstRespDurations)-1] p50idx := len(firstRespDurations) * 50 / 100 p90idx := len(firstRespDurations) * 90 / 100 p99idx := minInt(len(firstRespDurations)-1, len(firstRespDurations)*99/100) p50 := firstRespDurations[p50idx] p90 := firstRespDurations[p90idx] p99 := firstRespDurations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", "Backend Time", len(firstRespDurations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) fmt.Printf("%s\n", strings.Repeat("-", 100)) } for _, backend := range backendNames { durations := backendDurations[backend] if len(durations) == 0 { continue } sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) min := durations[0] max := durations[len(durations)-1] p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 := durations[p50idx] p90 := durations[p90idx] p99 := durations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", backend, len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Print backend wins statistics fmt.Printf("\nBackend First Response Wins:\n") fmt.Printf("%-20s %10s %10s\n", "Backend", "Wins", "Win %") fmt.Printf("%s\n", strings.Repeat("-", 42)) totalWins := 0 for _, wins := range sc.backendWins { totalWins += wins } // Sort backends by wins for consistent output type backendWin struct { backend string wins int } var winList []backendWin for backend, wins := range sc.backendWins { winList = append(winList, backendWin{backend, wins}) } sort.Slice(winList, func(i, j int) bool { return winList[i].wins > winList[j].wins }) for _, bw := range winList { winPercentage := float64(bw.wins) / float64(totalWins) * 100 fmt.Printf("%-20s %10d %9.1f%%\n", bw.backend, bw.wins, winPercentage) } // Print per-method statistics if len(sc.methodStats) > 0 { fmt.Printf("\nPer-Method Statistics (Primary Backend):\n") // Sort methods by name for consistent output methods := make([]string, 0, len(sc.methodStats)) for method := range sc.methodStats { methods = append(methods, method) } sort.Strings(methods) for _, method := range methods { var durations []time.Duration var displayLabel string // If secondary backends are configured and we have actual user latency data, use that if sc.hasSecondaryBackends { if actualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(actualDurations) > 0 { durations = make([]time.Duration, len(actualDurations)) copy(durations, actualDurations) displayLabel = method + " (User Latency)" } else { // Fall back to primary backend times if no actual latency data durations = sc.methodStats[method] displayLabel = method + " (Primary Backend)" } } else { // No secondary backends, use primary backend times durations = sc.methodStats[method] displayLabel = method } if len(durations) == 0 { continue } sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) minDuration := durations[0] max := durations[len(durations)-1] // Only calculate percentiles if we have enough samples p50 := minDuration p90 := minDuration p99 := minDuration if len(durations) >= 2 { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 = durations[p50idx] p90 = durations[p90idx] p99 = durations[p99idx] } // Add CU information to the output cuPrice := sc.methodCUPrices[method] cuEarned := sc.methodCU[method] fmt.Printf(" %-50s Count: %-5d Avg: %-10s Min: %-10s Max: %-10s p50: %-10s p90: %-10s p99: %-10s CU: %d (%d)\n", displayLabel, len(durations), formatDuration(avg), formatDuration(minDuration), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99), cuEarned, cuPrice) } } // Print per-method statistics for ALL backends if len(sc.backendMethodStats) > 0 { fmt.Printf("\nPer-Method Backend Comparison (Top 3 Methods):\n") // Collect all unique methods across all backends with their total counts methodCounts := make(map[string]int) for _, methods := range sc.backendMethodStats { for method, durations := range methods { methodCounts[method] += len(durations) } } // Sort methods by total count (descending) type methodCount struct { method string count int } var methodList []methodCount for method, count := range methodCounts { methodList = append(methodList, methodCount{method, count}) } sort.Slice(methodList, func(i, j int) bool { return methodList[i].count > methodList[j].count }) // Only show top 3 methods maxMethods := 3 if len(methodList) < maxMethods { maxMethods = len(methodList) } // For each of the top methods, show stats from all backends for i := 0; i < maxMethods; i++ { method := methodList[i].method fmt.Printf("\n Method: %s (Total requests: %d)\n", method, methodList[i].count) // Check if this is a stateful method if isStatefulMethod(method) { fmt.Printf(" Note: Stateful method - only sent to primary backend\n") } // Show wins for this method if available if methodWins, exists := sc.methodBackendWins[method]; exists { fmt.Printf(" First Response Wins: ") totalMethodWins := 0 for _, wins := range methodWins { totalMethodWins += wins } // Sort backends by wins for this method var methodWinList []backendWin for backend, wins := range methodWins { methodWinList = append(methodWinList, backendWin{backend, wins}) } sort.Slice(methodWinList, func(i, j int) bool { return methodWinList[i].wins > methodWinList[j].wins }) // Print wins inline for idx, bw := range methodWinList { if idx > 0 { fmt.Printf(", ") } winPercentage := float64(bw.wins) / float64(totalMethodWins) * 100 fmt.Printf("%s: %d (%.1f%%)", bw.backend, bw.wins, winPercentage) } fmt.Printf("\n") } fmt.Printf(" %-20s %10s %10s %10s %10s %10s %10s %10s\n", "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") fmt.Printf(" %s\n", strings.Repeat("-", 98)) for _, backend := range backendNames { durations, exists := sc.backendMethodStats[backend][method] if !exists || len(durations) == 0 { continue } sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) min := durations[0] max := durations[len(durations)-1] p50 := min p90 := min p99 := min if len(durations) >= 2 { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 = durations[p50idx] p90 = durations[p90idx] p99 = durations[p99idx] } fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", backend, len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Show User Latency for this method if available if methodActualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(methodActualDurations) > 0 { // Make a copy and sort durations := make([]time.Duration, len(methodActualDurations)) copy(durations, methodActualDurations) sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) min := durations[0] max := durations[len(durations)-1] p50 := min p90 := min p99 := min if len(durations) >= 2 { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 = durations[p50idx] p90 = durations[p90idx] p99 = durations[p99idx] } fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", "User Latency", len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Show Backend Time statistics for this method if methodFirstDurations, exists := sc.methodFirstResponseDurations[method]; exists && len(methodFirstDurations) > 0 { // Make a copy and sort durations := make([]time.Duration, len(methodFirstDurations)) copy(durations, methodFirstDurations) sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) min := durations[0] max := durations[len(durations)-1] p50 := min p90 := min p99 := min if len(durations) >= 2 { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 = durations[p50idx] p90 = durations[p90idx] p99 = durations[p99idx] } fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", "Backend Time", len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) fmt.Printf(" %s\n", strings.Repeat("-", 98)) } } } fmt.Printf("================================\n\n") // Store current interval's CU data in historical data before resetting if sc.totalCU > 0 { sc.historicalCU = append(sc.historicalCU, CUDataPoint{ Timestamp: time.Now(), // Store the end time of the interval CU: sc.totalCU, }) } // Clean up old historical data (keep only last 24 hours + some buffer) cutoff := time.Now().Add(-25 * time.Hour) newStart := 0 for i, point := range sc.historicalCU { if point.Timestamp.After(cutoff) { newStart = i break } } if newStart > 0 { sc.historicalCU = sc.historicalCU[newStart:] } // Reset statistics for the next interval // Keep only the last 1000 requests to prevent unlimited memory growth if len(sc.requestStats) > 1000 { sc.requestStats = sc.requestStats[len(sc.requestStats)-1000:] } // Reset method-specific statistics for method := range sc.methodStats { sc.methodStats[method] = sc.methodStats[method][:0] } // Reset backend method-specific statistics for backend := range sc.backendMethodStats { for method := range sc.backendMethodStats[backend] { sc.backendMethodStats[backend][method] = sc.backendMethodStats[backend][method][:0] } } // Reset backend wins statistics sc.backendWins = make(map[string]int) sc.methodBackendWins = make(map[string]map[string]int) // Reset first response statistics if len(sc.firstResponseDurations) > 1000 { sc.firstResponseDurations = sc.firstResponseDurations[len(sc.firstResponseDurations)-1000:] } else { sc.firstResponseDurations = sc.firstResponseDurations[:0] } if len(sc.actualFirstResponseDurations) > 1000 { sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[len(sc.actualFirstResponseDurations)-1000:] } else { sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[:0] } for method := range sc.methodFirstResponseDurations { sc.methodFirstResponseDurations[method] = sc.methodFirstResponseDurations[method][:0] } for method := range sc.methodActualFirstResponseDurations { sc.methodActualFirstResponseDurations[method] = sc.methodActualFirstResponseDurations[method][:0] } // Reset CU counters for the next interval sc.totalCU = 0 sc.methodCU = make(map[string]int) // Reset error count for the next interval sc.errorCount = 0 sc.skippedSecondaryRequests = 0 sc.totalRequests = 0 sc.totalWsConnections = 0 // Reset WebSocket connections to prevent memory leak sc.wsConnections = sc.wsConnections[:0] // Reset the interval start time for the next interval sc.intervalStartTime = time.Now() } // Helper function to avoid potential index out of bounds func minInt(a, b int) int { if a < b { return a } return b } // calculateCUForTimeWindow calculates total CU for a given time window func (sc *StatsCollector) calculateCUForTimeWindow(window time.Duration) (int, bool) { now := time.Now() cutoff := now.Add(-window) totalCU := 0 var oldestDataStartTime time.Time hasData := false // Add current interval's CU if it started within the window if sc.intervalStartTime.After(cutoff) { totalCU += sc.totalCU oldestDataStartTime = sc.intervalStartTime hasData = true } // Add historical CU data within the window // Historical timestamps represent the END of intervals for i := len(sc.historicalCU) - 1; i >= 0; i-- { point := sc.historicalCU[i] // Calculate the start time of this historical interval intervalStart := point.Timestamp.Add(-sc.summaryInterval) // Skip if the interval ended before our cutoff if point.Timestamp.Before(cutoff) { break } // Include this interval's CU totalCU += point.CU // Track the oldest data start time if !hasData || intervalStart.Before(oldestDataStartTime) { oldestDataStartTime = intervalStart } hasData = true } // Calculate actual data span var actualDataDuration time.Duration if hasData { actualDataDuration = now.Sub(oldestDataStartTime) } // We need extrapolation if we don't have enough historical data to cover the window needsExtrapolation := hasData && actualDataDuration < window return totalCU, needsExtrapolation } // extrapolateCU extrapolates CU data when there's insufficient historical data func (sc *StatsCollector) extrapolateCU(actualCU int, actualDuration, targetDuration time.Duration) int { if actualDuration <= 0 { return 0 } // Calculate CU per second rate cuPerSecond := float64(actualCU) / actualDuration.Seconds() // Extrapolate to target duration extrapolatedCU := cuPerSecond * targetDuration.Seconds() return int(extrapolatedCU) } // formatCUWithExtrapolation formats CU value with extrapolation indicator func formatCUWithExtrapolation(cu int, isExtrapolated bool) string { if isExtrapolated { return fmt.Sprintf("%d CU (extrapolated)", cu) } return fmt.Sprintf("%d CU", cu) } // GetPrimaryP50 calculates the current p50 latency for the primary backend func (sc *StatsCollector) GetPrimaryP50() time.Duration { sc.mu.Lock() defer sc.mu.Unlock() // Collect primary backend durations var primaryDurations []time.Duration for _, stat := range sc.requestStats { if stat.Backend == "primary" && stat.Error == nil { primaryDurations = append(primaryDurations, stat.Duration) } } // If we don't have enough data, return a sensible default if len(primaryDurations) < 10 { return 10 * time.Millisecond // Default to 10ms } // Sort and find p50 sort.Slice(primaryDurations, func(i, j int) bool { return primaryDurations[i] < primaryDurations[j] }) p50idx := len(primaryDurations) * 50 / 100 return primaryDurations[p50idx] } // GetPrimaryP75ForMethod calculates the current p75 latency for a specific method on the primary backend func (sc *StatsCollector) GetPrimaryP75ForMethod(method string) time.Duration { sc.mu.Lock() defer sc.mu.Unlock() // Get method-specific durations for primary backend if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 { // Make a copy to avoid modifying the original durationsCopy := make([]time.Duration, len(durations)) copy(durationsCopy, durations) // Sort and find p75 sort.Slice(durationsCopy, func(i, j int) bool { return durationsCopy[i] < durationsCopy[j] }) p75idx := len(durationsCopy) * 75 / 100 if p75idx >= len(durationsCopy) { p75idx = len(durationsCopy) - 1 } return durationsCopy[p75idx] } // If we don't have enough method-specific data, calculate global p75 here // (instead of calling GetPrimaryP50 which would cause nested mutex lock) var primaryDurations []time.Duration for _, stat := range sc.requestStats { if stat.Backend == "primary" && stat.Error == nil { primaryDurations = append(primaryDurations, stat.Duration) } } // If we don't have enough data, return a sensible default if len(primaryDurations) < 10 { return 15 * time.Millisecond // Default to 15ms for p75 } // Sort and find p75 sort.Slice(primaryDurations, func(i, j int) bool { return primaryDurations[i] < primaryDurations[j] }) p75idx := len(primaryDurations) * 75 / 100 if p75idx >= len(primaryDurations) { p75idx = len(primaryDurations) - 1 } return primaryDurations[p75idx] } // GetPrimaryP50ForMethod calculates the current p50 latency for a specific method on the primary backend func (sc *StatsCollector) GetPrimaryP50ForMethod(method string) time.Duration { sc.mu.Lock() defer sc.mu.Unlock() // Get method-specific durations for primary backend if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 { // Make a copy to avoid modifying the original durationsCopy := make([]time.Duration, len(durations)) copy(durationsCopy, durations) // Sort and find p50 sort.Slice(durationsCopy, func(i, j int) bool { return durationsCopy[i] < durationsCopy[j] }) p50idx := len(durationsCopy) * 50 / 100 return durationsCopy[p50idx] } // If we don't have enough method-specific data, calculate global p50 here // (instead of calling GetPrimaryP50 which would cause nested mutex lock) var primaryDurations []time.Duration for _, stat := range sc.requestStats { if stat.Backend == "primary" && stat.Error == nil { primaryDurations = append(primaryDurations, stat.Duration) } } // If we don't have enough data, return a sensible default if len(primaryDurations) < 10 { return 10 * time.Millisecond // Default to 10ms } // Sort and find p50 sort.Slice(primaryDurations, func(i, j int) bool { return primaryDurations[i] < primaryDurations[j] }) p50idx := len(primaryDurations) * 50 / 100 return primaryDurations[p50idx] } // isStatefulMethod returns true if the method requires session state and must always go to primary func isStatefulMethod(method string) bool { statefulMethods := map[string]bool{ // Filter methods - these create server-side state "eth_newFilter": true, "eth_newBlockFilter": true, "eth_newPendingTransactionFilter": true, "eth_getFilterChanges": true, "eth_getFilterLogs": true, "eth_uninstallFilter": true, // Subscription methods (WebSocket) - maintain persistent connections "eth_subscribe": true, "eth_unsubscribe": true, "eth_subscription": true, // Notification method // Some debug/trace methods might maintain state depending on implementation // But these are typically stateless, so not included here } return statefulMethods[method] } // requiresPrimaryOnlyMethod returns true if the method should always go to primary func requiresPrimaryOnlyMethod(method string) bool { primaryOnlyMethods := map[string]bool{ // Transaction sending methods - must go to primary "eth_sendRawTransaction": true, "eth_sendTransaction": true, // Mempool/txpool methods - these show pending transactions "txpool_content": true, "txpool_inspect": true, "txpool_status": true, "txpool_contentFrom": true, // Mining related methods "eth_mining": true, "eth_hashrate": true, "eth_getWork": true, "eth_submitWork": true, "eth_submitHashrate": true, // Complex methods with special block handling "eth_callMany": true, // Has complex block handling with multiple calls at different blocks "eth_simulateV1": true, // Simulation should use primary for consistency // Trace methods that depend on transaction data "trace_transaction": true, // Traces already mined transaction by hash "trace_replayTransaction": true, // Replays transaction execution by hash "trace_rawTransaction": true, // Simulates raw transaction data // Debug methods that should use primary "debug_traceTransaction": true, // Debug version of trace by tx hash "debug_storageRangeAt": true, // Accesses internal storage state // Transaction query methods - must go to primary // These can return null if tx is not found, and secondary might not have recent txs "eth_getTransactionByHash": true, "eth_getTransactionReceipt": true, } return primaryOnlyMethods[method] } // methodMightReturnNull returns true if the method might legitimately return null // and we should wait for primary's response instead of returning secondary's null func methodMightReturnNull(method string) bool { nullableMethods := map[string]bool{ "eth_getTransactionReceipt": true, "eth_getTransactionByHash": true, "eth_getTransactionByBlockHashAndIndex": true, "eth_getTransactionByBlockNumberAndIndex": true, "eth_getBlockByHash": true, "eth_getBlockByNumber": true, "eth_getUncleByBlockHashAndIndex": true, "eth_getUncleByBlockNumberAndIndex": true, } return nullableMethods[method] } // isNullResponse checks if a JSON-RPC response has a null result func isNullResponse(respBody []byte) bool { // Simple structure to check the result field var response struct { Result json.RawMessage `json:"result"` } if err := json.Unmarshal(respBody, &response); err != nil { return false } // Check if result is null return string(response.Result) == "null" } // methodShouldWaitOnSecondaryError returns true if we should wait for primary // when secondary returns an error response func methodShouldWaitOnSecondaryError(method string) bool { // Methods where secondary errors might be transient and primary could succeed waitOnErrorMethods := map[string]bool{ "eth_call": true, // State execution - secondary might be behind or have issues "eth_estimateGas": true, // Similar to eth_call "trace_call": true, // Tracing calls "debug_traceCall": true, // Debug tracing "eth_createAccessList": true, // Access list creation } return waitOnErrorMethods[method] } // flushingResponseWriter wraps http.ResponseWriter to flush after every write type flushingResponseWriter struct { http.ResponseWriter flusher http.Flusher } func (f *flushingResponseWriter) Write(p []byte) (n int, err error) { n, err = f.ResponseWriter.Write(p) if err == nil && n > 0 { f.flusher.Flush() } return } // peekingReader wraps a reader to peek at the first N bytes and detect null or error responses type peekingReader struct { r io.Reader buf []byte bufPos int isNull bool hasError bool checkDone bool peekSize int } func newPeekingReader(r io.Reader, peekSize int) *peekingReader { return &peekingReader{ r: r, buf: make([]byte, 0, peekSize), peekSize: peekSize, } } func (pr *peekingReader) Read(p []byte) (n int, err error) { // If we haven't finished checking yet if !pr.checkDone && len(pr.buf) < pr.peekSize { // Read more data into our buffer tempBuf := make([]byte, pr.peekSize-len(pr.buf)) readN, readErr := pr.r.Read(tempBuf) if readN > 0 { pr.buf = append(pr.buf, tempBuf[:readN]...) } // Check if we have enough data or hit EOF if len(pr.buf) >= pr.peekSize || readErr == io.EOF { pr.checkDone = true // Check for null response pattern pr.detectPatterns() } if readErr != nil && readErr != io.EOF { return 0, readErr } } // Serve data from buffer first if pr.bufPos < len(pr.buf) { n = copy(p, pr.buf[pr.bufPos:]) pr.bufPos += n return n, nil } // Then serve from underlying reader return pr.r.Read(p) } func (pr *peekingReader) detectPatterns() { // Look for patterns indicating null result or errors in JSON-RPC response bufStr := string(pr.buf) // Remove whitespace for easier pattern matching compactStr := strings.ReplaceAll(bufStr, " ", "") compactStr = strings.ReplaceAll(compactStr, "\n", "") compactStr = strings.ReplaceAll(compactStr, "\r", "") compactStr = strings.ReplaceAll(compactStr, "\t", "") // Check for null result pr.isNull = strings.Contains(compactStr, `"result":null`) // Check for JSON-RPC errors pr.hasError = strings.Contains(compactStr, `"error":`) && !strings.Contains(compactStr, `"error":null`) // Log what we found for debugging if pr.isNull || pr.hasError { log.Printf("Detected short response pattern - null: %v, error: %v, buffer preview: %s", pr.isNull, pr.hasError, strings.ReplaceAll(bufStr[:minInt(len(bufStr), 80)], "\n", " ")) } } func (pr *peekingReader) IsNull() bool { return pr.isNull } func (pr *peekingReader) HasError() bool { return pr.hasError } func main() { // Get configuration from environment variables listenAddr := getEnv("LISTEN_ADDR", ":8080") primaryBackend := getEnv("PRIMARY_BACKEND", "http://localhost:8545") secondaryBackendsStr := getEnv("SECONDARY_BACKENDS", "") summaryIntervalStr := getEnv("SUMMARY_INTERVAL", "60") // Default 60 seconds enableDetailedLogs := getEnv("ENABLE_DETAILED_LOGS", "false") // Default to disabled // Secondary probe configuration enableSecondaryProbing := getEnv("ENABLE_SECONDARY_PROBING", "true") == "true" probeIntervalStr := getEnv("PROBE_INTERVAL", "10") // Default 10 seconds minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") // Method routing configuration secondaryWhitelistStr := getEnv("SECONDARY_WHITELIST", "") // Methods allowed on secondary preferSecondaryStr := getEnv("PREFER_SECONDARY", "") // Methods that should prefer secondary summaryInterval, err := strconv.Atoi(summaryIntervalStr) if err != nil { log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") summaryInterval = 60 } probeInterval, err := strconv.Atoi(probeIntervalStr) if err != nil { log.Printf("Invalid PROBE_INTERVAL, using default of 10 seconds") probeInterval = 10 } minDelayBuffer, err := strconv.Atoi(minDelayBufferStr) if err != nil { log.Printf("Invalid MIN_DELAY_BUFFER, using default of 2ms") minDelayBuffer = 2 } // Parse method routing configuration methodRouting := &MethodRouting{ SecondaryWhitelist: make(map[string]bool), PreferSecondary: make(map[string]bool), } // Parse whitelist if secondaryWhitelistStr != "" { whitelist := strings.Split(secondaryWhitelistStr, ",") for _, method := range whitelist { methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true } log.Printf("Secondary whitelist: %v", whitelist) } // Parse prefer secondary list if preferSecondaryStr != "" { preferList := strings.Split(preferSecondaryStr, ",") for _, method := range preferList { methodRouting.PreferSecondary[strings.TrimSpace(method)] = true // Also add to whitelist automatically methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true } log.Printf("Prefer secondary for methods: %v", preferList) } // Create stats collector for periodic summaries statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") // Configure backends var backends []Backend backends = append(backends, Backend{ URL: primaryBackend, Name: "primary", Role: "primary", }) if secondaryBackendsStr != "" { secondaryList := strings.Split(secondaryBackendsStr, ",") for i, url := range secondaryList { backends = append(backends, Backend{ URL: strings.TrimSpace(url), Name: fmt.Sprintf("secondary-%d", i+1), Role: "secondary", }) } } log.Printf("Starting benchmark proxy on %s", listenAddr) log.Printf("Primary backend: %s", primaryBackend) log.Printf("Secondary backends: %s", secondaryBackendsStr) if enableSecondaryProbing && secondaryBackendsStr != "" { log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer) } // Set up HTTP client with reasonable timeouts client := &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{ MaxIdleConns: 200, // Increased for better connection pooling MaxIdleConnsPerHost: 50, // Increased per-host limit for probing IdleConnTimeout: 120 * time.Second, // Longer idle timeout for connection reuse DisableCompression: true, // Typically JSON-RPC doesn't benefit from compression DisableKeepAlives: false, // Ensure keep-alives are enabled MaxConnsPerHost: 50, // Limit concurrent connections per host }, } // Initialize secondary probe if enabled and we have secondary backends var secondaryProbe *SecondaryProbe if enableSecondaryProbing && secondaryBackendsStr != "" { probeMethods := strings.Split(probeMethodsStr, ",") for i := range probeMethods { probeMethods[i] = strings.TrimSpace(probeMethods[i]) } secondaryProbe = NewSecondaryProbe( backends, client, time.Duration(probeInterval)*time.Second, time.Duration(minDelayBuffer)*time.Millisecond, probeMethods, enableDetailedLogs == "true", ) if secondaryProbe == nil { log.Printf("Secondary probe initialization failed - no secondary backends found") } else { // Set the probe in stats collector for display statsCollector.SetSecondaryProbe(secondaryProbe) } } // Initialize chain head monitor var chainHeadMonitor *ChainHeadMonitor if len(backends) > 1 { // Only create if we have more than just primary chainHeadMonitor = NewChainHeadMonitor(backends, enableDetailedLogs == "true") log.Printf("Chain head monitoring: enabled") // Set the monitor in stats collector for display statsCollector.SetChainHeadMonitor(chainHeadMonitor) } // Configure websocket upgrader with larger buffer sizes // 20MB frame size and 50MB message size const ( maxFrameSize = 20 * 1024 * 1024 // 20MB maxMessageSize = 50 * 1024 * 1024 // 50MB ) upgrader := websocket.Upgrader{ ReadBufferSize: maxFrameSize, WriteBufferSize: maxFrameSize, // Allow all origins CheckOrigin: func(r *http.Request) bool { return true }, } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { // Check if this is a WebSocket upgrade request if websocket.IsWebSocketUpgrade(r) { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor, methodRouting) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor, methodRouting *MethodRouting) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) ctx, cancel := context.WithTimeout(r.Context(), 35*time.Second) defer cancel() // Limit request body size to 10MB to prevent memory exhaustion const maxBodySize = 10 * 1024 * 1024 // 10MB r.Body = http.MaxBytesReader(w, r.Body, maxBodySize) // Read the entire request body body, err := io.ReadAll(r.Body) if err != nil { // Check if the error is due to body size limit if strings.Contains(err.Error(), "request body too large") { http.Error(w, "Request body too large (max 10MB)", http.StatusRequestEntityTooLarge) } else { http.Error(w, "Error reading request body", http.StatusBadRequest) } return } defer r.Body.Close() // Parse request to extract method information (handles both single and batch) batchInfo, err := parseBatchInfo(body) if err != nil { http.Error(w, "Invalid JSON-RPC request", http.StatusBadRequest) return } // For logging and stats, use the first method or "batch" for batch requests var displayMethod string var isStateful bool var requiresPrimaryDueToBlockTag bool if batchInfo.IsBatch { displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount) isStateful = batchInfo.HasStateful requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary } else { displayMethod = batchInfo.Methods[0] if displayMethod == "" { displayMethod = "unknown" } isStateful = batchInfo.HasStateful requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary } if enableDetailedLogs { if batchInfo.IsBatch { log.Printf("Batch request: %d requests, methods: %s, block tags: %v", batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.BlockTags) } else { var blockTagInfo string if len(batchInfo.BlockTags) > 0 { blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0]) } log.Printf("Method: %s%s", displayMethod, blockTagInfo) } } // Check method routing configuration if !batchInfo.IsBatch && methodRouting != nil { // For single methods, check routing rules method := displayMethod // Check if this method prefers secondary backends if methodRouting.PreferSecondary[method] { if enableDetailedLogs { log.Printf("Method %s configured to prefer secondary backends", method) } } // Check if whitelist is configured and method is not in it if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { // Method not in whitelist - force primary only if enableDetailedLogs { log.Printf("Method %s not in secondary whitelist - using primary only", method) } // This will be enforced in the backend loop } } // Process backends with adaptive delay strategy var wg sync.WaitGroup var primaryWg sync.WaitGroup // Separate wait group for primary backend statsChan := make(chan ResponseStats, len(backends)) responseChan := make(chan struct { backend string resp *http.Response err error }, len(backends)) // Track if we've already sent a response var responseHandled atomic.Bool var firstBackendStartTime atomic.Pointer[time.Time] // Count secondary backends for proper channel sizing secondaryCount := 0 for _, backend := range backends { if backend.Role == "secondary" { secondaryCount++ } } // Buffer size = number of secondary backends so all can receive signal primaryResponseChan := make(chan struct{}, secondaryCount) // Signal when primary gets a response primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately for _, backend := range backends { // Method routing checks for secondary backends if backend.Role == "secondary" && methodRouting != nil && !batchInfo.IsBatch { method := displayMethod // Check if whitelist is configured and method is not in it if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] { if enableDetailedLogs { log.Printf("Skipping secondary backend %s for method %s (not in whitelist)", backend.Name, method) } continue } } // Skip primary backend if method prefers secondary (and we have secondary backends available) if backend.Role == "primary" && methodRouting != nil && !batchInfo.IsBatch { method := displayMethod if methodRouting.PreferSecondary[method] && len(backends) > 1 { if enableDetailedLogs { log.Printf("Skipping primary backend for method %s (configured to prefer secondary)", method) } continue } } // Skip secondary backends for stateful methods if isStateful && backend.Role != "primary" { if enableDetailedLogs { log.Printf("Skipping secondary backend %s for stateful method %s", backend.Name, displayMethod) } continue } // Skip secondary backends if request requires primary due to block tag if requiresPrimaryDueToBlockTag && backend.Role != "primary" { if enableDetailedLogs { log.Printf("Skipping secondary backend %s due to block tag requiring primary", backend.Name) } continue } // Check if secondary backend can handle "latest" block tag requests if backend.Role == "secondary" && len(batchInfo.BlockTags) > 0 { // Check all block tags in the request canUseSecondary := true for _, blockTag := range batchInfo.BlockTags { if !canUseSecondaryForBlockTag(blockTag, backend.Name, chainHeadMonitor) { canUseSecondary = false if enableDetailedLogs { log.Printf("Skipping secondary backend %s for block tag '%s' - not at required height", backend.Name, blockTag) } break } } if !canUseSecondary { continue } } // Skip unhealthy secondary backends if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) { if enableDetailedLogs { log.Printf("Skipping unhealthy secondary backend %s for %s", backend.Name, displayMethod) } continue } wg.Add(1) if backend.Role == "primary" { primaryWg.Add(1) } go func(b Backend) { defer wg.Done() if b.Role == "primary" { defer primaryWg.Done() } // Track when this goroutine actually starts processing goroutineStartTime := time.Now() // Record the first backend start time (should be primary) if b.Role == "primary" { t := goroutineStartTime firstBackendStartTime.Store(&t) } // If this is a secondary backend, wait for p75 delay if b.Role != "primary" { // Skip delay if this method prefers secondary backends skipDelay := false if methodRouting != nil && !batchInfo.IsBatch { if methodRouting.PreferSecondary[displayMethod] { skipDelay = true if enableDetailedLogs { log.Printf("Secondary backend %s starting immediately for method %s (configured to prefer secondary)", b.Name, displayMethod) } } } if !skipDelay { // Get backend-specific delay var backendSpecificDelay time.Duration if batchInfo.IsBatch { // For batch requests, use the maximum delay of all methods for this backend backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector) } else if secondaryProbe != nil { backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod) } else { // Fallback to method-based delay if no probe backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod) } if enableDetailedLogs { log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod) } delayTimer := time.NewTimer(backendSpecificDelay) select { case <-delayTimer.C: // Timer expired, primary is slow, proceed with secondary request case <-primaryResponseChan: // Primary already got a response, skip secondary delayTimer.Stop() // Still record that we skipped this backend statsChan <- ResponseStats{ Backend: b.Name, Error: fmt.Errorf("skipped - primary responded quickly"), Method: displayMethod, Duration: time.Since(goroutineStartTime), } return case <-primaryFailedFast: // Primary failed immediately, start secondary now delayTimer.Stop() if enableDetailedLogs { log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) } } } } // Create a new request (no longer using context for cancellation) backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body)) if err != nil { statsChan <- ResponseStats{ Backend: b.Name, Error: err, Method: displayMethod, Duration: time.Since(goroutineStartTime), // Include any wait time } return } // Copy headers for name, values := range r.Header { for _, value := range values { backendReq.Header.Add(name, value) } } // Send the request reqStart := time.Now() resp, err := client.Do(backendReq) reqDuration := time.Since(reqStart) if err != nil { // If primary failed with connection error, signal secondary to start if b.Role == "primary" { select { case primaryFailedFast <- struct{}{}: default: } } statsChan <- ResponseStats{ Backend: b.Name, Duration: reqDuration, // Keep backend-specific duration Error: err, Method: displayMethod, } return } // Don't close resp.Body here - it will be closed by the winner or drained by losers // Check if primary returned an error status if b.Role == "primary" && resp.StatusCode >= 400 { // Any 4xx or 5xx error should trigger immediate secondary select { case primaryFailedFast <- struct{}{}: default: } } // Signal primary response immediately for secondary backends to check if b.Role == "primary" && resp.StatusCode < 400 { // Send signal for each secondary backend so they all get notified for i := 0; i < secondaryCount; i++ { select { case primaryResponseChan <- struct{}{}: default: // Channel is full, some secondaries already received signal } } } statsChan <- ResponseStats{ Backend: b.Name, StatusCode: resp.StatusCode, Duration: reqDuration, // This is the backend-specific duration Method: displayMethod, } // CRITICAL FIX: Only allow secondary backends to win if they have successful responses if b.Role == "secondary" && resp.StatusCode >= 400 { // Secondary returned an error - DO NOT let it win the race if enableDetailedLogs { log.Printf("Secondary backend %s returned error status %d for %s - ignoring", b.Name, resp.StatusCode, displayMethod) } // Still need to drain and close the body go func() { defer resp.Body.Close() io.Copy(io.Discard, resp.Body) }() return } // CRITICAL FIX 2: Check for null responses from secondary backends for certain methods if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) { // Check Content-Length first - null responses are typically very short contentLength := resp.Header.Get("Content-Length") if contentLength != "" { length, err := strconv.Atoi(contentLength) if err == nil && length < 100 { // Null responses are typically < 100 bytes // This is suspiciously short, likely a null response // Create a peeking reader to confirm peeker := newPeekingReader(resp.Body, 100) resp.Body = io.NopCloser(peeker) // Read a bit to trigger null detection smallBuf := make([]byte, 1) peeker.Read(smallBuf) if peeker.IsNull() { // Confirmed null response - don't let secondary win if enableDetailedLogs { log.Printf("Secondary backend %s returned null for %s (Content-Length: %s) - waiting for primary", b.Name, displayMethod, contentLength) } // Close the response body resp.Body.Close() return } // Not null, continue with normal flow } } else { // No Content-Length header, use peeking reader anyway for safety peeker := newPeekingReader(resp.Body, 200) // Peek at first 200 bytes resp.Body = io.NopCloser(peeker) // Read a bit to trigger null detection smallBuf := make([]byte, 1) peeker.Read(smallBuf) if peeker.IsNull() { // Confirmed null response - don't let secondary win if enableDetailedLogs { log.Printf("Secondary backend %s returned null for %s - waiting for primary", b.Name, displayMethod) } // Close the response body resp.Body.Close() return } } } // CRITICAL FIX 3: Check for error responses from secondary backends for certain methods if b.Role == "secondary" && resp.StatusCode == 200 && methodShouldWaitOnSecondaryError(displayMethod) { // Check Content-Length first - error responses are typically short contentLength := resp.Header.Get("Content-Length") if contentLength != "" { length, err := strconv.Atoi(contentLength) if err == nil && length < 500 { // Error responses are typically < 500 bytes // This might be an error response // Create a peeking reader to confirm peeker := newPeekingReader(resp.Body, 500) resp.Body = io.NopCloser(peeker) // Read a bit to trigger error detection smallBuf := make([]byte, 1) peeker.Read(smallBuf) if peeker.HasError() { // Confirmed error response - don't let secondary win if enableDetailedLogs { log.Printf("Secondary backend %s returned JSON-RPC error for %s (Content-Length: %s) - waiting for primary", b.Name, displayMethod, contentLength) } // Close the response body resp.Body.Close() return } // Not an error, continue with normal flow } } else { // No Content-Length header, use peeking reader anyway for safety peeker := newPeekingReader(resp.Body, 500) // Peek at first 500 bytes resp.Body = io.NopCloser(peeker) // Read a bit to trigger error detection smallBuf := make([]byte, 1) peeker.Read(smallBuf) if peeker.HasError() { // Confirmed error response - don't let secondary win if enableDetailedLogs { log.Printf("Secondary backend %s returned JSON-RPC error for %s - waiting for primary", b.Name, displayMethod) } // Close the response body resp.Body.Close() return } } } // Try to be the first to respond if responseHandled.CompareAndSwap(false, true) { responseChan <- struct { backend string resp *http.Response err error }{b.Name, resp, nil} } else { // Not the winning response, need to drain and close the body // Use a goroutine with timeout to prevent hanging go func() { defer resp.Body.Close() // Create a deadline for draining done := make(chan struct{}) go func() { io.Copy(io.Discard, resp.Body) close(done) }() select { case <-done: // Drained successfully case <-time.After(5 * time.Second): // Timeout draining, just close if enableDetailedLogs { log.Printf("Timeout draining response from backend %s", b.Name) } } }() } }(backend) } // Wait for the first successful response var response struct { backend string resp *http.Response err error } var responseReceivedTime time.Time select { case response = <-responseChan: // Got a response responseReceivedTime = time.Now() case <-time.After(30 * time.Second): // Timeout if !responseHandled.CompareAndSwap(false, true) { // Someone else handled it response = <-responseChan responseReceivedTime = time.Now() } else { http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout) // Always wait for primary backend to complete before collecting stats go func() { primaryWg.Wait() // Wait for primary first wg.Wait() // Then wait for all close(statsChan) }() // Collect stats var stats []ResponseStats for stat := range statsChan { stats = append(stats, stat) } return } } // Send the response to the client if response.err == nil && response.resp != nil { defer response.resp.Body.Close() // Copy response headers for name, values := range response.resp.Header { for _, value := range values { w.Header().Add(name, value) } } // Ensure we flush data as it arrives for better streaming flusher, canFlush := w.(http.Flusher) w.WriteHeader(response.resp.StatusCode) // Track when streaming started streamingStartTime := time.Now() // Stream the response body to the client with proper error handling done := make(chan error, 1) go func() { // Use a custom writer that flushes periodically var err error if canFlush { // Flush every 32KB for better streaming performance flushingWriter := &flushingResponseWriter{ ResponseWriter: w, flusher: flusher, } _, err = io.Copy(flushingWriter, response.resp.Body) } else { _, err = io.Copy(w, response.resp.Body) } done <- err }() select { case streamErr := <-done: if streamErr != nil { if enableDetailedLogs { log.Printf("Error streaming response body: %v", streamErr) } // Connection is likely broken, can't send error to client } case <-ctx.Done(): // Context timeout - client connection might be gone if enableDetailedLogs { streamingDuration := time.Since(streamingStartTime) totalRequestDuration := time.Since(startTime) // Determine if it was a timeout or cancellation reason := "timeout" if ctx.Err() == context.Canceled { reason = "client disconnection" } log.Printf("Context cancelled while streaming response from backend '%s' (method: %s) after streaming for %s (total request time: %s) - reason: %s", response.backend, displayMethod, streamingDuration, totalRequestDuration, reason) } } } else { // No valid response received from any backend http.Error(w, "All backends failed", http.StatusBadGateway) } // Collect stats asynchronously to avoid blocking the response go func() { // Always wait for primary backend to complete before collecting stats // This ensures primary backend stats are always included primaryWg.Wait() // Wait for primary backend to complete first wg.Wait() // Then wait for all other backends close(statsChan) // Collect stats var stats []ResponseStats for stat := range statsChan { stats = append(stats, stat) } // Log response times if enabled totalDuration := time.Since(startTime) if enableDetailedLogs { logResponseStats(totalDuration, stats) } // Add the actual user-experienced duration for the winning response if response.err == nil && response.backend != "" { // Find the stat for the winning backend and update it with the actual user-experienced duration for i := range stats { if stats[i].Backend == response.backend && stats[i].Error == nil { // Calculate user latency from when the first backend started processing var userLatency time.Duration if firstStart := firstBackendStartTime.Load(); firstStart != nil && !responseReceivedTime.IsZero() { userLatency = responseReceivedTime.Sub(*firstStart) } else { // Fallback to original calculation if somehow we don't have the times userLatency = time.Since(startTime) } // Create a special stat entry for the actual first response time actualFirstResponseStat := ResponseStats{ Backend: "actual-first-response", StatusCode: stats[i].StatusCode, Duration: userLatency, Error: nil, Method: stats[i].Method, } stats = append(stats, actualFirstResponseStat) break } } } // Send stats to collector statsCollector.AddStats(stats, 0) // If this was a successful batch request from primary, add batch stats for CU calculation if batchInfo.IsBatch && response.err == nil && response.backend == "primary" { // Find the primary backend stat with successful response for _, stat := range stats { if stat.Backend == "primary" && stat.Error == nil { statsCollector.AddBatchStats(batchInfo.Methods, stat.Duration, "primary") break } } } }() // Return immediately after sending response to client return } func logResponseStats(totalDuration time.Duration, stats []ResponseStats) { // Format: timestamp | total_time | method | backend1:time1 | backend2:time2 | ... var parts []string parts = append(parts, time.Now().Format("2006-01-02 15:04:05.000")) parts = append(parts, fmt.Sprintf("total:%s", totalDuration)) // Add method if available (use the first stat with a method) method := "unknown" for _, stat := range stats { if stat.Method != "" { method = stat.Method break } } parts = append(parts, fmt.Sprintf("method:%s", method)) for _, stat := range stats { if stat.Error != nil { parts = append(parts, fmt.Sprintf("%s:error:%s", stat.Backend, stat.Error)) } else { parts = append(parts, fmt.Sprintf("%s:%d:%s", stat.Backend, stat.StatusCode, stat.Duration)) } } fmt.Println(strings.Join(parts, " | ")) } // handleWebSocketRequest manages WebSocket proxying func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []Backend, httpClient *http.Client, upgrader *websocket.Upgrader, statsCollector *StatsCollector) { // Upgrade the client connection clientConn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("Failed to upgrade client connection: %v", err) return } defer clientConn.Close() // Set max message size for client connection clientConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit // Connect to all backends var wg sync.WaitGroup primaryConnected := make(chan bool, 1) // Track if primary connected successfully for _, backend := range backends { wg.Add(1) go func(b Backend) { defer wg.Done() // Create backend URL with ws/wss instead of http/https backendURL := strings.Replace(b.URL, "http://", "ws://", 1) backendURL = strings.Replace(backendURL, "https://", "wss://", 1) // Create a clean header map for the dialer header := http.Header{} // Only copy non-WebSocket headers if origin := r.Header.Get("Origin"); origin != "" { header.Set("Origin", origin) } if userAgent := r.Header.Get("User-Agent"); userAgent != "" { header.Set("User-Agent", userAgent) } startTime := time.Now() // Connect to backend WebSocket with larger buffer sizes dialer := &websocket.Dialer{ ReadBufferSize: 20 * 1024 * 1024, // 20MB WriteBufferSize: 20 * 1024 * 1024, // 20MB } backendConn, resp, err := dialer.Dial(backendURL, header) connectDuration := time.Since(startTime) stats := WebSocketStats{ Backend: b.Name, ConnectTime: connectDuration, IsActive: false, } if err != nil { status := 0 if resp != nil { status = resp.StatusCode } log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status) stats.Error = err statsCollector.AddWebSocketStats(stats) // If primary failed to connect, signal that if b.Role == "primary" { select { case primaryConnected <- false: default: } } return } defer backendConn.Close() // Set max message size for backend connection backendConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit stats.IsActive = true statsCollector.AddWebSocketStats(stats) // If this is the primary backend, signal successful connection if b.Role == "primary" { select { case primaryConnected <- true: default: } } // If this is the primary backend, set up bidirectional proxying if b.Role == "primary" { // Channel to signal when primary connection fails primaryFailed := make(chan struct{}, 2) // Buffered for 2 signals // Forward messages from client to primary backend go func() { for { messageType, message, err := clientConn.ReadMessage() if err != nil { log.Printf("Error reading from client: %v", err) select { case primaryFailed <- struct{}{}: default: } return } err = backendConn.WriteMessage(messageType, message) if err != nil { log.Printf("Error writing to primary backend: %v", err) select { case primaryFailed <- struct{}{}: default: } return } } }() // Forward messages from primary backend to client go func() { for { messageType, message, err := backendConn.ReadMessage() if err != nil { log.Printf("Error reading from primary backend: %v", err) select { case primaryFailed <- struct{}{}: default: } return } err = clientConn.WriteMessage(messageType, message) if err != nil { log.Printf("Error writing to client: %v", err) select { case primaryFailed <- struct{}{}: default: } return } } }() // Wait for primary connection failure <-primaryFailed // Primary backend failed, close client connection with proper close message log.Printf("Primary backend WebSocket failed, closing client connection") closeMsg := websocket.FormatCloseMessage(websocket.CloseGoingAway, "Primary backend unavailable") clientConn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(time.Second)) // Return to trigger cleanup return } else { // For secondary backends, just read and discard messages for { _, _, err := backendConn.ReadMessage() if err != nil { log.Printf("Secondary backend %s connection closed: %v", b.Name, err) return } } } }(backend) } // Wait for all connections to terminate wg.Wait() // Check if primary connected successfully select { case connected := <-primaryConnected: if !connected { // Primary failed to connect, close client connection log.Printf("Primary backend WebSocket failed to connect, closing client connection") closeMsg := websocket.FormatCloseMessage(websocket.CloseServiceRestart, "Primary backend unavailable") clientConn.WriteControl(websocket.CloseMessage, closeMsg, time.Now().Add(time.Second)) } default: // No primary backend in the configuration (shouldn't happen) log.Printf("Warning: No primary backend configured for WebSocket") } } func getEnv(key, fallback string) string { if value, exists := os.LookupEnv(key); exists { return value } return fallback } // NewChainHeadMonitor creates a new chain head monitor func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHeadMonitor { monitor := &ChainHeadMonitor{ backends: backends, chainHeads: make(map[string]*ChainHead), enabledBackends: make(map[string]bool), blockHashCache: make(map[string]uint64), blockHashOrder: make([]string, 0, 128), wsDialer: &websocket.Dialer{ ReadBufferSize: 1024 * 1024, // 1MB WriteBufferSize: 1024 * 1024, // 1MB }, stopChan: make(chan struct{}), enableDetailedLogs: enableDetailedLogs, } // Start monitoring go monitor.startMonitoring() return monitor } // IsBackendHealthy checks if a backend is healthy and at the correct chain head func (m *ChainHeadMonitor) IsBackendHealthy(backendName string) bool { m.mu.RLock() defer m.mu.RUnlock() // Primary is always considered healthy for request routing if backendName == "primary" { return true } enabled, exists := m.enabledBackends[backendName] if !exists { return false // Unknown backend } return enabled } // startMonitoring starts WebSocket monitoring for all backends func (m *ChainHeadMonitor) startMonitoring() { // Initial delay to let backends start time.Sleep(2 * time.Second) for _, backend := range m.backends { go m.monitorBackend(backend) } // Periodic health check ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: m.checkBackendHealth() case <-m.stopChan: return } } } // monitorBackend monitors a single backend via WebSocket func (m *ChainHeadMonitor) monitorBackend(backend Backend) { backoffDelay := time.Second for { select { case <-m.stopChan: return default: } // Create WebSocket URL wsURL := strings.Replace(backend.URL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) // Connect to WebSocket conn, _, err := m.wsDialer.Dial(wsURL, nil) if err != nil { m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("WebSocket connection failed: %v", err)) time.Sleep(backoffDelay) backoffDelay = min(backoffDelay*2, 30*time.Second) continue } if m.enableDetailedLogs { log.Printf("Connected to %s WebSocket for chain head monitoring", backend.Name) } // Reset backoff on successful connection backoffDelay = time.Second // Get chain ID first chainID, err := m.getChainID(conn, backend.Name) if err != nil { conn.Close() m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to get chain ID: %v", err)) time.Sleep(backoffDelay) continue } // Store chain ID (especially important for primary) if backend.Role == "primary" { m.mu.Lock() m.primaryChainID = chainID m.mu.Unlock() } // Subscribe to new heads subscribeMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}`) err = conn.WriteMessage(websocket.TextMessage, subscribeMsg) if err != nil { conn.Close() m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to subscribe: %v", err)) time.Sleep(backoffDelay) continue } // Read subscription response _, msg, err := conn.ReadMessage() if err != nil { conn.Close() m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to read subscription response: %v", err)) time.Sleep(backoffDelay) continue } var subResponse struct { Result string `json:"result"` Error *struct { Message string `json:"message"` } `json:"error"` } if err := json.Unmarshal(msg, &subResponse); err != nil || subResponse.Error != nil { conn.Close() errMsg := "Subscription failed" if subResponse.Error != nil { errMsg = subResponse.Error.Message } m.updateBackendStatus(backend.Name, nil, errMsg) time.Sleep(backoffDelay) continue } // Read new head notifications m.readNewHeads(conn, backend.Name, chainID) conn.Close() time.Sleep(backoffDelay) } } // getChainID gets the chain ID from a backend func (m *ChainHeadMonitor) getChainID(conn *websocket.Conn, backendName string) (string, error) { // Send eth_chainId request chainIDMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":"chainId"}`) if err := conn.WriteMessage(websocket.TextMessage, chainIDMsg); err != nil { return "", err } // Set read deadline conn.SetReadDeadline(time.Now().Add(5 * time.Second)) defer conn.SetReadDeadline(time.Time{}) // Read response _, msg, err := conn.ReadMessage() if err != nil { return "", err } var response struct { Result string `json:"result"` Error *struct { Message string `json:"message"` } `json:"error"` } if err := json.Unmarshal(msg, &response); err != nil { return "", err } if response.Error != nil { return "", fmt.Errorf("RPC error: %s", response.Error.Message) } return response.Result, nil } // readNewHeads reads new head notifications from WebSocket func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string, chainID string) { // Set long read deadline for subscriptions conn.SetReadDeadline(time.Now().Add(60 * time.Second)) for { _, msg, err := conn.ReadMessage() if err != nil { m.updateBackendStatus(backendName, nil, fmt.Sprintf("WebSocket read error: %v", err)) return } // Reset read deadline after successful read conn.SetReadDeadline(time.Now().Add(60 * time.Second)) // Parse the subscription notification var notification struct { Params struct { Result struct { Number string `json:"number"` // Hex encoded block number Hash string `json:"hash"` // Block hash } `json:"result"` } `json:"params"` } if err := json.Unmarshal(msg, ¬ification); err != nil { continue // Skip malformed messages } // Convert hex block number to uint64 if notification.Params.Result.Number != "" { blockNumber, err := strconv.ParseUint(strings.TrimPrefix(notification.Params.Result.Number, "0x"), 16, 64) if err != nil { continue } head := &ChainHead{ BlockNumber: blockNumber, BlockHash: notification.Params.Result.Hash, ChainID: chainID, LastUpdate: time.Now(), IsHealthy: true, } m.updateBackendStatus(backendName, head, "") // Cache block hash if this is from primary backend if backendName == "primary" && notification.Params.Result.Hash != "" { m.cacheBlockHash(notification.Params.Result.Hash, blockNumber) } if m.enableDetailedLogs { log.Printf("Backend %s at block %d (hash: %s...)", backendName, blockNumber, head.BlockHash[:8]) } } } } // updateBackendStatus updates the status of a backend func (m *ChainHeadMonitor) updateBackendStatus(backendName string, head *ChainHead, errorMsg string) { m.mu.Lock() defer m.mu.Unlock() if head != nil { m.chainHeads[backendName] = head } else if errorMsg != "" { // Update or create entry with error if existing, exists := m.chainHeads[backendName]; exists { existing.IsHealthy = false existing.Error = errorMsg existing.LastUpdate = time.Now() } else { m.chainHeads[backendName] = &ChainHead{ IsHealthy: false, Error: errorMsg, LastUpdate: time.Now(), } } } // Update enabled status m.updateEnabledStatus() } // updateEnabledStatus updates which backends are enabled based on chain head func (m *ChainHeadMonitor) updateEnabledStatus() { // Get primary chain head primaryHead, primaryExists := m.chainHeads["primary"] if !primaryExists || !primaryHead.IsHealthy { // If primary is not healthy/available (e.g., during restarts), enable all healthy secondary backends if m.enableDetailedLogs { log.Printf("Primary backend not healthy/available - enabling all healthy secondary backends") } for _, backend := range m.backends { if backend.Role == "primary" { m.enabledBackends[backend.Name] = true // Always mark primary as enabled for routing continue } head, exists := m.chainHeads[backend.Name] if exists && head.IsHealthy { m.enabledBackends[backend.Name] = true if m.enableDetailedLogs { log.Printf("Backend %s enabled (primary unavailable)", backend.Name) } } else { m.enabledBackends[backend.Name] = false } } return } // Check each backend for _, backend := range m.backends { if backend.Role == "primary" { m.enabledBackends[backend.Name] = true continue } head, exists := m.chainHeads[backend.Name] if !exists || !head.IsHealthy { m.enabledBackends[backend.Name] = false if m.enableDetailedLogs { log.Printf("Backend %s disabled: not healthy", backend.Name) } continue } // Check if on same chain if head.ChainID != primaryHead.ChainID { m.enabledBackends[backend.Name] = false if m.enableDetailedLogs { log.Printf("Backend %s disabled: wrong chain ID (got %s, want %s)", backend.Name, head.ChainID, primaryHead.ChainID) } continue } // STRICT RULE: Only allow if secondary matches primary height or is ahead if head.BlockNumber < primaryHead.BlockNumber { m.enabledBackends[backend.Name] = false if m.enableDetailedLogs { log.Printf("Backend %s disabled: behind primary (at block %d, primary at %d)", backend.Name, head.BlockNumber, primaryHead.BlockNumber) } continue } // Secondary is at same height or ahead - enable it m.enabledBackends[backend.Name] = true if m.enableDetailedLogs { if head.BlockNumber > primaryHead.BlockNumber { log.Printf("Backend %s enabled: ahead of primary (at block %d, primary at %d)", backend.Name, head.BlockNumber, primaryHead.BlockNumber) } else { log.Printf("Backend %s enabled: at same block as primary (%d)", backend.Name, head.BlockNumber) } } } } // checkBackendHealth performs periodic health checks func (m *ChainHeadMonitor) checkBackendHealth() { m.mu.Lock() defer m.mu.Unlock() now := time.Now() // Check for stale data (no update in 90 seconds) for _, head := range m.chainHeads { if now.Sub(head.LastUpdate) > 90*time.Second { head.IsHealthy = false head.Error = "No recent updates" } } // Update enabled status m.updateEnabledStatus() // Log current status if detailed logs enabled if m.enableDetailedLogs { log.Printf("Chain head monitor status:") for name, enabled := range m.enabledBackends { status := "disabled" if enabled { status = "enabled" } if head, exists := m.chainHeads[name]; exists { if head.IsHealthy { log.Printf(" %s: %s, block %d, chain %s", name, status, head.BlockNumber, head.ChainID) } else { log.Printf(" %s: %s, error: %s", name, status, head.Error) } } else { log.Printf(" %s: %s, no data", name, status) } } } } // GetStatus returns the current status of all backends func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead { m.mu.RLock() defer m.mu.RUnlock() status := make(map[string]ChainHead) for name, head := range m.chainHeads { status[name] = *head } return status } // cacheBlockHash adds a block hash to the cache, maintaining a maximum of 128 entries func (m *ChainHeadMonitor) cacheBlockHash(blockHash string, blockNumber uint64) { m.mu.Lock() defer m.mu.Unlock() // Check if hash already exists if _, exists := m.blockHashCache[blockHash]; exists { return } // Add to cache m.blockHashCache[blockHash] = blockNumber m.blockHashOrder = append(m.blockHashOrder, blockHash) // Maintain maximum cache size of 128 blocks if len(m.blockHashOrder) > 128 { // Remove oldest entry oldestHash := m.blockHashOrder[0] delete(m.blockHashCache, oldestHash) m.blockHashOrder = m.blockHashOrder[1:] } } // GetBlockNumberForHash returns the block number for a given hash if it's in the cache func (m *ChainHeadMonitor) GetBlockNumberForHash(blockHash string) (uint64, bool) { m.mu.RLock() defer m.mu.RUnlock() blockNumber, exists := m.blockHashCache[blockHash] return blockNumber, exists }