a set of random fixes into the blue

This commit is contained in:
Para Dox
2025-05-29 10:35:07 +07:00
parent 97cf8fd6fa
commit c5710c33b1

View File

@@ -75,12 +75,30 @@ type StatsCollector struct {
appStartTime time.Time // Application start time (never reset)
intervalStartTime time.Time // Current interval start time (reset each interval)
summaryInterval time.Duration
methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows
hasSecondaryBackends bool // Track if secondary backends are configured
skippedSecondaryRequests int // Track how many secondary requests were skipped
methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows
hasSecondaryBackends bool // Track if secondary backends are configured
skippedSecondaryRequests int // Track how many secondary requests were skipped
secondaryProbe *SecondaryProbe // Reference to secondary probe
}
// SecondaryProbe maintains latency information for secondary backends through active probing
type SecondaryProbe struct {
mu sync.RWMutex
backends []Backend
client *http.Client
minResponseTime time.Duration // Overall minimum response time
methodTimings map[string]time.Duration // Per-method minimum response times
backendTimings map[string]time.Duration // Per-backend minimum response times
lastProbeTime time.Time
probeInterval time.Duration
minDelayBuffer time.Duration // Buffer to add to minimum times
probeMethods []string
enableDetailedLogs bool
failureCount int // Track consecutive probe failures
lastSuccessTime time.Time // Last time probes succeeded
}
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
@@ -110,6 +128,199 @@ func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool)
return sc
}
// SetSecondaryProbe sets the secondary probe reference after stats collector is created
func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.secondaryProbe = probe
}
// NewSecondaryProbe creates a new secondary probe instance
func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration,
minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe {
// Filter only secondary backends
var secondaryBackends []Backend
for _, b := range backends {
if b.Role == "secondary" {
secondaryBackends = append(secondaryBackends, b)
}
}
if len(secondaryBackends) == 0 {
return nil
}
sp := &SecondaryProbe{
backends: secondaryBackends,
client: client,
minResponseTime: 15 * time.Millisecond, // Start with reasonable default
methodTimings: make(map[string]time.Duration),
backendTimings: make(map[string]time.Duration),
probeInterval: probeInterval,
minDelayBuffer: minDelayBuffer,
probeMethods: probeMethods,
enableDetailedLogs: enableDetailedLogs,
lastSuccessTime: time.Now(),
}
// Run initial probe immediately
go func() {
sp.runProbe()
// Then start periodic probing
sp.startPeriodicProbing()
}()
return sp
}
// getDelayForMethod returns the appropriate delay for a given method
func (sp *SecondaryProbe) getDelayForMethod(method string) time.Duration {
sp.mu.RLock()
defer sp.mu.RUnlock()
// If probes have been failing, use a conservative fallback
if sp.failureCount > 3 && time.Since(sp.lastSuccessTime) > 5*time.Minute {
return 20 * time.Millisecond // Conservative fallback
}
// Use method-specific timing if available
if timing, exists := sp.methodTimings[method]; exists {
return timing + sp.minDelayBuffer
}
// Fall back to general minimum
return sp.minResponseTime + sp.minDelayBuffer
}
// getDelayForBackendAndMethod returns the appropriate delay for a specific backend and method
func (sp *SecondaryProbe) getDelayForBackendAndMethod(backend, method string) time.Duration {
sp.mu.RLock()
defer sp.mu.RUnlock()
// Start with backend-specific timing
delay := sp.minResponseTime
if backendTiming, exists := sp.backendTimings[backend]; exists {
delay = backendTiming
}
// Use method-specific timing if it's longer
if methodTiming, exists := sp.methodTimings[method]; exists && methodTiming > delay {
delay = methodTiming
}
return delay + sp.minDelayBuffer
}
// runProbe performs a single probe cycle to all secondary backends
func (sp *SecondaryProbe) runProbe() {
newMethodTimings := make(map[string]time.Duration)
newBackendTimings := make(map[string]time.Duration)
successfulProbes := 0
for _, backend := range sp.backends {
backendMin := time.Hour // Start with large value
for _, method := range sp.probeMethods {
reqBody := []byte(fmt.Sprintf(
`{"jsonrpc":"2.0","method":"%s","params":[],"id":"probe-%d"}`,
method, time.Now().UnixNano(),
))
req, err := http.NewRequest("POST", backend.URL, bytes.NewReader(reqBody))
if err != nil {
continue
}
req.Header.Set("Content-Type", "application/json")
start := time.Now()
resp, err := sp.client.Do(req)
duration := time.Since(start)
if err == nil && resp != nil {
resp.Body.Close()
if resp.StatusCode == 200 {
successfulProbes++
// Update method timing (use minimum across all backends)
if currentMin, exists := newMethodTimings[method]; !exists || duration < currentMin {
newMethodTimings[method] = duration
}
// Track backend minimum
if duration < backendMin {
backendMin = duration
}
if sp.enableDetailedLogs {
log.Printf("Probe: backend=%s method=%s duration=%s status=%d",
backend.Name, method, duration, resp.StatusCode)
}
}
}
}
// Store backend minimum if we got any successful probes
if backendMin < time.Hour {
newBackendTimings[backend.Name] = backendMin
}
}
// Update timings if we got successful probes
sp.mu.Lock()
defer sp.mu.Unlock()
if successfulProbes > 0 {
sp.failureCount = 0
sp.lastSuccessTime = time.Now()
// Update method timings
for method, timing := range newMethodTimings {
sp.methodTimings[method] = timing
}
// Update backend timings
for backend, timing := range newBackendTimings {
sp.backendTimings[backend] = timing
}
// Update overall minimum
overallMin := time.Hour
for _, timing := range newBackendTimings {
if timing < overallMin {
overallMin = timing
}
}
if overallMin < time.Hour {
sp.minResponseTime = overallMin
}
sp.lastProbeTime = time.Now()
if sp.enableDetailedLogs {
log.Printf("Probe complete: min=%s methods=%v backends=%v",
sp.minResponseTime, sp.methodTimings, sp.backendTimings)
}
} else {
sp.failureCount++
if sp.enableDetailedLogs {
log.Printf("Probe failed: consecutive failures=%d", sp.failureCount)
}
}
}
// startPeriodicProbing runs probes at regular intervals
func (sp *SecondaryProbe) startPeriodicProbing() {
ticker := time.NewTicker(sp.probeInterval)
defer ticker.Stop()
for range ticker.C {
sp.runProbe()
}
}
// initCUPrices initializes the map of method names to their CU prices
func initCUPrices() map[string]int {
return map[string]int{
@@ -251,7 +462,7 @@ func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Dur
sc.requestStats = append(sc.requestStats, stat)
if stat.Error != nil {
// Don't count skipped secondary backends as errors
if stat.Error.Error() != "skipped - primary responded within p75" {
if !strings.Contains(stat.Error.Error(), "skipped - primary responded") {
sc.errorCount++
} else {
// Track that we skipped a secondary request
@@ -374,6 +585,40 @@ func (sc *StatsCollector) printSummary() {
fmt.Printf("Total HTTP Requests: %d\n", sc.totalRequests)
fmt.Printf("Total WebSocket Connections: %d\n", sc.totalWsConnections)
fmt.Printf("Error Rate: %.2f%%\n", float64(sc.errorCount)/float64(sc.totalRequests+sc.totalWsConnections)*100)
// Display secondary probe information if available
if sc.secondaryProbe != nil {
sc.secondaryProbe.mu.RLock()
fmt.Printf("\n--- Secondary Probe Status ---\n")
fmt.Printf("Minimum Secondary Latency: %s\n", formatDuration(sc.secondaryProbe.minResponseTime))
fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer))
fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer))
if len(sc.secondaryProbe.methodTimings) > 0 {
fmt.Printf("Method-Specific Thresholds:\n")
// Sort methods for consistent output
var methods []string
for method := range sc.secondaryProbe.methodTimings {
methods = append(methods, method)
}
sort.Strings(methods)
for _, method := range methods {
timing := sc.secondaryProbe.methodTimings[method]
fmt.Printf(" %s: %s (+ %s buffer = %s)\n",
method,
formatDuration(timing),
formatDuration(sc.secondaryProbe.minDelayBuffer),
formatDuration(timing+sc.secondaryProbe.minDelayBuffer))
}
}
if sc.secondaryProbe.failureCount > 0 {
fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount)
}
sc.secondaryProbe.mu.RUnlock()
}
if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 {
fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n",
sc.skippedSecondaryRequests,
@@ -1218,12 +1463,30 @@ func main() {
summaryIntervalStr := getEnv("SUMMARY_INTERVAL", "60") // Default 60 seconds
enableDetailedLogs := getEnv("ENABLE_DETAILED_LOGS", "false") // Default to disabled
// Secondary probe configuration
enableSecondaryProbing := getEnv("ENABLE_SECONDARY_PROBING", "true") == "true"
probeIntervalStr := getEnv("PROBE_INTERVAL", "10") // Default 10 seconds
minDelayBufferStr := getEnv("MIN_DELAY_BUFFER", "2") // Default 2ms buffer
probeMethodsStr := getEnv("PROBE_METHODS", "eth_blockNumber,net_version,eth_chainId")
summaryInterval, err := strconv.Atoi(summaryIntervalStr)
if err != nil {
log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds")
summaryInterval = 60
}
probeInterval, err := strconv.Atoi(probeIntervalStr)
if err != nil {
log.Printf("Invalid PROBE_INTERVAL, using default of 10 seconds")
probeInterval = 10
}
minDelayBuffer, err := strconv.Atoi(minDelayBufferStr)
if err != nil {
log.Printf("Invalid MIN_DELAY_BUFFER, using default of 2ms")
minDelayBuffer = 2
}
// Create stats collector for periodic summaries
statsCollector := NewStatsCollector(time.Duration(summaryInterval)*time.Second, secondaryBackendsStr != "")
@@ -1249,6 +1512,9 @@ func main() {
log.Printf("Starting benchmark proxy on %s", listenAddr)
log.Printf("Primary backend: %s", primaryBackend)
log.Printf("Secondary backends: %s", secondaryBackendsStr)
if enableSecondaryProbing && secondaryBackendsStr != "" {
log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer)
}
// Set up HTTP client with reasonable timeouts
client := &http.Client{
@@ -1261,6 +1527,31 @@ func main() {
},
}
// Initialize secondary probe if enabled and we have secondary backends
var secondaryProbe *SecondaryProbe
if enableSecondaryProbing && secondaryBackendsStr != "" {
probeMethods := strings.Split(probeMethodsStr, ",")
for i := range probeMethods {
probeMethods[i] = strings.TrimSpace(probeMethods[i])
}
secondaryProbe = NewSecondaryProbe(
backends,
client,
time.Duration(probeInterval)*time.Second,
time.Duration(minDelayBuffer)*time.Millisecond,
probeMethods,
enableDetailedLogs == "true",
)
if secondaryProbe == nil {
log.Printf("Secondary probe initialization failed - no secondary backends found")
} else {
// Set the probe in stats collector for display
statsCollector.SetSecondaryProbe(secondaryProbe)
}
}
// Configure websocket upgrader with larger buffer sizes
// 20MB frame size and 50MB message size
const (
@@ -1281,14 +1572,14 @@ func main() {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else {
// Handle regular HTTP request
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector)
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe)
}
})
log.Fatal(http.ListenAndServe(listenAddr, nil))
}
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) {
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) {
startTime := time.Now()
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
@@ -1319,11 +1610,19 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
method = jsonRPCReq.Method
}
// Get current p75 delay for this specific method on primary backend
p75Delay := statsCollector.GetPrimaryP75ForMethod(method)
// Get delay threshold for secondary backends
var secondaryDelay time.Duration
if secondaryProbe != nil {
// Use probe-based delay
secondaryDelay = secondaryProbe.getDelayForMethod(method)
} else {
// Fall back to p75 approach
secondaryDelay = statsCollector.GetPrimaryP75ForMethod(method)
}
if enableDetailedLogs {
log.Printf("Method: %s, P75 delay: %s", method, p75Delay)
log.Printf("Method: %s, Secondary delay: %s (probe-based: %v)",
method, secondaryDelay, secondaryProbe != nil)
}
// Check if this is a stateful method that must go to primary only
@@ -1373,7 +1672,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// If this is a secondary backend, wait for p75 delay
if b.Role != "primary" {
delayTimer := time.NewTimer(p75Delay)
delayTimer := time.NewTimer(secondaryDelay)
select {
case <-delayTimer.C:
// Timer expired, primary is slow, proceed with secondary request
@@ -1384,7 +1683,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
// Still record that we skipped this backend
statsChan <- ResponseStats{
Backend: b.Name,
Error: fmt.Errorf("skipped - primary responded within p75"),
Error: fmt.Errorf("skipped - primary responded quickly"),
Method: method,
Duration: time.Since(goroutineStartTime),
}