Files
ethereum-rpc-docker/benchmark-proxy/main.go
2025-05-01 15:44:39 +07:00

697 lines
20 KiB
Go

// hi
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Simple structure to extract just the method from JSON-RPC requests
type JSONRPCRequest struct {
Method string `json:"method"`
}
type Backend struct {
URL string
Name string
Role string
}
type ResponseStats struct {
Backend string
StatusCode int
Duration time.Duration
Error error
Method string // Added method field
}
// WebSocketStats tracks information about websocket connections
type WebSocketStats struct {
Backend string
Error error
ConnectTime time.Duration
IsActive bool
MessagesSent int
MessagesReceived int
}
// StatsCollector maintains statistics for periodic summaries
type StatsCollector struct {
mu sync.Mutex
requestStats []ResponseStats
methodStats map[string][]time.Duration // Track durations by method
totalRequests int
errorCount int
wsConnections []WebSocketStats // Track websocket connections
totalWsConnections int
startTime time.Time
summaryInterval time.Duration
methodCUPrices map[string]int // Map of method names to CU prices
totalCU int // Total CU earned
methodCU map[string]int // Track CU earned per method
}
func NewStatsCollector(summaryInterval time.Duration) *StatsCollector {
sc := &StatsCollector{
requestStats: make([]ResponseStats, 0, 1000),
methodStats: make(map[string][]time.Duration),
startTime: time.Now(),
summaryInterval: summaryInterval,
methodCUPrices: initCUPrices(), // Initialize CU prices
methodCU: make(map[string]int),
}
// Start the periodic summary goroutine
go sc.periodicSummary()
return sc
}
// initCUPrices initializes the map of method names to their CU prices
func initCUPrices() map[string]int {
return map[string]int{
"debug_traceBlockByHash": 90,
"debug_traceBlockByNumber": 90,
"debug_traceCall": 90,
"debug_traceTransaction": 90,
"eth_accounts": 0,
"eth_blockNumber": 10,
"eth_call": 21,
"eth_chainId": 0,
"eth_coinbase": 0,
"eth_createAccessList": 30,
"eth_estimateGas": 60,
"eth_feeHistory": 15,
"eth_gasPrice": 15,
"eth_getBalance": 11,
"eth_getBlockByHash": 21,
"eth_getBlockByHash#full": 60,
"eth_getBlockByNumber": 24,
"eth_getBlockByNumber#full": 60,
"eth_getBlockReceipts": 80,
"eth_getBlockTransactionCountByHash": 15,
"eth_getBlockTransactionCountByNumber": 11,
"eth_getCode": 24,
"eth_getFilterChanges": 20,
"eth_getFilterLogs": 60,
"eth_getLogs": 60,
"eth_getProof": 11,
"eth_getStorageAt": 14,
"eth_getTransactionByBlockHashAndIndex": 19,
"eth_getTransactionByBlockNumberAndIndex": 13,
"eth_getTransactionByHash": 11,
"eth_getTransactionCount": 11,
"eth_getTransactionReceipt": 30,
"eth_getUncleByBlockHashAndIndex": 15,
"eth_getUncleByBlockNumberAndIndex": 15,
"eth_getUncleCountByBlockHash": 15,
"eth_getUncleCountByBlockNumber": 15,
"eth_hashrate": 0,
"eth_maxPriorityFeePerGas": 16,
"eth_mining": 0,
"eth_newBlockFilter": 20,
"eth_newFilter": 20,
"eth_newPendingTransactionFilter": 20,
"eth_protocolVersion": 0,
"eth_sendRawTransaction": 90,
"eth_syncing": 0,
"eth_subscribe": 10,
"eth_subscription": 25, // For "Notifications from the events you've subscribed to"
"eth_uninstallFilter": 10,
"eth_unsubscribe": 10,
"net_listening": 0,
"net_peerCount": 0,
"net_version": 0,
"trace_block": 90,
"trace_call": 60,
"trace_callMany": 90,
"trace_filter": 75,
"trace_get": 20,
"trace_rawTransaction": 75,
"trace_replayBlockTransactions": 90,
"trace_replayBlockTransactions#vmTrace": 300,
"trace_replayTransaction": 90,
"trace_replayTransaction#vmTrace": 300,
"trace_transaction": 90,
"txpool_content": 1000,
"web3_clientVersion": 0,
"web3_sha3": 10,
"bor_getAuthor": 10,
"bor_getCurrentProposer": 10,
"bor_getCurrentValidators": 10,
"bor_getRootHash": 10,
"bor_getSignersAtHash": 10,
}
}
func (sc *StatsCollector) AddStats(stats []ResponseStats, totalDuration time.Duration) {
sc.mu.Lock()
defer sc.mu.Unlock()
// Add stats to the collection
for _, stat := range stats {
sc.requestStats = append(sc.requestStats, stat)
if stat.Error != nil {
sc.errorCount++
}
// Track method-specific stats for primary backend
if stat.Backend == "primary" && stat.Error == nil {
if _, exists := sc.methodStats[stat.Method]; !exists {
sc.methodStats[stat.Method] = make([]time.Duration, 0, 100)
}
sc.methodStats[stat.Method] = append(sc.methodStats[stat.Method], stat.Duration)
// Add CU for this method
cuValue := sc.methodCUPrices[stat.Method]
sc.totalCU += cuValue
sc.methodCU[stat.Method] += cuValue
}
}
sc.totalRequests++
}
func (sc *StatsCollector) AddWebSocketStats(stats WebSocketStats) {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.wsConnections = append(sc.wsConnections, stats)
sc.totalWsConnections++
if stats.Error != nil {
sc.errorCount++
}
}
func (sc *StatsCollector) periodicSummary() {
ticker := time.NewTicker(sc.summaryInterval)
defer ticker.Stop()
for range ticker.C {
sc.printSummary()
}
}
func (sc *StatsCollector) printSummary() {
sc.mu.Lock()
defer sc.mu.Unlock()
uptime := time.Since(sc.startTime)
fmt.Printf("\n=== BENCHMARK PROXY SUMMARY ===\n")
fmt.Printf("Uptime: %s\n", uptime.Round(time.Second))
fmt.Printf("Total HTTP Requests: %d\n", sc.totalRequests)
fmt.Printf("Total WebSocket Connections: %d\n", sc.totalWsConnections)
fmt.Printf("Error Rate: %.2f%%\n", float64(sc.errorCount)/float64(sc.totalRequests+sc.totalWsConnections)*100)
fmt.Printf("Total Compute Units Earned: %d CU\n", sc.totalCU)
// Calculate response time statistics for primary backend
var primaryDurations []time.Duration
for _, stat := range sc.requestStats {
if stat.Backend == "primary" && stat.Error == nil {
primaryDurations = append(primaryDurations, stat.Duration)
}
}
if len(primaryDurations) > 0 {
sort.Slice(primaryDurations, func(i, j int) bool {
return primaryDurations[i] < primaryDurations[j]
})
var sum time.Duration
for _, d := range primaryDurations {
sum += d
}
avg := sum / time.Duration(len(primaryDurations))
min := primaryDurations[0]
max := primaryDurations[len(primaryDurations)-1]
p50idx := len(primaryDurations) * 50 / 100
p90idx := len(primaryDurations) * 90 / 100
p99idx := len(primaryDurations) * 99 / 100
fmt.Printf("\nPrimary Backend Response Times:\n")
fmt.Printf(" Min: %s\n", min)
fmt.Printf(" Avg: %s\n", avg)
fmt.Printf(" Max: %s\n", max)
fmt.Printf(" p50: %s\n", primaryDurations[p50idx])
fmt.Printf(" p90: %s\n", primaryDurations[p90idx])
fmt.Printf(" p99: %s\n", primaryDurations[p99idx])
}
// Print per-method statistics
if len(sc.methodStats) > 0 {
fmt.Printf("\nPer-Method Statistics (Primary Backend):\n")
// Sort methods by name for consistent output
methods := make([]string, 0, len(sc.methodStats))
for method := range sc.methodStats {
methods = append(methods, method)
}
sort.Strings(methods)
for _, method := range methods {
durations := sc.methodStats[method]
if len(durations) == 0 {
continue
}
sort.Slice(durations, func(i, j int) bool {
return durations[i] < durations[j]
})
var sum time.Duration
for _, d := range durations {
sum += d
}
avg := sum / time.Duration(len(durations))
minDuration := durations[0]
max := durations[len(durations)-1]
// Only calculate percentiles if we have enough samples
p50 := minDuration
p90 := minDuration
p99 := minDuration
if len(durations) >= 2 {
p50idx := len(durations) * 50 / 100
p90idx := len(durations) * 90 / 100
p99idx := minInt(len(durations)-1, len(durations)*99/100)
p50 = durations[p50idx]
p90 = durations[p90idx]
p99 = durations[p99idx]
}
// Add CU information to the output
cuPrice := sc.methodCUPrices[method]
cuEarned := sc.methodCU[method]
fmt.Printf(" %-20s Count: %-5d Avg: %-10s Min: %-10s Max: %-10s p50: %-10s p90: %-10s p99: %-10s CU: %d x %d = %d\n",
method, len(durations), avg, minDuration, max, p50, p90, p99,
cuPrice, len(durations), cuEarned)
}
}
fmt.Printf("================================\n\n")
// Keep only the last 1000 requests to prevent unlimited memory growth
if len(sc.requestStats) > 1000 {
sc.requestStats = sc.requestStats[len(sc.requestStats)-1000:]
}
// Trim method stats to prevent unlimited growth
for method, durations := range sc.methodStats {
if len(durations) > 1000 {
sc.methodStats[method] = durations[len(durations)-1000:]
}
}
// Keep only the last 1000 websocket connections to prevent unlimited memory growth
if len(sc.wsConnections) > 1000 {
sc.wsConnections = sc.wsConnections[len(sc.wsConnections)-1000:]
}
}
// Helper function to avoid potential index out of bounds
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func main() {
// Get configuration from environment variables
listenAddr := getEnv("LISTEN_ADDR", ":8080")
primaryBackend := getEnv("PRIMARY_BACKEND", "http://localhost:8545")
secondaryBackendsStr := getEnv("SECONDARY_BACKENDS", "")
summaryIntervalStr := getEnv("SUMMARY_INTERVAL", "60") // Default 60 seconds
summaryInterval, err := strconv.Atoi(summaryIntervalStr)
if err != nil {
log.Printf("Invalid SUMMARY_INTERVAL, using default of 60 seconds")
summaryInterval = 60
}
// Create stats collector for periodic summaries
statsCollector := NewStatsCollector(time.Duration(summaryInterval) * time.Second)
// Configure backends
var backends []Backend
backends = append(backends, Backend{
URL: primaryBackend,
Name: "primary",
Role: "primary",
})
if secondaryBackendsStr != "" {
secondaryList := strings.Split(secondaryBackendsStr, ",")
for i, url := range secondaryList {
backends = append(backends, Backend{
URL: strings.TrimSpace(url),
Name: fmt.Sprintf("secondary-%d", i+1),
Role: "secondary",
})
}
}
log.Printf("Starting benchmark proxy on %s", listenAddr)
log.Printf("Primary backend: %s", primaryBackend)
log.Printf("Secondary backends: %s", secondaryBackendsStr)
// Set up HTTP client with reasonable timeouts
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DisableCompression: true, // Typically JSON-RPC doesn't benefit from compression
},
}
// Configure websocket upgrader with larger buffer sizes
// 20MB frame size and 50MB message size
const (
maxFrameSize = 20 * 1024 * 1024 // 20MB
maxMessageSize = 50 * 1024 * 1024 // 50MB
)
upgrader := websocket.Upgrader{
ReadBufferSize: maxFrameSize,
WriteBufferSize: maxFrameSize,
// Allow all origins
CheckOrigin: func(r *http.Request) bool { return true },
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// Check if this is a WebSocket upgrade request
if websocket.IsWebSocketUpgrade(r) {
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
} else {
// Handle regular HTTP request
stats := handleRequest(w, r, backends, client)
statsCollector.AddStats(stats, 0) // The 0 is a placeholder, we're not using totalDuration in the collector
}
})
log.Fatal(http.ListenAndServe(listenAddr, nil))
}
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client) []ResponseStats {
startTime := time.Now()
// Read the entire request body
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body", http.StatusBadRequest)
return nil
}
defer r.Body.Close()
// Try to parse the method from the JSON-RPC request
method := "unknown"
var jsonRPCReq JSONRPCRequest
if err := json.Unmarshal(body, &jsonRPCReq); err == nil && jsonRPCReq.Method != "" {
method = jsonRPCReq.Method
}
// Process backends in parallel
var wg sync.WaitGroup
statsChan := make(chan ResponseStats, len(backends))
primaryRespChan := make(chan *http.Response, 1)
primaryErrChan := make(chan error, 1)
for _, backend := range backends {
wg.Add(1)
go func(b Backend) {
defer wg.Done()
// Create a new request
backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body))
if err != nil {
statsChan <- ResponseStats{
Backend: b.Name,
Error: err,
Method: method,
}
if b.Role == "primary" {
primaryErrChan <- err
}
return
}
// Copy headers
for name, values := range r.Header {
for _, value := range values {
backendReq.Header.Add(name, value)
}
}
// Send the request
reqStart := time.Now()
resp, err := client.Do(backendReq)
reqDuration := time.Since(reqStart)
if err != nil {
statsChan <- ResponseStats{
Backend: b.Name,
Duration: reqDuration,
Error: err,
Method: method,
}
if b.Role == "primary" {
primaryErrChan <- err
}
return
}
defer resp.Body.Close()
statsChan <- ResponseStats{
Backend: b.Name,
StatusCode: resp.StatusCode,
Duration: reqDuration,
Method: method,
}
if b.Role == "primary" {
// For primary, we need to return this response to the client
respBody, err := io.ReadAll(resp.Body)
if err != nil {
primaryErrChan <- err
return
}
// Create a new response to send back to client
primaryResp := *resp
primaryResp.Body = io.NopCloser(bytes.NewReader(respBody))
primaryRespChan <- &primaryResp
}
}(backend)
}
// Wait for primary response
select {
case primaryResp := <-primaryRespChan:
// Copy the response to the client
for name, values := range primaryResp.Header {
for _, value := range values {
w.Header().Add(name, value)
}
}
w.WriteHeader(primaryResp.StatusCode)
io.Copy(w, primaryResp.Body)
case err := <-primaryErrChan:
http.Error(w, "Error from primary backend: "+err.Error(), http.StatusBadGateway)
case <-time.After(30 * time.Second):
http.Error(w, "Timeout waiting for primary backend", http.StatusGatewayTimeout)
}
// Wait for all goroutines to complete
go func() {
wg.Wait()
close(statsChan)
}()
// Collect stats
var stats []ResponseStats
for stat := range statsChan {
stats = append(stats, stat)
}
// Log response times
totalDuration := time.Since(startTime)
logResponseStats(totalDuration, stats)
return stats
}
func logResponseStats(totalDuration time.Duration, stats []ResponseStats) {
// Format: timestamp | total_time | method | backend1:time1 | backend2:time2 | ...
var parts []string
parts = append(parts, time.Now().Format("2006-01-02 15:04:05.000"))
parts = append(parts, fmt.Sprintf("total:%s", totalDuration))
// Add method if available (use the first stat with a method)
method := "unknown"
for _, stat := range stats {
if stat.Method != "" {
method = stat.Method
break
}
}
parts = append(parts, fmt.Sprintf("method:%s", method))
for _, stat := range stats {
if stat.Error != nil {
parts = append(parts, fmt.Sprintf("%s:error:%s", stat.Backend, stat.Error))
} else {
parts = append(parts, fmt.Sprintf("%s:%d:%s", stat.Backend, stat.StatusCode, stat.Duration))
}
}
fmt.Println(strings.Join(parts, " | "))
}
// handleWebSocketRequest manages WebSocket proxying
func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []Backend,
httpClient *http.Client, upgrader *websocket.Upgrader,
statsCollector *StatsCollector) {
// Upgrade the client connection
clientConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade client connection: %v", err)
return
}
defer clientConn.Close()
// Set max message size for client connection
clientConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit
// Connect to all backends
var wg sync.WaitGroup
for _, backend := range backends {
wg.Add(1)
go func(b Backend) {
defer wg.Done()
// Create backend URL with ws/wss instead of http/https
backendURL := strings.Replace(b.URL, "http://", "ws://", 1)
backendURL = strings.Replace(backendURL, "https://", "wss://", 1)
// Create a clean header map for the dialer
header := http.Header{}
// Only copy non-WebSocket headers
if origin := r.Header.Get("Origin"); origin != "" {
header.Set("Origin", origin)
}
if userAgent := r.Header.Get("User-Agent"); userAgent != "" {
header.Set("User-Agent", userAgent)
}
startTime := time.Now()
// Connect to backend WebSocket with larger buffer sizes
dialer := &websocket.Dialer{
ReadBufferSize: 20 * 1024 * 1024, // 20MB
WriteBufferSize: 20 * 1024 * 1024, // 20MB
}
backendConn, resp, err := dialer.Dial(backendURL, header)
connectDuration := time.Since(startTime)
stats := WebSocketStats{
Backend: b.Name,
ConnectTime: connectDuration,
IsActive: false,
}
if err != nil {
status := 0
if resp != nil {
status = resp.StatusCode
}
log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status)
stats.Error = err
statsCollector.AddWebSocketStats(stats)
return
}
defer backendConn.Close()
// Set max message size for backend connection
backendConn.SetReadLimit(50 * 1024 * 1024) // 50MB message size limit
stats.IsActive = true
statsCollector.AddWebSocketStats(stats)
// If this is the primary backend, set up bidirectional proxying
if b.Role == "primary" {
// Forward messages from client to primary backend
go func() {
for {
messageType, message, err := clientConn.ReadMessage()
if err != nil {
log.Printf("Error reading from client: %v", err)
return
}
err = backendConn.WriteMessage(messageType, message)
if err != nil {
log.Printf("Error writing to primary backend: %v", err)
return
}
}
}()
// Forward messages from primary backend to client
for {
messageType, message, err := backendConn.ReadMessage()
if err != nil {
log.Printf("Error reading from primary backend: %v", err)
return
}
err = clientConn.WriteMessage(messageType, message)
if err != nil {
log.Printf("Error writing to client: %v", err)
return
}
}
} else {
// For secondary backends, just read and discard messages
for {
_, _, err := backendConn.ReadMessage()
if err != nil {
log.Printf("Secondary backend %s connection closed: %v", b.Name, err)
return
}
}
}
}(backend)
}
// Wait for all connections to terminate
wg.Wait()
}
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return fallback
}