This creates a truly intelligent proxy that learns from actual usage patterns and automatically optimizes routing based on real performance data rather than hardcoded assumptions

This commit is contained in:
Para Dox
2025-05-29 19:26:39 +07:00
parent ef273ee331
commit 318c4d26f5

View File

@@ -198,140 +198,205 @@ type SecondaryProbe struct {
lastSuccessTime time.Time // Last time probes succeeded lastSuccessTime time.Time // Last time probes succeeded
} }
// BlockHeightTracker monitors block heights from different backends // BlockHeightTracker monitors block heights from different backends using WebSocket subscriptions
type BlockHeightTracker struct { type BlockHeightTracker struct {
mu sync.RWMutex mu sync.RWMutex
backends []Backend backends []Backend
client *http.Client
blockHeights map[string]uint64 // Backend name -> latest block number blockHeights map[string]uint64 // Backend name -> latest block number
lastUpdateTime map[string]time.Time // Backend name -> last successful update lastUpdateTime map[string]time.Time // Backend name -> last successful update
checkInterval time.Duration connections map[string]*websocket.Conn // Active WebSocket connections
enableDetailedLogs bool enableDetailedLogs bool
maxBlockBehind uint64 // Maximum blocks a secondary can be behind primary maxBlockBehind uint64 // Maximum blocks a secondary can be behind primary
stopChan chan struct{}
} }
// NewBlockHeightTracker creates a new block height tracker // NewBlockHeightTracker creates a new WebSocket-based block height tracker
func NewBlockHeightTracker(backends []Backend, client *http.Client, checkInterval time.Duration, func NewBlockHeightTracker(backends []Backend, client *http.Client, checkInterval time.Duration,
maxBlockBehind uint64, enableDetailedLogs bool) *BlockHeightTracker { maxBlockBehind uint64, enableDetailedLogs bool) *BlockHeightTracker {
bht := &BlockHeightTracker{ bht := &BlockHeightTracker{
backends: backends, backends: backends,
client: client,
blockHeights: make(map[string]uint64), blockHeights: make(map[string]uint64),
lastUpdateTime: make(map[string]time.Time), lastUpdateTime: make(map[string]time.Time),
checkInterval: checkInterval, connections: make(map[string]*websocket.Conn),
enableDetailedLogs: enableDetailedLogs, enableDetailedLogs: enableDetailedLogs,
maxBlockBehind: maxBlockBehind, maxBlockBehind: maxBlockBehind,
stopChan: make(chan struct{}),
} }
// Start periodic block height checking // Start WebSocket connections for all backends
go bht.startPeriodicCheck() go bht.startWebSocketConnections()
// Run initial check
go bht.checkBlockHeights()
return bht return bht
} }
// checkBlockHeights queries all backends for their current block number // startWebSocketConnections establishes WebSocket connections to all backends
func (bht *BlockHeightTracker) checkBlockHeights() { func (bht *BlockHeightTracker) startWebSocketConnections() {
for _, backend := range bht.backends { for _, backend := range bht.backends {
go func(b Backend) { go bht.connectToBackend(backend)
reqBody := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":"blockcheck"}`) }
}
req, err := http.NewRequest("POST", b.URL, bytes.NewReader(reqBody)) // connectToBackend establishes and maintains a WebSocket connection to a single backend
func (bht *BlockHeightTracker) connectToBackend(backend Backend) {
for {
select {
case <-bht.stopChan:
return
default:
}
// Create WebSocket URL from HTTP URL
wsURL := strings.Replace(backend.URL, "http://", "ws://", 1)
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
if bht.enableDetailedLogs {
log.Printf("Block height tracker: connecting to %s at %s", backend.Name, wsURL)
}
// Connect to WebSocket
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
if err != nil { if err != nil {
if bht.enableDetailedLogs { if bht.enableDetailedLogs {
log.Printf("Block height check: failed to create request for %s: %v", b.Name, err) log.Printf("Block height tracker: failed to connect to %s: %v", backend.Name, err)
}
time.Sleep(10 * time.Second) // Wait before retry
continue
}
// Store connection
bht.mu.Lock()
bht.connections[backend.Name] = conn
bht.mu.Unlock()
// Subscribe to new block headers
subscribeMsg := map[string]interface{}{
"jsonrpc": "2.0",
"method": "eth_subscribe",
"params": []string{"newHeads"},
"id": 1,
}
if err := conn.WriteJSON(subscribeMsg); err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height tracker: failed to subscribe to %s: %v", backend.Name, err)
}
conn.Close()
bht.mu.Lock()
delete(bht.connections, backend.Name)
bht.mu.Unlock()
time.Sleep(5 * time.Second)
continue
}
if bht.enableDetailedLogs {
log.Printf("Block height tracker: subscribed to newHeads for %s", backend.Name)
}
// Handle incoming messages
bht.handleWebSocketMessages(conn, backend)
// Connection closed, clean up
bht.mu.Lock()
delete(bht.connections, backend.Name)
bht.mu.Unlock()
if bht.enableDetailedLogs {
log.Printf("Block height tracker: connection to %s closed, will retry", backend.Name)
}
time.Sleep(5 * time.Second) // Wait before reconnecting
}
}
// handleWebSocketMessages processes incoming WebSocket messages from a backend
func (bht *BlockHeightTracker) handleWebSocketMessages(conn *websocket.Conn, backend Backend) {
defer conn.Close()
for {
select {
case <-bht.stopChan:
return
default:
}
var message map[string]interface{}
if err := conn.ReadJSON(&message); err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height tracker: error reading from %s: %v", backend.Name, err)
} }
return return
} }
req.Header.Set("Content-Type", "application/json") // Check if this is a subscription notification
if method, exists := message["method"]; exists && method == "eth_subscription" {
start := time.Now() bht.handleSubscriptionNotification(message, backend)
resp, err := bht.client.Do(req) } else if result, exists := message["result"]; exists {
if err != nil { // This might be the subscription confirmation
if bht.enableDetailedLogs { if bht.enableDetailedLogs {
log.Printf("Block height check: request failed for %s: %v", b.Name, err) log.Printf("Block height tracker: subscription confirmed for %s: %v", backend.Name, result)
} }
return } else if errorObj, exists := message["error"]; exists {
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
if bht.enableDetailedLogs { if bht.enableDetailedLogs {
log.Printf("Block height check: HTTP error %d for %s", resp.StatusCode, b.Name) log.Printf("Block height tracker: error from %s: %v", backend.Name, errorObj)
} }
}
}
}
// handleSubscriptionNotification processes a newHeads subscription notification
func (bht *BlockHeightTracker) handleSubscriptionNotification(message map[string]interface{}, backend Backend) {
params, exists := message["params"]
if !exists {
return return
} }
body, err := io.ReadAll(resp.Body) paramsMap, ok := params.(map[string]interface{})
if err != nil { if !ok {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to read response for %s: %v", b.Name, err)
}
return return
} }
// Parse JSON-RPC response result, exists := paramsMap["result"]
var jsonResp struct { if !exists {
Result string `json:"result"`
Error *struct {
Code int `json:"code"`
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(body, &jsonResp); err != nil {
if bht.enableDetailedLogs {
log.Printf("Block height check: failed to parse JSON for %s: %v", b.Name, err)
}
return return
} }
if jsonResp.Error != nil { resultMap, ok := result.(map[string]interface{})
if bht.enableDetailedLogs { if !ok {
log.Printf("Block height check: JSON-RPC error for %s: %s", b.Name, jsonResp.Error.Message) return
} }
// Extract block number from the header
blockNumberHex, exists := resultMap["number"]
if !exists {
return
}
blockNumberStr, ok := blockNumberHex.(string)
if !ok {
return return
} }
// Parse hex block number // Parse hex block number
blockHex := strings.TrimPrefix(jsonResp.Result, "0x") blockHex := strings.TrimPrefix(blockNumberStr, "0x")
blockNum, err := strconv.ParseUint(blockHex, 16, 64) blockNum, err := strconv.ParseUint(blockHex, 16, 64)
if err != nil { if err != nil {
if bht.enableDetailedLogs { if bht.enableDetailedLogs {
log.Printf("Block height check: failed to parse block number for %s: %s", b.Name, jsonResp.Result) log.Printf("Block height tracker: failed to parse block number for %s: %s", backend.Name, blockNumberStr)
} }
return return
} }
// Update the block height // Update the block height
bht.mu.Lock() bht.mu.Lock()
oldHeight := bht.blockHeights[b.Name] oldHeight := bht.blockHeights[backend.Name]
bht.blockHeights[b.Name] = blockNum bht.blockHeights[backend.Name] = blockNum
bht.lastUpdateTime[b.Name] = time.Now() bht.lastUpdateTime[backend.Name] = time.Now()
bht.mu.Unlock() bht.mu.Unlock()
if bht.enableDetailedLogs { if bht.enableDetailedLogs && oldHeight != blockNum {
duration := time.Since(start) log.Printf("Block height tracker: %s updated from %d to %d via WebSocket",
if oldHeight != blockNum { backend.Name, oldHeight, blockNum)
log.Printf("Block height check: %s updated from %d to %d (took %s)",
b.Name, oldHeight, blockNum, duration)
}
}
}(backend)
}
}
// startPeriodicCheck runs block height checks at regular intervals
func (bht *BlockHeightTracker) startPeriodicCheck() {
ticker := time.NewTicker(bht.checkInterval)
defer ticker.Stop()
for range ticker.C {
bht.checkBlockHeights()
} }
} }
@@ -386,6 +451,22 @@ func (bht *BlockHeightTracker) getBlockHeightStatus() map[string]uint64 {
return result return result
} }
// Stop gracefully shuts down the block height tracker
func (bht *BlockHeightTracker) Stop() {
close(bht.stopChan)
// Close all WebSocket connections
bht.mu.Lock()
defer bht.mu.Unlock()
for name, conn := range bht.connections {
if bht.enableDetailedLogs {
log.Printf("Block height tracker: closing connection to %s", name)
}
conn.Close()
}
}
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector { func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
now := time.Now() now := time.Now()
sc := &StatsCollector{ sc := &StatsCollector{
@@ -1845,6 +1926,113 @@ func isStatefulMethod(method string) bool {
return statefulMethods[method] return statefulMethods[method]
} }
// isExpensiveMethod returns true if the method is computationally expensive and should be offloaded to secondary
func isExpensiveMethod(method string) bool {
expensiveMethods := map[string]bool{
// Debug methods - typically very expensive
"debug_traceBlockByHash": true,
"debug_traceBlockByNumber": true,
"debug_traceCall": true,
"debug_traceTransaction": true,
"debug_storageRangeAt": true,
"debug_getModifiedAccountsByHash": true,
"debug_getModifiedAccountsByNumber": true,
// Trace methods - expensive computation
"trace_block": true,
"trace_call": true,
"trace_callMany": true,
"trace_filter": true,
"trace_get": true,
"trace_rawTransaction": true,
"trace_replayBlockTransactions": true,
"trace_replayTransaction": true,
"trace_transaction": true,
// VM trace variants - extremely expensive
"trace_replayBlockTransactions#vmTrace": true,
"trace_replayTransaction#vmTrace": true,
}
return expensiveMethods[method]
}
// isExpensiveMethodByStats determines if a method is expensive based on actual performance statistics
func (sc *StatsCollector) isExpensiveMethodByStats(method string) bool {
sc.mu.Lock()
defer sc.mu.Unlock()
// Need sufficient data to make a determination
const minSamplesForAnalysis = 10
const expensiveThresholdMultiplier = 5.0
// Get all method durations for primary backend to calculate average
var allDurations []time.Duration
totalMethods := 0
// Collect all durations from primary backend across all methods
if methodStats, exists := sc.backendMethodStats["primary"]; exists {
for _, durations := range methodStats {
if len(durations) >= minSamplesForAnalysis {
allDurations = append(allDurations, durations...)
totalMethods++
}
}
}
// Need sufficient methods and samples for meaningful analysis
if len(allDurations) < minSamplesForAnalysis*3 || totalMethods < 3 {
return false // Not enough data, be conservative
}
// Calculate overall average latency across all methods on primary
var totalDuration time.Duration
for _, duration := range allDurations {
totalDuration += duration
}
averageLatency := totalDuration / time.Duration(len(allDurations))
// Get this method's durations from primary backend
var methodDurations []time.Duration
if methodStats, exists := sc.backendMethodStats["primary"]; exists {
if durations, methodExists := methodStats[method]; methodExists {
methodDurations = durations
}
}
// Need sufficient samples for this specific method
if len(methodDurations) < minSamplesForAnalysis {
return false // Not enough data for this method
}
// Find minimum duration for this method (best case performance)
minDuration := methodDurations[0]
for _, duration := range methodDurations {
if duration < minDuration {
minDuration = duration
}
}
// Method is expensive if its minimum time is >= 5x the average
expensiveThreshold := time.Duration(float64(averageLatency) * expensiveThresholdMultiplier)
return minDuration >= expensiveThreshold
}
// hasAvailableSecondaryAtChainHead checks if there are synchronized secondary backends available
func hasAvailableSecondaryAtChainHead(backends []Backend, blockHeightTracker *BlockHeightTracker) bool {
if blockHeightTracker == nil {
return false
}
for _, backend := range backends {
if backend.Role == "secondary" && !blockHeightTracker.isSecondaryBehind(backend.Name) {
return true
}
}
return false
}
// flushingResponseWriter wraps http.ResponseWriter to flush after every write // flushingResponseWriter wraps http.ResponseWriter to flush after every write
type flushingResponseWriter struct { type flushingResponseWriter struct {
http.ResponseWriter http.ResponseWriter
@@ -1875,9 +2063,12 @@ func main() {
// Block height tracking configuration // Block height tracking configuration
enableBlockHeightTracking := getEnv("ENABLE_BLOCK_HEIGHT_TRACKING", "true") == "true" enableBlockHeightTracking := getEnv("ENABLE_BLOCK_HEIGHT_TRACKING", "true") == "true"
blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds blockHeightCheckIntervalStr := getEnv("BLOCK_HEIGHT_CHECK_INTERVAL", "5") // Default 5 seconds (unused for WebSocket)
maxBlockBehindStr := getEnv("MAX_BLOCKS_BEHIND", "1") // Default 1 block behind maxBlockBehindStr := getEnv("MAX_BLOCKS_BEHIND", "1") // Default 1 block behind
// Expensive method routing configuration
enableExpensiveMethodRouting := getEnv("ENABLE_EXPENSIVE_METHOD_ROUTING", "true") == "true"
summaryInterval, err := strconv.Atoi(summaryIntervalStr) summaryInterval, err := strconv.Atoi(summaryIntervalStr)
if err != nil { if err != nil {
log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds") log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds")
@@ -1936,6 +2127,9 @@ func main() {
if enableSecondaryProbing && secondaryBackendsStr != "" { if enableSecondaryProbing && secondaryBackendsStr != "" {
log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer) log.Printf("Secondary probing: enabled (interval: %ds, buffer: %dms)", probeInterval, minDelayBuffer)
} }
if enableExpensiveMethodRouting && secondaryBackendsStr != "" {
log.Printf("Expensive method routing: enabled (trace/debug calls prefer secondary backends)")
}
// Set up HTTP client with reasonable timeouts // Set up HTTP client with reasonable timeouts
client := &http.Client{ client := &http.Client{
@@ -1989,8 +2183,8 @@ func main() {
if blockHeightTracker == nil { if blockHeightTracker == nil {
log.Printf("Block height tracker initialization failed") log.Printf("Block height tracker initialization failed")
} else { } else {
log.Printf("Block height tracking: enabled (interval: %ds, max blocks behind: %d)", log.Printf("Block height tracking: enabled (WebSocket-based, max blocks behind: %d)",
blockHeightCheckInterval, maxBlocksBehind) maxBlocksBehind)
// Set the tracker in stats collector for display // Set the tracker in stats collector for display
statsCollector.SetBlockHeightTracker(blockHeightTracker) statsCollector.SetBlockHeightTracker(blockHeightTracker)
} }
@@ -2016,14 +2210,14 @@ func main() {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector) handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else { } else {
// Handle regular HTTP request // Handle regular HTTP request
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker) handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, blockHeightTracker, enableExpensiveMethodRouting)
} }
}) })
log.Fatal(http.ListenAndServe(listenAddr, nil)) log.Fatal(http.ListenAndServe(listenAddr, nil))
} }
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker) { func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, blockHeightTracker *BlockHeightTracker, enableExpensiveMethodRouting bool) {
startTime := time.Now() startTime := time.Now()
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout) // Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
@@ -2111,6 +2305,37 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response
primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately
// Check if this is an expensive method that should be offloaded to secondary
var isExpensive bool
var hasExpensiveMethod bool
if batchInfo.IsBatch {
// Check if any method in the batch is expensive based on stats
for _, method := range batchInfo.Methods {
if statsCollector.isExpensiveMethodByStats(method) {
hasExpensiveMethod = true
if enableDetailedLogs {
log.Printf("Stats-based expensive method detected in batch: %s", method)
}
break
}
}
} else {
hasExpensiveMethod = statsCollector.isExpensiveMethodByStats(batchInfo.Methods[0])
if hasExpensiveMethod && enableDetailedLogs {
log.Printf("Stats-based expensive method detected: %s", batchInfo.Methods[0])
}
}
isExpensive = hasExpensiveMethod
// For expensive methods, prefer secondary backends if available and synchronized
var preferSecondary bool
if enableExpensiveMethodRouting && isExpensive && !isStateful && hasAvailableSecondaryAtChainHead(backends, blockHeightTracker) {
preferSecondary = true
if enableDetailedLogs {
log.Printf("Expensive method detected (%s), preferring secondary backends", displayMethod)
}
}
for _, backend := range backends { for _, backend := range backends {
// Skip secondary backends for stateful methods // Skip secondary backends for stateful methods
if isStateful && backend.Role != "primary" { if isStateful && backend.Role != "primary" {
@@ -2135,6 +2360,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
} }
} }
// For expensive methods, skip primary backend if we prefer secondary and have good ones
if preferSecondary && backend.Role == "primary" {
// Still add primary to the pool but with a delay to let secondaries try first
if enableDetailedLogs {
log.Printf("Delaying primary backend for expensive method %s to allow secondary backends priority", displayMethod)
}
}
wg.Add(1) wg.Add(1)
if backend.Role == "primary" { if backend.Role == "primary" {
primaryWg.Add(1) primaryWg.Add(1)
@@ -2155,8 +2388,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
firstBackendStartTime.Store(&t) firstBackendStartTime.Store(&t)
} }
// If this is a secondary backend, wait for p75 delay // Delay logic based on backend role and method type
if b.Role != "primary" { if b.Role != "primary" {
// Secondary backend: apply normal delay logic unless it's an expensive method
if !preferSecondary {
// Normal delay for secondary backends
delayTimer := time.NewTimer(secondaryDelay) delayTimer := time.NewTimer(secondaryDelay)
select { select {
case <-delayTimer.C: case <-delayTimer.C:
@@ -2180,6 +2416,26 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod) log.Printf("Primary failed fast for %s, starting secondary immediately", displayMethod)
} }
} }
} else {
// Expensive method: secondary backends start immediately
if enableDetailedLogs {
log.Printf("Starting secondary backend %s immediately for expensive method %s", b.Name, displayMethod)
}
}
} else if preferSecondary {
// Primary backend: delay for expensive methods to give secondaries priority
expensiveMethodDelay := 50 * time.Millisecond // Give secondaries a head start
delayTimer := time.NewTimer(expensiveMethodDelay)
select {
case <-delayTimer.C:
// Timer expired, proceed with primary request
if enableDetailedLogs {
log.Printf("Starting primary backend after delay for expensive method %s", displayMethod)
}
case <-primaryFailedFast:
// If something went wrong, start immediately
delayTimer.Stop()
}
} }
// Create a new request (no longer using context for cancellation) // Create a new request (no longer using context for cancellation)