base sync fix

This commit is contained in:
Para Dox
2025-05-30 10:26:20 +07:00
parent 7b129c628b
commit 91109e7a94

View File

@@ -107,14 +107,14 @@ func parseBatchInfo(body []byte) (*BatchInfo, error) {
return nil, fmt.Errorf("invalid JSON-RPC request format") return nil, fmt.Errorf("invalid JSON-RPC request format")
} }
// calculateBatchDelay determines the appropriate delay for a batch request // calculateBatchDelay determines the appropriate delay for a batch request for a specific backend
func calculateBatchDelay(methods []string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { func calculateBatchDelay(methods []string, backendName string, probe *SecondaryProbe, stats *StatsCollector) time.Duration {
var maxDelay time.Duration var maxDelay time.Duration
for _, method := range methods { for _, method := range methods {
var delay time.Duration var delay time.Duration
if probe != nil { if probe != nil {
delay = probe.getDelayForMethod(method) delay = probe.getDelayForBackendAndMethod(backendName, method)
} else { } else {
delay = stats.GetPrimaryP75ForMethod(method) delay = stats.GetPrimaryP75ForMethod(method)
} }
@@ -1138,6 +1138,24 @@ func (sc *StatsCollector) printSummary() {
} }
} }
if len(sc.secondaryProbe.backendTimings) > 0 {
fmt.Printf("Backend-Specific Minimum Latencies:\n")
// Sort backend names for consistent output
var backendNames []string
for backend := range sc.secondaryProbe.backendTimings {
backendNames = append(backendNames, backend)
}
sort.Strings(backendNames)
for _, backend := range backendNames {
timing := sc.secondaryProbe.backendTimings[backend]
fmt.Printf(" %s: %s (+ %s buffer = %s)\n",
backend,
formatDuration(timing),
formatDuration(sc.secondaryProbe.minDelayBuffer),
formatDuration(timing+sc.secondaryProbe.minDelayBuffer))
}
}
if sc.secondaryProbe.failureCount > 0 { if sc.secondaryProbe.failureCount > 0 {
fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount) fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount)
} }
@@ -2283,36 +2301,18 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary requiresPrimaryDueToBlockTag = batchInfo.RequiresPrimary
} }
// Get delay threshold for secondary backends
var secondaryDelay time.Duration
if batchInfo.IsBatch {
// For batch requests, use the maximum delay of all methods
secondaryDelay = calculateBatchDelay(batchInfo.Methods, secondaryProbe, statsCollector)
} else {
// For single requests, use method-specific delay
method := batchInfo.Methods[0]
if secondaryProbe != nil {
secondaryDelay = secondaryProbe.getDelayForMethod(method)
} else {
secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method)
}
}
if enableDetailedLogs { if enableDetailedLogs {
if batchInfo.IsBatch { if batchInfo.IsBatch {
log.Printf("Batch request: %d requests, methods: %s, max delay: %s (probe-based: %v), block tags: %v", log.Printf("Batch request: %d requests, methods: %s, block tags: %v",
batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.RequestCount, formatMethodList(batchInfo.Methods), batchInfo.BlockTags)
secondaryDelay, secondaryProbe != nil, batchInfo.BlockTags)
} else { } else {
var blockTagInfo string var blockTagInfo string
if len(batchInfo.BlockTags) > 0 { if len(batchInfo.BlockTags) > 0 {
blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0]) blockTagInfo = fmt.Sprintf(", block tag: %s", batchInfo.BlockTags[0])
} }
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)%s", log.Printf("Method: %s%s", displayMethod, blockTagInfo)
displayMethod, secondaryDelay, secondaryProbe != nil, blockTagInfo)
} }
} }
// Process backends with adaptive delay strategy // Process backends with adaptive delay strategy
var wg sync.WaitGroup var wg sync.WaitGroup
var primaryWg sync.WaitGroup // Separate wait group for primary backend var primaryWg sync.WaitGroup // Separate wait group for primary backend
@@ -2395,7 +2395,23 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// If this is a secondary backend, wait for p75 delay // If this is a secondary backend, wait for p75 delay
if b.Role != "primary" { if b.Role != "primary" {
delayTimer := time.NewTimer(secondaryDelay) // Get backend-specific delay
var backendSpecificDelay time.Duration
if batchInfo.IsBatch {
// For batch requests, use the maximum delay of all methods for this backend
backendSpecificDelay = calculateBatchDelay(batchInfo.Methods, b.Name, secondaryProbe, statsCollector)
} else if secondaryProbe != nil {
backendSpecificDelay = secondaryProbe.getDelayForBackendAndMethod(b.Name, displayMethod)
} else {
// Fallback to method-based delay if no probe
backendSpecificDelay = statsCollector.GetPrimaryP75ForMethod(displayMethod)
}
if enableDetailedLogs {
log.Printf("Secondary backend %s waiting %s for method %s", b.Name, backendSpecificDelay, displayMethod)
}
delayTimer := time.NewTimer(backendSpecificDelay)
select { select {
case <-delayTimer.C: case <-delayTimer.C:
// Timer expired, primary is slow, proceed with secondary request // Timer expired, primary is slow, proceed with secondary request