rigid block tag routing

This commit is contained in:
Para Dox
2025-05-30 08:55:42 +07:00
parent 99bc61a018
commit c02c7468a8

View File

@@ -32,6 +32,8 @@ type BatchInfo struct {
Methods []string Methods []string
RequestCount int RequestCount int
HasStateful bool HasStateful bool
BlockTags []string // Added to track block tags in batch
RequiresPrimary bool // Added to indicate if batch requires primary due to block tags
} }
// parseBatchInfo analyzes the request body to extract method information // parseBatchInfo analyzes the request body to extract method information
@@ -49,6 +51,7 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) {
IsBatch: true, IsBatch: true,
RequestCount: len(batchReqs), RequestCount: len(batchReqs),
Methods: make([]string, 0, len(batchReqs)), Methods: make([]string, 0, len(batchReqs)),
BlockTags: make([]string, 0),
} }
// Extract methods and check for stateful ones // Extract methods and check for stateful ones
@@ -57,24 +60,47 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) {
if req.Method != "" { if req.Method != "" {
info.Methods = append(info.Methods, req.Method) info.Methods = append(info.Methods, req.Method)
methodSet[req.Method] = true methodSet[req.Method] = true
if isStatefulMethod(req.Method) { if isStatefulMethod(req.Method) || requiresPrimaryOnlyMethod(req.Method) {
info.HasStateful = true info.HasStateful = true
} }
} }
} }
// Extract block tags from the batch
blockTags, err := parseBlockTagsFromBatch(body)
if err == nil {
info.BlockTags = blockTags
// Check if any block tag requires primary
for _, tag := range blockTags {
if requiresPrimaryBackend(tag) {
info.RequiresPrimary = true
break
}
}
}
return info, nil return info, nil
} }
// Try parsing as single request // Try parsing as single request
var singleReq JSONRPCRequest var singleReq JSONRPCRequest
if err := json.Unmarshal(body, &singleReq); err == nil { if err := json.Unmarshal(body, &singleReq); err == nil {
return &BatchInfo{ info := &BatchInfo{
IsBatch: false, IsBatch: false,
Methods: []string{singleReq.Method}, Methods: []string{singleReq.Method},
RequestCount: 1, RequestCount: 1,
HasStateful: isStatefulMethod(singleReq.Method), HasStateful: isStatefulMethod(singleReq.Method) || requiresPrimaryOnlyMethod(singleReq.Method),
}, nil BlockTags: make([]string, 0),
}
// Extract block tag from single request
reqInfo, err := parseRequestInfo(body)
if err == nil && reqInfo.BlockTag != "" {
info.BlockTags = []string{reqInfo.BlockTag}
info.RequiresPrimary = requiresPrimaryBackend(reqInfo.BlockTag)
}
return info, nil
} }
// Neither batch nor single request // Neither batch nor single request
@@ -220,6 +246,158 @@ type ChainHead struct {
Error string // Last error if any Error string // Last error if any
} }
// RequestInfo contains parsed information about a JSON-RPC request
type RequestInfo struct {
Method string
BlockTag string
HasParams bool
}
// Full JSON-RPC request structure for parsing parameters
type JSONRPCFullRequest struct {
Method string `json:"method"`
Params json.RawMessage `json:"params"`
ID interface{} `json:"id"`
}
// parseRequestInfo extracts detailed information from a JSON-RPC request
func parseRequestInfo(body []byte) (*RequestInfo, error) {
var req JSONRPCFullRequest
if err := json.Unmarshal(body, &req); err != nil {
return nil, err
}
info := &RequestInfo{
Method: req.Method,
HasParams: len(req.Params) > 0,
}
// Methods that commonly use block tags
methodsWithBlockTags := map[string]int{
"eth_getBalance": -1, // last param
"eth_getCode": -1, // last param
"eth_getTransactionCount": -1, // last param
"eth_getStorageAt": -1, // last param
"eth_call": -1, // last param
"eth_estimateGas": -1, // last param
"eth_getProof": -1, // last param
"eth_getBlockByNumber": 0, // first param
"eth_getBlockTransactionCountByNumber": 0, // first param
"eth_getTransactionByBlockNumberAndIndex": 0, // first param
"eth_getUncleByBlockNumberAndIndex": 0, // first param
"eth_getUncleCountByBlockNumber": 0, // first param
// Note: eth_getLogs uses a filter object with fromBlock/toBlock fields,
// which would need special handling and is not included here
}
paramPos, hasBlockTag := methodsWithBlockTags[req.Method]
if !hasBlockTag || !info.HasParams {
return info, nil
}
// Parse params as array
var params []json.RawMessage
if err := json.Unmarshal(req.Params, &params); err != nil {
// Not an array, might be object params
return info, nil
}
if len(params) == 0 {
return info, nil
}
// Determine which parameter to check
var blockTagParam json.RawMessage
if paramPos == -1 {
// Last parameter
blockTagParam = params[len(params)-1]
} else if paramPos < len(params) {
// Specific position
blockTagParam = params[paramPos]
} else {
return info, nil
}
// Try to parse as string (block tag)
var blockTag string
if err := json.Unmarshal(blockTagParam, &blockTag); err == nil {
info.BlockTag = blockTag
}
return info, nil
}
// parseBlockTagsFromBatch extracts block tags from all requests in a batch
func parseBlockTagsFromBatch(body []byte) ([]string, error) {
var batchReqs []JSONRPCFullRequest
if err := json.Unmarshal(body, &batchReqs); err != nil {
return nil, err
}
blockTags := make([]string, 0)
for _, req := range batchReqs {
reqBytes, err := json.Marshal(req)
if err != nil {
continue
}
info, err := parseRequestInfo(reqBytes)
if err != nil {
continue
}
if info.BlockTag != "" {
blockTags = append(blockTags, info.BlockTag)
}
}
return blockTags, nil
}
// requiresPrimaryBackend checks if a request must be routed to primary based on block tag
func requiresPrimaryBackend(blockTag string) bool {
// These block tags must always go to primary
primaryOnlyTags := map[string]bool{
"finalized": true,
"pending": true,
"safe": true,
}
return primaryOnlyTags[blockTag]
}
// canUseSecondaryForLatest checks if secondary backend can be used for "latest" block tag
func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool {
// Only check for "latest" tag
if blockTag != "latest" {
// For non-latest tags (like specific block numbers), follow existing rules
return true
}
if chainHeadMonitor == nil {
// No monitor, can't verify - be conservative
return false
}
// Get chain head status
chainStatus := chainHeadMonitor.GetStatus()
primaryHead, primaryExists := chainStatus["primary"]
if !primaryExists || !primaryHead.IsHealthy {
// Primary not healthy, allow secondary
return true
}
secondaryHead, secondaryExists := chainStatus[backendName]
if !secondaryExists || !secondaryHead.IsHealthy {
// Secondary not healthy
return false
}
// For "latest", secondary must be at EXACTLY the same block height
return secondaryHead.BlockNumber == primaryHead.BlockNumber
}
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
now := time.Now() now := time.Now()
sc := &StatsCollector{ sc := &StatsCollector{
@@ -1673,6 +1851,62 @@ func isStatefulMethod(method string) bool {
return statefulMethods[method] return statefulMethods[method]
} }
// requiresPrimaryOnlyMethod returns true if the method should always go to primary
func requiresPrimaryOnlyMethod(method string) bool {
primaryOnlyMethods := map[string]bool{
// Transaction sending methods - must go to primary
"eth_sendRawTransaction": true,
"eth_sendTransaction": true,
// Mempool/txpool methods - these show pending transactions
"txpool_content": true,
"txpool_inspect": true,
"txpool_status": true,
"txpool_contentFrom": true,
// Mining related methods
"eth_mining": true,
"eth_hashrate": true,
"eth_getWork": true,
"eth_submitWork": true,
"eth_submitHashrate": true,
}
return primaryOnlyMethods[method]
}
// methodMightReturnNull returns true if the method might legitimately return null
// and we should wait for primary's response instead of returning secondary's null
func methodMightReturnNull(method string) bool {
nullableMethods := map[string]bool{
"eth_getTransactionReceipt": true,
"eth_getTransactionByHash": true,
"eth_getTransactionByBlockHashAndIndex": true,
"eth_getTransactionByBlockNumberAndIndex": true,
"eth_getBlockByHash": true,
"eth_getBlockByNumber": true,
"eth_getUncleByBlockHashAndIndex": true,
"eth_getUncleByBlockNumberAndIndex": true,
}
return nullableMethods[method]
}
// isNullResponse checks if a JSON-RPC response has a null result
func isNullResponse(respBody []byte) bool {
// Simple structure to check the result field
var response struct {
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(respBody, &response); err != nil {
return false
}
// Check if result is null
return string(response.Result) == "null"
}
// flushingResponseWriter wraps http.ResponseWriter to flush after every write // flushingResponseWriter wraps http.ResponseWriter to flush after every write
type flushingResponseWriter struct { type flushingResponseWriter struct {
http.ResponseWriter http.ResponseWriter
@@ -1857,16 +2091,19 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// For logging and stats, use the first method or "batch" for batch requests // For logging and stats, use the first method or "batch" for batch requests
var displayMethod string var displayMethod string
var isStateful bool var isStateful bool
var requiresPrimaryDueToBlockTag bool
if batchInfo.IsBatch { if batchInfo.IsBatch {
displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount) displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount)
isStateful = batchInfo.HasStateful isStateful = batchInfo.HasStateful
requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary
} else { } else {
displayMethod = batchInfo.Methods[0] displayMethod = batchInfo.Methods[0]
if displayMethod == "" { if displayMethod == "" {
displayMethod = "unknown" displayMethod = "unknown"
} }
isStateful = batchInfo.HasStateful isStateful = batchInfo.HasStateful
requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary
} }
// Get delay threshold for secondary backends // Get delay threshold for secondary backends
@@ -1886,12 +2123,16 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
if enableDetailedLogs { if enableDetailedLogs {
if batchInfo.IsBatch { if batchInfo.IsBatch {
log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v)", log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v), block tags: %v",
batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.RequestCount, formatMethodList(batchInfo.Methods),
secondaryDelay, secondaryProbe != nil) secondaryDelay, secondaryProbe != nil, batchInfo.BlockTags)
} else { } else {
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", var blockTagInfo string
displayMethod, secondaryDelay, secondaryProbe != nil) if len(batchInfo.BlockTags) > 0 {
blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0])
}
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)%s",
displayMethod, secondaryDelay, secondaryProbe != nil, blockTagInfo)
} }
} }
@@ -1914,9 +2155,39 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
for _, backend := range backends { for _, backend := range backends {
// Skip secondary backends for stateful methods // Skip secondary backends for stateful methods
if isStateful && backend.Role != "primary" { if isStateful && backend.Role != "primary" {
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s for stateful method %s", backend.Name, displayMethod)
}
continue continue
} }
// Skip secondary backends if request requires primary due to block tag
if requiresPrimaryDueToBlockTag && backend.Role != "primary" {
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s due to block tag requiring primary", backend.Name)
}
continue
}
// Check if secondary backend can handle "latest" block tag requests
if backend.Role == "secondary" && len(batchInfo.BlockTags) > 0 {
// Check all block tags in the request
canUseSecondary := true
for _, blockTag := range batchInfo.BlockTags {
if !canUseSecondaryForLatest(blockTag, backend.Name, chainHeadMonitor) {
canUseSecondary = false
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s for block tag '%s' - not at same height as primary",
backend.Name, blockTag)
}
break
}
}
if !canUseSecondary {
continue
}
}
// Skip unhealthy secondary backends // Skip unhealthy secondary backends
if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) { if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) {
if enableDetailedLogs { if enableDetailedLogs {
@@ -2057,6 +2328,27 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
return return
} }
// CRITICAL FIX 2: Check for null responses from secondary backends for certain methods
if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) {
// Need to read the body to check if it's null
bodyBytes, err := io.ReadAll(resp.Body)
resp.Body.Close() // Close the original body
if err == nil && isNullResponse(bodyBytes) {
// Secondary returned null - don't let it win the race
if enableDetailedLogs {
log.Printf("Secondary backend %s returned null for %s - waiting for primary",
b.Name, displayMethod)
}
return
}
// Not null or couldn't read - recreate the body for potential use
if err == nil {
resp.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
}
// Try to be the first to respond // Try to be the first to respond
if responseHandled.CompareAndSwap(false, true) { if responseHandled.CompareAndSwap(false, true) {
responseChan <- struct { responseChan <- struct {