From f6161e575daf80bf5fe486dc81d075b8b01b3db3 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Wed, 28 May 2025 22:30:28 +0700 Subject: [PATCH] more features --- benchmark-proxy/main.go | 243 +++++++++++++++++++++++++++++++++------- 1 file changed, 202 insertions(+), 41 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 82a06f45..7a10ba4f 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -59,43 +59,47 @@ type CUDataPoint struct { // StatsCollector maintains statistics for periodic summaries type StatsCollector struct { - mu sync.Mutex - requestStats []ResponseStats - methodStats map[string][]time.Duration // Track durations by method - backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method - backendWins map[string]int // Track how many times each backend responded first - methodBackendWins map[string]map[string]int // Track wins per method per backend - firstResponseDurations []time.Duration // Track durations of first successful responses - methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method - totalRequests int - errorCount int - wsConnections []WebSocketStats // Track websocket connections - totalWsConnections int - appStartTime time.Time // Application start time (never reset) - intervalStartTime time.Time // Current interval start time (reset each interval) - summaryInterval time.Duration - methodCUPrices map[string]int // Map of method names to CU prices - totalCU int // Total CU earned - methodCU map[string]int // Track CU earned per method - historicalCU []CUDataPoint // Historical CU data for different time windows + mu sync.Mutex + requestStats []ResponseStats + methodStats map[string][]time.Duration // Track durations by method + backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method + backendWins map[string]int // Track how many times each backend responded first + methodBackendWins map[string]map[string]int // Track wins per method per backend + firstResponseDurations []time.Duration // Track durations of first successful responses (from winning backend's perspective) + actualFirstResponseDurations []time.Duration // Track actual user-experienced durations + methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method (winning backend's perspective) + methodActualFirstResponseDurations map[string][]time.Duration // Track actual user-experienced durations by method + totalRequests int + errorCount int + wsConnections []WebSocketStats // Track websocket connections + totalWsConnections int + appStartTime time.Time // Application start time (never reset) + intervalStartTime time.Time // Current interval start time (reset each interval) + summaryInterval time.Duration + methodCUPrices map[string]int // Map of method names to CU prices + totalCU int // Total CU earned + methodCU map[string]int // Track CU earned per method + historicalCU []CUDataPoint // Historical CU data for different time windows } func NewStatsCollector(summaryInterval time.Duration) *StatsCollector { now := time.Now() sc := &StatsCollector{ - requestStats: make([]ResponseStats, 0, 1000), - methodStats: make(map[string][]time.Duration), - backendMethodStats: make(map[string]map[string][]time.Duration), - backendWins: make(map[string]int), - methodBackendWins: make(map[string]map[string]int), - firstResponseDurations: make([]time.Duration, 0, 1000), - methodFirstResponseDurations: make(map[string][]time.Duration), - appStartTime: now, - intervalStartTime: now, - summaryInterval: summaryInterval, - methodCUPrices: initCUPrices(), // Initialize CU prices - methodCU: make(map[string]int), - historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals + requestStats: make([]ResponseStats, 0, 1000), + methodStats: make(map[string][]time.Duration), + backendMethodStats: make(map[string]map[string][]time.Duration), + backendWins: make(map[string]int), + methodBackendWins: make(map[string]map[string]int), + firstResponseDurations: make([]time.Duration, 0, 1000), + actualFirstResponseDurations: make([]time.Duration, 0, 1000), + methodFirstResponseDurations: make(map[string][]time.Duration), + methodActualFirstResponseDurations: make(map[string][]time.Duration), + appStartTime: now, + intervalStartTime: now, + summaryInterval: summaryInterval, + methodCUPrices: initCUPrices(), // Initialize CU prices + methodCU: make(map[string]int), + historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals } // Start the periodic summary goroutine @@ -185,16 +189,24 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.mu.Lock() defer sc.mu.Unlock() - // Find the fastest successful response + // Find the fastest successful response and actual first response var fastestBackend string var fastestDuration time.Duration = time.Hour // Initialize with a very large duration + var actualFirstDuration time.Duration var method string + var hasActualFirst bool for _, stat := range stats { - if stat.Error == nil && stat.Duration < fastestDuration { + if stat.Backend == "actual-first-response" { + actualFirstDuration = stat.Duration + hasActualFirst = true + method = stat.Method + } else if stat.Error == nil && stat.Duration < fastestDuration { fastestDuration = stat.Duration fastestBackend = stat.Backend - method = stat.Method + if method == "" { + method = stat.Method + } } } @@ -208,7 +220,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur } sc.methodBackendWins[method][fastestBackend]++ - // Track first response duration + // Track first response duration (from winning backend's perspective) sc.firstResponseDurations = append(sc.firstResponseDurations, fastestDuration) // Track first response duration by method @@ -216,10 +228,24 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.methodFirstResponseDurations[method] = make([]time.Duration, 0, 100) } sc.methodFirstResponseDurations[method] = append(sc.methodFirstResponseDurations[method], fastestDuration) + + // Track actual first response duration if available + if hasActualFirst { + sc.actualFirstResponseDurations = append(sc.actualFirstResponseDurations, actualFirstDuration) + + if _, exists := sc.methodActualFirstResponseDurations[method]; !exists { + sc.methodActualFirstResponseDurations[method] = make([]time.Duration, 0, 100) + } + sc.methodActualFirstResponseDurations[method] = append(sc.methodActualFirstResponseDurations[method], actualFirstDuration) + } } - // Add stats to the collection + // Add stats to the collection (skip actual-first-response as it's synthetic) for _, stat := range stats { + if stat.Backend == "actual-first-response" { + continue // Don't add synthetic entries to regular stats + } + sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { sc.errorCount++ @@ -457,7 +483,39 @@ func (sc *StatsCollector) printSummary() { "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") fmt.Printf("%s\n", strings.Repeat("-", 100)) - // First, show the "First Response" merged statistics + // First, show the actual user latency if available + if len(sc.actualFirstResponseDurations) > 0 { + actualDurations := make([]time.Duration, len(sc.actualFirstResponseDurations)) + copy(actualDurations, sc.actualFirstResponseDurations) + + sort.Slice(actualDurations, func(i, j int) bool { + return actualDurations[i] < actualDurations[j] + }) + + var sum time.Duration + for _, d := range actualDurations { + sum += d + } + + avg := sum / time.Duration(len(actualDurations)) + min := actualDurations[0] + max := actualDurations[len(actualDurations)-1] + + p50idx := len(actualDurations) * 50 / 100 + p90idx := len(actualDurations) * 90 / 100 + p99idx := minInt(len(actualDurations)-1, len(actualDurations)*99/100) + + p50 := actualDurations[p50idx] + p90 := actualDurations[p90idx] + p99 := actualDurations[p99idx] + + fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", + "User Latency", len(actualDurations), + formatDuration(min), formatDuration(avg), formatDuration(max), + formatDuration(p50), formatDuration(p90), formatDuration(p99)) + } + + // Then show the winner's time (what backend actually took) if len(sc.firstResponseDurations) > 0 { firstRespDurations := make([]time.Duration, len(sc.firstResponseDurations)) copy(firstRespDurations, sc.firstResponseDurations) @@ -484,7 +542,7 @@ func (sc *StatsCollector) printSummary() { p99 := firstRespDurations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", - "First Response", len(firstRespDurations), + "Winner's Time", len(firstRespDurations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) fmt.Printf("%s\n", strings.Repeat("-", 100)) @@ -645,6 +703,11 @@ func (sc *StatsCollector) printSummary() { fmt.Printf("\n Method: %s (Total requests: %d)\n", method, methodList[i].count) + // Check if this is a stateful method + if isStatefulMethod(method) { + fmt.Printf(" Note: Stateful method - only sent to primary backend\n") + } + // Show wins for this method if available if methodWins, exists := sc.methodBackendWins[method]; exists { fmt.Printf(" First Response Wins: ") @@ -716,7 +779,46 @@ func (sc *StatsCollector) printSummary() { formatDuration(p50), formatDuration(p90), formatDuration(p99)) } - // Show First Response statistics for this method + // Show User Latency for this method if available + if methodActualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(methodActualDurations) > 0 { + // Make a copy and sort + durations := make([]time.Duration, len(methodActualDurations)) + copy(durations, methodActualDurations) + + sort.Slice(durations, func(i, j int) bool { + return durations[i] < durations[j] + }) + + var sum time.Duration + for _, d := range durations { + sum += d + } + + avg := sum / time.Duration(len(durations)) + min := durations[0] + max := durations[len(durations)-1] + + p50 := min + p90 := min + p99 := min + + if len(durations) >= 2 { + p50idx := len(durations) * 50 / 100 + p90idx := len(durations) * 90 / 100 + p99idx := minInt(len(durations)-1, len(durations)*99/100) + + p50 = durations[p50idx] + p90 = durations[p90idx] + p99 = durations[p99idx] + } + + fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", + "User Latency", len(durations), + formatDuration(min), formatDuration(avg), formatDuration(max), + formatDuration(p50), formatDuration(p90), formatDuration(p99)) + } + + // Show Winner's Time statistics for this method if methodFirstDurations, exists := sc.methodFirstResponseDurations[method]; exists && len(methodFirstDurations) > 0 { // Make a copy and sort durations := make([]time.Duration, len(methodFirstDurations)) @@ -750,7 +852,7 @@ func (sc *StatsCollector) printSummary() { } fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", - "First Response", len(durations), + "Winner's Time", len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) fmt.Printf(" %s\n", strings.Repeat("-", 98)) @@ -809,9 +911,17 @@ func (sc *StatsCollector) printSummary() { } else { sc.firstResponseDurations = sc.firstResponseDurations[:0] } + if len(sc.actualFirstResponseDurations) > 1000 { + sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[len(sc.actualFirstResponseDurations)-1000:] + } else { + sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[:0] + } for method := range sc.methodFirstResponseDurations { sc.methodFirstResponseDurations[method] = sc.methodFirstResponseDurations[method][:0] } + for method := range sc.methodActualFirstResponseDurations { + sc.methodActualFirstResponseDurations[method] = sc.methodActualFirstResponseDurations[method][:0] + } // Reset CU counters for the next interval sc.totalCU = 0 @@ -934,6 +1044,29 @@ func (sc *StatsCollector) GetPrimaryP50() time.Duration { return primaryDurations[p50idx] } +// isStatefulMethod returns true if the method requires session state and must always go to primary +func isStatefulMethod(method string) bool { + statefulMethods := map[string]bool{ + // Filter methods - these create server-side state + "eth_newFilter": true, + "eth_newBlockFilter": true, + "eth_newPendingTransactionFilter": true, + "eth_getFilterChanges": true, + "eth_getFilterLogs": true, + "eth_uninstallFilter": true, + + // Subscription methods (WebSocket) - maintain persistent connections + "eth_subscribe": true, + "eth_unsubscribe": true, + "eth_subscription": true, // Notification method + + // Some debug/trace methods might maintain state depending on implementation + // But these are typically stateless, so not included here + } + + return statefulMethods[method] +} + func main() { // Get configuration from environment variables listenAddr := getEnv("LISTEN_ADDR", ":8080") @@ -1034,6 +1167,9 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Get current p50 delay for primary backend p50Delay := statsCollector.GetPrimaryP50() + // Check if this is a stateful method that must go to primary only + isStateful := isStatefulMethod(method) + // Process backends with adaptive delay strategy var wg sync.WaitGroup statsChan := make(chan ResponseStats, len(backends)) @@ -1052,6 +1188,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c var responseHandled atomic.Bool for _, backend := range backends { + // Skip secondary backends for stateful methods + if isStateful && backend.Role != "primary" { + continue + } + wg.Add(1) go func(b Backend) { defer wg.Done() @@ -1204,6 +1345,26 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c if enableDetailedLogs { logResponseStats(totalDuration, stats) } + + // Add the actual user-experienced duration for the winning response + if response.err == nil && response.backend != "" { + // Find the stat for the winning backend and update it with the actual user-experienced duration + for i := range stats { + if stats[i].Backend == response.backend && stats[i].Error == nil { + // Create a special stat entry for the actual first response time + actualFirstResponseStat := ResponseStats{ + Backend: "actual-first-response", + StatusCode: stats[i].StatusCode, + Duration: time.Since(startTime), // This is what the user actually experienced + Error: nil, + Method: stats[i].Method, + } + stats = append(stats, actualFirstResponseStat) + break + } + } + } + return stats }