a set of random fixes into the blue

This commit is contained in:
Para Dox
2025-05-29 10:53:36 +07:00
parent c5710c33b1
commit 779a2e76f9

View File

@@ -26,6 +26,102 @@ type JSONRPCRequest struct {
Method string `json:"method"` Method string `json:"method"`
} }
// BatchInfo contains information about a batch request
type BatchInfo struct {
IsBatch bool
Methods []string
RequestCount int
HasStateful bool
}
// parseBatchInfo analyzes the request body to extract method information
func parseBatchInfo(body []byte) (*BatchInfo, error) {
// Check for empty body
if len(body) == 0 {
return nil, fmt.Errorf("empty request body")
}
// Try parsing as array first (batch request)
var batchReqs []JSONRPCRequest
if err := json.Unmarshal(body, &batchReqs); err == nil {
// It's a batch request
info := &BatchInfo{
IsBatch: true,
RequestCount: len(batchReqs),
Methods: make([]string, 0, len(batchReqs)),
}
// Extract methods and check for stateful ones
methodSet := make(map[string]bool) // Track unique methods
for _, req := range batchReqs {
if req.Method != "" {
info.Methods = append(info.Methods, req.Method)
methodSet[req.Method] = true
if isStatefulMethod(req.Method) {
info.HasStateful = true
}
}
}
return info, nil
}
// Try parsing as single request
var singleReq JSONRPCRequest
if err := json.Unmarshal(body, &singleReq); err == nil {
return &BatchInfo{
IsBatch: false,
Methods: []string{singleReq.Method},
RequestCount: 1,
HasStateful: isStatefulMethod(singleReq.Method),
}, nil
}
// Neither batch nor single request
return nil, fmt.Errorf("invalid JSON-RPC request format")
}
// calculateBatchDelay determines the appropriate delay for a batch request
func calculateBatchDelay(methods []string, probe *SecondaryProbe, stats *StatsCollector) time.Duration {
var maxDelay time.Duration
for _, method := range methods {
var delay time.Duration
if probe != nil {
delay = probe.getDelayForMethod(method)
} else {
delay = stats.GetPrimaryP75ForMethod(method)
}
if delay > maxDelay {
maxDelay = delay
}
}
// If no methods or all unknown, use a default
if maxDelay == 0 {
if probe != nil {
return probe.minResponseTime + probe.minDelayBuffer
}
return 15 * time.Millisecond // Default fallback
}
return maxDelay
}
// formatMethodList creates a readable string from method list for logging
func formatMethodList(methods []string) string {
if len(methods) == 0 {
return "[]"
}
if len(methods) <= 3 {
return fmt.Sprintf("%v", methods)
}
// Show first 3 methods + count of remaining
return fmt.Sprintf("[%s, %s, %s, ... +%d more]",
methods[0], methods[1], methods[2], len(methods)-3)
}
type Backend struct { type Backend struct {
URL string URL string
Name string Name string
@@ -488,15 +584,20 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
// Keep tracking primary backend in the old way for backward compatibility // Keep tracking primary backend in the old way for backward compatibility
if stat.Backend == "primary" { if stat.Backend == "primary" {
if _, exists := sc.methodStats[stat.Method]; !exists { // Handle batch requests specially for CU calculation
sc.methodStats[stat.Method] = make([]time.Duration, 0, 100) if strings.HasPrefix(stat.Method, "batch[") && len(stat.Method) > 6 {
} // Don't track batch as a method, it will be handled separately
sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration) } else {
if _, exists := sc.methodStats[stat.Method]; !exists {
sc.methodStats[stat.Method] = make([]time.Duration, 0, 100)
}
sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration)
// Add CU for this method // Add CU for this method
cuValue := sc.methodCUPrices[stat.Method] cuValue := sc.methodCUPrices[stat.Method]
sc.totalCU += cuValue sc.totalCU += cuValue
sc.methodCU[stat.Method] += cuValue sc.methodCU[stat.Method] += cuValue
}
} }
} }
} }
@@ -504,6 +605,32 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
sc.totalRequests++ sc.totalRequests++
} }
// AddBatchStats adds statistics for a batch request
func (sc *StatsCollector) AddBatchStats(methods []string, duration time.Duration, backend string) {
sc.mu.Lock()
defer sc.mu.Unlock()
// Calculate total CU for the batch
batchCU := 0
for _, method := range methods {
if method != "" {
cuValue := sc.methodCUPrices[method]
batchCU += cuValue
// Track individual method CU
sc.methodCU[method] += cuValue
// Track method durations (use batch duration for each method)
if _, exists := sc.methodStats[method]; !exists {
sc.methodStats[method] = make([]time.Duration, 0, 100)
}
sc.methodStats[method] = append(sc.methodStats[method], duration)
}
}
sc.totalCU += batchCU
}
func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) { func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) {
sc.mu.Lock() sc.mu.Lock()
defer sc.mu.Unlock() defer sc.mu.Unlock()
@@ -1603,31 +1730,54 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
} }
defer r.Body.Close() defer r.Body.Close()
// Try to parse the method from the JSON-RPC request // Parse request to extract method information (handles both single and batch)
method := "unknown" batchInfo, err := parseBatchInfo(body)
var jsonRPCReq JSONRPCRequest if err != nil {
if err := json.Unmarshal(body, &jsonRPCReq); err == nil && jsonRPCReq.Method != "" { http.Error(w, "Invalid JSON-RPC request", http.StatusBadRequest)
method = jsonRPCReq.Method return
}
// For logging and stats, use the first method or "batch" for batch requests
var displayMethod string
var isStateful bool
if batchInfo.IsBatch {
displayMethod = fmt.Sprintf("batch[%d]", batchInfo.RequestCount)
isStateful = batchInfo.HasStateful
} else {
displayMethod = batchInfo.Methods[0]
if displayMethod == "" {
displayMethod = "unknown"
}
isStateful = batchInfo.HasStateful
} }
// Get delay threshold for secondary backends // Get delay threshold for secondary backends
var secondaryDelay time.Duration var secondaryDelay time.Duration
if secondaryProbe != nil { if batchInfo.IsBatch {
// Use probe-based delay // For batch requests, use the maximum delay of all methods
secondaryDelay = secondaryProbe.getDelayForMethod(method) secondaryDelay = calculateBatchDelay(batchInfo.Methods, secondaryProbe, statsCollector)
} else { } else {
// Fall back to p75 approach // For single requests, use method-specific delay
secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method) method := batchInfo.Methods[0]
if secondaryProbe != nil {
secondaryDelay = secondaryProbe.getDelayForMethod(method)
} else {
secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method)
}
} }
if enableDetailedLogs { if enableDetailedLogs {
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)", if batchInfo.IsBatch {
method, secondaryDelay, secondaryProbe != nil) log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v)",
batchInfo.RequestCount, formatMethodList(batchInfo.Methods),
secondaryDelay, secondaryProbe != nil)
} else {
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)",
displayMethod, secondaryDelay, secondaryProbe != nil)
}
} }
// Check if this is a stateful method that must go to primary only
isStateful := isStatefulMethod(method)
// Process backends with adaptive delay strategy // Process backends with adaptive delay strategy
var wg sync.WaitGroup var wg sync.WaitGroup
var primaryWg sync.WaitGroup // Separate wait group for primary backend var primaryWg sync.WaitGroup // Separate wait group for primary backend
@@ -1684,7 +1834,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
statsChan <- ResponseStats{ statsChan <- ResponseStats{
Backend: b.Name, Backend: b.Name,
Error: fmt.Errorf("skipped - primary responded quickly"), Error: fmt.Errorf("skipped - primary responded quickly"),
Method: method, Method: displayMethod,
Duration: time.Since(goroutineStartTime), Duration: time.Since(goroutineStartTime),
} }
return return
@@ -1692,7 +1842,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// Primary failed immediately, start secondary now // Primary failed immediately, start secondary now
delayTimer.Stop() delayTimer.Stop()
if enableDetailedLogs { if enableDetailedLogs {
log.Printf("Primary failed fast for %s, starting secondary immediately", method) log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod)
} }
} }
} }
@@ -1703,7 +1853,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
statsChan <- ResponseStats{ statsChan <- ResponseStats{
Backend: b.Name, Backend: b.Name,
Error: err, Error: err,
Method: method, Method: displayMethod,
Duration: time.Since(goroutineStartTime), // Include any wait time Duration: time.Since(goroutineStartTime), // Include any wait time
} }
return return
@@ -1734,7 +1884,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
Backend: b.Name, Backend: b.Name,
Duration: reqDuration, // Keep backend-specific duration Duration: reqDuration, // Keep backend-specific duration
Error: err, Error: err,
Method: method, Method: displayMethod,
} }
return return
} }
@@ -1763,7 +1913,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
Backend: b.Name, Backend: b.Name,
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
Duration: reqDuration, // This is the backend-specific duration Duration: reqDuration, // This is the backend-specific duration
Method: method, Method: displayMethod,
} }
// Try to be the first to respond // Try to be the first to respond
@@ -1892,7 +2042,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
} }
log.Printf("Context cancelled while streaming response from backend '%s' (method: %s) after streaming for %s (total request time: %s) - reason: %s", log.Printf("Context cancelled while streaming response from backend '%s' (method: %s) after streaming for %s (total request time: %s) - reason: %s",
response.backend, method, streamingDuration, totalRequestDuration, reason) response.backend, displayMethod, streamingDuration, totalRequestDuration, reason)
} }
} }
} else { } else {
@@ -1950,6 +2100,17 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// Send stats to collector // Send stats to collector
statsCollector.AddStats(stats, 0) statsCollector.AddStats(stats, 0)
// If this was a successful batch request from primary, add batch stats for CU calculation
if batchInfo.IsBatch && response.err == nil && response.backend == "primary" {
// Find the primary backend stat with successful response
for _, stat := range stats {
if stat.Backend == "primary" && stat.Error == nil {
statsCollector.AddBatchStats(batchInfo.Methods, stat.Duration, "primary")
break
}
}
}
}() }()
// Return immediately after sending response to client // Return immediately after sending response to client