diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 24bcfc15..954dd235 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -250,7 +250,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { // Don't count skipped secondary backends as errors - if stat.Error.Error() != "skipped - primary responded within p50" { + if stat.Error.Error() != "skipped - primary responded within p75" { sc.errorCount++ } else { // Track that we skipped a secondary request @@ -1077,6 +1077,55 @@ func (sc *StatsCollector) GetPrimaryP50() time.Duration { return primaryDurations[p50idx] } +// GetPrimaryP75ForMethod calculates the current p75 latency for a specific method on the primary backend +func (sc *StatsCollector) GetPrimaryP75ForMethod(method string) time.Duration { + sc.mu.Lock() + defer sc.mu.Unlock() + + // Get method-specific durations for primary backend + if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 { + // Make a copy to avoid modifying the original + durationsCopy := make([]time.Duration, len(durations)) + copy(durationsCopy, durations) + + // Sort and find p75 + sort.Slice(durationsCopy, func(i, j int) bool { + return durationsCopy[i] < durationsCopy[j] + }) + + p75idx := len(durationsCopy) * 75 / 100 + if p75idx >= len(durationsCopy) { + p75idx = len(durationsCopy) - 1 + } + return durationsCopy[p75idx] + } + + // If we don't have enough method-specific data, calculate global p75 here + // (instead of calling GetPrimaryP50 which would cause nested mutex lock) + var primaryDurations []time.Duration + for _, stat := range sc.requestStats { + if stat.Backend == "primary" && stat.Error == nil { + primaryDurations = append(primaryDurations, stat.Duration) + } + } + + // If we don't have enough data, return a sensible default + if len(primaryDurations) < 10 { + return 15 * time.Millisecond // Default to 15ms for p75 + } + + // Sort and find p75 + sort.Slice(primaryDurations, func(i, j int) bool { + return primaryDurations[i] < primaryDurations[j] + }) + + p75idx := len(primaryDurations) * 75 / 100 + if p75idx >= len(primaryDurations) { + p75idx = len(primaryDurations) - 1 + } + return primaryDurations[p75idx] +} + // GetPrimaryP50ForMethod calculates the current p50 latency for a specific method on the primary backend func (sc *StatsCollector) GetPrimaryP50ForMethod(method string) time.Duration { sc.mu.Lock() @@ -1240,11 +1289,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c method = jsonRPCReq.Method } - // Get current p50 delay for this specific method on primary backend - p50Delay := statsCollector.GetPrimaryP50ForMethod(method) + // Get current p75 delay for this specific method on primary backend + p75Delay := statsCollector.GetPrimaryP75ForMethod(method) if enableDetailedLogs { - log.Printf("Method: %s, P50 delay: %s", method, p50Delay) + log.Printf("Method: %s, P75 delay: %s", method, p75Delay) } // Check if this is a stateful method that must go to primary only @@ -1292,9 +1341,9 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c firstBackendStartTime.Store(&t) } - // If this is a secondary backend, wait for p50 delay + // If this is a secondary backend, wait for p75 delay if b.Role != "primary" { - delayTimer := time.NewTimer(p50Delay) + delayTimer := time.NewTimer(p75Delay) select { case <-delayTimer.C: // Timer expired, primary is slow, proceed with secondary request @@ -1305,7 +1354,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Still record that we skipped this backend statsChan <- ResponseStats{ Backend: b.Name, - Error: fmt.Errorf("skipped - primary responded within p50"), + Error: fmt.Errorf("skipped - primary responded within p75"), Method: method, Duration: time.Since(goroutineStartTime), }