diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 50216858..0e02c9e9 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -231,6 +231,8 @@ type ChainHeadMonitor struct { chainHeads map[string]*ChainHead // backend name -> chain head info primaryChainID string // Chain ID of primary backend enabledBackends map[string]bool // Track which backends are enabled + blockHashCache map[string]uint64 // block hash -> block number cache (last 128 blocks from primary) + blockHashOrder []string // ordered list of block hashes (oldest first) wsDialer *websocket.Dialer stopChan chan struct{} enableDetailedLogs bool @@ -339,6 +341,31 @@ func parseRequestInfo(body []byte) (*RequestInfo, error) { // which needs special handling } + // Methods that use block hashes as parameters + methodsWithBlockHashes := map[string]int{ + "eth_getBlockByHash": 0, // first param + "eth_getBlockTransactionCountByHash": 0, // first param + "eth_getTransactionByBlockHashAndIndex": 0, // first param + "eth_getUncleByBlockHashAndIndex": 0, // first param + "eth_getUncleCountByBlockHash": 0, // first param + "debug_traceBlockByHash": 0, // first param + } + + // Check for block hash methods first + paramPos, hasBlockHash := methodsWithBlockHashes[req.Method] + if hasBlockHash && info.HasParams { + // Parse params as array + var params []json.RawMessage + if err := json.Unmarshal(req.Params, ¶ms); err == nil && len(params) > paramPos { + // Try to parse as string (block hash) + var blockHash string + if err := json.Unmarshal(params[paramPos], &blockHash); err == nil { + info.BlockTag = blockHash + return info, nil + } + } + } + paramPos, hasBlockTag := methodsWithBlockTags[req.Method] if !hasBlockTag || !info.HasParams { return info, nil @@ -561,6 +588,62 @@ func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMoni return secondaryHead.BlockNumber == primaryHead.BlockNumber } +// canUseSecondaryForBlockTag checks if secondary backend can be used for a given block tag +func canUseSecondaryForBlockTag(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { + if chainHeadMonitor == nil { + // No monitor, can't verify - be conservative + return false + } + + // Get chain head status + chainStatus := chainHeadMonitor.GetStatus() + + primaryHead, primaryExists := chainStatus["primary"] + if !primaryExists || !primaryHead.IsHealthy { + // Primary not healthy, allow secondary + return true + } + + secondaryHead, secondaryExists := chainStatus[backendName] + if !secondaryExists || !secondaryHead.IsHealthy { + // Secondary not healthy + return false + } + + // Handle "latest" tag - secondary must be at EXACTLY the same block height + if blockTag == "latest" { + return secondaryHead.BlockNumber == primaryHead.BlockNumber + } + + // Handle "earliest" tag - always allowed + if blockTag == "earliest" { + return true + } + + // Check if it's a block hash (0x followed by 64 hex chars) + if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") { + // Try to look up the block number from our cache + if blockNumber, exists := chainHeadMonitor.GetBlockNumberForHash(blockTag); exists { + // We know this block number, check if secondary has it + return secondaryHead.BlockNumber >= blockNumber + } + // Unknown block hash - be conservative and route to primary + return false + } + + // Check if it's a numeric block tag (hex number) + if strings.HasPrefix(blockTag, "0x") { + blockNumber, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64) + if err == nil { + // Valid block number - check if secondary has reached it + return secondaryHead.BlockNumber >= blockNumber + } + } + + // Unknown block tag format - be conservative + return false +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -1209,6 +1292,12 @@ func (sc *StatsCollector) printSummary() { fmt.Printf(" %s: %s (%s)\n", name, status, details) } + + // Show block hash cache stats + sc.chainHeadMonitor.mu.RLock() + cacheSize := len(sc.chainHeadMonitor.blockHashCache) + sc.chainHeadMonitor.mu.RUnlock() + fmt.Printf(" Block hash cache: %d entries (max 128)\n", cacheSize) } if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { @@ -2351,10 +2440,10 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Check all block tags in the request canUseSecondary := true for _, blockTag := range batchInfo.BlockTags { - if !canUseSecondaryForLatest(blockTag, backend.Name, chainHeadMonitor) { + if !canUseSecondaryForBlockTag(blockTag, backend.Name, chainHeadMonitor) { canUseSecondary = false if enableDetailedLogs { - log.Printf("Skipping secondary backend %s for block tag '%s' - not at same height as primary", + log.Printf("Skipping secondary backend %s for block tag '%s' - not at required height", backend.Name, blockTag) } break @@ -2971,6 +3060,8 @@ func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHead backends: backends, chainHeads: make(map[string]*ChainHead), enabledBackends: make(map[string]bool), + blockHashCache: make(map[string]uint64), + blockHashOrder: make([]string, 0, 128), wsDialer: &websocket.Dialer{ ReadBufferSize: 1024 * 1024, // 1MB WriteBufferSize: 1024 * 1024, // 1MB @@ -3200,6 +3291,11 @@ func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string m.updateBackendStatus(backendName, head, "") + // Cache block hash if this is from primary backend + if backendName == "primary" && notification.Params.Result.Hash != "" { + m.cacheBlockHash(notification.Params.Result.Hash, blockNumber) + } + if m.enableDetailedLogs { log.Printf("Backend %s at block %d (hash: %s...)", backendName, blockNumber, head.BlockHash[:8]) @@ -3364,3 +3460,35 @@ func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead { } return status } + +// cacheBlockHash adds a block hash to the cache, maintaining a maximum of 128 entries +func (m *ChainHeadMonitor) cacheBlockHash(blockHash string, blockNumber uint64) { + m.mu.Lock() + defer m.mu.Unlock() + + // Check if hash already exists + if _, exists := m.blockHashCache[blockHash]; exists { + return + } + + // Add to cache + m.blockHashCache[blockHash] = blockNumber + m.blockHashOrder = append(m.blockHashOrder, blockHash) + + // Maintain maximum cache size of 128 blocks + if len(m.blockHashOrder) > 128 { + // Remove oldest entry + oldestHash := m.blockHashOrder[0] + delete(m.blockHashCache, oldestHash) + m.blockHashOrder = m.blockHashOrder[1:] + } +} + +// GetBlockNumberForHash returns the block number for a given hash if it's in the cache +func (m *ChainHeadMonitor) GetBlockNumberForHash(blockHash string) (uint64, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + blockNumber, exists := m.blockHashCache[blockHash] + return blockNumber, exists +}