From c02c7468a8acdae2ea0c05a916526efc03f06957 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Fri, 30 May 2025 08:55:42 +0700 Subject: [PATCH] rigid block tag routing --- benchmark-proxy/main.go | 316 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 304 insertions(+), 12 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 33c1d875..febd9617 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -28,10 +28,12 @@ type JSONRPCRequest struct { // BatchInfo contains information about a batch request type BatchInfo struct { - IsBatch bool - Methods []string - RequestCount int - HasStateful bool + IsBatch bool + Methods []string + RequestCount int + HasStateful bool + BlockTags []string // Added to track block tags in batch + RequiresPrimary bool // Added to indicate if batch requires primary due to block tags } // parseBatchInfo analyzes the request body to extract method information @@ -49,6 +51,7 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) { IsBatch: true, RequestCount: len(batchReqs), Methods: make([]string, 0, len(batchReqs)), + BlockTags: make([]string, 0), } // Extract methods and check for stateful ones @@ -57,24 +60,47 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) { if req.Method != "" { info.Methods = append(info.Methods, req.Method) methodSet[req.Method] = true - if isStatefulMethod(req.Method) { + if isStatefulMethod(req.Method) || requiresPrimaryOnlyMethod(req.Method) { info.HasStateful = true } } } + // Extract block tags from the batch + blockTags, err := parseBlockTagsFromBatch(body) + if err == nil { + info.BlockTags = blockTags + // Check if any block tag requires primary + for _, tag := range blockTags { + if requiresPrimaryBackend(tag) { + info.RequiresPrimary = true + break + } + } + } + return info, nil } // Try parsing as single request var singleReq JSONRPCRequest if err := json.Unmarshal(body, &singleReq); err == nil { - return &BatchInfo{ + info := &BatchInfo{ IsBatch: false, Methods: []string{singleReq.Method}, RequestCount: 1, - HasStateful: isStatefulMethod(singleReq.Method), - }, nil + HasStateful: isStatefulMethod(singleReq.Method) || requiresPrimaryOnlyMethod(singleReq.Method), + BlockTags: make([]string, 0), + } + + // Extract block tag from single request + reqInfo, err := parseRequestInfo(body) + if err == nil && reqInfo.BlockTag != "" { + info.BlockTags = []string{reqInfo.BlockTag} + info.RequiresPrimary = requiresPrimaryBackend(reqInfo.BlockTag) + } + + return info, nil } // Neither batch nor single request @@ -220,6 +246,158 @@ type ChainHead struct { Error string // Last error if any } +// RequestInfo contains parsed information about a JSON-RPC request +type RequestInfo struct { + Method string + BlockTag string + HasParams bool +} + +// Full JSON-RPC request structure for parsing parameters +type JSONRPCFullRequest struct { + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` +} + +// parseRequestInfo extracts detailed information from a JSON-RPC request +func parseRequestInfo(body []byte) (*RequestInfo, error) { + var req JSONRPCFullRequest + if err := json.Unmarshal(body, &req); err != nil { + return nil, err + } + + info := &RequestInfo{ + Method: req.Method, + HasParams: len(req.Params) > 0, + } + + // Methods that commonly use block tags + methodsWithBlockTags := map[string]int{ + "eth_getBalance": -1, // last param + "eth_getCode": -1, // last param + "eth_getTransactionCount": -1, // last param + "eth_getStorageAt": -1, // last param + "eth_call": -1, // last param + "eth_estimateGas": -1, // last param + "eth_getProof": -1, // last param + "eth_getBlockByNumber": 0, // first param + "eth_getBlockTransactionCountByNumber": 0, // first param + "eth_getTransactionByBlockNumberAndIndex": 0, // first param + "eth_getUncleByBlockNumberAndIndex": 0, // first param + "eth_getUncleCountByBlockNumber": 0, // first param + // Note: eth_getLogs uses a filter object with fromBlock/toBlock fields, + // which would need special handling and is not included here + } + + paramPos, hasBlockTag := methodsWithBlockTags[req.Method] + if !hasBlockTag || !info.HasParams { + return info, nil + } + + // Parse params as array + var params []json.RawMessage + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + // Not an array, might be object params + return info, nil + } + + if len(params) == 0 { + return info, nil + } + + // Determine which parameter to check + var blockTagParam json.RawMessage + if paramPos == -1 { + // Last parameter + blockTagParam = params[len(params)-1] + } else if paramPos < len(params) { + // Specific position + blockTagParam = params[paramPos] + } else { + return info, nil + } + + // Try to parse as string (block tag) + var blockTag string + if err := json.Unmarshal(blockTagParam, &blockTag); err == nil { + info.BlockTag = blockTag + } + + return info, nil +} + +// parseBlockTagsFromBatch extracts block tags from all requests in a batch +func parseBlockTagsFromBatch(body []byte) ([]string, error) { + var batchReqs []JSONRPCFullRequest + if err := json.Unmarshal(body, &batchReqs); err != nil { + return nil, err + } + + blockTags := make([]string, 0) + for _, req := range batchReqs { + reqBytes, err := json.Marshal(req) + if err != nil { + continue + } + + info, err := parseRequestInfo(reqBytes) + if err != nil { + continue + } + + if info.BlockTag != "" { + blockTags = append(blockTags, info.BlockTag) + } + } + + return blockTags, nil +} + +// requiresPrimaryBackend checks if a request must be routed to primary based on block tag +func requiresPrimaryBackend(blockTag string) bool { + // These block tags must always go to primary + primaryOnlyTags := map[string]bool{ + "finalized": true, + "pending": true, + "safe": true, + } + + return primaryOnlyTags[blockTag] +} + +// canUseSecondaryForLatest checks if secondary backend can be used for "latest" block tag +func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { + // Only check for "latest" tag + if blockTag != "latest" { + // For non-latest tags (like specific block numbers), follow existing rules + return true + } + + if chainHeadMonitor == nil { + // No monitor, can't verify - be conservative + return false + } + + // Get chain head status + chainStatus := chainHeadMonitor.GetStatus() + + primaryHead, primaryExists := chainStatus["primary"] + if !primaryExists || !primaryHead.IsHealthy { + // Primary not healthy, allow secondary + return true + } + + secondaryHead, secondaryExists := chainStatus[backendName] + if !secondaryExists || !secondaryHead.IsHealthy { + // Secondary not healthy + return false + } + + // For "latest", secondary must be at EXACTLY the same block height + return secondaryHead.BlockNumber == primaryHead.BlockNumber +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -1673,6 +1851,62 @@ func isStatefulMethod(method string) bool { return statefulMethods[method] } +// requiresPrimaryOnlyMethod returns true if the method should always go to primary +func requiresPrimaryOnlyMethod(method string) bool { + primaryOnlyMethods := map[string]bool{ + // Transaction sending methods - must go to primary + "eth_sendRawTransaction": true, + "eth_sendTransaction": true, + + // Mempool/txpool methods - these show pending transactions + "txpool_content": true, + "txpool_inspect": true, + "txpool_status": true, + "txpool_contentFrom": true, + + // Mining related methods + "eth_mining": true, + "eth_hashrate": true, + "eth_getWork": true, + "eth_submitWork": true, + "eth_submitHashrate": true, + } + + return primaryOnlyMethods[method] +} + +// methodMightReturnNull returns true if the method might legitimately return null +// and we should wait for primary's response instead of returning secondary's null +func methodMightReturnNull(method string) bool { + nullableMethods := map[string]bool{ + "eth_getTransactionReceipt": true, + "eth_getTransactionByHash": true, + "eth_getTransactionByBlockHashAndIndex": true, + "eth_getTransactionByBlockNumberAndIndex": true, + "eth_getBlockByHash": true, + "eth_getBlockByNumber": true, + "eth_getUncleByBlockHashAndIndex": true, + "eth_getUncleByBlockNumberAndIndex": true, + } + + return nullableMethods[method] +} + +// isNullResponse checks if a JSON-RPC response has a null result +func isNullResponse(respBody []byte) bool { + // Simple structure to check the result field + var response struct { + Result json.RawMessage `json:"result"` + } + + if err := json.Unmarshal(respBody, &response); err != nil { + return false + } + + // Check if result is null + return string(response.Result) == "null" +} + // flushingResponseWriter wraps http.ResponseWriter to flush after every write type flushingResponseWriter struct { http.ResponseWriter @@ -1857,16 +2091,19 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // For logging and stats, use the first method or "batch" for batch requests var displayMethod string var isStateful bool + var requiresPrimaryDueToBlockTag bool if batchInfo.IsBatch { displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount) isStateful = batchInfo.HasStateful + requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary } else { displayMethod = batchInfo.Methods[0] if displayMethod == "" { displayMethod = "unknown" } isStateful = batchInfo.HasStateful + requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary } // Get delay threshold for secondary backends @@ -1886,12 +2123,16 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c if enableDetailedLogs { if batchInfo.IsBatch { - log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v)", + log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v), block tags: %v", batchInfo.RequestCount, formatMethodList(batchInfo.Methods), - secondaryDelay, secondaryProbe != nil) + secondaryDelay, secondaryProbe != nil, batchInfo.BlockTags) } else { - log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", - displayMethod, secondaryDelay, secondaryProbe != nil) + var blockTagInfo string + if len(batchInfo.BlockTags) > 0 { + blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0]) + } + log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)%s", + displayMethod, secondaryDelay, secondaryProbe != nil, blockTagInfo) } } @@ -1914,9 +2155,39 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c for _, backend := range backends { // Skip secondary backends for stateful methods if isStateful && backend.Role != "primary" { + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s for stateful method %s", backend.Name, displayMethod) + } continue } + // Skip secondary backends if request requires primary due to block tag + if requiresPrimaryDueToBlockTag && backend.Role != "primary" { + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s due to block tag requiring primary", backend.Name) + } + continue + } + + // Check if secondary backend can handle "latest" block tag requests + if backend.Role == "secondary" && len(batchInfo.BlockTags) > 0 { + // Check all block tags in the request + canUseSecondary := true + for _, blockTag := range batchInfo.BlockTags { + if !canUseSecondaryForLatest(blockTag, backend.Name, chainHeadMonitor) { + canUseSecondary = false + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s for block tag '%s' - not at same height as primary", + backend.Name, blockTag) + } + break + } + } + if !canUseSecondary { + continue + } + } + // Skip unhealthy secondary backends if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) { if enableDetailedLogs { @@ -2057,6 +2328,27 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c return } + // CRITICAL FIX 2: Check for null responses from secondary backends for certain methods + if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) { + // Need to read the body to check if it's null + bodyBytes, err := io.ReadAll(resp.Body) + resp.Body.Close() // Close the original body + + if err == nil && isNullResponse(bodyBytes) { + // Secondary returned null - don't let it win the race + if enableDetailedLogs { + log.Printf("Secondary backend %s returned null for %s - waiting for primary", + b.Name, displayMethod) + } + return + } + + // Not null or couldn't read - recreate the body for potential use + if err == nil { + resp.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + } + } + // Try to be the first to respond if responseHandled.CompareAndSwap(false, true) { responseChan <- struct {