From 779a2e76f96ba1be2f74d0b306784817ed99b847 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Thu, 29 May 2025 10:53:36 +0700 Subject: [PATCH] a set of random fixes into the blue --- benchmark-proxy/main.go | 219 ++++++++++++++++++++++++++++++++++------ 1 file changed, 190 insertions(+), 29 deletions(-) diff --git a/benchmark-proxy/main.go b/benchmark-proxy/main.go index 9c40cb4e..08a2d1b6 100644 --- a/benchmark-proxy/main.go +++ b/benchmark-proxy/main.go @@ -26,6 +26,102 @@ type JSONRPCRequest struct { Method string `json:"method"` } +// BatchInfo contains information about a batch request +type BatchInfo struct { + IsBatch bool + Methods []string + RequestCount int + HasStateful bool +} + +// parseBatchInfo analyzes the request body to extract method information +func parseBatchInfo(body []byte) (*BatchInfo, error) { + // Check for empty body + if len(body) == 0 { + return nil, fmt.Errorf("empty request body") + } + + // Try parsing as array first (batch request) + var batchReqs []JSONRPCRequest + if err := json.Unmarshal(body, &batchReqs); err == nil { + // It's a batch request + info := &BatchInfo{ + IsBatch: true, + RequestCount: len(batchReqs), + Methods: make([]string, 0, len(batchReqs)), + } + + // Extract methods and check for stateful ones + methodSet := make(map[string]bool) // Track unique methods + for _, req := range batchReqs { + if req.Method != "" { + info.Methods = append(info.Methods, req.Method) + methodSet[req.Method] = true + if isStatefulMethod(req.Method) { + info.HasStateful = true + } + } + } + + return info, nil + } + + // Try parsing as single request + var singleReq JSONRPCRequest + if err := json.Unmarshal(body, &singleReq); err == nil { + return &BatchInfo{ + IsBatch: false, + Methods: []string{singleReq.Method}, + RequestCount: 1, + HasStateful: isStatefulMethod(singleReq.Method), + }, nil + } + + // Neither batch nor single request + return nil, fmt.Errorf("invalid JSON-RPC request format") +} + +// calculateBatchDelay determines the appropriate delay for a batch request +func calculateBatchDelay(methods []string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { + var maxDelay time.Duration + + for _, method := range methods { + var delay time.Duration + if probe != nil { + delay = probe.getDelayForMethod(method) + } else { + delay = stats.GetPrimaryP75ForMethod(method) + } + + if delay > maxDelay { + maxDelay = delay + } + } + + // If no methods or all unknown, use a default + if maxDelay == 0 { + if probe != nil { + return probe.minResponseTime + probe.minDelayBuffer + } + return 15 * time.Millisecond // Default fallback + } + + return maxDelay +} + +// formatMethodList creates a readable string from method list for logging +func formatMethodList(methods []string) string { + if len(methods) == 0 { + return "[]" + } + if len(methods) <= 3 { + return fmt.Sprintf("%v", methods) + } + // Show first 3 methods + count of remaining + return fmt.Sprintf("[%s, %s, %s, ... +%d more]", + methods[0], methods[1], methods[2], len(methods)-3) +} + type Backend struct { URL string Name string @@ -488,15 +584,20 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur // Keep tracking primary backend in the old way for backward compatibility if stat.Backend == "primary" { - if _, exists := sc.methodStats[stat.Method]; !exists { - sc.methodStats[stat.Method] = make([]time.Duration, 0, 100) - } - sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration) + // Handle batch requests specially for CU calculation + if strings.HasPrefix(stat.Method, "batch[") && len(stat.Method) > 6 { + // Don't track batch as a method, it will be handled separately + } else { + if _, exists := sc.methodStats[stat.Method]; !exists { + sc.methodStats[stat.Method] = make([]time.Duration, 0, 100) + } + sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration) - // Add CU for this method - cuValue := sc.methodCUPrices[stat.Method] - sc.totalCU += cuValue - sc.methodCU[stat.Method] += cuValue + // Add CU for this method + cuValue := sc.methodCUPrices[stat.Method] + sc.totalCU += cuValue + sc.methodCU[stat.Method] += cuValue + } } } } @@ -504,6 +605,32 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur sc.totalRequests++ } +// AddBatchStats adds statistics for a batch request +func (sc *StatsCollector) AddBatchStats(methods []string, duration time.Duration, backend string) { + sc.mu.Lock() + defer sc.mu.Unlock() + + // Calculate total CU for the batch + batchCU := 0 + for _, method := range methods { + if method != "" { + cuValue := sc.methodCUPrices[method] + batchCU += cuValue + + // Track individual method CU + sc.methodCU[method] += cuValue + + // Track method durations (use batch duration for each method) + if _, exists := sc.methodStats[method]; !exists { + sc.methodStats[method] = make([]time.Duration, 0, 100) + } + sc.methodStats[method] = append(sc.methodStats[method], duration) + } + } + + sc.totalCU += batchCU +} + func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) { sc.mu.Lock() defer sc.mu.Unlock() @@ -1603,31 +1730,54 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } defer r.Body.Close() - // Try to parse the method from the JSON-RPC request - method := "unknown" - var jsonRPCReq JSONRPCRequest - if err := json.Unmarshal(body, &jsonRPCReq); err == nil && jsonRPCReq.Method != "" { - method = jsonRPCReq.Method + // Parse request to extract method information (handles both single and batch) + batchInfo, err := parseBatchInfo(body) + if err != nil { + http.Error(w, "Invalid JSON-RPC request", http.StatusBadRequest) + return + } + + // For logging and stats, use the first method or "batch" for batch requests + var displayMethod string + var isStateful bool + + if batchInfo.IsBatch { + displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount) + isStateful = batchInfo.HasStateful + } else { + displayMethod = batchInfo.Methods[0] + if displayMethod == "" { + displayMethod = "unknown" + } + isStateful = batchInfo.HasStateful } // Get delay threshold for secondary backends var secondaryDelay time.Duration - if secondaryProbe != nil { - // Use probe-based delay - secondaryDelay = secondaryProbe.getDelayForMethod(method) + if batchInfo.IsBatch { + // For batch requests, use the maximum delay of all methods + secondaryDelay = calculateBatchDelay(batchInfo.Methods, secondaryProbe, statsCollector) } else { - // Fall back to p75 approach - secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method) + // For single requests, use method-specific delay + method := batchInfo.Methods[0] + if secondaryProbe != nil { + secondaryDelay = secondaryProbe.getDelayForMethod(method) + } else { + secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method) + } } if enableDetailedLogs { - log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", - method, secondaryDelay, secondaryProbe != nil) + if batchInfo.IsBatch { + log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v)", + batchInfo.RequestCount, formatMethodList(batchInfo.Methods), + secondaryDelay, secondaryProbe != nil) + } else { + log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", + displayMethod, secondaryDelay, secondaryProbe != nil) + } } - // Check if this is a stateful method that must go to primary only - isStateful := isStatefulMethod(method) - // Process backends with adaptive delay strategy var wg sync.WaitGroup var primaryWg sync.WaitGroup // Separate wait group for primary backend @@ -1684,7 +1834,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c statsChan <- ResponseStats{ Backend: b.Name, Error: fmt.Errorf("skipped - primary responded quickly"), - Method: method, + Method: displayMethod, Duration: time.Since(goroutineStartTime), } return @@ -1692,7 +1842,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Primary failed immediately, start secondary now delayTimer.Stop() if enableDetailedLogs { - log.Printf("Primary failed fast for %s, starting secondary immediately", method) + log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) } } } @@ -1703,7 +1853,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c statsChan <- ResponseStats{ Backend: b.Name, Error: err, - Method: method, + Method: displayMethod, Duration: time.Since(goroutineStartTime), // Include any wait time } return @@ -1734,7 +1884,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Backend: b.Name, Duration: reqDuration, // Keep backend-specific duration Error: err, - Method: method, + Method: displayMethod, } return } @@ -1763,7 +1913,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c Backend: b.Name, StatusCode: resp.StatusCode, Duration: reqDuration, // This is the backend-specific duration - Method: method, + Method: displayMethod, } // Try to be the first to respond @@ -1892,7 +2042,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c } log.Printf("Context cancelled while streaming response from backend '%s' (method: %s) after streaming for %s (total request time: %s) - reason: %s", - response.backend, method, streamingDuration, totalRequestDuration, reason) + response.backend, displayMethod, streamingDuration, totalRequestDuration, reason) } } } else { @@ -1950,6 +2100,17 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c // Send stats to collector statsCollector.AddStats(stats, 0) + + // If this was a successful batch request from primary, add batch stats for CU calculation + if batchInfo.IsBatch && response.err == nil && response.backend == "primary" { + // Find the primary backend stat with successful response + for _, stat := range stats { + if stat.Backend == "primary" && stat.Error == nil { + statsCollector.AddBatchStats(batchInfo.Methods, stat.Duration, "primary") + break + } + } + } }() // Return immediately after sending response to client