more features

This commit is contained in:
Para Dox
2025-05-28 22:30:28 +07:00
parent 1c14c8a861
commit f6161e575d

View File

@@ -59,43 +59,47 @@ type CUDataPoint struct {
// StatsCollector maintains statistics for periodic summaries // StatsCollector maintains statistics for periodic summaries
type StatsCollector struct { type StatsCollector struct {
mu sync.Mutex mu sync.Mutex
requestStats []ResponseStats requestStats []ResponseStats
methodStats map[string][]time.Duration // Track durations by method methodStats map[string][]time.Duration // Track durations by method
backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method
backendWins map[string]int // Track how many times each backend responded first backendWins map[string]int // Track how many times each backend responded first
methodBackendWins map[string]map[string]int // Track wins per method per backend methodBackendWins map[string]map[string]int // Track wins per method per backend
firstResponseDurations []time.Duration // Track durations of first successful responses firstResponseDurations []time.Duration // Track durations of first successful responses (from winning backend's perspective)
methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method actualFirstResponseDurations []time.Duration // Track actual user-experienced durations
totalRequests int methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method (winning backend's perspective)
errorCount int methodActualFirstResponseDurations map[string][]time.Duration // Track actual user-experienced durations by method
wsConnections []WebSocketStats // Track websocket connections totalRequests int
totalWsConnections int errorCount int
appStartTime time.Time // Application start time (never reset) wsConnections []WebSocketStats // Track websocket connections
intervalStartTime time.Time // Current interval start time (reset each interval) totalWsConnections int
summaryInterval time.Duration appStartTime time.Time // Application start time (never reset)
methodCUPrices map[string]int // Map of method names to CU prices intervalStartTime time.Time // Current interval start time (reset each interval)
totalCU int // Total CU earned summaryInterval time.Duration
methodCU map[string]int // Track CU earned per method methodCUPrices map[string]int // Map of method names to CU prices
historicalCU []CUDataPoint // Historical CU data for different time windows totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows
} }
func NewStatsCollector(summaryInterval time.Duration) *StatsCollector { func NewStatsCollector(summaryInterval time.Duration) *StatsCollector {
now := time.Now() now := time.Now()
sc := &StatsCollector{ sc := &StatsCollector{
requestStats: make([]ResponseStats, 0, 1000), requestStats: make([]ResponseStats, 0, 1000),
methodStats: make(map[string][]time.Duration), methodStats: make(map[string][]time.Duration),
backendMethodStats: make(map[string]map[string][]time.Duration), backendMethodStats: make(map[string]map[string][]time.Duration),
backendWins: make(map[string]int), backendWins: make(map[string]int),
methodBackendWins: make(map[string]map[string]int), methodBackendWins: make(map[string]map[string]int),
firstResponseDurations: make([]time.Duration, 0, 1000), firstResponseDurations: make([]time.Duration, 0, 1000),
methodFirstResponseDurations: make(map[string][]time.Duration), actualFirstResponseDurations: make([]time.Duration, 0, 1000),
appStartTime: now, methodFirstResponseDurations: make(map[string][]time.Duration),
intervalStartTime: now, methodActualFirstResponseDurations: make(map[string][]time.Duration),
summaryInterval: summaryInterval, appStartTime: now,
methodCUPrices: initCUPrices(), // Initialize CU prices intervalStartTime: now,
methodCU: make(map[string]int), summaryInterval: summaryInterval,
historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals methodCUPrices: initCUPrices(), // Initialize CU prices
methodCU: make(map[string]int),
historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals
} }
// Start the periodic summary goroutine // Start the periodic summary goroutine
@@ -185,16 +189,24 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
sc.mu.Lock() sc.mu.Lock()
defer sc.mu.Unlock() defer sc.mu.Unlock()
// Find the fastest successful response // Find the fastest successful response and actual first response
var fastestBackend string var fastestBackend string
var fastestDuration time.Duration = time.Hour // Initialize with a very large duration var fastestDuration time.Duration = time.Hour // Initialize with a very large duration
var actualFirstDuration time.Duration
var method string var method string
var hasActualFirst bool
for _, stat := range stats { for _, stat := range stats {
if stat.Error == nil && stat.Duration < fastestDuration { if stat.Backend == "actual-first-response" {
actualFirstDuration = stat.Duration
hasActualFirst = true
method = stat.Method
} else if stat.Error == nil && stat.Duration < fastestDuration {
fastestDuration = stat.Duration fastestDuration = stat.Duration
fastestBackend = stat.Backend fastestBackend = stat.Backend
method = stat.Method if method == "" {
method = stat.Method
}
} }
} }
@@ -208,7 +220,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
} }
sc.methodBackendWins[method][fastestBackend]++ sc.methodBackendWins[method][fastestBackend]++
// Track first response duration // Track first response duration (from winning backend's perspective)
sc.firstResponseDurations = append(sc.firstResponseDurations, fastestDuration) sc.firstResponseDurations = append(sc.firstResponseDurations, fastestDuration)
// Track first response duration by method // Track first response duration by method
@@ -216,10 +228,24 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
sc.methodFirstResponseDurations[method] = make([]time.Duration, 0, 100) sc.methodFirstResponseDurations[method] = make([]time.Duration, 0, 100)
} }
sc.methodFirstResponseDurations[method] = append(sc.methodFirstResponseDurations[method], fastestDuration) sc.methodFirstResponseDurations[method] = append(sc.methodFirstResponseDurations[method], fastestDuration)
// Track actual first response duration if available
if hasActualFirst {
sc.actualFirstResponseDurations = append(sc.actualFirstResponseDurations, actualFirstDuration)
if _, exists := sc.methodActualFirstResponseDurations[method]; !exists {
sc.methodActualFirstResponseDurations[method] = make([]time.Duration, 0, 100)
}
sc.methodActualFirstResponseDurations[method] = append(sc.methodActualFirstResponseDurations[method], actualFirstDuration)
}
} }
// Add stats to the collection // Add stats to the collection (skip actual-first-response as it's synthetic)
for _, stat := range stats { for _, stat := range stats {
if stat.Backend == "actual-first-response" {
continue // Don't add synthetic entries to regular stats
}
sc.requestStats = append(sc.requestStats, stat) sc.requestStats = append(sc.requestStats, stat)
if stat.Error != nil { if stat.Error != nil {
sc.errorCount++ sc.errorCount++
@@ -457,7 +483,39 @@ func (sc *StatsCollector) printSummary() {
"Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99")
fmt.Printf("%s\n", strings.Repeat("-", 100)) fmt.Printf("%s\n", strings.Repeat("-", 100))
// First, show the "First Response" merged statistics // First, show the actual user latency if available
if len(sc.actualFirstResponseDurations) > 0 {
actualDurations := make([]time.Duration, len(sc.actualFirstResponseDurations))
copy(actualDurations, sc.actualFirstResponseDurations)
sort.Slice(actualDurations, func(i, j int) bool {
return actualDurations[i] < actualDurations[j]
})
var sum time.Duration
for _, d := range actualDurations {
sum += d
}
avg := sum / time.Duration(len(actualDurations))
min := actualDurations[0]
max := actualDurations[len(actualDurations)-1]
p50idx := len(actualDurations) * 50 / 100
p90idx := len(actualDurations) * 90 / 100
p99idx := minInt(len(actualDurations)-1, len(actualDurations)*99/100)
p50 := actualDurations[p50idx]
p90 := actualDurations[p90idx]
p99 := actualDurations[p99idx]
fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n",
"User Latency", len(actualDurations),
formatDuration(min), formatDuration(avg), formatDuration(max),
formatDuration(p50), formatDuration(p90), formatDuration(p99))
}
// Then show the winner's time (what backend actually took)
if len(sc.firstResponseDurations) > 0 { if len(sc.firstResponseDurations) > 0 {
firstRespDurations := make([]time.Duration, len(sc.firstResponseDurations)) firstRespDurations := make([]time.Duration, len(sc.firstResponseDurations))
copy(firstRespDurations, sc.firstResponseDurations) copy(firstRespDurations, sc.firstResponseDurations)
@@ -484,7 +542,7 @@ func (sc *StatsCollector) printSummary() {
p99 := firstRespDurations[p99idx] p99 := firstRespDurations[p99idx]
fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n",
"First Response", len(firstRespDurations), "Winner's Time", len(firstRespDurations),
formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(min), formatDuration(avg), formatDuration(max),
formatDuration(p50), formatDuration(p90), formatDuration(p99)) formatDuration(p50), formatDuration(p90), formatDuration(p99))
fmt.Printf("%s\n", strings.Repeat("-", 100)) fmt.Printf("%s\n", strings.Repeat("-", 100))
@@ -645,6 +703,11 @@ func (sc *StatsCollector) printSummary() {
fmt.Printf("\n Method: %s (Total requests: %d)\n", method, methodList[i].count) fmt.Printf("\n Method: %s (Total requests: %d)\n", method, methodList[i].count)
// Check if this is a stateful method
if isStatefulMethod(method) {
fmt.Printf(" Note: Stateful method - only sent to primary backend\n")
}
// Show wins for this method if available // Show wins for this method if available
if methodWins, exists := sc.methodBackendWins[method]; exists { if methodWins, exists := sc.methodBackendWins[method]; exists {
fmt.Printf(" First Response Wins: ") fmt.Printf(" First Response Wins: ")
@@ -716,7 +779,46 @@ func (sc *StatsCollector) printSummary() {
formatDuration(p50), formatDuration(p90), formatDuration(p99)) formatDuration(p50), formatDuration(p90), formatDuration(p99))
} }
// Show First Response statistics for this method // Show User Latency for this method if available
if methodActualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(methodActualDurations) > 0 {
// Make a copy and sort
durations := make([]time.Duration, len(methodActualDurations))
copy(durations, methodActualDurations)
sort.Slice(durations, func(i, j int) bool {
return durations[i] < durations[j]
})
var sum time.Duration
for _, d := range durations {
sum += d
}
avg := sum / time.Duration(len(durations))
min := durations[0]
max := durations[len(durations)-1]
p50 := min
p90 := min
p99 := min
if len(durations) >= 2 {
p50idx := len(durations) * 50 / 100
p90idx := len(durations) * 90 / 100
p99idx := minInt(len(durations)-1, len(durations)*99/100)
p50 = durations[p50idx]
p90 = durations[p90idx]
p99 = durations[p99idx]
}
fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n",
"User Latency", len(durations),
formatDuration(min), formatDuration(avg), formatDuration(max),
formatDuration(p50), formatDuration(p90), formatDuration(p99))
}
// Show Winner's Time statistics for this method
if methodFirstDurations, exists := sc.methodFirstResponseDurations[method]; exists && len(methodFirstDurations) > 0 { if methodFirstDurations, exists := sc.methodFirstResponseDurations[method]; exists && len(methodFirstDurations) > 0 {
// Make a copy and sort // Make a copy and sort
durations := make([]time.Duration, len(methodFirstDurations)) durations := make([]time.Duration, len(methodFirstDurations))
@@ -750,7 +852,7 @@ func (sc *StatsCollector) printSummary() {
} }
fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n", fmt.Printf(" %-20s %10d %10s %10s %10s %10s %10s %10s\n",
"First Response", len(durations), "Winner's Time", len(durations),
formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(min), formatDuration(avg), formatDuration(max),
formatDuration(p50), formatDuration(p90), formatDuration(p99)) formatDuration(p50), formatDuration(p90), formatDuration(p99))
fmt.Printf(" %s\n", strings.Repeat("-", 98)) fmt.Printf(" %s\n", strings.Repeat("-", 98))
@@ -809,9 +911,17 @@ func (sc *StatsCollector) printSummary() {
} else { } else {
sc.firstResponseDurations = sc.firstResponseDurations[:0] sc.firstResponseDurations = sc.firstResponseDurations[:0]
} }
if len(sc.actualFirstResponseDurations) > 1000 {
sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[len(sc.actualFirstResponseDurations)-1000:]
} else {
sc.actualFirstResponseDurations = sc.actualFirstResponseDurations[:0]
}
for method := range sc.methodFirstResponseDurations { for method := range sc.methodFirstResponseDurations {
sc.methodFirstResponseDurations[method] = sc.methodFirstResponseDurations[method][:0] sc.methodFirstResponseDurations[method] = sc.methodFirstResponseDurations[method][:0]
} }
for method := range sc.methodActualFirstResponseDurations {
sc.methodActualFirstResponseDurations[method] = sc.methodActualFirstResponseDurations[method][:0]
}
// Reset CU counters for the next interval // Reset CU counters for the next interval
sc.totalCU = 0 sc.totalCU = 0
@@ -934,6 +1044,29 @@ func (sc *StatsCollector) GetPrimaryP50() time.Duration {
return primaryDurations[p50idx] return primaryDurations[p50idx]
} }
// isStatefulMethod returns true if the method requires session state and must always go to primary
func isStatefulMethod(method string) bool {
statefulMethods := map[string]bool{
// Filter methods - these create server-side state
"eth_newFilter": true,
"eth_newBlockFilter": true,
"eth_newPendingTransactionFilter": true,
"eth_getFilterChanges": true,
"eth_getFilterLogs": true,
"eth_uninstallFilter": true,
// Subscription methods (WebSocket) - maintain persistent connections
"eth_subscribe": true,
"eth_unsubscribe": true,
"eth_subscription": true, // Notification method
// Some debug/trace methods might maintain state depending on implementation
// But these are typically stateless, so not included here
}
return statefulMethods[method]
}
func main() { func main() {
// Get configuration from environment variables // Get configuration from environment variables
listenAddr := getEnv("LISTEN_ADDR", ":8080") listenAddr := getEnv("LISTEN_ADDR", ":8080")
@@ -1034,6 +1167,9 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// Get current p50 delay for primary backend // Get current p50 delay for primary backend
p50Delay := statsCollector.GetPrimaryP50() p50Delay := statsCollector.GetPrimaryP50()
// Check if this is a stateful method that must go to primary only
isStateful := isStatefulMethod(method)
// Process backends with adaptive delay strategy // Process backends with adaptive delay strategy
var wg sync.WaitGroup var wg sync.WaitGroup
statsChan := make(chan ResponseStats, len(backends)) statsChan := make(chan ResponseStats, len(backends))
@@ -1052,6 +1188,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
var responseHandled atomic.Bool var responseHandled atomic.Bool
for _, backend := range backends { for _, backend := range backends {
// Skip secondary backends for stateful methods
if isStateful && backend.Role != "primary" {
continue
}
wg.Add(1) wg.Add(1)
go func(b Backend) { go func(b Backend) {
defer wg.Done() defer wg.Done()
@@ -1204,6 +1345,26 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
if enableDetailedLogs { if enableDetailedLogs {
logResponseStats(totalDuration, stats) logResponseStats(totalDuration, stats)
} }
// Add the actual user-experienced duration for the winning response
if response.err == nil && response.backend != "" {
// Find the stat for the winning backend and update it with the actual user-experienced duration
for i := range stats {
if stats[i].Backend == response.backend && stats[i].Error == nil {
// Create a special stat entry for the actual first response time
actualFirstResponseStat := ResponseStats{
Backend: "actual-first-response",
StatusCode: stats[i].StatusCode,
Duration: time.Since(startTime), // This is what the user actually experienced
Error: nil,
Method: stats[i].Method,
}
stats = append(stats, actualFirstResponseStat)
break
}
}
}
return stats return stats
} }