// hi package main import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "os" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" ) // Simple structure to extract just the method from JSON-RPC requests type JSONRPCRequest struct { Method string `json:"method"` } // BatchInfo contains information about a batch request type BatchInfo struct { IsBatch bool Methods []string RequestCount int HasStateful bool BlockTags []string // Added to track block tags in batch RequiresPrimary bool // Added to indicate if batch requires primary due to block tags } // parseBatchInfo analyzes the request body to extract method information func parseBatchInfo(body []byte) (*BatchInfo, error) { // Check for empty body if len(body) == 0 { return nil, fmt.Errorf("empty request body") } // Try parsing as array first (batch request) var batchReqs []JSONRPCRequest if err := json.Unmarshal(body, &batchReqs); err == nil { // It's a batch request info := &BatchInfo{ IsBatch: true, RequestCount: len(batchReqs), Methods: make([]string, 0, len(batchReqs)), BlockTags: make([]string, 0), } // Extract methods and check for stateful ones methodSet := make(map[string]bool) // Track unique methods for _, req := range batchReqs { if req.Method != "" { info.Methods = append(info.Methods, req.Method) methodSet[req.Method] = true if isStatefulMethod(req.Method) || requiresPrimaryOnlyMethod(req.Method) { info.HasStateful = true } } } // Extract block tags from the batch blockTags, err := parseBlockTagsFromBatch(body) if err == nil { info.BlockTags = blockTags // Check if any block tag requires primary for _, tag := range blockTags { if requiresPrimaryBackend(tag) { info.RequiresPrimary = true break } } } return info, nil } // Try parsing as single request var singleReq JSONRPCRequest if err := json.Unmarshal(body, &singleReq); err == nil { info := &BatchInfo{ IsBatch: false, Methods: []string{singleReq.Method}, RequestCount: 1, HasStateful: isStatefulMethod(singleReq.Method) || requiresPrimaryOnlyMethod(singleReq.Method), BlockTags: make([]string, 0), } // Extract block tag from single request reqInfo, err := parseRequestInfo(body) if err == nil && reqInfo.BlockTag != "" { info.BlockTags = []string{reqInfo.BlockTag} info.RequiresPrimary = requiresPrimaryBackend(reqInfo.BlockTag) } return info, nil } // Neither batch nor single request return nil, fmt.Errorf("invalid JSON-RPC request format") } // calculateBatchDelay determines the appropriate delay for a batch request for a specific backend func calculateBatchDelay(methods []string, backendName string, probe *SecondaryProbe, stats *StatsCollector) time.Duration { var maxDelay time.Duration for _, method := range methods { var delay time.Duration if probe != nil { delay = probe.getDelayForBackendAndMethod(backendName, method) } else { delay = stats.GetPrimaryP75ForMethod(method) } if delay > maxDelay { maxDelay = delay } } // If no methods or all unknown, use a default if maxDelay == 0 { if probe != nil { return probe.minResponseTime + probe.minDelayBuffer } return 15 * time.Millisecond // Default fallback } return maxDelay } // formatMethodList creates a readable string from method list for logging func formatMethodList(methods []string) string { if len(methods) == 0 { return "[]" } if len(methods) <= 3 { return fmt.Sprintf("%v", methods) } // Show first 3 methods + count of remaining return fmt.Sprintf("[%s, %s, %s, ... +%d more]", methods[0], methods[1], methods[2], len(methods)-3) } type Backend struct { URL string Name string Role string } type ResponseStats struct { Backend string StatusCode int Duration time.Duration Error error Method string // Added method field } // WebSocketStats tracks information about websocket connections type WebSocketStats struct { Backend string Error error ConnectTime time.Duration IsActive bool MessagesSent int MessagesReceived int } // CUDataPoint represents a historical CU data point with timestamp type CUDataPoint struct { Timestamp time.Time // End time of the interval CU int } // StatsCollector maintains statistics for periodic summaries type StatsCollector struct { mu sync.Mutex requestStats []ResponseStats methodStats map[string][]time.Duration // Track durations by method backendMethodStats map[string]map[string][]time.Duration // Track durations by backend and method backendWins map[string]int // Track how many times each backend responded first methodBackendWins map[string]map[string]int // Track wins per method per backend firstResponseDurations []time.Duration // Track durations of first successful responses (from winning backend's perspective) actualFirstResponseDurations []time.Duration // Track actual user-experienced durations methodFirstResponseDurations map[string][]time.Duration // Track first response durations by method (winning backend's perspective) methodActualFirstResponseDurations map[string][]time.Duration // Track actual user-experienced durations by method totalRequests int errorCount int wsConnections []WebSocketStats // Track websocket connections totalWsConnections int appStartTime time.Time // Application start time (never reset) intervalStartTime time.Time // Current interval start time (reset each interval) summaryInterval time.Duration methodCUPrices map[string]int // Map of method names to CU prices totalCU int // Total CU earned methodCU map[string]int // Track CU earned per method historicalCU []CUDataPoint // Historical CU data for different time windows hasSecondaryBackends bool // Track if secondary backends are configured skippedSecondaryRequests int // Track how many secondary requests were skipped secondaryProbe *SecondaryProbe // Reference to secondary probe chainHeadMonitor *ChainHeadMonitor // Reference to chain head monitor ethCallStats *EthCallStats // Track eth_call specific statistics } // SecondaryProbe maintains latency information for secondary backends through active probing type SecondaryProbe struct { mu sync.RWMutex backends []Backend client *http.Client minResponseTime time.Duration // Overall minimum response time methodTimings map[string]time.Duration // Per-method minimum response times backendTimings map[string]time.Duration // Per-backend minimum response times lastProbeTime time.Time probeInterval time.Duration minDelayBuffer time.Duration // Buffer to add to minimum times probeMethods []string enableDetailedLogs bool failureCount int // Track consecutive probe failures lastSuccessTime time.Time // Last time probes succeeded } // ChainHeadMonitor monitors chain heads of all backends via WebSocket subscriptions type ChainHeadMonitor struct { mu sync.RWMutex backends []Backend chainHeads map[string]*ChainHead // backend name -> chain head info primaryChainID string // Chain ID of primary backend enabledBackends map[string]bool // Track which backends are enabled blockHashCache map[string]uint64 // block hash -> block number cache (last 128 blocks from primary) blockHashOrder []string // ordered list of block hashes (oldest first) wsDialer *websocket.Dialer stopChan chan struct{} enableDetailedLogs bool } // ChainHead tracks the current head of a backend type ChainHead struct { BlockNumber uint64 // Current block number BlockHash string // Current block hash ChainID string // Chain ID LastUpdate time.Time // Last time we received an update IsHealthy bool // Whether this backend is healthy Error string // Last error if any } // MethodRouting contains configuration for method routing decisions type MethodRouting struct { SecondaryWhitelist map[string]bool // Methods allowed on secondary backends PreferSecondary map[string]bool // Methods that should prefer secondary backends } // RequestInfo contains parsed information about a JSON-RPC request type RequestInfo struct { Method string BlockTag string HasParams bool } // EthCallFeatures tracks specific features used in eth_call requests type EthCallFeatures struct { HasStateOverrides bool // Whether the call includes state overrides BlockTagType string // Type of block tag: latest, pending, safe, finalized, number, hash, earliest HasAccessList bool // Whether the call includes an access list GasLimit uint64 // Gas limit if specified DataSize int // Size of the call data IsContractCall bool // Whether 'to' address is specified HasValue bool // Whether the call includes value transfer } // EthCallStats tracks statistics for eth_call requests by feature category type EthCallStats struct { TotalCount int SecondaryWins int PrimaryOnlyCount int // Requests that only went to primary ErrorCount int ByBlockTagType map[string]*EthCallCategoryStats WithStateOverrides *EthCallCategoryStats WithoutStateOverrides *EthCallCategoryStats WithAccessList *EthCallCategoryStats WithoutAccessList *EthCallCategoryStats ByDataSizeRange map[string]*EthCallCategoryStats // Ranges: small(<1KB), medium(1-10KB), large(>10KB) } // EthCallCategoryStats tracks stats for a specific category of eth_call type EthCallCategoryStats struct { Count int SecondaryWins int PrimaryOnlyCount int ErrorCount int AverageDuration time.Duration P50Duration time.Duration P90Duration time.Duration Durations []time.Duration // For calculating percentiles } // Full JSON-RPC request structure for parsing parameters type JSONRPCFullRequest struct { Method string `json:"method"` Params json.RawMessage `json:"params"` ID interface{} `json:"id"` } // parseRequestInfo extracts detailed information from a JSON-RPC request func parseRequestInfo(body []byte) (*RequestInfo, error) { var req JSONRPCFullRequest if err := json.Unmarshal(body, &req); err != nil { return nil, err } info := &RequestInfo{ Method: req.Method, HasParams: len(req.Params) > 0, } // Special handling for eth_getLogs if req.Method == "eth_getLogs" && info.HasParams { blockTags, err := parseEthLogsFilter(req.Params) if err == nil && len(blockTags) > 0 { // For eth_getLogs, we'll return the first block tag that requires primary routing // or "latest" if any of them is "latest" for _, tag := range blockTags { if requiresPrimaryBackend(tag) || tag == "latest" { info.BlockTag = tag break } } // If no special tags found but we have tags, use the first one if info.BlockTag == "" && len(blockTags) > 0 { info.BlockTag = blockTags[0] } } return info, nil } // Special handling for trace_filter if req.Method == "trace_filter" && info.HasParams { blockTags, err := parseTraceFilter(req.Params) if err == nil && len(blockTags) > 0 { // For trace_filter, we'll return the first block tag that requires primary routing // or "latest" if any of them is "latest" for _, tag := range blockTags { if requiresPrimaryBackend(tag) || tag == "latest" { info.BlockTag = tag break } } // If no special tags found but we have tags, use the first one if info.BlockTag == "" && len(blockTags) > 0 { info.BlockTag = blockTags[0] } } return info, nil } // Methods that commonly use block tags methodsWithBlockTags := map[string]int{ "eth_getBalance": -1, // last param "eth_getCode": -1, // last param "eth_getTransactionCount": -1, // last param "eth_getStorageAt": -1, // last param "eth_call": -1, // last param "eth_estimateGas": -1, // last param "eth_getProof": -1, // last param "eth_getBlockByNumber": 0, // first param "eth_getBlockTransactionCountByNumber": 0, // first param "eth_getTransactionByBlockNumberAndIndex": 0, // first param "eth_getUncleByBlockNumberAndIndex": 0, // first param "eth_getUncleCountByBlockNumber": 0, // first param // Trace methods that use block tags "trace_block": 0, // first param (block number/tag) "trace_replayBlockTransactions": 0, // first param (block number/tag) "trace_call": -1, // last param (block tag) // Debug methods that use block tags "debug_traceBlockByNumber": 0, // first param (block number/tag) "debug_traceCall": 1, // SPECIAL: second param (call object, block tag, trace config) // Note: eth_getLogs uses a filter object with fromBlock/toBlock fields, // which is handled specially above // Note: trace_filter uses a filter object similar to eth_getLogs, // which needs special handling } // Methods that use block hashes as parameters methodsWithBlockHashes := map[string]int{ "eth_getBlockByHash": 0, // first param "eth_getBlockTransactionCountByHash": 0, // first param "eth_getTransactionByBlockHashAndIndex": 0, // first param "eth_getUncleByBlockHashAndIndex": 0, // first param "eth_getUncleCountByBlockHash": 0, // first param "debug_traceBlockByHash": 0, // first param } // Check for block hash methods first paramPos, hasBlockHash := methodsWithBlockHashes[req.Method] if hasBlockHash && info.HasParams { // Parse params as array var params []json.RawMessage if err := json.Unmarshal(req.Params, ¶ms); err == nil && len(params) > paramPos { // Try to parse as string (block hash) var blockHash string if err := json.Unmarshal(params[paramPos], &blockHash); err == nil { info.BlockTag = blockHash return info, nil } } } paramPos, hasBlockTag := methodsWithBlockTags[req.Method] if !hasBlockTag || !info.HasParams { return info, nil } // Parse params as array var params []json.RawMessage if err := json.Unmarshal(req.Params, ¶ms); err != nil { // Not an array, might be object params return info, nil } if len(params) == 0 { return info, nil } // Determine which parameter to check var blockTagParam json.RawMessage if paramPos == -1 { // Last parameter blockTagParam = params[len(params)-1] } else if paramPos < len(params) { // Specific position blockTagParam = params[paramPos] } else { return info, nil } // Special handling for debug_traceCall where position 1 might be omitted // If we're checking position 1 but only have 2 params, the middle param might be omitted if req.Method == "debug_traceCall" && paramPos == 1 && len(params) == 2 { // With only 2 params, it's likely (call_object, trace_config) without block tag // The block tag would default to "latest" on the backend info.BlockTag = "latest" return info, nil } // Try to parse as string (block tag) var blockTag string if err := json.Unmarshal(blockTagParam, &blockTag); err == nil { info.BlockTag = blockTag } return info, nil } // parseBlockTagsFromBatch extracts block tags from all requests in a batch func parseBlockTagsFromBatch(body []byte) ([]string, error) { var batchReqs []JSONRPCFullRequest if err := json.Unmarshal(body, &batchReqs); err != nil { return nil, err } blockTags := make([]string, 0) for _, req := range batchReqs { // Special handling for eth_getLogs if req.Method == "eth_getLogs" && len(req.Params) > 0 { logsTags, err := parseEthLogsFilter(req.Params) if err == nil { blockTags = append(blockTags, logsTags...) } continue } // Special handling for trace_filter if req.Method == "trace_filter" && len(req.Params) > 0 { traceTags, err := parseTraceFilter(req.Params) if err == nil { blockTags = append(blockTags, traceTags...) } continue } // Regular handling for other methods reqBytes, err := json.Marshal(req) if err != nil { continue } info, err := parseRequestInfo(reqBytes) if err != nil { continue } if info.BlockTag != "" { blockTags = append(blockTags, info.BlockTag) } } return blockTags, nil } // parseEthLogsFilter extracts block tags from eth_getLogs filter parameter func parseEthLogsFilter(params json.RawMessage) ([]string, error) { // eth_getLogs takes a single filter object parameter var paramArray []json.RawMessage if err := json.Unmarshal(params, ¶mArray); err != nil { return nil, err } if len(paramArray) == 0 { return nil, nil } // Parse the filter object var filter struct { FromBlock json.RawMessage `json:"fromBlock"` ToBlock json.RawMessage `json:"toBlock"` } if err := json.Unmarshal(paramArray[0], &filter); err != nil { return nil, err } blockTags := make([]string, 0, 2) // Extract fromBlock if present if len(filter.FromBlock) > 0 { var fromBlock string if err := json.Unmarshal(filter.FromBlock, &fromBlock); err == nil && fromBlock != "" { blockTags = append(blockTags, fromBlock) } } // Extract toBlock if present if len(filter.ToBlock) > 0 { var toBlock string if err := json.Unmarshal(filter.ToBlock, &toBlock); err == nil && toBlock != "" { blockTags = append(blockTags, toBlock) } } return blockTags, nil } // parseTraceFilter extracts block tags from trace_filter filter parameter func parseTraceFilter(params json.RawMessage) ([]string, error) { // trace_filter takes a single filter object parameter var paramArray []json.RawMessage if err := json.Unmarshal(params, ¶mArray); err != nil { return nil, err } if len(paramArray) == 0 { return nil, nil } // Parse the filter object var filter struct { FromBlock json.RawMessage `json:"fromBlock"` ToBlock json.RawMessage `json:"toBlock"` } if err := json.Unmarshal(paramArray[0], &filter); err != nil { return nil, err } blockTags := make([]string, 0, 2) // Extract fromBlock if present if len(filter.FromBlock) > 0 { var fromBlock string if err := json.Unmarshal(filter.FromBlock, &fromBlock); err == nil && fromBlock != "" { blockTags = append(blockTags, fromBlock) } } // Extract toBlock if present if len(filter.ToBlock) > 0 { var toBlock string if err := json.Unmarshal(filter.ToBlock, &toBlock); err == nil && toBlock != "" { blockTags = append(blockTags, toBlock) } } return blockTags, nil } // parseEthCallFeatures extracts feature information from an eth_call request func parseEthCallFeatures(params json.RawMessage, blockTag string) (*EthCallFeatures, error) { features := &EthCallFeatures{ BlockTagType: classifyBlockTag(blockTag), } // eth_call params: [call_object, block_tag, state_overrides] var paramArray []json.RawMessage if err := json.Unmarshal(params, ¶mArray); err != nil { return nil, err } if len(paramArray) == 0 { return nil, fmt.Errorf("eth_call requires at least one parameter") } // Parse the call object (first parameter) var callObject struct { From json.RawMessage `json:"from"` To json.RawMessage `json:"to"` Gas json.RawMessage `json:"gas"` GasPrice json.RawMessage `json:"gasPrice"` Value json.RawMessage `json:"value"` Data json.RawMessage `json:"data"` AccessList json.RawMessage `json:"accessList"` } if err := json.Unmarshal(paramArray[0], &callObject); err != nil { return nil, err } // Check if contract call (has 'to' address) if len(callObject.To) > 0 { var to string if err := json.Unmarshal(callObject.To, &to); err == nil && to != "" && to != "0x0" { features.IsContractCall = true } } // Check if has value if len(callObject.Value) > 0 { var value string if err := json.Unmarshal(callObject.Value, &value); err == nil && value != "" && value != "0x0" { features.HasValue = true } } // Check data size if len(callObject.Data) > 0 { var data string if err := json.Unmarshal(callObject.Data, &data); err == nil { // Remove 0x prefix and calculate byte size if strings.HasPrefix(data, "0x") { features.DataSize = (len(data) - 2) / 2 // Each byte is 2 hex chars } } } // Check gas limit if len(callObject.Gas) > 0 { var gasHex string if err := json.Unmarshal(callObject.Gas, &gasHex); err == nil && gasHex != "" { if strings.HasPrefix(gasHex, "0x") { gas, err := strconv.ParseUint(gasHex[2:], 16, 64) if err == nil { features.GasLimit = gas } } } } // Check for access list if len(callObject.AccessList) > 0 && string(callObject.AccessList) != "null" { features.HasAccessList = true } // Check for state overrides (third parameter) if len(paramArray) >= 3 && len(paramArray[2]) > 0 && string(paramArray[2]) != "null" && string(paramArray[2]) != "{}" { features.HasStateOverrides = true } return features, nil } // classifyBlockTag categorizes a block tag into types func classifyBlockTag(blockTag string) string { if blockTag == "" { return "latest" // Default } // Check for special tags switch blockTag { case "latest", "pending", "safe", "finalized", "earliest": return blockTag } // Check if it's a block hash (0x followed by 64 hex chars) if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") { return "hash" } // Check if it's a block number (hex) if strings.HasPrefix(blockTag, "0x") { _, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64) if err == nil { return "number" } } return "unknown" } // getDataSizeRange categorizes data size into ranges func getDataSizeRange(dataSize int) string { if dataSize < 1024 { // Less than 1KB return "small" } else if dataSize <= 10240 { // 1KB to 10KB return "medium" } return "large" // More than 10KB } // requiresPrimaryBackend checks if a request must be routed to primary based on block tag func requiresPrimaryBackend(blockTag string) bool { // These block tags must always go to primary primaryOnlyTags := map[string]bool{ "finalized": true, "pending": true, "safe": true, } return primaryOnlyTags[blockTag] } // canUseSecondaryForLatest checks if secondary backend can be used for "latest" block tag func canUseSecondaryForLatest(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { // Only check for "latest" tag if blockTag != "latest" { // For non-latest tags (like specific block numbers), follow existing rules return true } if chainHeadMonitor == nil { // No monitor, can't verify - be conservative return false } // Get chain head status chainStatus := chainHeadMonitor.GetStatus() primaryHead, primaryExists := chainStatus["primary"] if !primaryExists || !primaryHead.IsHealthy { // Primary not healthy, allow secondary return true } secondaryHead, secondaryExists := chainStatus[backendName] if !secondaryExists || !secondaryHead.IsHealthy { // Secondary not healthy return false } // For "latest", secondary must be at EXACTLY the same block height return secondaryHead.BlockNumber == primaryHead.BlockNumber } // canUseSecondaryForBlockTag checks if secondary backend can be used for a given block tag func canUseSecondaryForBlockTag(blockTag string, backendName string, chainHeadMonitor *ChainHeadMonitor) bool { if chainHeadMonitor == nil { // No monitor, can't verify - be conservative return false } // Get chain head status chainStatus := chainHeadMonitor.GetStatus() primaryHead, primaryExists := chainStatus["primary"] if !primaryExists || !primaryHead.IsHealthy { // Primary not healthy, allow secondary return true } secondaryHead, secondaryExists := chainStatus[backendName] if !secondaryExists || !secondaryHead.IsHealthy { // Secondary not healthy return false } // Handle "latest" tag - secondary must be at EXACTLY the same block height if blockTag == "latest" { return secondaryHead.BlockNumber == primaryHead.BlockNumber } // Handle "earliest" tag - always allowed if blockTag == "earliest" { return true } // Check if it's a block hash (0x followed by 64 hex chars) if len(blockTag) == 66 && strings.HasPrefix(blockTag, "0x") { // Try to look up the block number from our cache if blockNumber, exists := chainHeadMonitor.GetBlockNumberForHash(blockTag); exists { // We know this block number, check if secondary has it return secondaryHead.BlockNumber >= blockNumber } // Unknown block hash - be conservative and route to primary return false } // Check if it's a numeric block tag (hex number) if strings.HasPrefix(blockTag, "0x") { blockNumber, err := strconv.ParseUint(strings.TrimPrefix(blockTag, "0x"), 16, 64) if err == nil { // Valid block number - check if secondary has reached it return secondaryHead.BlockNumber >= blockNumber } } // Unknown block tag format - be conservative return false } // initEthCallStats initializes the eth_call statistics structure func initEthCallStats() *EthCallStats { return &EthCallStats{ ByBlockTagType: map[string]*EthCallCategoryStats{ "latest": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "pending": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "safe": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "finalized": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "earliest": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "number": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "hash": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, }, WithStateOverrides: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, WithoutStateOverrides: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, WithAccessList: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, WithoutAccessList: &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, ByDataSizeRange: map[string]*EthCallCategoryStats{ "small": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "medium": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, "large": &EthCallCategoryStats{Durations: make([]time.Duration, 0, 100)}, }, } } func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { now := time.Now() sc := &StatsCollector{ requestStats: make([]ResponseStats, 0, 1000), methodStats: make(map[string][]time.Duration), backendMethodStats: make(map[string]map[string][]time.Duration), backendWins: make(map[string]int), methodBackendWins: make(map[string]map[string]int), firstResponseDurations: make([]time.Duration, 0, 1000), actualFirstResponseDurations: make([]time.Duration, 0, 1000), methodFirstResponseDurations: make(map[string][]time.Duration), methodActualFirstResponseDurations: make(map[string][]time.Duration), appStartTime: now, intervalStartTime: now, summaryInterval: summaryInterval, methodCUPrices: initCUPrices(), // Initialize CU prices methodCU: make(map[string]int), historicalCU: make([]CUDataPoint, 0, 2000), // Store up to ~24 hours of 1-minute intervals hasSecondaryBackends: hasSecondaryBackends, ethCallStats: initEthCallStats(), // Initialize eth_call stats } // Start the periodic summary goroutine go sc.periodicSummary() return sc } // SetSecondaryProbe sets the secondary probe reference after stats collector is created func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) { sc.mu.Lock() defer sc.mu.Unlock() sc.secondaryProbe = probe } // SetChainHeadMonitor sets the chain head monitor reference after stats collector is created func (sc *StatsCollector) SetChainHeadMonitor(monitor *ChainHeadMonitor) { sc.mu.Lock() defer sc.mu.Unlock() sc.chainHeadMonitor = monitor } // NewSecondaryProbe creates a new secondary probe instance func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration, minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe { // Filter only secondary backends var secondaryBackends []Backend for _, b := range backends { if b.Role == "secondary" { secondaryBackends = append(secondaryBackends, b) } } if len(secondaryBackends) == 0 { return nil } sp := &SecondaryProbe{ backends: secondaryBackends, client: client, minResponseTime: 15 * time.Millisecond, // Start with reasonable default methodTimings: make(map[string]time.Duration), backendTimings: make(map[string]time.Duration), probeInterval: probeInterval, minDelayBuffer: minDelayBuffer, probeMethods: probeMethods, enableDetailedLogs: enableDetailedLogs, lastSuccessTime: time.Now(), } // Run initial probe immediately go func() { sp.runProbe() // Then start periodic probing sp.startPeriodicProbing() }() return sp } // getDelayForMethod returns the appropriate delay for a given method func (sp *SecondaryProbe) getDelayForMethod(method string) time.Duration { sp.mu.RLock() defer sp.mu.RUnlock() // If probes have been failing, use a conservative fallback if sp.failureCount > 3 && time.Since(sp.lastSuccessTime) > 5*time.Minute { return 20 * time.Millisecond // Conservative fallback } // Use method-specific timing if available if timing, exists := sp.methodTimings[method]; exists { return timing + sp.minDelayBuffer } // Fall back to general minimum return sp.minResponseTime + sp.minDelayBuffer } // getDelayForBackendAndMethod returns the appropriate delay for a specific backend and method func (sp *SecondaryProbe) getDelayForBackendAndMethod(backend, method string) time.Duration { sp.mu.RLock() defer sp.mu.RUnlock() // Start with backend-specific timing delay := sp.minResponseTime if backendTiming, exists := sp.backendTimings[backend]; exists { delay = backendTiming } // Use method-specific timing if it's longer if methodTiming, exists := sp.methodTimings[method]; exists && methodTiming > delay { delay = methodTiming } return delay + sp.minDelayBuffer } // runProbe performs a single probe cycle to all secondary backends func (sp *SecondaryProbe) runProbe() { newMethodTimings := make(map[string]time.Duration) newBackendTimings := make(map[string]time.Duration) successfulProbes := 0 for _, backend := range sp.backends { backendMin := time.Hour // Start with large value for _, method := range sp.probeMethods { methodMin := time.Hour // Track minimum for this method on this backend methodSuccesses := 0 // Perform 10 probes for this method and take the minimum for probe := 0; probe < 10; probe++ { reqBody := []byte(fmt.Sprintf( `{"jsonrpc":"2.0","method":"%s","params":[],"id":"probe-%d-%d"}`, method, time.Now().UnixNano(), probe, )) req, err := http.NewRequest("POST", backend.URL, bytes.NewReader(reqBody)) if err != nil { continue } req.Header.Set("Content-Type", "application/json") // Ensure connection reuse by setting Connection: keep-alive req.Header.Set("Connection", "keep-alive") start := time.Now() resp, err := sp.client.Do(req) duration := time.Since(start) if err == nil && resp != nil { resp.Body.Close() if resp.StatusCode == 200 { methodSuccesses++ successfulProbes++ // Track minimum for this method on this backend if duration < methodMin { methodMin = duration } if sp.enableDetailedLogs { log.Printf("Probe %d/10: backend=%s method=%s duration=%s status=%d (min so far: %s)", probe+1, backend.Name, method, duration, resp.StatusCode, methodMin) } } } // Small delay between probes to avoid overwhelming the backend if probe < 9 { // Don't delay after the last probe time.Sleep(10 * time.Millisecond) } } // Only use this method's timing if we had successful probes if methodSuccesses > 0 && methodMin < time.Hour { // Update method timing (use minimum across all backends) if currentMin, exists := newMethodTimings[method]; !exists || methodMin < currentMin { newMethodTimings[method] = methodMin } // Track backend minimum if methodMin < backendMin { backendMin = methodMin } if sp.enableDetailedLogs { log.Printf("Method %s on backend %s: %d/10 successful probes, min duration: %s", method, backend.Name, methodSuccesses, methodMin) } } } // Store backend minimum if we got any successful probes if backendMin < time.Hour { newBackendTimings[backend.Name] = backendMin } } // Update timings if we got successful probes sp.mu.Lock() defer sp.mu.Unlock() if successfulProbes > 0 { sp.failureCount = 0 sp.lastSuccessTime = time.Now() // Update method timings for method, timing := range newMethodTimings { sp.methodTimings[method] = timing } // Update backend timings for backend, timing := range newBackendTimings { sp.backendTimings[backend] = timing } // Update overall minimum overallMin := time.Hour for _, timing := range newBackendTimings { if timing < overallMin { overallMin = timing } } if overallMin < time.Hour { sp.minResponseTime = overallMin } sp.lastProbeTime = time.Now() if sp.enableDetailedLogs { log.Printf("Probe complete: min=%s methods=%v backends=%v", sp.minResponseTime, sp.methodTimings, sp.backendTimings) } } else { sp.failureCount++ if sp.enableDetailedLogs { log.Printf("Probe failed: consecutive failures=%d", sp.failureCount) } } } // startPeriodicProbing runs probes at regular intervals func (sp *SecondaryProbe) startPeriodicProbing() { ticker := time.NewTicker(sp.probeInterval) defer ticker.Stop() for range ticker.C { sp.runProbe() } } // initCUPrices initializes the map of method names to their CU prices func initCUPrices() map[string]int { return map[string]int{ "debug_traceBlockByHash": 90, "debug_traceBlockByNumber": 90, "debug_traceCall": 90, "debug_traceTransaction": 90, "debug_storageRangeAt": 50, // Storage access method "eth_accounts": 0, "eth_blockNumber": 10, "eth_call": 21, "eth_chainId": 0, "eth_coinbase": 0, "eth_createAccessList": 30, "eth_estimateGas": 60, "eth_feeHistory": 15, "eth_gasPrice": 15, "eth_getBalance": 11, "eth_getBlockByHash": 21, "eth_getBlockByHash#full": 60, "eth_getBlockByNumber": 24, "eth_getBlockByNumber#full": 60, "eth_getBlockReceipts": 80, "eth_getBlockTransactionCountByHash": 15, "eth_getBlockTransactionCountByNumber": 11, "eth_getCode": 24, "eth_getFilterChanges": 20, "eth_getFilterLogs": 60, "eth_getLogs": 60, "eth_getProof": 11, "eth_getStorageAt": 14, "eth_getTransactionByBlockHashAndIndex": 19, "eth_getTransactionByBlockNumberAndIndex": 13, "eth_getTransactionByHash": 11, "eth_getTransactionCount": 11, "eth_getTransactionReceipt": 30, "eth_getUncleByBlockHashAndIndex": 15, "eth_getUncleByBlockNumberAndIndex": 15, "eth_getUncleCountByBlockHash": 15, "eth_getUncleCountByBlockNumber": 15, "eth_hashrate": 0, "eth_maxPriorityFeePerGas": 16, "eth_mining": 0, "eth_newBlockFilter": 20, "eth_newFilter": 20, "eth_newPendingTransactionFilter": 20, "eth_protocolVersion": 0, "eth_sendRawTransaction": 90, "eth_syncing": 0, "eth_subscribe": 10, "eth_subscription": 25, // For "Notifications from the events you've subscribed to" "eth_uninstallFilter": 10, "eth_unsubscribe": 10, "net_listening": 0, "net_peerCount": 0, "net_version": 0, "trace_block": 90, "trace_call": 60, "trace_callMany": 90, "trace_filter": 75, "trace_get": 20, "trace_rawTransaction": 75, "trace_replayBlockTransactions": 90, "trace_replayBlockTransactions#vmTrace": 300, "trace_replayTransaction": 90, "trace_replayTransaction#vmTrace": 300, "trace_transaction": 90, "txpool_content": 1000, "web3_clientVersion": 0, "web3_sha3": 10, "bor_getAuthor": 10, "bor_getCurrentProposer": 10, "bor_getCurrentValidators": 10, "bor_getRootHash": 10, "bor_getSignersAtHash": 10, } } func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Duration) { sc.mu.Lock() defer sc.mu.Unlock() // Find the fastest successful response and actual first response var fastestBackend string var fastestDuration time.Duration = time.Hour // Initialize with a very large duration var actualFirstDuration time.Duration var method string var hasActualFirst bool for _, stat := range stats { if stat.Backend == "actual-first-response" { actualFirstDuration = stat.Duration hasActualFirst = true method = stat.Method } else if stat.Error == nil && stat.Duration < fastestDuration { fastestDuration = stat.Duration fastestBackend = stat.Backend if method == "" { method = stat.Method } } } // Track the win if we found a successful response if fastestBackend != "" { sc.backendWins[fastestBackend]++ // Track wins per method if _, exists := sc.methodBackendWins[method]; !exists { sc.methodBackendWins[method] = make(map[string]int) } sc.methodBackendWins[method][fastestBackend]++ // Track first response duration (from winning backend's perspective) sc.firstResponseDurations = append(sc.firstResponseDurations, fastestDuration) // Track first response duration by method if _, exists := sc.methodFirstResponseDurations[method]; !exists { sc.methodFirstResponseDurations[method] = make([]time.Duration, 0, 100) } sc.methodFirstResponseDurations[method] = append(sc.methodFirstResponseDurations[method], fastestDuration) // Track actual first response duration if available if hasActualFirst { sc.actualFirstResponseDurations = append(sc.actualFirstResponseDurations, actualFirstDuration) if _, exists := sc.methodActualFirstResponseDurations[method]; !exists { sc.methodActualFirstResponseDurations[method] = make([]time.Duration, 0, 100) } sc.methodActualFirstResponseDurations[method] = append(sc.methodActualFirstResponseDurations[method], actualFirstDuration) } } // Add stats to the collection (skip actual-first-response as it's synthetic) for _, stat := range stats { if stat.Backend == "actual-first-response" { continue // Don't add synthetic entries to regular stats } sc.requestStats = append(sc.requestStats, stat) if stat.Error != nil { // Don't count skipped secondary backends as errors if !strings.Contains(stat.Error.Error(), "skipped - primary responded") { sc.errorCount++ } else { // Track that we skipped a secondary request sc.skippedSecondaryRequests++ } } // Track method-specific stats for all backends if stat.Error == nil { // Initialize backend map if not exists if _, exists := sc.backendMethodStats[stat.Backend]; !exists { sc.backendMethodStats[stat.Backend] = make(map[string][]time.Duration) } // Initialize method array if not exists if _, exists := sc.backendMethodStats[stat.Backend][stat.Method]; !exists { sc.backendMethodStats[stat.Backend][stat.Method] = make([]time.Duration, 0, 100) } // Add the duration sc.backendMethodStats[stat.Backend][stat.Method] = append( sc.backendMethodStats[stat.Backend][stat.Method], stat.Duration) // Keep tracking primary backend in the old way for backward compatibility if stat.Backend == "primary" { // Handle batch requests specially for CU calculation if strings.HasPrefix(stat.Method, "batch[") && len(stat.Method) > 6 { // Don't track batch as a method, it will be handled separately } else { if _, exists := sc.methodStats[stat.Method]; !exists { sc.methodStats[stat.Method] = make([]time.Duration, 0, 100) } sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration) // Add CU for this method cuValue := sc.methodCUPrices[stat.Method] sc.totalCU += cuValue sc.methodCU[stat.Method] += cuValue } } } } sc.totalRequests++ } // AddBatchStats adds statistics for a batch request func (sc *StatsCollector) AddBatchStats(methods []string, duration time.Duration, backend string) { sc.mu.Lock() defer sc.mu.Unlock() // Calculate total CU for the batch batchCU := 0 for _, method := range methods { if method != "" { cuValue := sc.methodCUPrices[method] batchCU += cuValue // Track individual method CU sc.methodCU[method] += cuValue // Track method durations (use batch duration for each method) if _, exists := sc.methodStats[method]; !exists { sc.methodStats[method] = make([]time.Duration, 0, 100) } sc.methodStats[method] = append(sc.methodStats[method], duration) } } sc.totalCU += batchCU } func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) { sc.mu.Lock() defer sc.mu.Unlock() sc.wsConnections = append(sc.wsConnections, stats) sc.totalWsConnections++ if stats.Error != nil { sc.errorCount++ } } // AddEthCallStats adds statistics for an eth_call request func (sc *StatsCollector) AddEthCallStats(features *EthCallFeatures, winningBackend string, duration time.Duration, hasError bool, sentToSecondary bool) { sc.mu.Lock() defer sc.mu.Unlock() // Update total count sc.ethCallStats.TotalCount++ // Update error count if hasError { sc.ethCallStats.ErrorCount++ } // Update wins if winningBackend != "" && winningBackend != "primary" { sc.ethCallStats.SecondaryWins++ } // Update primary-only count if !sentToSecondary { sc.ethCallStats.PrimaryOnlyCount++ } // Update block tag type stats if catStats, exists := sc.ethCallStats.ByBlockTagType[features.BlockTagType]; exists { catStats.Count++ if hasError { catStats.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { catStats.SecondaryWins++ } if !sentToSecondary { catStats.PrimaryOnlyCount++ } if !hasError { catStats.Durations = append(catStats.Durations, duration) } } // Update state override stats if features.HasStateOverrides { sc.ethCallStats.WithStateOverrides.Count++ if hasError { sc.ethCallStats.WithStateOverrides.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { sc.ethCallStats.WithStateOverrides.SecondaryWins++ } if !sentToSecondary { sc.ethCallStats.WithStateOverrides.PrimaryOnlyCount++ } if !hasError { sc.ethCallStats.WithStateOverrides.Durations = append(sc.ethCallStats.WithStateOverrides.Durations, duration) } } else { sc.ethCallStats.WithoutStateOverrides.Count++ if hasError { sc.ethCallStats.WithoutStateOverrides.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { sc.ethCallStats.WithoutStateOverrides.SecondaryWins++ } if !sentToSecondary { sc.ethCallStats.WithoutStateOverrides.PrimaryOnlyCount++ } if !hasError { sc.ethCallStats.WithoutStateOverrides.Durations = append(sc.ethCallStats.WithoutStateOverrides.Durations, duration) } } // Update access list stats if features.HasAccessList { sc.ethCallStats.WithAccessList.Count++ if hasError { sc.ethCallStats.WithAccessList.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { sc.ethCallStats.WithAccessList.SecondaryWins++ } if !sentToSecondary { sc.ethCallStats.WithAccessList.PrimaryOnlyCount++ } if !hasError { sc.ethCallStats.WithAccessList.Durations = append(sc.ethCallStats.WithAccessList.Durations, duration) } } else { sc.ethCallStats.WithoutAccessList.Count++ if hasError { sc.ethCallStats.WithoutAccessList.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { sc.ethCallStats.WithoutAccessList.SecondaryWins++ } if !sentToSecondary { sc.ethCallStats.WithoutAccessList.PrimaryOnlyCount++ } if !hasError { sc.ethCallStats.WithoutAccessList.Durations = append(sc.ethCallStats.WithoutAccessList.Durations, duration) } } // Update data size range stats dataSizeRange := getDataSizeRange(features.DataSize) if catStats, exists := sc.ethCallStats.ByDataSizeRange[dataSizeRange]; exists { catStats.Count++ if hasError { catStats.ErrorCount++ } if winningBackend != "" && winningBackend != "primary" { catStats.SecondaryWins++ } if !sentToSecondary { catStats.PrimaryOnlyCount++ } if !hasError { catStats.Durations = append(catStats.Durations, duration) } } } func (sc *StatsCollector) periodicSummary() { ticker := time.NewTicker(sc.summaryInterval) defer ticker.Stop() for range ticker.C { sc.printSummary() } } // formatDuration formats a duration with at most 6 significant digits total func formatDuration(d time.Duration) string { // Convert to string with standard formatting str := d.String() // Find the decimal point if it exists decimalIdx := strings.Index(str, ".") if decimalIdx == -1 { // No decimal point, return as is (already ≤ 6 digits or no need to truncate) return str } // Find the unit suffix (ms, µs, etc.) unitIdx := -1 for i := decimalIdx; i < len(str); i++ { if !(str[i] >= '0' && str[i] <= '9') && str[i] != '.' { unitIdx = i break } } if unitIdx == -1 { unitIdx = len(str) // No unit suffix found } // Count digits before decimal (not including sign) digitsBeforeDecimal := 0 for i := 0; i < decimalIdx; i++ { if str[i] >= '0' && str[i] <= '9' { digitsBeforeDecimal++ } } // Calculate how many decimal places we can keep (allowing for 6 total digits) maxDecimalPlaces := 6 - digitsBeforeDecimal if maxDecimalPlaces <= 0 { // No room for decimal places return str[:decimalIdx] + str[unitIdx:] } // Calculate end position for truncation endPos := decimalIdx + 1 + maxDecimalPlaces if endPos > unitIdx { endPos = unitIdx } // Return truncated string return str[:endPos] + str[unitIdx:] } func (sc *StatsCollector) printSummary() { sc.mu.Lock() defer sc.mu.Unlock() uptime := time.Since(sc.appStartTime) fmt.Printf("\n=== BENCHMARK PROXY SUMMARY ===\n") fmt.Printf("Uptime: %s\n", uptime.Round(time.Second)) fmt.Printf("Total HTTP Requests: %d\n", sc.totalRequests) fmt.Printf("Total WebSocket Connections: %d\n", sc.totalWsConnections) fmt.Printf("Error Rate: %.2f%%\n", float64(sc.errorCount)/float64(sc.totalRequests+sc.totalWsConnections)*100) // Display secondary probe information if available if sc.secondaryProbe != nil { sc.secondaryProbe.mu.RLock() fmt.Printf("\n--- Secondary Probe Status ---\n") fmt.Printf("Minimum Secondary Latency: %s\n", formatDuration(sc.secondaryProbe.minResponseTime)) fmt.Printf("Probe Buffer: %s\n", formatDuration(sc.secondaryProbe.minDelayBuffer)) fmt.Printf("Effective Delay Threshold: %s\n", formatDuration(sc.secondaryProbe.minResponseTime+sc.secondaryProbe.minDelayBuffer)) if len(sc.secondaryProbe.methodTimings) > 0 { fmt.Printf("Method-Specific Thresholds:\n") // Sort methods for consistent output var methods []string for method := range sc.secondaryProbe.methodTimings { methods = append(methods, method) } sort.Strings(methods) for _, method := range methods { timing := sc.secondaryProbe.methodTimings[method] fmt.Printf(" %s: %s (+ %s buffer = %s)\n", method, formatDuration(timing), formatDuration(sc.secondaryProbe.minDelayBuffer), formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) } } if len(sc.secondaryProbe.backendTimings) > 0 { fmt.Printf("Backend-Specific Minimum Latencies:\n") // Sort backend names for consistent output var backendNames []string for backend := range sc.secondaryProbe.backendTimings { backendNames = append(backendNames, backend) } sort.Strings(backendNames) for _, backend := range backendNames { timing := sc.secondaryProbe.backendTimings[backend] fmt.Printf(" %s: %s (+ %s buffer = %s)\n", backend, formatDuration(timing), formatDuration(sc.secondaryProbe.minDelayBuffer), formatDuration(timing+sc.secondaryProbe.minDelayBuffer)) } } if sc.secondaryProbe.failureCount > 0 { fmt.Printf("Probe Failures: %d consecutive\n", sc.secondaryProbe.failureCount) } sc.secondaryProbe.mu.RUnlock() } // Display chain head monitor information if available if sc.chainHeadMonitor != nil { fmt.Printf("\n--- Chain Head Monitor Status ---\n") chainStatus := sc.chainHeadMonitor.GetStatus() // Get primary block height for comparison var primaryBlockHeight uint64 if primaryHead, exists := chainStatus["primary"]; exists && primaryHead.IsHealthy { primaryBlockHeight = primaryHead.BlockNumber } // Sort backend names for consistent output var backendNames []string for name := range chainStatus { backendNames = append(backendNames, name) } sort.Strings(backendNames) for _, name := range backendNames { head := chainStatus[name] status := "healthy" details := fmt.Sprintf("block %d, chain %s", head.BlockNumber, head.ChainID) // Add block difference info for secondary backends if name != "primary" && primaryBlockHeight > 0 && head.IsHealthy { diff := int64(head.BlockNumber) - int64(primaryBlockHeight) if diff > 0 { details += fmt.Sprintf(" (+%d ahead)", diff) } else if diff < 0 { details += fmt.Sprintf(" (%d behind)", diff) } else { details += " (in sync)" } } if !head.IsHealthy { status = "unhealthy" details = head.Error } else if sc.chainHeadMonitor.IsBackendHealthy(name) { status = "enabled" } else { status = "disabled" } fmt.Printf(" %s: %s (%s)\n", name, status, details) } // Show block hash cache stats sc.chainHeadMonitor.mu.RLock() cacheSize := len(sc.chainHeadMonitor.blockHashCache) sc.chainHeadMonitor.mu.RUnlock() fmt.Printf(" Block hash cache: %d entries (max 128)\n", cacheSize) } if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 { fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n", sc.skippedSecondaryRequests, float64(sc.skippedSecondaryRequests)/float64(sc.totalRequests)*100) } fmt.Printf("Total Compute Units Earned (current interval): %d CU\n", sc.totalCU) // Calculate and display CU for different time windows timeWindows := []struct { duration time.Duration label string }{ {10 * time.Minute, "Last 10 minutes"}, {1 * time.Hour, "Last hour"}, {3 * time.Hour, "Last 3 hours"}, {24 * time.Hour, "Last 24 hours"}, } fmt.Printf("\nHistorical Compute Units:\n") for _, window := range timeWindows { actualCU, needsExtrapolation := sc.calculateCUForTimeWindow(window.duration) if needsExtrapolation { // Calculate actual data duration for extrapolation now := time.Now() cutoff := now.Add(-window.duration) var oldestDataStartTime time.Time hasData := false // Check current interval if sc.intervalStartTime.After(cutoff) { oldestDataStartTime = sc.intervalStartTime hasData = true } // Check historical data for i := len(sc.historicalCU) - 1; i >= 0; i-- { point := sc.historicalCU[i] intervalStart := point.Timestamp.Add(-sc.summaryInterval) if point.Timestamp.Before(cutoff) { break } if !hasData || intervalStart.Before(oldestDataStartTime) { oldestDataStartTime = intervalStart } hasData = true } var actualDuration time.Duration if hasData { actualDuration = now.Sub(oldestDataStartTime) } extrapolatedCU := sc.extrapolateCU(actualCU, actualDuration, window.duration) fmt.Printf(" %s: %s\n", window.label, formatCUWithExtrapolation(extrapolatedCU, true)) } else { fmt.Printf(" %s: %s\n", window.label, formatCUWithExtrapolation(actualCU, false)) } } // Calculate response time statistics for primary backend var primaryDurations []time.Duration for _, stat := range sc.requestStats { if stat.Backend == "primary" && stat.Error == nil { primaryDurations = append(primaryDurations, stat.Duration) } } if len(primaryDurations) > 0 { sort.Slice(primaryDurations, func(i, j int) bool { return primaryDurations[i] < primaryDurations[j] }) var sum time.Duration for _, d := range primaryDurations { sum += d } avg := sum / time.Duration(len(primaryDurations)) min := primaryDurations[0] max := primaryDurations[len(primaryDurations)-1] p50idx := len(primaryDurations) * 50 / 100 p90idx := len(primaryDurations) * 90 / 100 p99idx := minInt(len(primaryDurations)-1, len(primaryDurations)*99/100) p50 := primaryDurations[p50idx] p90 := primaryDurations[p90idx] p99 := primaryDurations[p99idx] fmt.Printf("\nPrimary Backend Response Times:\n") fmt.Printf(" Min: %s\n", formatDuration(min)) fmt.Printf(" Avg: %s\n", formatDuration(avg)) fmt.Printf(" Max: %s\n", formatDuration(max)) fmt.Printf(" p50: %s\n", formatDuration(p50)) fmt.Printf(" p90: %s\n", formatDuration(p90)) fmt.Printf(" p99: %s\n", formatDuration(p99)) } // Calculate response time statistics for ALL backends backendDurations := make(map[string][]time.Duration) for _, stat := range sc.requestStats { if stat.Error == nil { backendDurations[stat.Backend] = append(backendDurations[stat.Backend], stat.Duration) } } // Sort backend names for consistent output var backendNames []string for backend := range backendDurations { backendNames = append(backendNames, backend) } sort.Strings(backendNames) // Print per-backend statistics fmt.Printf("\nPer-Backend Response Time Comparison:\n") fmt.Printf("Note: 'User Latency' = actual time users wait; 'Backend Time' = winning backend's response time\n") fmt.Printf("%-20s %10s %10s %10s %10s %10s %10s %10s\n", "Backend", "Count", "Min", "Avg", "Max", "p50", "p90", "p99") fmt.Printf("%s\n", strings.Repeat("-", 100)) // First, show the actual user latency if available if len(sc.actualFirstResponseDurations) > 0 { actualDurations := make([]time.Duration, len(sc.actualFirstResponseDurations)) copy(actualDurations, sc.actualFirstResponseDurations) sort.Slice(actualDurations, func(i, j int) bool { return actualDurations[i] < actualDurations[j] }) var sum time.Duration for _, d := range actualDurations { sum += d } avg := sum / time.Duration(len(actualDurations)) min := actualDurations[0] max := actualDurations[len(actualDurations)-1] p50idx := len(actualDurations) * 50 / 100 p90idx := len(actualDurations) * 90 / 100 p99idx := minInt(len(actualDurations)-1, len(actualDurations)*99/100) p50 := actualDurations[p50idx] p90 := actualDurations[p90idx] p99 := actualDurations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", "User Latency", len(actualDurations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Then show the backend time (what backend actually took) if len(sc.firstResponseDurations) > 0 { firstRespDurations := make([]time.Duration, len(sc.firstResponseDurations)) copy(firstRespDurations, sc.firstResponseDurations) sort.Slice(firstRespDurations, func(i, j int) bool { return firstRespDurations[i] < firstRespDurations[j] }) var sum time.Duration for _, d := range firstRespDurations { sum += d } avg := sum / time.Duration(len(firstRespDurations)) min := firstRespDurations[0] max := firstRespDurations[len(firstRespDurations)-1] p50idx := len(firstRespDurations) * 50 / 100 p90idx := len(firstRespDurations) * 90 / 100 p99idx := minInt(len(firstRespDurations)-1, len(firstRespDurations)*99/100) p50 := firstRespDurations[p50idx] p90 := firstRespDurations[p90idx] p99 := firstRespDurations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", "Backend Time", len(firstRespDurations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) fmt.Printf("%s\n", strings.Repeat("-", 100)) } for _, backend := range backendNames { durations := backendDurations[backend] if len(durations) == 0 { continue } sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) min := durations[0] max := durations[len(durations)-1] p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100) p50 := durations[p50idx] p90 := durations[p90idx] p99 := durations[p99idx] fmt.Printf("%-20s %10d %10s %10s %10s %10s %10s %10s\n", backend, len(durations), formatDuration(min), formatDuration(avg), formatDuration(max), formatDuration(p50), formatDuration(p90), formatDuration(p99)) } // Print backend wins statistics fmt.Printf("\nBackend First Response Wins:\n") fmt.Printf("%-20s %10s %10s\n", "Backend", "Wins", "Win %") fmt.Printf("%s\n", strings.Repeat("-", 42)) totalWins := 0 for _, wins := range sc.backendWins { totalWins += wins } // Sort backends by wins for consistent output type backendWin struct { backend string wins int } var winList []backendWin for backend, wins := range sc.backendWins { winList = append(winList, backendWin{backend, wins}) } sort.Slice(winList, func(i, j int) bool { return winList[i].wins > winList[j].wins }) for _, bw := range winList { winPercentage := float64(bw.wins) / float64(totalWins) * 100 fmt.Printf("%-20s %10d %9.1f%%\n", bw.backend, bw.wins, winPercentage) } // Print per-method statistics if len(sc.methodStats) > 0 { fmt.Printf("\nPer-Method Statistics (Primary Backend):\n") // Sort methods by name for consistent output methods := make([]string, 0, len(sc.methodStats)) for method := range sc.methodStats { methods = append(methods, method) } sort.Strings(methods) for _, method := range methods { var durations []time.Duration var displayLabel string // If secondary backends are configured and we have actual user latency data, use that if sc.hasSecondaryBackends { if actualDurations, exists := sc.methodActualFirstResponseDurations[method]; exists && len(actualDurations) > 0 { durations = make([]time.Duration, len(actualDurations)) copy(durations, actualDurations) displayLabel = method + " (User Latency)" } else { // Fall back to primary backend times if no actual latency data durations = sc.methodStats[method] displayLabel = method + " (Primary Backend)" } } else { // No secondary backends, use primary backend times durations = sc.methodStats[method] displayLabel = method } if len(durations) == 0 { continue } sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] }) var sum time.Duration for _, d := range durations { sum += d } avg := sum / time.Duration(len(durations)) minDuration := durations[0] max := durations[len(durations)-1] // Only calculate percentiles if we have enough samples p50 := minDuration p90 := minDuration p99 := minDuration if len(durations) >= 2 { p50idx := len(durations) * 50 / 100 p90idx := len(durations) * 90 / 100 p99idx := minInt(len(durations)-1, len(durations)*99/100)