more features
This commit is contained in:
@@ -250,7 +250,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
|
|||||||
sc.requestStats = append(sc.requestStats, stat)
|
sc.requestStats = append(sc.requestStats, stat)
|
||||||
if stat.Error != nil {
|
if stat.Error != nil {
|
||||||
// Don't count skipped secondary backends as errors
|
// Don't count skipped secondary backends as errors
|
||||||
if stat.Error.Error() != "skipped - primary responded within p50" {
|
if stat.Error.Error() != "skipped - primary responded within p75" {
|
||||||
sc.errorCount++
|
sc.errorCount++
|
||||||
} else {
|
} else {
|
||||||
// Track that we skipped a secondary request
|
// Track that we skipped a secondary request
|
||||||
@@ -1077,6 +1077,55 @@ func (sc *StatsCollector) GetPrimaryP50() time.Duration {
|
|||||||
return primaryDurations[p50idx]
|
return primaryDurations[p50idx]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetPrimaryP75ForMethod calculates the current p75 latency for a specific method on the primary backend
|
||||||
|
func (sc *StatsCollector) GetPrimaryP75ForMethod(method string) time.Duration {
|
||||||
|
sc.mu.Lock()
|
||||||
|
defer sc.mu.Unlock()
|
||||||
|
|
||||||
|
// Get method-specific durations for primary backend
|
||||||
|
if durations, exists := sc.methodStats[method]; exists && len(durations) >= 5 {
|
||||||
|
// Make a copy to avoid modifying the original
|
||||||
|
durationsCopy := make([]time.Duration, len(durations))
|
||||||
|
copy(durationsCopy, durations)
|
||||||
|
|
||||||
|
// Sort and find p75
|
||||||
|
sort.Slice(durationsCopy, func(i, j int) bool {
|
||||||
|
return durationsCopy[i] < durationsCopy[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
p75idx := len(durationsCopy) * 75 / 100
|
||||||
|
if p75idx >= len(durationsCopy) {
|
||||||
|
p75idx = len(durationsCopy) - 1
|
||||||
|
}
|
||||||
|
return durationsCopy[p75idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have enough method-specific data, calculate global p75 here
|
||||||
|
// (instead of calling GetPrimaryP50 which would cause nested mutex lock)
|
||||||
|
var primaryDurations []time.Duration
|
||||||
|
for _, stat := range sc.requestStats {
|
||||||
|
if stat.Backend == "primary" && stat.Error == nil {
|
||||||
|
primaryDurations = append(primaryDurations, stat.Duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have enough data, return a sensible default
|
||||||
|
if len(primaryDurations) < 10 {
|
||||||
|
return 15 * time.Millisecond // Default to 15ms for p75
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort and find p75
|
||||||
|
sort.Slice(primaryDurations, func(i, j int) bool {
|
||||||
|
return primaryDurations[i] < primaryDurations[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
p75idx := len(primaryDurations) * 75 / 100
|
||||||
|
if p75idx >= len(primaryDurations) {
|
||||||
|
p75idx = len(primaryDurations) - 1
|
||||||
|
}
|
||||||
|
return primaryDurations[p75idx]
|
||||||
|
}
|
||||||
|
|
||||||
// GetPrimaryP50ForMethod calculates the current p50 latency for a specific method on the primary backend
|
// GetPrimaryP50ForMethod calculates the current p50 latency for a specific method on the primary backend
|
||||||
func (sc *StatsCollector) GetPrimaryP50ForMethod(method string) time.Duration {
|
func (sc *StatsCollector) GetPrimaryP50ForMethod(method string) time.Duration {
|
||||||
sc.mu.Lock()
|
sc.mu.Lock()
|
||||||
@@ -1240,11 +1289,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
method = jsonRPCReq.Method
|
method = jsonRPCReq.Method
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get current p50 delay for this specific method on primary backend
|
// Get current p75 delay for this specific method on primary backend
|
||||||
p50Delay := statsCollector.GetPrimaryP50ForMethod(method)
|
p75Delay := statsCollector.GetPrimaryP75ForMethod(method)
|
||||||
|
|
||||||
if enableDetailedLogs {
|
if enableDetailedLogs {
|
||||||
log.Printf("Method: %s, P50 delay: %s", method, p50Delay)
|
log.Printf("Method: %s, P75 delay: %s", method, p75Delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if this is a stateful method that must go to primary only
|
// Check if this is a stateful method that must go to primary only
|
||||||
@@ -1292,9 +1341,9 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
firstBackendStartTime.Store(&t)
|
firstBackendStartTime.Store(&t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is a secondary backend, wait for p50 delay
|
// If this is a secondary backend, wait for p75 delay
|
||||||
if b.Role != "primary" {
|
if b.Role != "primary" {
|
||||||
delayTimer := time.NewTimer(p50Delay)
|
delayTimer := time.NewTimer(p75Delay)
|
||||||
select {
|
select {
|
||||||
case <-delayTimer.C:
|
case <-delayTimer.C:
|
||||||
// Timer expired, primary is slow, proceed with secondary request
|
// Timer expired, primary is slow, proceed with secondary request
|
||||||
@@ -1305,7 +1354,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
// Still record that we skipped this backend
|
// Still record that we skipped this backend
|
||||||
statsChan <- ResponseStats{
|
statsChan <- ResponseStats{
|
||||||
Backend: b.Name,
|
Backend: b.Name,
|
||||||
Error: fmt.Errorf("skipped - primary responded within p50"),
|
Error: fmt.Errorf("skipped - primary responded within p75"),
|
||||||
Method: method,
|
Method: method,
|
||||||
Duration: time.Since(goroutineStartTime),
|
Duration: time.Since(goroutineStartTime),
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user