This ensures that users always get responses from backends that are synchronized with the latest blockchain state, preventing issues with outdated or inconsistent data from lagging secondary backends.

This commit is contained in:
Para Dox
2025-05-29 19:04:46 +07:00
parent e4f6ca8ee7
commit ef273ee331

View File

@@ -171,13 +171,14 @@ type StatsCollector struct {
appStartTime time.Time // Application start time (never reset) appStartTime time.Time // Application start time (never reset)
intervalStartTime time.Time // Current interval start time (reset each interval) intervalStartTime time.Time // Current interval start time (reset each interval)
summaryInterval time.Duration summaryInterval time.Duration
methodCUPrices map[string]int // Map of method names to CU prices methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows historicalCU []CUDataPoint // Historical CU data for different time windows
hasSecondaryBackends bool // Track if secondary backends are configured hasSecondaryBackends bool // Track if secondary backends are configured
skippedSecondaryRequests int // Track how many secondary requests were skipped skippedSecondaryRequests int // Track how many secondary requests were skipped
secondaryProbe *SecondaryProbe // Reference to secondary probe secondaryProbe *SecondaryProbe // Reference to secondary probe
blockHeightTracker *BlockHeightTracker // Reference to block height tracker
} }
// SecondaryProbe maintains latency information for secondary backends through active probing // SecondaryProbe maintains latency information for secondary backends through active probing
@@ -197,6 +198,194 @@ type SecondaryProbe struct {
lastSuccessTime time.Time // Last time probes succeeded lastSuccessTime time.Time // Last time probes succeeded
} }
// BlockHeightTracker monitors block heights from different backends
type BlockHeightTracker struct {
mu sync.RWMutex
backends []Backend
client *http.Client
blockHeights map[string]uint64 // Backend name -> latest block number
lastUpdateTime map[string]time.Time // Backend name -> last successful update
checkInterval time.Duration
enableDetailedLogs bool
maxBlockBehind uint64 // Maximum blocks a secondary can be behind primary
}
// NewBlockHeightTracker creates a new block height tracker
func NewBlockHeightTracker(backends []Backend, client *http.Client, checkInterval time.Duration,
maxBlockBehind uint64, enableDetailedLogs bool) *BlockHeightTracker {
bht := &BlockHeightTracker{
backends: backends,
client: client,
blockHeights: make(map[string]uint64),
lastUpdateTime: make(map[string]time.Time),
checkInterval: checkInterval,
enableDetailedLogs: enableDetailedLogs,
maxBlockBehind: maxBlockBehind,
}
// Start periodic block height checking
go bht.startPeriodicCheck()
// Run initial check
go bht.checkBlockHeights()
return bht
}
// checkBlockHeights queries all backends for their current block number
func (bht *BlockHeightTracker) checkBlockHeights() {
for _, backend := range bht.backends {
go func(b Backend) {
reqBody := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":"blockcheck"}`)
req, err := http.NewRequest("POST", b.URL, bytes.NewReader(reqBody))
if err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to create request for %s: %v", b.Name, err)
}
return
}
req.Header.Set("Content-Type", "application/json")
start := time.Now()
resp, err := bht.client.Do(req)
if err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: request failed for %s: %v", b.Name, err)
}
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
if bht.enableDetailedLogs {
log.Printf("Block height check: HTTP error %d for %s", resp.StatusCode, b.Name)
}
return
}
body, err := io.ReadAll(resp.Body)
if err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to read response for %s: %v", b.Name, err)
}
return
}
// Parse JSON-RPC response
var jsonResp struct {
Result string `json:"result"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(body, &jsonResp); err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to parse JSON for %s: %v", b.Name, err)
}
return
}
if jsonResp.Error != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: JSON-RPC error for %s: %s", b.Name, jsonResp.Error.Message)
}
return
}
// Parse hex block number
blockHex := strings.TrimPrefix(jsonResp.Result, "0x")
blockNum, err := strconv.ParseUint(blockHex, 16, 64)
if err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to parse block number for %s: %s", b.Name, jsonResp.Result)
}
return
}
// Update the block height
bht.mu.Lock()
oldHeight := bht.blockHeights[b.Name]
bht.blockHeights[b.Name] = blockNum
bht.lastUpdateTime[b.Name] = time.Now()
bht.mu.Unlock()
if bht.enableDetailedLogs {
duration := time.Since(start)
if oldHeight != blockNum {
log.Printf("Block height check: %s updated from %d to %d (took %s)",
b.Name, oldHeight, blockNum, duration)
}
}
}(backend)
}
}
// startPeriodicCheck runs block height checks at regular intervals
func (bht *BlockHeightTracker) startPeriodicCheck() {
ticker := time.NewTicker(bht.checkInterval)
defer ticker.Stop()
for range ticker.C {
bht.checkBlockHeights()
}
}
// isSecondaryBehind checks if a secondary backend is behind the primary by more than maxBlockBehind
func (bht *BlockHeightTracker) isSecondaryBehind(secondaryName string) bool {
bht.mu.RLock()
defer bht.mu.RUnlock()
// Find primary backend block height
var primaryHeight uint64
var primaryFound bool
for name, height := range bht.blockHeights {
// Check if this is the primary backend (assuming it's named "primary")
if name == "primary" {
primaryHeight = height
primaryFound = true
break
}
}
if !primaryFound {
// If we can't find primary height, be conservative and allow secondary
return false
}
// Get secondary height
secondaryHeight, exists := bht.blockHeights[secondaryName]
if !exists {
// If we don't have secondary height data, be conservative and block it
return true
}
// Check if secondary is behind by more than the threshold
if primaryHeight > secondaryHeight {
blocksBehind := primaryHeight - secondaryHeight
return blocksBehind > bht.maxBlockBehind
}
// Secondary is not behind
return false
}
// getBlockHeightStatus returns a summary of current block heights for logging
func (bht *BlockHeightTracker) getBlockHeightStatus() map[string]uint64 {
bht.mu.RLock()
defer bht.mu.RUnlock()
result := make(map[string]uint64)
for name, height := range bht.blockHeights {
result[name] = height
}
return result
}
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
now := time.Now() now := time.Now()
sc := &StatsCollector{ sc := &StatsCollector{
@@ -231,6 +420,13 @@ func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) {
sc.secondaryProbe = probe sc.secondaryProbe = probe
} }
// SetBlockHeightTracker sets the block height tracker reference after stats collector is created
func (sc *StatsCollector) SetBlockHeightTracker(tracker *BlockHeightTracker) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.blockHeightTracker = tracker
}
// NewSecondaryProbe creates a new secondary probe instance // NewSecondaryProbe creates a new secondary probe instance
func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration,
minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe {
@@ -597,7 +793,8 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
sc.requestStats = append(sc.requestStats, stat) sc.requestStats = append(sc.requestStats, stat)
if stat.Error != nil { if stat.Error != nil {
// Don't count skipped secondary backends as errors // Don't count skipped secondary backends as errors
if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { if !strings.Contains(stat.Error.Error(), "skipped - primary responded") &&
!strings.Contains(stat.Error.Error(), "skipped - behind in block height") {
sc.errorCount++ sc.errorCount++
} else { } else {
// Track that we skipped a secondary request // Track that we skipped a secondary request
@@ -785,6 +982,47 @@ func (sc *StatsCollector) printSummary() {
sc.secondaryProbe.mu.RUnlock() sc.secondaryProbe.mu.RUnlock()
} }
// Display block height information if available
if sc.blockHeightTracker != nil {
blockStatus := sc.blockHeightTracker.getBlockHeightStatus()
if len(blockStatus) > 0 {
fmt.Printf("\n--- Block Height Status ---\n")
// Find primary height for comparison
var primaryHeight uint64
var primaryFound bool
if height, exists := blockStatus["primary"]; exists {
primaryHeight = height
primaryFound = true
}
// Sort backend names for consistent output
var backendNames []string
for name := range blockStatus {
backendNames = append(backendNames, name)
}
sort.Strings(backendNames)
for _, name := range backendNames {
height := blockStatus[name]
if name == "primary" {
fmt.Printf(" %s: %d (primary)\n", name, height)
} else if primaryFound {
behind := int64(primaryHeight) - int64(height)
if behind > 0 {
fmt.Printf(" %s: %d (%d blocks behind)\n", name, height, behind)
} else if behind == 0 {
fmt.Printf(" %s: %d (synchronized)\n", name, height)
} else {
fmt.Printf(" %s: %d (%d blocks ahead)\n", name, height, -behind)
}
} else {
fmt.Printf(" %s: %d\n", name, height)
}
}
}
}
if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 {
fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n",
sc.skippedSecondaryRequests, sc.skippedSecondaryRequests,
@@ -1635,6 +1873,11 @@ func main() {
minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer
probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId")
// Block height tracking configuration
enableBlockHeightTracking := getEnv("ENABLE_BLOCK_HEIGHT_TRACKING", "true") == "true"
blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds
maxBlockBehindStr := getEnv("MAX_BLOCKS_BEHIND", "1") // Default 1 block behind
summaryInterval, err := strconv.Atoi(summaryIntervalStr) summaryInterval, err := strconv.Atoi(summaryIntervalStr)
if err != nil { if err != nil {
log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds")
@@ -1653,6 +1896,18 @@ func main() {
minDelayBuffer = 2 minDelayBuffer = 2
} }
blockHeightCheckInterval, err := strconv.Atoi(blockHeightCheckIntervalStr)
if err != nil {
log.Printf("Invalid BLOCK_HEIGHT_CHECK_INTERVAL, using default of 5 seconds")
blockHeightCheckInterval = 5
}
maxBlocksBehind, err := strconv.ParseUint(maxBlockBehindStr, 10, 64)
if err != nil {
log.Printf("Invalid MAX_BLOCKS_BEHIND, using default of 1 block")
maxBlocksBehind = 1
}
// Create stats collector for periodic summaries // Create stats collector for periodic summaries
statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "")
@@ -1720,6 +1975,27 @@ func main() {
} }
} }
// Initialize block height tracker if enabled and we have secondary backends
var blockHeightTracker *BlockHeightTracker
if enableBlockHeightTracking && secondaryBackendsStr != "" {
blockHeightTracker = NewBlockHeightTracker(
backends,
client,
time.Duration(blockHeightCheckInterval)*time.Second,
maxBlocksBehind,
enableDetailedLogs == "true",
)
if blockHeightTracker == nil {
log.Printf("Block height tracker initialization failed")
} else {
log.Printf("Block height tracking: enabled (interval: %ds, max blocks behind: %d)",
blockHeightCheckInterval, maxBlocksBehind)
// Set the tracker in stats collector for display
statsCollector.SetBlockHeightTracker(blockHeightTracker)
}
}
// Configure websocket upgrader with larger buffer sizes // Configure websocket upgrader with larger buffer sizes
// 20MB frame size and 50MB message size // 20MB frame size and 50MB message size
const ( const (
@@ -1740,14 +2016,14 @@ func main() {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else { } else {
// Handle regular HTTP request // Handle regular HTTP request
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe) handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker)
} }
}) })
log.Fatal(http.ListenAndServe(listenAddr, nil)) log.Fatal(http.ListenAndServe(listenAddr, nil))
} }
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) { func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker) {
startTime := time.Now() startTime := time.Now()
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout) // Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
@@ -1841,6 +2117,24 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
continue continue
} }
// Skip secondary backends if they're behind in block height
if backend.Role != "primary" && blockHeightTracker != nil {
if blockHeightTracker.isSecondaryBehind(backend.Name) {
if enableDetailedLogs {
blockStatus := blockHeightTracker.getBlockHeightStatus()
log.Printf("Skipping secondary backend %s - behind in block height: %v", backend.Name, blockStatus)
}
// Record that we skipped this secondary backend due to block height
statsChan <- ResponseStats{
Backend: backend.Name,
Error: fmt.Errorf("skipped - behind in block height"),
Method: displayMethod,
Duration: 0, // No actual request made
}
continue
}
}
wg.Add(1) wg.Add(1)
if backend.Role == "primary" { if backend.Role == "primary" {
primaryWg.Add(1) primaryWg.Add(1)