more features
This commit is contained in:
@@ -4,9 +4,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -1173,6 +1171,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
|
|
||||||
// Process backends with adaptive delay strategy
|
// Process backends with adaptive delay strategy
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
var primaryWg sync.WaitGroup // Separate wait group for primary backend
|
||||||
statsChan := make(chan ResponseStats, len(backends))
|
statsChan := make(chan ResponseStats, len(backends))
|
||||||
responseChan := make(chan struct {
|
responseChan := make(chan struct {
|
||||||
backend string
|
backend string
|
||||||
@@ -1181,10 +1180,6 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
body []byte
|
body []byte
|
||||||
}, len(backends))
|
}, len(backends))
|
||||||
|
|
||||||
// Create a context that we can cancel once we get the first response
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Track if we've already sent a response
|
// Track if we've already sent a response
|
||||||
var responseHandled atomic.Bool
|
var responseHandled atomic.Bool
|
||||||
|
|
||||||
@@ -1195,30 +1190,33 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
if backend.Role == "primary" {
|
||||||
|
primaryWg.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
go func(b Backend) {
|
go func(b Backend) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
if b.Role == "primary" {
|
||||||
|
defer primaryWg.Done()
|
||||||
|
}
|
||||||
|
|
||||||
// Track when this goroutine actually starts processing
|
// Track when this goroutine actually starts processing
|
||||||
goroutineStartTime := time.Now()
|
goroutineStartTime := time.Now()
|
||||||
|
|
||||||
// If this is a secondary backend, wait for p50 delay
|
// If this is a secondary backend, wait for p50 delay
|
||||||
if b.Role != "primary" {
|
if b.Role != "primary" {
|
||||||
|
delayTimer := time.NewTimer(p50Delay)
|
||||||
select {
|
select {
|
||||||
case <-time.After(p50Delay):
|
case <-delayTimer.C:
|
||||||
// Continue after delay
|
// Continue after delay
|
||||||
case <-ctx.Done():
|
case <-responseChan:
|
||||||
// Primary already responded, skip secondary
|
// Someone already responded, but continue anyway to collect stats
|
||||||
return
|
delayTimer.Stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if response was already handled
|
// Create a new request (no longer using context for cancellation)
|
||||||
if responseHandled.Load() {
|
backendReq, err := http.NewRequest(r.Method, b.URL, bytes.NewReader(body))
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new request
|
|
||||||
backendReq, err := http.NewRequestWithContext(ctx, r.Method, b.URL, bytes.NewReader(body))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
statsChan <- ResponseStats{
|
statsChan <- ResponseStats{
|
||||||
Backend: b.Name,
|
Backend: b.Name,
|
||||||
@@ -1242,15 +1240,12 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
reqDuration := time.Since(reqStart)
|
reqDuration := time.Since(reqStart)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Only record stats if this isn't a context cancellation
|
|
||||||
if !errors.Is(err, context.Canceled) {
|
|
||||||
statsChan <- ResponseStats{
|
statsChan <- ResponseStats{
|
||||||
Backend: b.Name,
|
Backend: b.Name,
|
||||||
Duration: reqDuration, // Keep backend-specific duration
|
Duration: reqDuration, // Keep backend-specific duration
|
||||||
Error: err,
|
Error: err,
|
||||||
Method: method,
|
Method: method,
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
@@ -1282,9 +1277,6 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
err error
|
err error
|
||||||
body []byte
|
body []byte
|
||||||
}{b.Name, resp, nil, respBody}
|
}{b.Name, resp, nil, respBody}
|
||||||
|
|
||||||
// Cancel other requests
|
|
||||||
cancel()
|
|
||||||
}
|
}
|
||||||
}(backend)
|
}(backend)
|
||||||
}
|
}
|
||||||
@@ -1307,9 +1299,10 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
response = <-responseChan
|
response = <-responseChan
|
||||||
} else {
|
} else {
|
||||||
http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout)
|
http.Error(w, "Timeout waiting for any backend", http.StatusGatewayTimeout)
|
||||||
cancel()
|
// Always wait for primary backend to complete before collecting stats
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
primaryWg.Wait() // Wait for primary first
|
||||||
|
wg.Wait() // Then wait for all
|
||||||
close(statsChan)
|
close(statsChan)
|
||||||
}()
|
}()
|
||||||
// Collect stats
|
// Collect stats
|
||||||
@@ -1333,9 +1326,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
w.Write(response.body)
|
w.Write(response.body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all goroutines to complete and collect stats
|
// Always wait for primary backend to complete before collecting stats
|
||||||
|
// This ensures primary backend stats are always included
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
primaryWg.Wait() // Wait for primary backend to complete first
|
||||||
|
wg.Wait() // Then wait for all other backends
|
||||||
close(statsChan)
|
close(statsChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user