whitelist methods

This commit is contained in:
Para Dox
2025-06-01 15:42:45 +07:00
parent 6858e94734
commit 6eb5518a2d

View File

@@ -248,6 +248,12 @@ type ChainHead struct {
Error string // Last error if any
}
// MethodRouting contains configuration for method routing decisions
type MethodRouting struct {
SecondaryWhitelist map[string]bool // Methods allowed on secondary backends
PreferSecondary map[string]bool // Methods that should prefer secondary backends
}
// RequestInfo contains parsed information about a JSON-RPC request
type RequestInfo struct {
Method string
@@ -2196,6 +2202,21 @@ func isNullResponse(respBody []byte) bool {
return string(response.Result) == "null"
}
// methodShouldWaitOnSecondaryError returns true if we should wait for primary
// when secondary returns an error response
func methodShouldWaitOnSecondaryError(method string) bool {
// Methods where secondary errors might be transient and primary could succeed
waitOnErrorMethods := map[string]bool{
"eth_call": true, // State execution - secondary might be behind or have issues
"eth_estimateGas": true, // Similar to eth_call
"trace_call": true, // Tracing calls
"debug_traceCall": true, // Debug tracing
"eth_createAccessList": true, // Access list creation
}
return waitOnErrorMethods[method]
}
// flushingResponseWriter wraps http.ResponseWriter to flush after every write
type flushingResponseWriter struct {
http.ResponseWriter
@@ -2210,6 +2231,89 @@ func (f *flushingResponseWriter) Write(p []byte) (n int, err error) {
return
}
// peekingReader wraps a reader to peek at the first N bytes and detect null or error responses
type peekingReader struct {
r io.Reader
buf []byte
bufPos int
isNull bool
hasError bool
checkDone bool
peekSize int
}
func newPeekingReader(r io.Reader, peekSize int) *peekingReader {
return &peekingReader{
r: r,
buf: make([]byte, 0, peekSize),
peekSize: peekSize,
}
}
func (pr *peekingReader) Read(p []byte) (n int, err error) {
// If we haven't finished checking yet
if !pr.checkDone && len(pr.buf) < pr.peekSize {
// Read more data into our buffer
tempBuf := make([]byte, pr.peekSize-len(pr.buf))
readN, readErr := pr.r.Read(tempBuf)
if readN > 0 {
pr.buf = append(pr.buf, tempBuf[:readN]...)
}
// Check if we have enough data or hit EOF
if len(pr.buf) >= pr.peekSize || readErr == io.EOF {
pr.checkDone = true
// Check for null response pattern
pr.detectPatterns()
}
if readErr != nil && readErr != io.EOF {
return 0, readErr
}
}
// Serve data from buffer first
if pr.bufPos < len(pr.buf) {
n = copy(p, pr.buf[pr.bufPos:])
pr.bufPos += n
return n, nil
}
// Then serve from underlying reader
return pr.r.Read(p)
}
func (pr *peekingReader) detectPatterns() {
// Look for patterns indicating null result or errors in JSON-RPC response
bufStr := string(pr.buf)
// Remove whitespace for easier pattern matching
compactStr := strings.ReplaceAll(bufStr, " ", "")
compactStr = strings.ReplaceAll(compactStr, "\n", "")
compactStr = strings.ReplaceAll(compactStr, "\r", "")
compactStr = strings.ReplaceAll(compactStr, "\t", "")
// Check for null result
pr.isNull = strings.Contains(compactStr, `"result":null`)
// Check for JSON-RPC errors
pr.hasError = strings.Contains(compactStr, `"error":`) && !strings.Contains(compactStr, `"error":null`)
// Log what we found for debugging
if pr.isNull || pr.hasError {
log.Printf("Detected short response pattern - null: %v, error: %v, buffer preview: %s",
pr.isNull, pr.hasError, strings.ReplaceAll(bufStr[:minInt(len(bufStr), 80)], "\n", " "))
}
}
func (pr *peekingReader) IsNull() bool {
return pr.isNull
}
func (pr *peekingReader) HasError() bool {
return pr.hasError
}
func main() {
// Get configuration from environment variables
listenAddr := getEnv("LISTEN_ADDR", ":8080")
@@ -2224,6 +2328,10 @@ func main() {
minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer
probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId")
// Method routing configuration
secondaryWhitelistStr := getEnv("SECONDARY_WHITELIST", "") // Methods allowed on secondary
preferSecondaryStr := getEnv("PREFER_SECONDARY", "") // Methods that should prefer secondary
summaryInterval, err := strconv.Atoi(summaryIntervalStr)
if err != nil {
log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds")
@@ -2242,6 +2350,32 @@ func main() {
minDelayBuffer = 2
}
// Parse method routing configuration
methodRouting := &MethodRouting{
SecondaryWhitelist: make(map[string]bool),
PreferSecondary: make(map[string]bool),
}
// Parse whitelist
if secondaryWhitelistStr != "" {
whitelist := strings.Split(secondaryWhitelistStr, ",")
for _, method := range whitelist {
methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true
}
log.Printf("Secondary whitelist: %v", whitelist)
}
// Parse prefer secondary list
if preferSecondaryStr != "" {
preferList := strings.Split(preferSecondaryStr, ",")
for _, method := range preferList {
methodRouting.PreferSecondary[strings.TrimSpace(method)] = true
// Also add to whitelist automatically
methodRouting.SecondaryWhitelist[strings.TrimSpace(method)] = true
}
log.Printf("Prefer secondary for methods: %v", preferList)
}
// Create stats collector for periodic summaries
statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "")
@@ -2339,14 +2473,14 @@ func main() {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else {
// Handle regular HTTP request
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor)
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor, methodRouting)
}
})
log.Fatal(http.ListenAndServe(listenAddr, nil))
}
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor) {
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor, methodRouting *MethodRouting) {
startTime := time.Now()
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
@@ -2407,6 +2541,29 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
log.Printf("Method: %s%s", displayMethod, blockTagInfo)
}
}
// Check method routing configuration
if !batchInfo.IsBatch && methodRouting != nil {
// For single methods, check routing rules
method := displayMethod
// Check if this method prefers secondary backends
if methodRouting.PreferSecondary[method] {
if enableDetailedLogs {
log.Printf("Method %s configured to prefer secondary backends", method)
}
}
// Check if whitelist is configured and method is not in it
if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] {
// Method not in whitelist - force primary only
if enableDetailedLogs {
log.Printf("Method %s not in secondary whitelist - using primary only", method)
}
// This will be enforced in the backend loop
}
}
// Process backends with adaptive delay strategy
var wg sync.WaitGroup
var primaryWg sync.WaitGroup // Separate wait group for primary backend
@@ -2424,6 +2581,30 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately
for _, backend := range backends {
// Method routing checks for secondary backends
if backend.Role == "secondary" && methodRouting != nil && !batchInfo.IsBatch {
method := displayMethod
// Check if whitelist is configured and method is not in it
if len(methodRouting.SecondaryWhitelist) > 0 && !methodRouting.SecondaryWhitelist[method] {
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s for method %s (not in whitelist)", backend.Name, method)
}
continue
}
}
// Skip primary backend if method prefers secondary (and we have secondary backends available)
if backend.Role == "primary" && methodRouting != nil && !batchInfo.IsBatch {
method := displayMethod
if methodRouting.PreferSecondary[method] && len(backends) > 1 {
if enableDetailedLogs {
log.Printf("Skipping primary backend for method %s (configured to prefer secondary)", method)
}
continue
}
}
// Skip secondary backends for stateful methods
if isStateful && backend.Role != "primary" {
if enableDetailedLogs {
@@ -2489,43 +2670,56 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// If this is a secondary backend, wait for p75 delay
if b.Role != "primary" {
// Get backend-specific delay
var backendSpecificDelay time.Duration
if batchInfo.IsBatch {
// For batch requests, use the maximum delay of all methods for this backend
backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector)
} else if secondaryProbe != nil {
backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod)
} else {
// Fallback to method-based delay if no probe
backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod)
}
if enableDetailedLogs {
log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod)
}
delayTimer := time.NewTimer(backendSpecificDelay)
select {
case <-delayTimer.C:
// Timer expired, primary is slow, proceed with secondary request
case <-primaryResponseChan:
// Primary already got a response, skip secondary
delayTimer.Stop()
// Still record that we skipped this backend
statsChan <- ResponseStats{
Backend: b.Name,
Error: fmt.Errorf("skipped - primary responded quickly"),
Method: displayMethod,
Duration: time.Since(goroutineStartTime),
// Skip delay if this method prefers secondary backends
skipDelay := false
if methodRouting != nil && !batchInfo.IsBatch {
if methodRouting.PreferSecondary[displayMethod] {
skipDelay = true
if enableDetailedLogs {
log.Printf("Secondary backend %s starting immediately for method %s (configured to prefer secondary)", b.Name, displayMethod)
}
}
return
case <-primaryFailedFast:
// Primary failed immediately, start secondary now
delayTimer.Stop()
}
if !skipDelay {
// Get backend-specific delay
var backendSpecificDelay time.Duration
if batchInfo.IsBatch {
// For batch requests, use the maximum delay of all methods for this backend
backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector)
} else if secondaryProbe != nil {
backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod)
} else {
// Fallback to method-based delay if no probe
backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod)
}
if enableDetailedLogs {
log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod)
log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod)
}
delayTimer := time.NewTimer(backendSpecificDelay)
select {
case <-delayTimer.C:
// Timer expired, primary is slow, proceed with secondary request
case <-primaryResponseChan:
// Primary already got a response, skip secondary
delayTimer.Stop()
// Still record that we skipped this backend
statsChan <- ResponseStats{
Backend: b.Name,
Error: fmt.Errorf("skipped - primary responded quickly"),
Method: displayMethod,
Duration: time.Since(goroutineStartTime),
}
return
case <-primaryFailedFast:
// Primary failed immediately, start secondary now
delayTimer.Stop()
if enableDetailedLogs {
log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod)
}
}
}
}
@@ -2617,22 +2811,101 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// CRITICAL FIX 2: Check for null responses from secondary backends for certain methods
if b.Role == "secondary" && resp.StatusCode == 200 && methodMightReturnNull(displayMethod) {
// Need to read the body to check if it's null
bodyBytes, err := io.ReadAll(resp.Body)
resp.Body.Close() // Close the original body
// Check Content-Length first - null responses are typically very short
contentLength := resp.Header.Get("Content-Length")
if contentLength != "" {
length, err := strconv.Atoi(contentLength)
if err == nil && length < 100 { // Null responses are typically < 100 bytes
// This is suspiciously short, likely a null response
// Create a peeking reader to confirm
peeker := newPeekingReader(resp.Body, 100)
resp.Body = io.NopCloser(peeker)
if err == nil && isNullResponse(bodyBytes) {
// Secondary returned null - don't let it win the race
if enableDetailedLogs {
log.Printf("Secondary backend %s returned null for %s - waiting for primary",
b.Name, displayMethod)
// Read a bit to trigger null detection
smallBuf := make([]byte, 1)
peeker.Read(smallBuf)
if peeker.IsNull() {
// Confirmed null response - don't let secondary win
if enableDetailedLogs {
log.Printf("Secondary backend %s returned null for %s (Content-Length: %s) - waiting for primary",
b.Name, displayMethod, contentLength)
}
// Close the response body
resp.Body.Close()
return
}
// Not null, continue with normal flow
}
return
}
} else {
// No Content-Length header, use peeking reader anyway for safety
peeker := newPeekingReader(resp.Body, 200) // Peek at first 200 bytes
resp.Body = io.NopCloser(peeker)
// Not null or couldn't read - recreate the body for potential use
if err == nil {
resp.Body = io.NopCloser(bytes.NewReader(bodyBytes))
// Read a bit to trigger null detection
smallBuf := make([]byte, 1)
peeker.Read(smallBuf)
if peeker.IsNull() {
// Confirmed null response - don't let secondary win
if enableDetailedLogs {
log.Printf("Secondary backend %s returned null for %s - waiting for primary",
b.Name, displayMethod)
}
// Close the response body
resp.Body.Close()
return
}
}
}
// CRITICAL FIX 3: Check for error responses from secondary backends for certain methods
if b.Role == "secondary" && resp.StatusCode == 200 && methodShouldWaitOnSecondaryError(displayMethod) {
// Check Content-Length first - error responses are typically short
contentLength := resp.Header.Get("Content-Length")
if contentLength != "" {
length, err := strconv.Atoi(contentLength)
if err == nil && length < 500 { // Error responses are typically < 500 bytes
// This might be an error response
// Create a peeking reader to confirm
peeker := newPeekingReader(resp.Body, 500)
resp.Body = io.NopCloser(peeker)
// Read a bit to trigger error detection
smallBuf := make([]byte, 1)
peeker.Read(smallBuf)
if peeker.HasError() {
// Confirmed error response - don't let secondary win
if enableDetailedLogs {
log.Printf("Secondary backend %s returned JSON-RPC error for %s (Content-Length: %s) - waiting for primary",
b.Name, displayMethod, contentLength)
}
// Close the response body
resp.Body.Close()
return
}
// Not an error, continue with normal flow
}
} else {
// No Content-Length header, use peeking reader anyway for safety
peeker := newPeekingReader(resp.Body, 500) // Peek at first 500 bytes
resp.Body = io.NopCloser(peeker)
// Read a bit to trigger error detection
smallBuf := make([]byte, 1)
peeker.Read(smallBuf)
if peeker.HasError() {
// Confirmed error response - don't let secondary win
if enableDetailedLogs {
log.Printf("Secondary backend %s returned JSON-RPC error for %s - waiting for primary",
b.Name, displayMethod)
}
// Close the response body
resp.Body.Close()
return
}
}
}