more features
This commit is contained in:
@@ -4,7 +4,9 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@@ -14,6 +16,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -904,6 +907,33 @@ func formatCUWithExtrapolation(cu int, isExtrapolated bool) string {
|
||||
return fmt.Sprintf("%d CU", cu)
|
||||
}
|
||||
|
||||
// GetPrimaryP50 calculates the current p50 latency for the primary backend
|
||||
func (sc *StatsCollector) GetPrimaryP50() time.Duration {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
|
||||
// Collect primary backend durations
|
||||
var primaryDurations []time.Duration
|
||||
for _, stat := range sc.requestStats {
|
||||
if stat.Backend == "primary" && stat.Error == nil {
|
||||
primaryDurations = append(primaryDurations, stat.Duration)
|
||||
}
|
||||
}
|
||||
|
||||
// If we don't have enough data, return a sensible default
|
||||
if len(primaryDurations) < 10 {
|
||||
return 10 * time.Millisecond // Default to 10ms
|
||||
}
|
||||
|
||||
// Sort and find p50
|
||||
sort.Slice(primaryDurations, func(i, j int) bool {
|
||||
return primaryDurations[i] < primaryDurations[j]
|
||||
})
|
||||
|
||||
p50idx := len(primaryDurations) * 50 / 100
|
||||
return primaryDurations[p50idx]
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Get configuration from environment variables
|
||||
listenAddr := getEnv("LISTEN_ADDR", ":8080")
|
||||
@@ -975,7 +1005,7 @@ func main() {
|
||||
handleWebSocketRequest(w, r, backends, client, &upgrader, statsCollector)
|
||||
} else {
|
||||
// Handle regular HTTP request
|
||||
stats := handleRequest(w, r, backends, client, enableDetailedLogs == "true")
|
||||
stats := handleRequest(w, r, backends, client, enableDetailedLogs == "true", statsCollector)
|
||||
statsCollector.AddStats(stats, 0) // The 0 is a placeholder, we're not using totalDuration in the collector
|
||||
}
|
||||
})
|
||||
@@ -983,7 +1013,7 @@ func main() {
|
||||
log.Fatal(http.ListenAndServe(listenAddr, nil))
|
||||
}
|
||||
|
||||
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool) []ResponseStats {
|
||||
func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, client *http.Client, enableDetailedLogs bool, statsCollector *StatsCollector) []ResponseStats {
|
||||
startTime := time.Now()
|
||||
|
||||
// Read the entire request body
|
||||
@@ -1001,28 +1031,55 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
method = jsonRPCReq.Method
|
||||
}
|
||||
|
||||
// Process backends in parallel
|
||||
// Get current p50 delay for primary backend
|
||||
p50Delay := statsCollector.GetPrimaryP50()
|
||||
|
||||
// Process backends with adaptive delay strategy
|
||||
var wg sync.WaitGroup
|
||||
statsChan := make(chan ResponseStats, len(backends))
|
||||
primaryRespChan := make(chan *http.Response, 1)
|
||||
primaryErrChan := make(chan error, 1)
|
||||
responseChan := make(chan struct {
|
||||
backend string
|
||||
resp *http.Response
|
||||
err error
|
||||
body []byte
|
||||
}, len(backends))
|
||||
|
||||
// Create a context that we can cancel once we get the first response
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Track if we've already sent a response
|
||||
var responseHandled atomic.Bool
|
||||
|
||||
for _, backend := range backends {
|
||||
wg.Add(1)
|
||||
go func(b Backend) {
|
||||
defer wg.Done()
|
||||
|
||||
// If this is a secondary backend, wait for p50 delay
|
||||
if b.Role != "primary" {
|
||||
select {
|
||||
case <-time.After(p50Delay):
|
||||
// Continue after delay
|
||||
case <-ctx.Done():
|
||||
// Primary already responded, skip secondary
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Check if response was already handled
|
||||
if responseHandled.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new request
|
||||
backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body))
|
||||
backendReq, err := http.NewRequestWithContext(ctx, r.Method, b.URL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
statsChan <- ResponseStats{
|
||||
Backend: b.Name,
|
||||
Error: err,
|
||||
Method: method,
|
||||
}
|
||||
if b.Role == "primary" {
|
||||
primaryErrChan <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1038,6 +1095,22 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
resp, err := client.Do(backendReq)
|
||||
reqDuration := time.Since(reqStart)
|
||||
|
||||
if err != nil {
|
||||
// Only record stats if this isn't a context cancellation
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
statsChan <- ResponseStats{
|
||||
Backend: b.Name,
|
||||
Duration: reqDuration,
|
||||
Error: err,
|
||||
Method: method,
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read response body
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
statsChan <- ResponseStats{
|
||||
Backend: b.Name,
|
||||
@@ -1045,12 +1118,8 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
Error: err,
|
||||
Method: method,
|
||||
}
|
||||
if b.Role == "primary" {
|
||||
primaryErrChan <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
statsChan <- ResponseStats{
|
||||
Backend: b.Name,
|
||||
@@ -1059,40 +1128,66 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
||||
Method: method,
|
||||
}
|
||||
|
||||
if b.Role == "primary" {
|
||||
// For primary, we need to return this response to the client
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
primaryErrChan <- err
|
||||
return
|
||||
}
|
||||
// Try to be the first to respond
|
||||
if responseHandled.CompareAndSwap(false, true) {
|
||||
responseChan <- struct {
|
||||
backend string
|
||||
resp *http.Response
|
||||
err error
|
||||
body []byte
|
||||
}{b.Name, resp, nil, respBody}
|
||||
|
||||
// Create a new response to send back to client
|
||||
primaryResp := *resp
|
||||
primaryResp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
primaryRespChan <- &primaryResp
|
||||
// Cancel other requests
|
||||
cancel()
|
||||
}
|
||||
}(backend)
|
||||
}
|
||||
|
||||
// Wait for primary response
|
||||
// Wait for the first successful response
|
||||
var response struct {
|
||||
backend string
|
||||
resp *http.Response
|
||||
err error
|
||||
body []byte
|
||||
}
|
||||
|
||||
select {
|
||||
case primaryResp := <-primaryRespChan:
|
||||
// Copy the response to the client
|
||||
for name, values := range primaryResp.Header {
|
||||
case response = <-responseChan:
|
||||
// Got a response
|
||||
case <-time.After(30 * time.Second):
|
||||
// Timeout
|
||||
if !responseHandled.CompareAndSwap(false, true) {
|
||||
// Someone else handled it
|
||||
response = <-responseChan
|
||||
} else {
|
||||
http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout)
|
||||
cancel()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(statsChan)
|
||||
}()
|
||||
// Collect stats
|
||||
var stats []ResponseStats
|
||||
for stat := range statsChan {
|
||||
stats = append(stats, stat)
|
||||
}
|
||||
return stats
|
||||
}
|
||||
}
|
||||
|
||||
// Send the response to the client
|
||||
if response.err == nil && response.resp != nil {
|
||||
// Copy response headers
|
||||
for name, values := range response.resp.Header {
|
||||
for _, value := range values {
|
||||
w.Header().Add(name, value)
|
||||
}
|
||||
}
|
||||
w.WriteHeader(primaryResp.StatusCode)
|
||||
io.Copy(w, primaryResp.Body)
|
||||
case err := <-primaryErrChan:
|
||||
http.Error(w, "Error from primary backend: "+err.Error(), http.StatusBadGateway)
|
||||
case <-time.After(30 * time.Second):
|
||||
http.Error(w, "Timeout waiting for primary backend", http.StatusGatewayTimeout)
|
||||
w.WriteHeader(response.resp.StatusCode)
|
||||
w.Write(response.body)
|
||||
}
|
||||
|
||||
// Wait for all goroutines to complete
|
||||
// Wait for all goroutines to complete and collect stats
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(statsChan)
|
||||
|
||||
Reference in New Issue
Block a user