opus fixes this

This commit is contained in:
Para Dox
2025-05-29 21:35:22 +07:00
parent 6d46471536
commit 382db5bee1

View File

@@ -171,13 +171,14 @@ type StatsCollector struct {
appStartTime time.Time // Application start time (never reset)
intervalStartTime time.Time // Current interval start time (reset each interval)
summaryInterval time.Duration
methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows
hasSecondaryBackends bool // Track if secondary backends are configured
skippedSecondaryRequests int // Track how many secondary requests were skipped
secondaryProbe *SecondaryProbe // Reference to secondary probe
methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
historicalCU []CUDataPoint // Historical CU data for different time windows
hasSecondaryBackends bool // Track if secondary backends are configured
skippedSecondaryRequests int // Track how many secondary requests were skipped
secondaryProbe *SecondaryProbe // Reference to secondary probe
chainHeadMonitor *ChainHeadMonitor // Reference to chain head monitor
}
// SecondaryProbe maintains latency information for secondary backends through active probing
@@ -197,6 +198,28 @@ type SecondaryProbe struct {
lastSuccessTime time.Time // Last time probes succeeded
}
// ChainHeadMonitor monitors chain heads of all backends via WebSocket subscriptions
type ChainHeadMonitor struct {
mu sync.RWMutex
backends []Backend
chainHeads map[string]*ChainHead // backend name -> chain head info
primaryChainID string // Chain ID of primary backend
enabledBackends map[string]bool // Track which backends are enabled
wsDialer *websocket.Dialer
stopChan chan struct{}
enableDetailedLogs bool
}
// ChainHead tracks the current head of a backend
type ChainHead struct {
BlockNumber uint64 // Current block number
BlockHash string // Current block hash
ChainID string // Chain ID
LastUpdate time.Time // Last time we received an update
IsHealthy bool // Whether this backend is healthy
Error string // Last error if any
}
func NewStatsCollector(summaryInterval time.Duration, hasSecondaryBackends bool) *StatsCollector {
now := time.Now()
sc := &StatsCollector{
@@ -231,6 +254,13 @@ func (sc *StatsCollector) SetSecondaryProbe(probe *SecondaryProbe) {
sc.secondaryProbe = probe
}
// SetChainHeadMonitor sets the chain head monitor reference after stats collector is created
func (sc *StatsCollector) SetChainHeadMonitor(monitor *ChainHeadMonitor) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.chainHeadMonitor = monitor
}
// NewSecondaryProbe creates a new secondary probe instance
func NewSecondaryProbe(backends []Backend, client *http.Client, probeInterval time.Duration,
minDelayBuffer time.Duration, probeMethods []string, enableDetailedLogs bool) *SecondaryProbe {
@@ -773,6 +803,54 @@ func (sc *StatsCollector) printSummary() {
sc.secondaryProbe.mu.RUnlock()
}
// Display chain head monitor information if available
if sc.chainHeadMonitor != nil {
fmt.Printf("\n--- Chain Head Monitor Status ---\n")
chainStatus := sc.chainHeadMonitor.GetStatus()
// Get primary block height for comparison
var primaryBlockHeight uint64
if primaryHead, exists := chainStatus["primary"]; exists && primaryHead.IsHealthy {
primaryBlockHeight = primaryHead.BlockNumber
}
// Sort backend names for consistent output
var backendNames []string
for name := range chainStatus {
backendNames = append(backendNames, name)
}
sort.Strings(backendNames)
for _, name := range backendNames {
head := chainStatus[name]
status := "healthy"
details := fmt.Sprintf("block %d, chain %s", head.BlockNumber, head.ChainID)
// Add block difference info for secondary backends
if name != "primary" && primaryBlockHeight > 0 && head.IsHealthy {
diff := int64(head.BlockNumber) - int64(primaryBlockHeight)
if diff > 0 {
details += fmt.Sprintf(" (+%d ahead)", diff)
} else if diff < 0 {
details += fmt.Sprintf(" (%d behind)", diff)
} else {
details += " (in sync)"
}
}
if !head.IsHealthy {
status = "unhealthy"
details = head.Error
} else if sc.chainHeadMonitor.IsBackendHealthy(name) {
status = "enabled"
} else {
status = "disabled"
}
fmt.Printf(" %s: %s (%s)\n", name, status, details)
}
}
if sc.hasSecondaryBackends && sc.skippedSecondaryRequests > 0 {
fmt.Printf("Skipped Secondary Requests: %d (%.1f%% of requests)\n",
sc.skippedSecondaryRequests,
@@ -1708,6 +1786,16 @@ func main() {
}
}
// Initialize chain head monitor
var chainHeadMonitor *ChainHeadMonitor
if len(backends) > 1 { // Only create if we have more than just primary
chainHeadMonitor = NewChainHeadMonitor(backends, enableDetailedLogs == "true")
log.Printf("Chain head monitoring: enabled")
// Set the monitor in stats collector for display
statsCollector.SetChainHeadMonitor(chainHeadMonitor)
}
// Configure websocket upgrader with larger buffer sizes
// 20MB frame size and 50MB message size
const (
@@ -1728,14 +1816,14 @@ func main() {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else {
// Handle regular HTTP request
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe)
handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector, secondaryProbe, chainHeadMonitor)
}
})
log.Fatal(http.ListenAndServe(listenAddr, nil))
}
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe) {
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector, secondaryProbe *SecondaryProbe, chainHeadMonitor *ChainHeadMonitor) {
startTime := time.Now()
// Create a context that will cancel after 35 seconds (5s buffer over backend timeout)
@@ -1829,6 +1917,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
continue
}
// Skip unhealthy secondary backends
if backend.Role == "secondary" && chainHeadMonitor != nil && !chainHeadMonitor.IsBackendHealthy(backend.Name) {
if enableDetailedLogs {
log.Printf("Skipping unhealthy secondary backend %s for %s", backend.Name, displayMethod)
}
continue
}
wg.Add(1)
if backend.Role == "primary" {
primaryWg.Add(1)
@@ -1945,6 +2041,22 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
Method: displayMethod,
}
// CRITICAL FIX: Only allow secondary backends to win if they have successful responses
if b.Role == "secondary" && resp.StatusCode >= 400 {
// Secondary returned an error - DO NOT let it win the race
if enableDetailedLogs {
log.Printf("Secondary backend %s returned error status %d for %s - ignoring",
b.Name, resp.StatusCode, displayMethod)
}
// Still need to drain and close the body
go func() {
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
}()
return
}
// Try to be the first to respond
if responseHandled.CompareAndSwap(false, true) {
responseChan <- struct {
@@ -2367,3 +2479,403 @@ func getEnv(key, fallback string) string {
}
return fallback
}
// NewChainHeadMonitor creates a new chain head monitor
func NewChainHeadMonitor(backends []Backend, enableDetailedLogs bool) *ChainHeadMonitor {
monitor := &ChainHeadMonitor{
backends: backends,
chainHeads: make(map[string]*ChainHead),
enabledBackends: make(map[string]bool),
wsDialer: &websocket.Dialer{
ReadBufferSize: 1024 * 1024, // 1MB
WriteBufferSize: 1024 * 1024, // 1MB
},
stopChan: make(chan struct{}),
enableDetailedLogs: enableDetailedLogs,
}
// Start monitoring
go monitor.startMonitoring()
return monitor
}
// IsBackendHealthy checks if a backend is healthy and at the correct chain head
func (m *ChainHeadMonitor) IsBackendHealthy(backendName string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
// Primary is always considered healthy for request routing
if backendName == "primary" {
return true
}
enabled, exists := m.enabledBackends[backendName]
if !exists {
return false // Unknown backend
}
return enabled
}
// startMonitoring starts WebSocket monitoring for all backends
func (m *ChainHeadMonitor) startMonitoring() {
// Initial delay to let backends start
time.Sleep(2 * time.Second)
for _, backend := range m.backends {
go m.monitorBackend(backend)
}
// Periodic health check
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.checkBackendHealth()
case <-m.stopChan:
return
}
}
}
// monitorBackend monitors a single backend via WebSocket
func (m *ChainHeadMonitor) monitorBackend(backend Backend) {
backoffDelay := time.Second
for {
select {
case <-m.stopChan:
return
default:
}
// Create WebSocket URL
wsURL := strings.Replace(backend.URL, "http://", "ws://", 1)
wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
// Connect to WebSocket
conn, _, err := m.wsDialer.Dial(wsURL, nil)
if err != nil {
m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("WebSocket connection failed: %v", err))
time.Sleep(backoffDelay)
backoffDelay = min(backoffDelay*2, 30*time.Second)
continue
}
if m.enableDetailedLogs {
log.Printf("Connected to %s WebSocket for chain head monitoring", backend.Name)
}
// Reset backoff on successful connection
backoffDelay = time.Second
// Get chain ID first
chainID, err := m.getChainID(conn, backend.Name)
if err != nil {
conn.Close()
m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to get chain ID: %v", err))
time.Sleep(backoffDelay)
continue
}
// Store chain ID (especially important for primary)
if backend.Role == "primary" {
m.mu.Lock()
m.primaryChainID = chainID
m.mu.Unlock()
}
// Subscribe to new heads
subscribeMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}`)
err = conn.WriteMessage(websocket.TextMessage, subscribeMsg)
if err != nil {
conn.Close()
m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to subscribe: %v", err))
time.Sleep(backoffDelay)
continue
}
// Read subscription response
_, msg, err := conn.ReadMessage()
if err != nil {
conn.Close()
m.updateBackendStatus(backend.Name, nil, fmt.Sprintf("Failed to read subscription response: %v", err))
time.Sleep(backoffDelay)
continue
}
var subResponse struct {
Result string `json:"result"`
Error *struct {
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(msg, &subResponse); err != nil || subResponse.Error != nil {
conn.Close()
errMsg := "Subscription failed"
if subResponse.Error != nil {
errMsg = subResponse.Error.Message
}
m.updateBackendStatus(backend.Name, nil, errMsg)
time.Sleep(backoffDelay)
continue
}
// Read new head notifications
m.readNewHeads(conn, backend.Name, chainID)
conn.Close()
time.Sleep(backoffDelay)
}
}
// getChainID gets the chain ID from a backend
func (m *ChainHeadMonitor) getChainID(conn *websocket.Conn, backendName string) (string, error) {
// Send eth_chainId request
chainIDMsg := json.RawMessage(`{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":"chainId"}`)
if err := conn.WriteMessage(websocket.TextMessage, chainIDMsg); err != nil {
return "", err
}
// Set read deadline
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
defer conn.SetReadDeadline(time.Time{})
// Read response
_, msg, err := conn.ReadMessage()
if err != nil {
return "", err
}
var response struct {
Result string `json:"result"`
Error *struct {
Message string `json:"message"`
} `json:"error"`
}
if err := json.Unmarshal(msg, &response); err != nil {
return "", err
}
if response.Error != nil {
return "", fmt.Errorf("RPC error: %s", response.Error.Message)
}
return response.Result, nil
}
// readNewHeads reads new head notifications from WebSocket
func (m *ChainHeadMonitor) readNewHeads(conn *websocket.Conn, backendName string, chainID string) {
// Set long read deadline for subscriptions
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
for {
_, msg, err := conn.ReadMessage()
if err != nil {
m.updateBackendStatus(backendName, nil, fmt.Sprintf("WebSocket read error: %v", err))
return
}
// Reset read deadline after successful read
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
// Parse the subscription notification
var notification struct {
Params struct {
Result struct {
Number string `json:"number"` // Hex encoded block number
Hash string `json:"hash"` // Block hash
} `json:"result"`
} `json:"params"`
}
if err := json.Unmarshal(msg, &notification); err != nil {
continue // Skip malformed messages
}
// Convert hex block number to uint64
if notification.Params.Result.Number != "" {
blockNumber, err := strconv.ParseUint(strings.TrimPrefix(notification.Params.Result.Number, "0x"), 16, 64)
if err != nil {
continue
}
head := &ChainHead{
BlockNumber: blockNumber,
BlockHash: notification.Params.Result.Hash,
ChainID: chainID,
LastUpdate: time.Now(),
IsHealthy: true,
}
m.updateBackendStatus(backendName, head, "")
if m.enableDetailedLogs {
log.Printf("Backend %s at block %d (hash: %s...)",
backendName, blockNumber, head.BlockHash[:8])
}
}
}
}
// updateBackendStatus updates the status of a backend
func (m *ChainHeadMonitor) updateBackendStatus(backendName string, head *ChainHead, errorMsg string) {
m.mu.Lock()
defer m.mu.Unlock()
if head != nil {
m.chainHeads[backendName] = head
} else if errorMsg != "" {
// Update or create entry with error
if existing, exists := m.chainHeads[backendName]; exists {
existing.IsHealthy = false
existing.Error = errorMsg
existing.LastUpdate = time.Now()
} else {
m.chainHeads[backendName] = &ChainHead{
IsHealthy: false,
Error: errorMsg,
LastUpdate: time.Now(),
}
}
}
// Update enabled status
m.updateEnabledStatus()
}
// updateEnabledStatus updates which backends are enabled based on chain head
func (m *ChainHeadMonitor) updateEnabledStatus() {
// Get primary chain head
primaryHead, primaryExists := m.chainHeads["primary"]
if !primaryExists || !primaryHead.IsHealthy {
// If primary is not healthy/available (e.g., during restarts), enable all healthy secondary backends
if m.enableDetailedLogs {
log.Printf("Primary backend not healthy/available - enabling all healthy secondary backends")
}
for _, backend := range m.backends {
if backend.Role == "primary" {
m.enabledBackends[backend.Name] = true // Always mark primary as enabled for routing
continue
}
head, exists := m.chainHeads[backend.Name]
if exists && head.IsHealthy {
m.enabledBackends[backend.Name] = true
if m.enableDetailedLogs {
log.Printf("Backend %s enabled (primary unavailable)", backend.Name)
}
} else {
m.enabledBackends[backend.Name] = false
}
}
return
}
// Check each backend
for _, backend := range m.backends {
if backend.Role == "primary" {
m.enabledBackends[backend.Name] = true
continue
}
head, exists := m.chainHeads[backend.Name]
if !exists || !head.IsHealthy {
m.enabledBackends[backend.Name] = false
if m.enableDetailedLogs {
log.Printf("Backend %s disabled: not healthy", backend.Name)
}
continue
}
// Check if on same chain
if head.ChainID != primaryHead.ChainID {
m.enabledBackends[backend.Name] = false
if m.enableDetailedLogs {
log.Printf("Backend %s disabled: wrong chain ID (got %s, want %s)",
backend.Name, head.ChainID, primaryHead.ChainID)
}
continue
}
// STRICT RULE: Only allow if secondary matches primary height or is ahead
if head.BlockNumber < primaryHead.BlockNumber {
m.enabledBackends[backend.Name] = false
if m.enableDetailedLogs {
log.Printf("Backend %s disabled: behind primary (at block %d, primary at %d)",
backend.Name, head.BlockNumber, primaryHead.BlockNumber)
}
continue
}
// Secondary is at same height or ahead - enable it
m.enabledBackends[backend.Name] = true
if m.enableDetailedLogs {
if head.BlockNumber > primaryHead.BlockNumber {
log.Printf("Backend %s enabled: ahead of primary (at block %d, primary at %d)",
backend.Name, head.BlockNumber, primaryHead.BlockNumber)
} else {
log.Printf("Backend %s enabled: at same block as primary (%d)",
backend.Name, head.BlockNumber)
}
}
}
}
// checkBackendHealth performs periodic health checks
func (m *ChainHeadMonitor) checkBackendHealth() {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
// Check for stale data (no update in 90 seconds)
for _, head := range m.chainHeads {
if now.Sub(head.LastUpdate) > 90*time.Second {
head.IsHealthy = false
head.Error = "No recent updates"
}
}
// Update enabled status
m.updateEnabledStatus()
// Log current status if detailed logs enabled
if m.enableDetailedLogs {
log.Printf("Chain head monitor status:")
for name, enabled := range m.enabledBackends {
status := "disabled"
if enabled {
status = "enabled"
}
if head, exists := m.chainHeads[name]; exists {
if head.IsHealthy {
log.Printf(" %s: %s, block %d, chain %s", name, status, head.BlockNumber, head.ChainID)
} else {
log.Printf(" %s: %s, error: %s", name, status, head.Error)
}
} else {
log.Printf(" %s: %s, no data", name, status)
}
}
}
}
// GetStatus returns the current status of all backends
func (m *ChainHeadMonitor) GetStatus() map[string]ChainHead {
m.mu.RLock()
defer m.mu.RUnlock()
status := make(map[string]ChainHead)
for name, head := range m.chainHeads {
status[name] = *head
}
return status
}