track block hashes
This commit is contained in:
@@ -231,6 +231,8 @@ type ChainHeadMonitor struct {
|
|||||||
chainHeads map[string]*ChainHead // backend name -> chain head info
|
chainHeads map[string]*ChainHead // backend name -> chain head info
|
||||||
primaryChainID string // Chain ID of primary backend
|
primaryChainID string // Chain ID of primary backend
|
||||||
enabledBackends map[string]bool // Track which backends are enabled
|
enabledBackends map[string]bool // Track which backends are enabled
|
||||||
|
blockHashCache map[string]uint64 // block hash -> block number cache (last 128 blocks from primary)
|
||||||
|
blockHashOrder []string // ordered list of block hashes (oldest first)
|
||||||
wsDialer *websocket.Dialer
|
wsDialer *websocket.Dialer
|
||||||
stopChan chan struct{}
|
stopChan chan struct{}
|
||||||
enableDetailedLogs bool
|
enableDetailedLogs bool
|
||||||
@@ -339,6 +341,31 @@ func parseRequestInfo(body []byte) (*RequestInfo, error) {
|
|||||||
// which needs special handling
|
// which needs special handling
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Methods that use block hashes as parameters
|
||||||
|
methodsWithBlockHashes := map[string]int{
|
||||||
|
"eth_getBlockByHash": 0, // first param
|
||||||
|
"eth_getBlockTransactionCountByHash": 0, // first param
|
||||||
|
"eth_getTransactionByBlockHashAndIndex": 0, // first param
|
||||||
|
"eth_getUncleByBlockHashAndIndex": 0, // first param
|
||||||
|
"eth_getUncleCountByBlockHash": 0, // first param
|
||||||
|
"debug_traceBlockByHash": 0, // first param
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for block hash methods first
|
||||||
|
paramPos, hasBlockHash := methodsWithBlockHashes[req.Method]
|
||||||
|
if hasBlockHash && info.HasParams {
|
||||||
|
// Parse params as array
|
||||||
|
var params []json.RawMessage
|
||||||
|
if err := json.Unmarshal(req.Params, ¶ms); err == nil && len(params) > paramPos {
|
||||||
|
// Try to parse as string (block hash)
|
||||||
|
var blockHash string
|
||||||
|
if err := json.Unmarshal(params[paramPos], &blockHash); err == nil {
|
||||||
|
info.BlockTag = blockHash
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
paramPos, hasBlockTag := methodsWithBlockTags[req.Method]
|
paramPos, hasBlockTag := methodsWithBlockTags[req.Method]
|
||||||
if !hasBlockTag || !info.HasParams {
|
if !hasBlockTag || !info.HasParams {
|
||||||
return info, nil
|
return info, nil
|
||||||
@@ -561,6 +588,62 @@ func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMoni
|
|||||||
return secondaryHead.BlockNumber == primaryHead.BlockNumber
|
return secondaryHead.BlockNumber == primaryHead.BlockNumber
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// canUseSecondaryForBlockTag checks if secondary backend can be used for a given block tag
|
||||||
|
func canUseSecondaryForBlockTag(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool {
|
||||||
|
if chainHeadMonitor == nil {
|
||||||
|
// No monitor, can't verify - be conservative
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get chain head status
|
||||||
|
chainStatus := chainHeadMonitor.GetStatus()
|
||||||
|
|
||||||
|
primaryHead, primaryExists := chainStatus["primary"]
|
||||||
|
if !primaryExists || !primaryHead.IsHealthy {
|
||||||
|
// Primary not healthy, allow secondary
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
secondaryHead, secondaryExists := chainStatus[backendName]
|
||||||
|
if !secondaryExists || !secondaryHead.IsHealthy {
|
||||||
|
// Secondary not healthy
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle "latest" tag - secondary must be at EXACTLY the same block height
|
||||||
|
if blockTag == "latest" {
|
||||||
|
return secondaryHead.BlockNumber == primaryHead.BlockNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle "earliest" tag - always allowed
|
||||||
|
if blockTag == "earliest" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if it's a block hash (0x followed by 64 hex chars)
|
||||||
|
if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") {
|
||||||
|
// Try to look up the block number from our cache
|
||||||
|
if blockNumber, exists := chainHeadMonitor.GetBlockNumberForHash(blockTag); exists {
|
||||||
|
// We know this block number, check if secondary has it
|
||||||
|
return secondaryHead.BlockNumber >= blockNumber
|
||||||
|
}
|
||||||
|
// Unknown block hash - be conservative and route to primary
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if it's a numeric block tag (hex number)
|
||||||
|
if strings.HasPrefix(blockTag, "0x") {
|
||||||
|
blockNumber, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64)
|
||||||
|
if err == nil {
|
||||||
|
// Valid block number - check if secondary has reached it
|
||||||
|
return secondaryHead.BlockNumber >= blockNumber
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unknown block tag format - be conservative
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
|
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
sc := &StatsCollector{
|
sc := &StatsCollector{
|
||||||
@@ -1209,6 +1292,12 @@ func (sc *StatsCollector) printSummary() {
|
|||||||
|
|
||||||
fmt.Printf(" %s: %s (%s)\n", name, status, details)
|
fmt.Printf(" %s: %s (%s)\n", name, status, details)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Show block hash cache stats
|
||||||
|
sc.chainHeadMonitor.mu.RLock()
|
||||||
|
cacheSize := len(sc.chainHeadMonitor.blockHashCache)
|
||||||
|
sc.chainHeadMonitor.mu.RUnlock()
|
||||||
|
fmt.Printf(" Block hash cache: %d entries (max 128)\n", cacheSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 {
|
if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 {
|
||||||
@@ -2351,10 +2440,10 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
// Check all block tags in the request
|
// Check all block tags in the request
|
||||||
canUseSecondary := true
|
canUseSecondary := true
|
||||||
for _, blockTag := range batchInfo.BlockTags {
|
for _, blockTag := range batchInfo.BlockTags {
|
||||||
if !canUseSecondaryForLatest(blockTag, backend.Name, chainHeadMonitor) {
|
if !canUseSecondaryForBlockTag(blockTag, backend.Name, chainHeadMonitor) {
|
||||||
canUseSecondary = false
|
canUseSecondary = false
|
||||||
if enableDetailedLogs {
|
if enableDetailedLogs {
|
||||||
log.Printf("Skipping secondary backend %s for block tag '%s' - not at same height as primary",
|
log.Printf("Skipping secondary backend %s for block tag '%s' - not at required height",
|
||||||
backend.Name, blockTag)
|
backend.Name, blockTag)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@@ -2971,6 +3060,8 @@ func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHead
|
|||||||
backends: backends,
|
backends: backends,
|
||||||
chainHeads: make(map[string]*ChainHead),
|
chainHeads: make(map[string]*ChainHead),
|
||||||
enabledBackends: make(map[string]bool),
|
enabledBackends: make(map[string]bool),
|
||||||
|
blockHashCache: make(map[string]uint64),
|
||||||
|
blockHashOrder: make([]string, 0, 128),
|
||||||
wsDialer: &websocket.Dialer{
|
wsDialer: &websocket.Dialer{
|
||||||
ReadBufferSize: 1024 * 1024, // 1MB
|
ReadBufferSize: 1024 * 1024, // 1MB
|
||||||
WriteBufferSize: 1024 * 1024, // 1MB
|
WriteBufferSize: 1024 * 1024, // 1MB
|
||||||
@@ -3200,6 +3291,11 @@ func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string
|
|||||||
|
|
||||||
m.updateBackendStatus(backendName, head, "")
|
m.updateBackendStatus(backendName, head, "")
|
||||||
|
|
||||||
|
// Cache block hash if this is from primary backend
|
||||||
|
if backendName == "primary" && notification.Params.Result.Hash != "" {
|
||||||
|
m.cacheBlockHash(notification.Params.Result.Hash, blockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
if m.enableDetailedLogs {
|
if m.enableDetailedLogs {
|
||||||
log.Printf("Backend %s at block %d (hash: %s...)",
|
log.Printf("Backend %s at block %d (hash: %s...)",
|
||||||
backendName, blockNumber, head.BlockHash[:8])
|
backendName, blockNumber, head.BlockHash[:8])
|
||||||
@@ -3364,3 +3460,35 @@ func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead {
|
|||||||
}
|
}
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cacheBlockHash adds a block hash to the cache, maintaining a maximum of 128 entries
|
||||||
|
func (m *ChainHeadMonitor) cacheBlockHash(blockHash string, blockNumber uint64) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
// Check if hash already exists
|
||||||
|
if _, exists := m.blockHashCache[blockHash]; exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to cache
|
||||||
|
m.blockHashCache[blockHash] = blockNumber
|
||||||
|
m.blockHashOrder = append(m.blockHashOrder, blockHash)
|
||||||
|
|
||||||
|
// Maintain maximum cache size of 128 blocks
|
||||||
|
if len(m.blockHashOrder) > 128 {
|
||||||
|
// Remove oldest entry
|
||||||
|
oldestHash := m.blockHashOrder[0]
|
||||||
|
delete(m.blockHashCache, oldestHash)
|
||||||
|
m.blockHashOrder = m.blockHashOrder[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBlockNumberForHash returns the block number for a given hash if it's in the cache
|
||||||
|
func (m *ChainHeadMonitor) GetBlockNumberForHash(blockHash string) (uint64, bool) {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
blockNumber, exists := m.blockHashCache[blockHash]
|
||||||
|
return blockNumber, exists
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user