diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index f0ba9c03..9c40cb4e 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -75,12 +75,30 @@ type StatsCollector struct { appStartTime time.Time // Application start time (never reset) intervalStartTime time.Time // Current interval start time (reset each interval) summaryInterval time.Duration - methodCUPrices map[string]int // Map of method names to CU prices - totalCU int // Total CU earned - methodCU map[string]int // Track CU earned per method - historicalCU []CUDataPoint // Historical CU data for different time windows - hasSecondaryBackends bool // Track if secondary backends are configured - skippedSecondaryRequests int // Track how many secondary requests were skipped + methodCUPrices map[string]int // Map of method names to CU prices + totalCU int // Total CU earned + methodCU map[string]int // Track CU earned per method + historicalCU []CUDataPoint // Historical CU data for different time windows + hasSecondaryBackends bool // Track if secondary backends are configured + skippedSecondaryRequests int // Track how many secondary requests were skipped + secondaryProbe *SecondaryProbe // Reference to secondary probe +} + +// SecondaryProbe maintains latency information for secondary backends through active probing +type SecondaryProbe struct { + mu sync.RWMutex + backends []Backend + client *http.Client + minResponseTime time.Duration // Overall minimum response time + methodTimings map[string]time.Duration // Per-method minimum response times + backendTimings map[string]time.Duration // Per-backend minimum response times + lastProbeTime time.Time + probeInterval time.Duration + minDelayBuffer time.Duration // Buffer to add to minimum times + probeMethods []string + enableDetailedLogs bool + failureCount int // Track consecutive probe failures + lastSuccessTime time.Time // Last time probes succeeded } func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { @@ -110,6 +128,199 @@ func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) return sc } +// SetSecondaryProbe sets the secondary probe reference after stats collector is created +func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) { + sc.mu.Lock() + defer sc.mu.Unlock() + sc.secondaryProbe = probe +} + +// NewSecondaryProbe creates a new secondary probe instance +func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, + minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { + + // Filter only secondary backends + var secondaryBackends []Backend + for _, b := range backends { + if b.Role == "secondary" { + secondaryBackends = append(secondaryBackends, b) + } + } + + if len(secondaryBackends) == 0 { + return nil + } + + sp := &SecondaryProbe{ + backends: secondaryBackends, + client: client, + minResponseTime: 15 * time.Millisecond, // Start with reasonable default + methodTimings: make(map[string]time.Duration), + backendTimings: make(map[string]time.Duration), + probeInterval: probeInterval, + minDelayBuffer: minDelayBuffer, + probeMethods: probeMethods, + enableDetailedLogs: enableDetailedLogs, + lastSuccessTime: time.Now(), + } + + // Run initial probe immediately + go func() { + sp.runProbe() + // Then start periodic probing + sp.startPeriodicProbing() + }() + + return sp +} + +// getDelayForMethod returns the appropriate delay for a given method +func (sp *SecondaryProbe) getDelayForMethod(method string) time.Duration { + sp.mu.RLock() + defer sp.mu.RUnlock() + + // If probes have been failing, use a conservative fallback + if sp.failureCount > 3 && time.Since(sp.lastSuccessTime) > 5*time.Minute { + return 20 * time.Millisecond // Conservative fallback + } + + // Use method-specific timing if available + if timing, exists := sp.methodTimings[method]; exists { + return timing + sp.minDelayBuffer + } + + // Fall back to general minimum + return sp.minResponseTime + sp.minDelayBuffer +} + +// getDelayForBackendAndMethod returns the appropriate delay for a specific backend and method +func (sp *SecondaryProbe) getDelayForBackendAndMethod(backend, method string) time.Duration { + sp.mu.RLock() + defer sp.mu.RUnlock() + + // Start with backend-specific timing + delay := sp.minResponseTime + if backendTiming, exists := sp.backendTimings[backend]; exists { + delay = backendTiming + } + + // Use method-specific timing if it's longer + if methodTiming, exists := sp.methodTimings[method]; exists && methodTiming > delay { + delay = methodTiming + } + + return delay + sp.minDelayBuffer +} + +// runProbe performs a single probe cycle to all secondary backends +func (sp *SecondaryProbe) runProbe() { + newMethodTimings := make(map[string]time.Duration) + newBackendTimings := make(map[string]time.Duration) + successfulProbes := 0 + + for _, backend := range sp.backends { + backendMin := time.Hour // Start with large value + + for _, method := range sp.probeMethods { + reqBody := []byte(fmt.Sprintf( + `{"jsonrpc":"2.0","method":"%s","params":[],"id":"probe-%d"}`, + method, time.Now().UnixNano(), + )) + + req, err := http.NewRequest("POST", backend.URL, bytes.NewReader(reqBody)) + if err != nil { + continue + } + + req.Header.Set("Content-Type", "application/json") + + start := time.Now() + resp, err := sp.client.Do(req) + duration := time.Since(start) + + if err == nil && resp != nil { + resp.Body.Close() + + if resp.StatusCode == 200 { + successfulProbes++ + + // Update method timing (use minimum across all backends) + if currentMin, exists := newMethodTimings[method]; !exists || duration < currentMin { + newMethodTimings[method] = duration + } + + // Track backend minimum + if duration < backendMin { + backendMin = duration + } + + if sp.enableDetailedLogs { + log.Printf("Probe: backend=%s method=%s duration=%s status=%d", + backend.Name, method, duration, resp.StatusCode) + } + } + } + } + + // Store backend minimum if we got any successful probes + if backendMin < time.Hour { + newBackendTimings[backend.Name] = backendMin + } + } + + // Update timings if we got successful probes + sp.mu.Lock() + defer sp.mu.Unlock() + + if successfulProbes > 0 { + sp.failureCount = 0 + sp.lastSuccessTime = time.Now() + + // Update method timings + for method, timing := range newMethodTimings { + sp.methodTimings[method] = timing + } + + // Update backend timings + for backend, timing := range newBackendTimings { + sp.backendTimings[backend] = timing + } + + // Update overall minimum + overallMin := time.Hour + for _, timing := range newBackendTimings { + if timing < overallMin { + overallMin = timing + } + } + if overallMin < time.Hour { + sp.minResponseTime = overallMin + } + + sp.lastProbeTime = time.Now() + + if sp.enableDetailedLogs { + log.Printf("Probe complete: min=%s methods=%v backends=%v", + sp.minResponseTime, sp.methodTimings, sp.backendTimings) + } + } else { + sp.failureCount++ + if sp.enableDetailedLogs { + log.Printf("Probe failed: consecutive failures=%d", sp.failureCount) + } + } +} + +// startPeriodicProbing runs probes at regular intervals +func (sp *SecondaryProbe) startPeriodicProbing() { + ticker := time.NewTicker(sp.probeInterval) + defer ticker.Stop() + + for range ticker.C { + sp.runProbe() + } +} + // initCUPrices initializes the map of method names to their CU prices func initCUPrices() map[string]int { return map[string]int{ @@ -251,7 +462,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { // Don't count skipped secondary backends as errors - if stat.Error.Error() != "skipped - primary responded within p75" { + if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { sc.errorCount++ } else { // Track that we skipped a secondary request @@ -374,6 +585,40 @@ func (sc *StatsCollector) printSummary() { fmt.Printf("Total HTTP Requests: %d\n", sc.totalRequests) fmt.Printf("Total WebSocket Connections: %d\n", sc.totalWsConnections) fmt.Printf("Error Rate: %.2f%%\n", float64(sc.errorCount)/float64(sc.totalRequests+sc.totalWsConnections)*100) + + // Display secondary probe information if available + if sc.secondaryProbe != nil { + sc.secondaryProbe.mu.RLock() + fmt.Printf("\n--- Secondary Probe Status ---\n") + fmt.Printf("Minimum Secondary Latency: %s\n", formatDuration(sc.secondaryProbe.minResponseTime)) + fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer)) + fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer)) + + if len(sc.secondaryProbe.methodTimings) > 0 { + fmt.Printf("Method-Specific Thresholds:\n") + // Sort methods for consistent output + var methods []string + for method := range sc.secondaryProbe.methodTimings { + methods = append(methods, method) + } + sort.Strings(methods) + for _, method := range methods { + timing := sc.secondaryProbe.methodTimings[method] + fmt.Printf(" %s: %s (+ %s buffer = %s)\n", + method, + formatDuration(timing), + formatDuration(sc.secondaryProbe.minDelayBuffer), + formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) + } + } + + if sc.secondaryProbe.failureCount > 0 { + fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount) + } + + sc.secondaryProbe.mu.RUnlock() + } + if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, @@ -1218,12 +1463,30 @@ func main() { summaryIntervalStr := getEnv("SUMMARY_INTERVAL", "60") // Default 60 seconds enableDetailedLogs := getEnv("ENABLE_DETAILED_LOGS", "false") // Default to disabled + // Secondary probe configuration + enableSecondaryProbing := getEnv("ENABLE_SECONDARY_PROBING", "true") == "true" + probeIntervalStr := getEnv("PROBE_INTERVAL", "10") // Default 10 seconds + minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer + probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId") + summaryInterval, err := strconv.Atoi(summaryIntervalStr) if err != nil { log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") summaryInterval = 60 } + probeInterval, err := strconv.Atoi(probeIntervalStr) + if err != nil { + log.Printf("Invalid PROBE_INTERVAL, using default of 10 seconds") + probeInterval = 10 + } + + minDelayBuffer, err := strconv.Atoi(minDelayBufferStr) + if err != nil { + log.Printf("Invalid MIN_DELAY_BUFFER, using default of 2ms") + minDelayBuffer = 2 + } + // Create stats collector for periodic summaries statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "") @@ -1249,6 +1512,9 @@ func main() { log.Printf("Starting benchmark proxy on %s", listenAddr) log.Printf("Primary backend: %s", primaryBackend) log.Printf("Secondary backends: %s", secondaryBackendsStr) + if enableSecondaryProbing && secondaryBackendsStr != "" { + log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer) + } // Set up HTTP client with reasonable timeouts client := &http.Client{ @@ -1261,6 +1527,31 @@ func main() { }, } + // Initialize secondary probe if enabled and we have secondary backends + var secondaryProbe *SecondaryProbe + if enableSecondaryProbing && secondaryBackendsStr != "" { + probeMethods := strings.Split(probeMethodsStr, ",") + for i := range probeMethods { + probeMethods[i] = strings.TrimSpace(probeMethods[i]) + } + + secondaryProbe = NewSecondaryProbe( + backends, + client, + time.Duration(probeInterval)*time.Second, + time.Duration(minDelayBuffer)*time.Millisecond, + probeMethods, + enableDetailedLogs == "true", + ) + + if secondaryProbe == nil { + log.Printf("Secondary probe initialization failed - no secondary backends found") + } else { + // Set the probe in stats collector for display + statsCollector.SetSecondaryProbe(secondaryProbe) + } + } + // Configure websocket upgrader with larger buffer sizes // 20MB frame size and 50MB message size const ( @@ -1281,14 +1572,14 @@ func main() { handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) } else { // Handle regular HTTP request - handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector) + handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe) } }) log.Fatal(http.ListenAndServe(listenAddr, nil)) } -func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) { +func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) { startTime := time.Now() // Create a context that will cancel after 35 seconds (5s buffer over backend timeout) @@ -1319,11 +1610,19 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c method = jsonRPCReq.Method } - // Get current p75 delay for this specific method on primary backend - p75Delay := statsCollector.GetPrimaryP75ForMethod(method) + // Get delay threshold for secondary backends + var secondaryDelay time.Duration + if secondaryProbe != nil { + // Use probe-based delay + secondaryDelay = secondaryProbe.getDelayForMethod(method) + } else { + // Fall back to p75 approach + secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method) + } if enableDetailedLogs { - log.Printf("Method: %s, P75 delay: %s", method, p75Delay) + log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", + method, secondaryDelay, secondaryProbe != nil) } // Check if this is a stateful method that must go to primary only @@ -1373,7 +1672,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // If this is a secondary backend, wait for p75 delay if b.Role != "primary" { - delayTimer := time.NewTimer(p75Delay) + delayTimer := time.NewTimer(secondaryDelay) select { case <-delayTimer.C: // Timer expired, primary is slow, proceed with secondary request @@ -1384,7 +1683,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Still record that we skipped this backend statsChan <- ResponseStats{ Backend: b.Name, - Error: fmt.Errorf("skipped - primary responded within p75"), + Error: fmt.Errorf("skipped - primary responded quickly"), Method: method, Duration: time.Since(goroutineStartTime), }