diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index ea7b5cdd..bd0bf104 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -189,6 +189,9 @@ type SecondaryProbe struct { minResponseTime time.Duration // Overall minimum response time methodTimings map[string]time.Duration // Per-method minimum response times backendTimings map[string]time.Duration // Per-backend minimum response times + backendAvailable map[string]bool // Track if backends are available (responding without errors) + backendErrorCount map[string]int // Track consecutive error count per backend + backendLastSuccess map[string]time.Time // Track last successful probe time per backend lastProbeTime time.Time probeInterval time.Duration minDelayBuffer time.Duration // Buffer to add to minimum times @@ -196,6 +199,8 @@ type SecondaryProbe struct { enableDetailedLogs bool failureCount int // Track consecutive probe failures lastSuccessTime time.Time // Last time probes succeeded + maxErrorThreshold int // Maximum consecutive errors before marking backend unavailable + recoveryThreshold int // Number of consecutive successes needed to mark backend available again } // BlockHeightTracker monitors block heights from different backends using WebSocket subscriptions @@ -530,11 +535,23 @@ func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval ti minResponseTime: 15 * time.Millisecond, // Start with reasonable default methodTimings: make(map[string]time.Duration), backendTimings: make(map[string]time.Duration), + backendAvailable: make(map[string]bool), + backendErrorCount: make(map[string]int), + backendLastSuccess: make(map[string]time.Time), probeInterval: probeInterval, minDelayBuffer: minDelayBuffer, probeMethods: probeMethods, enableDetailedLogs: enableDetailedLogs, lastSuccessTime: time.Now(), + maxErrorThreshold: 5, // Mark backend unavailable after 5 consecutive errors + recoveryThreshold: 3, // Require 3 consecutive successes to mark backend available again + } + + // Initialize all backends as available initially + for _, backend := range secondaryBackends { + sp.backendAvailable[backend.Name] = true + sp.backendErrorCount[backend.Name] = 0 + sp.backendLastSuccess[backend.Name] = time.Now() } // Run initial probe immediately @@ -592,7 +609,8 @@ func (sp *SecondaryProbe) runProbe() { successfulProbes := 0 for _, backend := range sp.backends { - backendMin := time.Hour // Start with large value + backendMin := time.Hour // Start with large value + backendSuccessCount := 0 // Track successes for this backend across all methods for _, method := range sp.probeMethods { methodMin := time.Hour // Track minimum for this method on this backend @@ -619,11 +637,11 @@ func (sp *SecondaryProbe) runProbe() { duration := time.Since(start) if err == nil && resp != nil { - resp.Body.Close() - - if resp.StatusCode == 200 { + // Check for both network success and HTTP success + if resp.StatusCode >= 200 && resp.StatusCode < 300 { methodSuccesses++ successfulProbes++ + backendSuccessCount++ // Track minimum for this method on this backend if duration < methodMin { @@ -634,6 +652,19 @@ func (sp *SecondaryProbe) runProbe() { log.Printf("Probe %d/10: backend=%s method=%s duration=%s status=%d (min so far: %s)", probe+1, backend.Name, method, duration, resp.StatusCode, methodMin) } + } else { + // HTTP error status (4xx, 5xx) + if sp.enableDetailedLogs { + log.Printf("Probe %d/10: backend=%s method=%s HTTP error status=%d", + probe+1, backend.Name, method, resp.StatusCode) + } + } + resp.Body.Close() + } else { + // Network error + if sp.enableDetailedLogs { + log.Printf("Probe %d/10: backend=%s method=%s network error: %v", + probe+1, backend.Name, method, err) } } @@ -666,6 +697,25 @@ func (sp *SecondaryProbe) runProbe() { if backendMin < time.Hour { newBackendTimings[backend.Name] = backendMin } + + // Update backend health based on overall success rate for this probe cycle + // Consider a backend healthy if it had at least some successful responses + sp.mu.Lock() + sp.updateBackendHealth(backend.Name, backendSuccessCount > 0) + sp.mu.Unlock() + + if sp.enableDetailedLogs { + sp.mu.RLock() + availability := "AVAILABLE" + if !sp.backendAvailable[backend.Name] { + availability = "UNAVAILABLE" + } + errorCount := sp.backendErrorCount[backend.Name] + sp.mu.RUnlock() + + log.Printf("Backend %s probe cycle complete: %d/%d total successes, status: %s (error count: %d)", + backend.Name, backendSuccessCount, len(sp.probeMethods)*10, availability, errorCount) + } } // Update timings if we got successful probes @@ -700,13 +750,16 @@ func (sp *SecondaryProbe) runProbe() { sp.lastProbeTime = time.Now() if sp.enableDetailedLogs { - log.Printf("Probe complete: min=%s methods=%v backends=%v", - sp.minResponseTime, sp.methodTimings, sp.backendTimings) + availableBackends := sp.getAvailableBackends() + log.Printf("Probe complete: min=%s methods=%v backends=%v available_backends=%v", + sp.minResponseTime, sp.methodTimings, sp.backendTimings, availableBackends) } } else { sp.failureCount++ if sp.enableDetailedLogs { - log.Printf("Probe failed: consecutive failures=%d", sp.failureCount) + availableBackends := sp.getAvailableBackends() + log.Printf("Probe failed: consecutive failures=%d available_backends=%v", + sp.failureCount, availableBackends) } } } @@ -1038,6 +1091,29 @@ func (sc *StatsCollector) printSummary() { fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer)) fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer)) + // Display backend availability status + fmt.Printf("Backend Health Status:\n") + backendNames := make([]string, 0, len(sc.secondaryProbe.backendAvailable)) + for name := range sc.secondaryProbe.backendAvailable { + backendNames = append(backendNames, name) + } + sort.Strings(backendNames) + + for _, name := range backendNames { + available := sc.secondaryProbe.backendAvailable[name] + errorCount := sc.secondaryProbe.backendErrorCount[name] + lastSuccess := sc.secondaryProbe.backendLastSuccess[name] + + status := "AVAILABLE" + if !available { + status = "UNAVAILABLE" + } + + timeSinceSuccess := time.Since(lastSuccess) + fmt.Printf(" %s: %s (errors: %d, last success: %s ago)\n", + name, status, errorCount, timeSinceSuccess.Round(time.Second)) + } + if len(sc.secondaryProbe.methodTimings) > 0 { fmt.Printf("Method-Specific Thresholds:\n") // Sort methods for consistent output @@ -2019,20 +2095,127 @@ func (sc *StatsCollector) isExpensiveMethodByStats(method string) bool { return minDuration >= expensiveThreshold } -// hasAvailableSecondaryAtChainHead checks if there are synchronized secondary backends available -func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker) bool { +// hasAvailableSecondaryAtChainHead checks if there are synchronized and available secondary backends +func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker, secondaryProbe *SecondaryProbe) bool { if blockHeightTracker == nil { return false } for _, backend := range backends { if backend.Role == "secondary" && !blockHeightTracker.isSecondaryBehind(backend.Name) { + // Also check if the backend is available (not marked unavailable due to errors) + if secondaryProbe != nil && !secondaryProbe.isBackendAvailable(backend.Name) { + continue // Skip unavailable backends + } return true } } return false } +// selectBestSecondaryForExpensiveMethod intelligently selects which secondary backend to use for expensive queries +// It prefers backends whose probe latency doesn't exceed half the minimum response time for the expensive method +func (sc *StatsCollector) selectBestSecondaryForExpensiveMethod(method string, backends []Backend, + blockHeightTracker *BlockHeightTracker, secondaryProbe *SecondaryProbe) *Backend { + + if blockHeightTracker == nil || secondaryProbe == nil { + return nil + } + + // Get available synchronized secondary backends + var availableSecondaries []Backend + for _, backend := range backends { + if backend.Role == "secondary" && + !blockHeightTracker.isSecondaryBehind(backend.Name) && + secondaryProbe.isBackendAvailable(backend.Name) { + availableSecondaries = append(availableSecondaries, backend) + } + } + + if len(availableSecondaries) == 0 { + return nil + } + + // If only one secondary available, use it + if len(availableSecondaries) == 1 { + return &availableSecondaries[0] + } + + // Get minimum response time for this expensive method on primary + sc.mu.Lock() + var methodMinTime time.Duration + if methodStats, exists := sc.backendMethodStats["primary"]; exists { + if durations, methodExists := methodStats[method]; methodExists && len(durations) > 0 { + methodMinTime = durations[0] + for _, duration := range durations { + if duration < methodMinTime { + methodMinTime = duration + } + } + } + } + sc.mu.Unlock() + + // If we don't have stats for this method, use any available secondary + if methodMinTime == 0 { + return &availableSecondaries[0] + } + + // Calculate the threshold: probe latency should not exceed half the expensive method's min time + probeLatencyThreshold := methodMinTime / 2 + + // Evaluate secondary backends based on their probe latencies + type secondaryCandidate struct { + backend Backend + probeLatency time.Duration + isAcceptable bool + } + + var candidates []secondaryCandidate + + secondaryProbe.mu.RLock() + for _, backend := range availableSecondaries { + probeLatency := secondaryProbe.minResponseTime // Default to overall minimum + + // Use backend-specific timing if available + if backendTiming, exists := secondaryProbe.backendTimings[backend.Name]; exists { + probeLatency = backendTiming + } + + candidate := secondaryCandidate{ + backend: backend, + probeLatency: probeLatency, + isAcceptable: probeLatency <= probeLatencyThreshold, + } + candidates = append(candidates, candidate) + } + secondaryProbe.mu.RUnlock() + + // Sort candidates by probe latency (ascending) + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].probeLatency < candidates[j].probeLatency + }) + + // Strategy: Use the slowest backend among those that meet the criteria + // This preserves faster backends for quicker queries + var selectedCandidate *secondaryCandidate + + // First, try to find acceptable candidates (probe latency <= threshold) + for i := len(candidates) - 1; i >= 0; i-- { + if candidates[i].isAcceptable { + selectedCandidate = &candidates[i] + break // Take the slowest acceptable one + } + } + + // If no acceptable candidates, fall back to the slowest available + if selectedCandidate == nil { + selectedCandidate = &candidates[len(candidates)-1] + } + + return &selectedCandidate.backend +} + // flushingResponseWriter wraps http.ResponseWriter to flush after every write type flushingResponseWriter struct { http.ResponseWriter @@ -2327,12 +2510,25 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } isExpensive = hasExpensiveMethod - // For expensive methods, prefer secondary backends if available and synchronized + // For expensive methods, intelligently select the best secondary backend var preferSecondary bool - if enableExpensiveMethodRouting && isExpensive && !isStateful && hasAvailableSecondaryAtChainHead(backends, blockHeightTracker) { - preferSecondary = true - if enableDetailedLogs { - log.Printf("Expensive method detected (%s), preferring secondary backends", displayMethod) + var selectedSecondary *Backend + if enableExpensiveMethodRouting && isExpensive && !isStateful { + // Determine the method name for selection (use first method for batches) + methodForSelection := displayMethod + if batchInfo.IsBatch && len(batchInfo.Methods) > 0 { + methodForSelection = batchInfo.Methods[0] + } + + selectedSecondary = statsCollector.selectBestSecondaryForExpensiveMethod( + methodForSelection, backends, blockHeightTracker, secondaryProbe) + + if selectedSecondary != nil { + preferSecondary = true + if enableDetailedLogs { + log.Printf("Expensive method detected (%s), selected secondary backend: %s", + displayMethod, selectedSecondary.Name) + } } } @@ -2360,6 +2556,41 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } } + // Skip secondary backends if they're marked as unavailable due to errors + if backend.Role != "primary" && secondaryProbe != nil { + if !secondaryProbe.isBackendAvailable(backend.Name) { + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s - marked as unavailable due to errors", backend.Name) + } + // Record that we skipped this secondary backend due to availability + statsChan <- ResponseStats{ + Backend: backend.Name, + Error: fmt.Errorf("skipped - marked as unavailable"), + Method: displayMethod, + Duration: 0, // No actual request made + } + continue + } + } + + // For expensive methods, only use the selected secondary backend + if preferSecondary && backend.Role != "primary" { + if selectedSecondary == nil || backend.Name != selectedSecondary.Name { + // Skip this secondary - it's not the selected one for expensive method + if enableDetailedLogs { + log.Printf("Skipping secondary backend %s - not selected for expensive method %s", + backend.Name, displayMethod) + } + statsChan <- ResponseStats{ + Backend: backend.Name, + Error: fmt.Errorf("skipped - not selected for expensive method"), + Method: displayMethod, + Duration: 0, + } + continue + } + } + // For expensive methods, skip primary backend if we prefer secondary and have good ones if preferSecondary && backend.Role == "primary" { // Still add primary to the pool but with a delay to let secondaries try first @@ -2946,3 +3177,59 @@ func getEnv(key, fallback string) string { } return fallback } + +// isBackendAvailable checks if a backend is currently marked as available +func (sp *SecondaryProbe) isBackendAvailable(backendName string) bool { + sp.mu.RLock() + defer sp.mu.RUnlock() + + available, exists := sp.backendAvailable[backendName] + return exists && available +} + +// getAvailableBackends returns a list of currently available backend names +func (sp *SecondaryProbe) getAvailableBackends() []string { + sp.mu.RLock() + defer sp.mu.RUnlock() + + var available []string + for name, isAvailable := range sp.backendAvailable { + if isAvailable { + available = append(available, name) + } + } + return available +} + +// updateBackendHealth updates the health status of a backend based on probe results +func (sp *SecondaryProbe) updateBackendHealth(backendName string, isSuccess bool) { + // This method should be called with the mutex already held (write lock) + + if isSuccess { + // Reset error count and update last success time + sp.backendErrorCount[backendName] = 0 + sp.backendLastSuccess[backendName] = time.Now() + + // If backend was marked unavailable, check if we should mark it available again + if !sp.backendAvailable[backendName] { + // For now, mark available immediately on first success + // Could be enhanced to require multiple consecutive successes + sp.backendAvailable[backendName] = true + if sp.enableDetailedLogs { + log.Printf("Backend %s marked as AVAILABLE (recovered from errors)", backendName) + } + } + } else { + // Increment error count + sp.backendErrorCount[backendName]++ + + // Check if we should mark backend as unavailable + if sp.backendAvailable[backendName] && sp.backendErrorCount[backendName] >= sp.maxErrorThreshold { + sp.backendAvailable[backendName] = false + if sp.enableDetailedLogs { + log.Printf("Backend %s marked as UNAVAILABLE (consecutive errors: %d)", + backendName, sp.backendErrorCount[backendName]) + } + } + } +}