The system now intelligently manages backend health, ensuring that only responsive backends handle traffic while automatically recovering failed backends when they become available again. This provides much more robust handling of secondary backend failures.

This commit is contained in:
Para Dox
2025-05-29 20:06:27 +07:00
parent 318c4d26f5
commit f9ea6d118c

View File

@@ -189,6 +189,9 @@ type SecondaryProbe struct {
minResponseTime time.Duration // Overall minimum response time
methodTimings map[string]time.Duration // Per-method minimum response times
backendTimings map[string]time.Duration // Per-backend minimum response times
backendAvailable map[string]bool // Track if backends are available (responding without errors)
backendErrorCount map[string]int // Track consecutive error count per backend
backendLastSuccess map[string]time.Time // Track last successful probe time per backend
lastProbeTime time.Time
probeInterval time.Duration
minDelayBuffer time.Duration // Buffer to add to minimum times
@@ -196,6 +199,8 @@ type SecondaryProbe struct {
enableDetailedLogs bool
failureCount int // Track consecutive probe failures
lastSuccessTime time.Time // Last time probes succeeded
maxErrorThreshold int // Maximum consecutive errors before marking backend unavailable
recoveryThreshold int // Number of consecutive successes needed to mark backend available again
}
// BlockHeightTracker monitors block heights from different backends using WebSocket subscriptions
@@ -530,11 +535,23 @@ func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval ti
minResponseTime: 15 * time.Millisecond, // Start with reasonable default
methodTimings: make(map[string]time.Duration),
backendTimings: make(map[string]time.Duration),
backendAvailable: make(map[string]bool),
backendErrorCount: make(map[string]int),
backendLastSuccess: make(map[string]time.Time),
probeInterval: probeInterval,
minDelayBuffer: minDelayBuffer,
probeMethods: probeMethods,
enableDetailedLogs: enableDetailedLogs,
lastSuccessTime: time.Now(),
maxErrorThreshold: 5, // Mark backend unavailable after 5 consecutive errors
recoveryThreshold: 3, // Require 3 consecutive successes to mark backend available again
}
// Initialize all backends as available initially
for _, backend := range secondaryBackends {
sp.backendAvailable[backend.Name] = true
sp.backendErrorCount[backend.Name] = 0
sp.backendLastSuccess[backend.Name] = time.Now()
}
// Run initial probe immediately
@@ -592,7 +609,8 @@ func (sp *SecondaryProbe) runProbe() {
successfulProbes := 0
for _, backend := range sp.backends {
backendMin := time.Hour // Start with large value
backendMin := time.Hour // Start with large value
backendSuccessCount := 0 // Track successes for this backend across all methods
for _, method := range sp.probeMethods {
methodMin := time.Hour // Track minimum for this method on this backend
@@ -619,11 +637,11 @@ func (sp *SecondaryProbe) runProbe() {
duration := time.Since(start)
if err == nil && resp != nil {
resp.Body.Close()
if resp.StatusCode == 200 {
// Check for both network success and HTTP success
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
methodSuccesses++
successfulProbes++
backendSuccessCount++
// Track minimum for this method on this backend
if duration < methodMin {
@@ -634,6 +652,19 @@ func (sp *SecondaryProbe) runProbe() {
log.Printf("Probe %d/10: backend=%s method=%s duration=%s status=%d (min so far: %s)",
probe+1, backend.Name, method, duration, resp.StatusCode, methodMin)
}
} else {
// HTTP error status (4xx, 5xx)
if sp.enableDetailedLogs {
log.Printf("Probe %d/10: backend=%s method=%s HTTP error status=%d",
probe+1, backend.Name, method, resp.StatusCode)
}
}
resp.Body.Close()
} else {
// Network error
if sp.enableDetailedLogs {
log.Printf("Probe %d/10: backend=%s method=%s network error: %v",
probe+1, backend.Name, method, err)
}
}
@@ -666,6 +697,25 @@ func (sp *SecondaryProbe) runProbe() {
if backendMin < time.Hour {
newBackendTimings[backend.Name] = backendMin
}
// Update backend health based on overall success rate for this probe cycle
// Consider a backend healthy if it had at least some successful responses
sp.mu.Lock()
sp.updateBackendHealth(backend.Name, backendSuccessCount > 0)
sp.mu.Unlock()
if sp.enableDetailedLogs {
sp.mu.RLock()
availability := "AVAILABLE"
if !sp.backendAvailable[backend.Name] {
availability = "UNAVAILABLE"
}
errorCount := sp.backendErrorCount[backend.Name]
sp.mu.RUnlock()
log.Printf("Backend %s probe cycle complete: %d/%d total successes, status: %s (error count: %d)",
backend.Name, backendSuccessCount, len(sp.probeMethods)*10, availability, errorCount)
}
}
// Update timings if we got successful probes
@@ -700,13 +750,16 @@ func (sp *SecondaryProbe) runProbe() {
sp.lastProbeTime = time.Now()
if sp.enableDetailedLogs {
log.Printf("Probe complete: min=%s methods=%v backends=%v",
sp.minResponseTime, sp.methodTimings, sp.backendTimings)
availableBackends := sp.getAvailableBackends()
log.Printf("Probe complete: min=%s methods=%v backends=%v available_backends=%v",
sp.minResponseTime, sp.methodTimings, sp.backendTimings, availableBackends)
}
} else {
sp.failureCount++
if sp.enableDetailedLogs {
log.Printf("Probe failed: consecutive failures=%d", sp.failureCount)
availableBackends := sp.getAvailableBackends()
log.Printf("Probe failed: consecutive failures=%d available_backends=%v",
sp.failureCount, availableBackends)
}
}
}
@@ -1038,6 +1091,29 @@ func (sc *StatsCollector) printSummary() {
fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer))
fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer))
// Display backend availability status
fmt.Printf("Backend Health Status:\n")
backendNames := make([]string, 0, len(sc.secondaryProbe.backendAvailable))
for name := range sc.secondaryProbe.backendAvailable {
backendNames = append(backendNames, name)
}
sort.Strings(backendNames)
for _, name := range backendNames {
available := sc.secondaryProbe.backendAvailable[name]
errorCount := sc.secondaryProbe.backendErrorCount[name]
lastSuccess := sc.secondaryProbe.backendLastSuccess[name]
status := "AVAILABLE"
if !available {
status = "UNAVAILABLE"
}
timeSinceSuccess := time.Since(lastSuccess)
fmt.Printf(" %s: %s (errors: %d, last success: %s ago)\n",
name, status, errorCount, timeSinceSuccess.Round(time.Second))
}
if len(sc.secondaryProbe.methodTimings) > 0 {
fmt.Printf("Method-Specific Thresholds:\n")
// Sort methods for consistent output
@@ -2019,20 +2095,127 @@ func (sc *StatsCollector) isExpensiveMethodByStats(method string) bool {
return minDuration >= expensiveThreshold
}
// hasAvailableSecondaryAtChainHead checks if there are synchronized secondary backends available
func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker) bool {
// hasAvailableSecondaryAtChainHead checks if there are synchronized and available secondary backends
func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker, secondaryProbe *SecondaryProbe) bool {
if blockHeightTracker == nil {
return false
}
for _, backend := range backends {
if backend.Role == "secondary" && !blockHeightTracker.isSecondaryBehind(backend.Name) {
// Also check if the backend is available (not marked unavailable due to errors)
if secondaryProbe != nil && !secondaryProbe.isBackendAvailable(backend.Name) {
continue // Skip unavailable backends
}
return true
}
}
return false
}
// selectBestSecondaryForExpensiveMethod intelligently selects which secondary backend to use for expensive queries
// It prefers backends whose probe latency doesn't exceed half the minimum response time for the expensive method
func (sc *StatsCollector) selectBestSecondaryForExpensiveMethod(method string, backends []Backend,
blockHeightTracker *BlockHeightTracker, secondaryProbe *SecondaryProbe) *Backend {
if blockHeightTracker == nil || secondaryProbe == nil {
return nil
}
// Get available synchronized secondary backends
var availableSecondaries []Backend
for _, backend := range backends {
if backend.Role == "secondary" &&
!blockHeightTracker.isSecondaryBehind(backend.Name) &&
secondaryProbe.isBackendAvailable(backend.Name) {
availableSecondaries = append(availableSecondaries, backend)
}
}
if len(availableSecondaries) == 0 {
return nil
}
// If only one secondary available, use it
if len(availableSecondaries) == 1 {
return &availableSecondaries[0]
}
// Get minimum response time for this expensive method on primary
sc.mu.Lock()
var methodMinTime time.Duration
if methodStats, exists := sc.backendMethodStats["primary"]; exists {
if durations, methodExists := methodStats[method]; methodExists && len(durations) > 0 {
methodMinTime = durations[0]
for _, duration := range durations {
if duration < methodMinTime {
methodMinTime = duration
}
}
}
}
sc.mu.Unlock()
// If we don't have stats for this method, use any available secondary
if methodMinTime == 0 {
return &availableSecondaries[0]
}
// Calculate the threshold: probe latency should not exceed half the expensive method's min time
probeLatencyThreshold := methodMinTime / 2
// Evaluate secondary backends based on their probe latencies
type secondaryCandidate struct {
backend Backend
probeLatency time.Duration
isAcceptable bool
}
var candidates []secondaryCandidate
secondaryProbe.mu.RLock()
for _, backend := range availableSecondaries {
probeLatency := secondaryProbe.minResponseTime // Default to overall minimum
// Use backend-specific timing if available
if backendTiming, exists := secondaryProbe.backendTimings[backend.Name]; exists {
probeLatency = backendTiming
}
candidate := secondaryCandidate{
backend: backend,
probeLatency: probeLatency,
isAcceptable: probeLatency <= probeLatencyThreshold,
}
candidates = append(candidates, candidate)
}
secondaryProbe.mu.RUnlock()
// Sort candidates by probe latency (ascending)
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].probeLatency < candidates[j].probeLatency
})
// Strategy: Use the slowest backend among those that meet the criteria
// This preserves faster backends for quicker queries
var selectedCandidate *secondaryCandidate
// First, try to find acceptable candidates (probe latency <= threshold)
for i := len(candidates) - 1; i >= 0; i-- {
if candidates[i].isAcceptable {
selectedCandidate = &candidates[i]
break // Take the slowest acceptable one
}
}
// If no acceptable candidates, fall back to the slowest available
if selectedCandidate == nil {
selectedCandidate = &candidates[len(candidates)-1]
}
return &selectedCandidate.backend
}
// flushingResponseWriter wraps http.ResponseWriter to flush after every write
type flushingResponseWriter struct {
http.ResponseWriter
@@ -2327,12 +2510,25 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
}
isExpensive = hasExpensiveMethod
// For expensive methods, prefer secondary backends if available and synchronized
// For expensive methods, intelligently select the best secondary backend
var preferSecondary bool
if enableExpensiveMethodRouting && isExpensive && !isStateful && hasAvailableSecondaryAtChainHead(backends, blockHeightTracker) {
preferSecondary = true
if enableDetailedLogs {
log.Printf("Expensive method detected (%s), preferring secondary backends", displayMethod)
var selectedSecondary *Backend
if enableExpensiveMethodRouting && isExpensive && !isStateful {
// Determine the method name for selection (use first method for batches)
methodForSelection := displayMethod
if batchInfo.IsBatch && len(batchInfo.Methods) > 0 {
methodForSelection = batchInfo.Methods[0]
}
selectedSecondary = statsCollector.selectBestSecondaryForExpensiveMethod(
methodForSelection, backends, blockHeightTracker, secondaryProbe)
if selectedSecondary != nil {
preferSecondary = true
if enableDetailedLogs {
log.Printf("Expensive method detected (%s), selected secondary backend: %s",
displayMethod, selectedSecondary.Name)
}
}
}
@@ -2360,6 +2556,41 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
}
}
// Skip secondary backends if they're marked as unavailable due to errors
if backend.Role != "primary" && secondaryProbe != nil {
if !secondaryProbe.isBackendAvailable(backend.Name) {
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s - marked as unavailable due to errors", backend.Name)
}
// Record that we skipped this secondary backend due to availability
statsChan <- ResponseStats{
Backend: backend.Name,
Error: fmt.Errorf("skipped - marked as unavailable"),
Method: displayMethod,
Duration: 0, // No actual request made
}
continue
}
}
// For expensive methods, only use the selected secondary backend
if preferSecondary && backend.Role != "primary" {
if selectedSecondary == nil || backend.Name != selectedSecondary.Name {
// Skip this secondary - it's not the selected one for expensive method
if enableDetailedLogs {
log.Printf("Skipping secondary backend %s - not selected for expensive method %s",
backend.Name, displayMethod)
}
statsChan <- ResponseStats{
Backend: backend.Name,
Error: fmt.Errorf("skipped - not selected for expensive method"),
Method: displayMethod,
Duration: 0,
}
continue
}
}
// For expensive methods, skip primary backend if we prefer secondary and have good ones
if preferSecondary && backend.Role == "primary" {
// Still add primary to the pool but with a delay to let secondaries try first
@@ -2946,3 +3177,59 @@ func getEnv(key, fallback string) string {
}
return fallback
}
// isBackendAvailable checks if a backend is currently marked as available
func (sp *SecondaryProbe) isBackendAvailable(backendName string) bool {
sp.mu.RLock()
defer sp.mu.RUnlock()
available, exists := sp.backendAvailable[backendName]
return exists && available
}
// getAvailableBackends returns a list of currently available backend names
func (sp *SecondaryProbe) getAvailableBackends() []string {
sp.mu.RLock()
defer sp.mu.RUnlock()
var available []string
for name, isAvailable := range sp.backendAvailable {
if isAvailable {
available = append(available, name)
}
}
return available
}
// updateBackendHealth updates the health status of a backend based on probe results
func (sp *SecondaryProbe) updateBackendHealth(backendName string, isSuccess bool) {
// This method should be called with the mutex already held (write lock)
if isSuccess {
// Reset error count and update last success time
sp.backendErrorCount[backendName] = 0
sp.backendLastSuccess[backendName] = time.Now()
// If backend was marked unavailable, check if we should mark it available again
if !sp.backendAvailable[backendName] {
// For now, mark available immediately on first success
// Could be enhanced to require multiple consecutive successes
sp.backendAvailable[backendName] = true
if sp.enableDetailedLogs {
log.Printf("Backend %s marked as AVAILABLE (recovered from errors)", backendName)
}
}
} else {
// Increment error count
sp.backendErrorCount[backendName]++
// Check if we should mark backend as unavailable
if sp.backendAvailable[backendName] && sp.backendErrorCount[backendName] >= sp.maxErrorThreshold {
sp.backendAvailable[backendName] = false
if sp.enableDetailedLogs {
log.Printf("Backend %s marked as UNAVAILABLE (consecutive errors: %d)",
backendName, sp.backendErrorCount[backendName])
}
}
}
}