From 382db5bee11371028a0b27486a4033f34a8f8011 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Thu, 29 May 2025 21:35:22 +0700 Subject: [PATCH] opus fixes this --- benchmark-proxy/main.go | 530 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 521 insertions(+), 9 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 146535ae..5a40cfe6 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -171,13 +171,14 @@ type StatsCollector struct { appStartTime time.Time // Application start time (never reset) intervalStartTime time.Time // Current interval start time (reset each interval) summaryInterval time.Duration - methodCUPrices map[string]int // Map of method names to CU prices - totalCU int // Total CU earned - methodCU map[string]int // Track CU earned per method - historicalCU []CUDataPoint // Historical CU data for different time windows - hasSecondaryBackends bool // Track if secondary backends are configured - skippedSecondaryRequests int // Track how many secondary requests were skipped - secondaryProbe *SecondaryProbe // Reference to secondary probe + methodCUPrices map[string]int // Map of method names to CU prices + totalCU int // Total CU earned + methodCU map[string]int // Track CU earned per method + historicalCU []CUDataPoint // Historical CU data for different time windows + hasSecondaryBackends bool // Track if secondary backends are configured + skippedSecondaryRequests int // Track how many secondary requests were skipped + secondaryProbe *SecondaryProbe // Reference to secondary probe + chainHeadMonitor *ChainHeadMonitor // Reference to chain head monitor } // SecondaryProbe maintains latency information for secondary backends through active probing @@ -197,6 +198,28 @@ type SecondaryProbe struct { lastSuccessTime time.Time // Last time probes succeeded } +// ChainHeadMonitor monitors chain heads of all backends via WebSocket subscriptions +type ChainHeadMonitor struct { + mu sync.RWMutex + backends []Backend + chainHeads map[string]*ChainHead // backend name -> chain head info + primaryChainID string // Chain ID of primary backend + enabledBackends map[string]bool // Track which backends are enabled + wsDialer *websocket.Dialer + stopChan chan struct{} + enableDetailedLogs bool +} + +// ChainHead tracks the current head of a backend +type ChainHead struct { + BlockNumber uint64 // Current block number + BlockHash string // Current block hash + ChainID string // Chain ID + LastUpdate time.Time // Last time we received an update + IsHealthy bool // Whether this backend is healthy + Error string // Last error if any +} + func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ @@ -231,6 +254,13 @@ func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) { sc.secondaryProbe = probe } +// SetChainHeadMonitor sets the chain head monitor reference after stats collector is created +func (sc *StatsCollector) SetChainHeadMonitor(monitor *ChainHeadMonitor) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.chainHeadMonitor = monitor +} + // NewSecondaryProbe creates a new secondary probe instance func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { @@ -773,6 +803,54 @@ func (sc *StatsCollector) printSummary() { sc.secondaryProbe.mu.RUnlock() } + // Display chain head monitor information if available + if sc.chainHeadMonitor != nil { + fmt.Printf("\n--- Chain Head Monitor Status ---\n") + chainStatus := sc.chainHeadMonitor.GetStatus() + + // Get primary block height for comparison + var primaryBlockHeight uint64 + if primaryHead, exists := chainStatus["primary"]; exists && primaryHead.IsHealthy { + primaryBlockHeight = primaryHead.BlockNumber + } + + // Sort backend names for consistent output + var backendNames []string + for name := range chainStatus { + backendNames = append(backendNames, name) + } + sort.Strings(backendNames) + + for _, name := range backendNames { + head := chainStatus[name] + status := "healthy" + details := fmt.Sprintf("block %d, chain %s", head.BlockNumber, head.ChainID) + + // Add block difference info for secondary backends + if name != "primary" && primaryBlockHeight > 0 && head.IsHealthy { + diff := int64(head.BlockNumber) - int64(primaryBlockHeight) + if diff > 0 { + details += fmt.Sprintf(" (+%d ahead)", diff) + } else if diff < 0 { + details += fmt.Sprintf(" (%d behind)", diff) + } else { + details += " (in sync)" + } + } + + if !head.IsHealthy { + status = "unhealthy" + details = head.Error + } else if sc.chainHeadMonitor.IsBackendHealthy(name) { + status = "enabled" + } else { + status = "disabled" + } + + fmt.Printf(" %s: %s (%s)\n", name, status, details) + } + } + if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, @@ -1708,6 +1786,16 @@ func main() { } } + // Initialize chain head monitor + var chainHeadMonitor *ChainHeadMonitor + if len(backends) > 1 { // Only create if we have more than just primary + chainHeadMonitor = NewChainHeadMonitor(backends, enableDetailedLogs == "true") + log.Printf("Chain head monitoring: enabled") + + // Set the monitor in stats collector for display + statsCollector.SetChainHeadMonitor(chainHeadMonitor) + } + // Configure websocket upgrader with larger buffer sizes // 20MB frame size and 50MB message size const ( @@ -1728,14 +1816,14 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe) + handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) @@ -1829,6 +1917,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c continue } + // Skip unhealthy secondary backends + if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) { + if enableDetailedLogs { + log.Printf("Skipping unhealthy secondary backend %s for %s", backend.Name, displayMethod) + } + continue + } + wg.Add(1) if backend.Role == "primary" { primaryWg.Add(1) @@ -1945,6 +2041,22 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Method: displayMethod, } + // CRITICAL FIX: Only allow secondary backends to win if they have successful responses + if b.Role == "secondary" && resp.StatusCode >= 400 { + // Secondary returned an error - DO NOT let it win the race + if enableDetailedLogs { + log.Printf("Secondary backend %s returned error status %d for %s - ignoring", + b.Name, resp.StatusCode, displayMethod) + } + + // Still need to drain and close the body + go func() { + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + }() + return + } + // Try to be the first to respond if responseHandled.CompareAndSwap(false, true) { responseChan <- struct { @@ -2367,3 +2479,403 @@ func getEnv(key, fallback string) string { } return fallback } + +// NewChainHeadMonitor creates a new chain head monitor +func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHeadMonitor { + monitor := &ChainHeadMonitor{ + backends: backends, + chainHeads: make(map[string]*ChainHead), + enabledBackends: make(map[string]bool), + wsDialer: &websocket.Dialer{ + ReadBufferSize: 1024 * 1024, // 1MB + WriteBufferSize: 1024 * 1024, // 1MB + }, + stopChan: make(chan struct{}), + enableDetailedLogs: enableDetailedLogs, + } + + // Start monitoring + go monitor.startMonitoring() + + return monitor +} + +// IsBackendHealthy checks if a backend is healthy and at the correct chain head +func (m *ChainHeadMonitor) IsBackendHealthy(backendName string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + // Primary is always considered healthy for request routing + if backendName == "primary" { + return true + } + + enabled, exists := m.enabledBackends[backendName] + if !exists { + return false // Unknown backend + } + + return enabled +} + +// startMonitoring starts WebSocket monitoring for all backends +func (m *ChainHeadMonitor) startMonitoring() { + // Initial delay to let backends start + time.Sleep(2 * time.Second) + + for _, backend := range m.backends { + go m.monitorBackend(backend) + } + + // Periodic health check + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.checkBackendHealth() + case <-m.stopChan: + return + } + } +} + +// monitorBackend monitors a single backend via WebSocket +func (m *ChainHeadMonitor) monitorBackend(backend Backend) { + backoffDelay := time.Second + + for { + select { + case <-m.stopChan: + return + default: + } + + // Create WebSocket URL + wsURL := strings.Replace(backend.URL, "http://", "ws://", 1) + wsURL = strings.Replace(wsURL, "https://", "wss://", 1) + + // Connect to WebSocket + conn, _, err := m.wsDialer.Dial(wsURL, nil) + if err != nil { + m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("WebSocket connection failed: %v", err)) + time.Sleep(backoffDelay) + backoffDelay = min(backoffDelay*2, 30*time.Second) + continue + } + + if m.enableDetailedLogs { + log.Printf("Connected to %s WebSocket for chain head monitoring", backend.Name) + } + + // Reset backoff on successful connection + backoffDelay = time.Second + + // Get chain ID first + chainID, err := m.getChainID(conn, backend.Name) + if err != nil { + conn.Close() + m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to get chain ID: %v", err)) + time.Sleep(backoffDelay) + continue + } + + // Store chain ID (especially important for primary) + if backend.Role == "primary" { + m.mu.Lock() + m.primaryChainID = chainID + m.mu.Unlock() + } + + // Subscribe to new heads + subscribeMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}`) + err = conn.WriteMessage(websocket.TextMessage, subscribeMsg) + if err != nil { + conn.Close() + m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to subscribe: %v", err)) + time.Sleep(backoffDelay) + continue + } + + // Read subscription response + _, msg, err := conn.ReadMessage() + if err != nil { + conn.Close() + m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to read subscription response: %v", err)) + time.Sleep(backoffDelay) + continue + } + + var subResponse struct { + Result string `json:"result"` + Error *struct { + Message string `json:"message"` + } `json:"error"` + } + + if err := json.Unmarshal(msg, &subResponse); err != nil || subResponse.Error != nil { + conn.Close() + errMsg := "Subscription failed" + if subResponse.Error != nil { + errMsg = subResponse.Error.Message + } + m.updateBackendStatus(backend.Name, nil, errMsg) + time.Sleep(backoffDelay) + continue + } + + // Read new head notifications + m.readNewHeads(conn, backend.Name, chainID) + + conn.Close() + time.Sleep(backoffDelay) + } +} + +// getChainID gets the chain ID from a backend +func (m *ChainHeadMonitor) getChainID(conn *websocket.Conn, backendName string) (string, error) { + // Send eth_chainId request + chainIDMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":"chainId"}`) + if err := conn.WriteMessage(websocket.TextMessage, chainIDMsg); err != nil { + return "", err + } + + // Set read deadline + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + defer conn.SetReadDeadline(time.Time{}) + + // Read response + _, msg, err := conn.ReadMessage() + if err != nil { + return "", err + } + + var response struct { + Result string `json:"result"` + Error *struct { + Message string `json:"message"` + } `json:"error"` + } + + if err := json.Unmarshal(msg, &response); err != nil { + return "", err + } + + if response.Error != nil { + return "", fmt.Errorf("RPC error: %s", response.Error.Message) + } + + return response.Result, nil +} + +// readNewHeads reads new head notifications from WebSocket +func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string, chainID string) { + // Set long read deadline for subscriptions + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + + for { + _, msg, err := conn.ReadMessage() + if err != nil { + m.updateBackendStatus(backendName, nil, fmt.Sprintf("WebSocket read error: %v", err)) + return + } + + // Reset read deadline after successful read + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + + // Parse the subscription notification + var notification struct { + Params struct { + Result struct { + Number string `json:"number"` // Hex encoded block number + Hash string `json:"hash"` // Block hash + } `json:"result"` + } `json:"params"` + } + + if err := json.Unmarshal(msg, ¬ification); err != nil { + continue // Skip malformed messages + } + + // Convert hex block number to uint64 + if notification.Params.Result.Number != "" { + blockNumber, err := strconv.ParseUint(strings.TrimPrefix(notification.Params.Result.Number, "0x"), 16, 64) + if err != nil { + continue + } + + head := &ChainHead{ + BlockNumber: blockNumber, + BlockHash: notification.Params.Result.Hash, + ChainID: chainID, + LastUpdate: time.Now(), + IsHealthy: true, + } + + m.updateBackendStatus(backendName, head, "") + + if m.enableDetailedLogs { + log.Printf("Backend %s at block %d (hash: %s...)", + backendName, blockNumber, head.BlockHash[:8]) + } + } + } +} + +// updateBackendStatus updates the status of a backend +func (m *ChainHeadMonitor) updateBackendStatus(backendName string, head *ChainHead, errorMsg string) { + m.mu.Lock() + defer m.mu.Unlock() + + if head != nil { + m.chainHeads[backendName] = head + } else if errorMsg != "" { + // Update or create entry with error + if existing, exists := m.chainHeads[backendName]; exists { + existing.IsHealthy = false + existing.Error = errorMsg + existing.LastUpdate = time.Now() + } else { + m.chainHeads[backendName] = &ChainHead{ + IsHealthy: false, + Error: errorMsg, + LastUpdate: time.Now(), + } + } + } + + // Update enabled status + m.updateEnabledStatus() +} + +// updateEnabledStatus updates which backends are enabled based on chain head +func (m *ChainHeadMonitor) updateEnabledStatus() { + // Get primary chain head + primaryHead, primaryExists := m.chainHeads["primary"] + if !primaryExists || !primaryHead.IsHealthy { + // If primary is not healthy/available (e.g., during restarts), enable all healthy secondary backends + if m.enableDetailedLogs { + log.Printf("Primary backend not healthy/available - enabling all healthy secondary backends") + } + + for _, backend := range m.backends { + if backend.Role == "primary" { + m.enabledBackends[backend.Name] = true // Always mark primary as enabled for routing + continue + } + + head, exists := m.chainHeads[backend.Name] + if exists && head.IsHealthy { + m.enabledBackends[backend.Name] = true + if m.enableDetailedLogs { + log.Printf("Backend %s enabled (primary unavailable)", backend.Name) + } + } else { + m.enabledBackends[backend.Name] = false + } + } + return + } + + // Check each backend + for _, backend := range m.backends { + if backend.Role == "primary" { + m.enabledBackends[backend.Name] = true + continue + } + + head, exists := m.chainHeads[backend.Name] + if !exists || !head.IsHealthy { + m.enabledBackends[backend.Name] = false + if m.enableDetailedLogs { + log.Printf("Backend %s disabled: not healthy", backend.Name) + } + continue + } + + // Check if on same chain + if head.ChainID != primaryHead.ChainID { + m.enabledBackends[backend.Name] = false + if m.enableDetailedLogs { + log.Printf("Backend %s disabled: wrong chain ID (got %s, want %s)", + backend.Name, head.ChainID, primaryHead.ChainID) + } + continue + } + + // STRICT RULE: Only allow if secondary matches primary height or is ahead + if head.BlockNumber < primaryHead.BlockNumber { + m.enabledBackends[backend.Name] = false + if m.enableDetailedLogs { + log.Printf("Backend %s disabled: behind primary (at block %d, primary at %d)", + backend.Name, head.BlockNumber, primaryHead.BlockNumber) + } + continue + } + + // Secondary is at same height or ahead - enable it + m.enabledBackends[backend.Name] = true + if m.enableDetailedLogs { + if head.BlockNumber > primaryHead.BlockNumber { + log.Printf("Backend %s enabled: ahead of primary (at block %d, primary at %d)", + backend.Name, head.BlockNumber, primaryHead.BlockNumber) + } else { + log.Printf("Backend %s enabled: at same block as primary (%d)", + backend.Name, head.BlockNumber) + } + } + } +} + +// checkBackendHealth performs periodic health checks +func (m *ChainHeadMonitor) checkBackendHealth() { + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now() + + // Check for stale data (no update in 90 seconds) + for _, head := range m.chainHeads { + if now.Sub(head.LastUpdate) > 90*time.Second { + head.IsHealthy = false + head.Error = "No recent updates" + } + } + + // Update enabled status + m.updateEnabledStatus() + + // Log current status if detailed logs enabled + if m.enableDetailedLogs { + log.Printf("Chain head monitor status:") + for name, enabled := range m.enabledBackends { + status := "disabled" + if enabled { + status = "enabled" + } + + if head, exists := m.chainHeads[name]; exists { + if head.IsHealthy { + log.Printf(" %s: %s, block %d, chain %s", name, status, head.BlockNumber, head.ChainID) + } else { + log.Printf(" %s: %s, error: %s", name, status, head.Error) + } + } else { + log.Printf(" %s: %s, no data", name, status) + } + } + } +} + +// GetStatus returns the current status of all backends +func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead { + m.mu.RLock() + defer m.mu.RUnlock() + + status := make(map[string]ChainHead) + for name, head := range m.chainHeads { + status[name] = *head + } + return status +}