From 91109e7a9437ccd70208a183e2e55d188feafb7f Mon Sep 17 00:00:00 2001 From: Para Dox Date: Fri, 30 May 2025 10:26:20 +0700 Subject: [PATCH] base sync fix --- benchmark-proxy/main.go | 66 +++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 29d2e2cc..50216858 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -107,14 +107,14 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) { return nil, fmt.Errorf("invalid JSON-RPC request format") } -// calculateBatchDelay determines the appropriate delay for a batch request -func calculateBatchDelay(methods []string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { +// calculateBatchDelay determines the appropriate delay for a batch request for a specific backend +func calculateBatchDelay(methods []string, backendName string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { var maxDelay time.Duration for _, method := range methods { var delay time.Duration if probe != nil { - delay = probe.getDelayForMethod(method) + delay = probe.getDelayForBackendAndMethod(backendName, method) } else { delay = stats.GetPrimaryP75ForMethod(method) } @@ -1138,6 +1138,24 @@ func (sc *StatsCollector) printSummary() { } } + if len(sc.secondaryProbe.backendTimings) > 0 { + fmt.Printf("Backend-Specific Minimum Latencies:\n") + // Sort backend names for consistent output + var backendNames []string + for backend := range sc.secondaryProbe.backendTimings { + backendNames = append(backendNames, backend) + } + sort.Strings(backendNames) + for _, backend := range backendNames { + timing := sc.secondaryProbe.backendTimings[backend] + fmt.Printf(" %s: %s (+ %s buffer = %s)\n", + backend, + formatDuration(timing), + formatDuration(sc.secondaryProbe.minDelayBuffer), + formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) + } + } + if sc.secondaryProbe.failureCount > 0 { fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount) } @@ -2283,36 +2301,18 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary } - // Get delay threshold for secondary backends - var secondaryDelay time.Duration - if batchInfo.IsBatch { - // For batch requests, use the maximum delay of all methods - secondaryDelay = calculateBatchDelay(batchInfo.Methods, secondaryProbe, statsCollector) - } else { - // For single requests, use method-specific delay - method := batchInfo.Methods[0] - if secondaryProbe != nil { - secondaryDelay = secondaryProbe.getDelayForMethod(method) - } else { - secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method) - } - } - if enableDetailedLogs { if batchInfo.IsBatch { - log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v), block tags: %v", - batchInfo.RequestCount, formatMethodList(batchInfo.Methods), - secondaryDelay, secondaryProbe != nil, batchInfo.BlockTags) + log.Printf("Batch request: %d requests, methods: %s, block tags: %v", + batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.BlockTags) } else { var blockTagInfo string if len(batchInfo.BlockTags) > 0 { blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0]) } - log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)%s", - displayMethod, secondaryDelay, secondaryProbe != nil, blockTagInfo) + log.Printf("Method: %s%s", displayMethod, blockTagInfo) } } - // Process backends with adaptive delay strategy var wg sync.WaitGroup var primaryWg sync.WaitGroup // Separate wait group for primary backend @@ -2395,7 +2395,23 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // If this is a secondary backend, wait for p75 delay if b.Role != "primary" { - delayTimer := time.NewTimer(secondaryDelay) + // Get backend-specific delay + var backendSpecificDelay time.Duration + if batchInfo.IsBatch { + // For batch requests, use the maximum delay of all methods for this backend + backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector) + } else if secondaryProbe != nil { + backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod) + } else { + // Fallback to method-based delay if no probe + backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod) + } + + if enableDetailedLogs { + log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod) + } + + delayTimer := time.NewTimer(backendSpecificDelay) select { case <-delayTimer.C: // Timer expired, primary is slow, proceed with secondary request