a set of random fixes into the blue
This commit is contained in:
@@ -1343,6 +1343,7 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
var responseHandled atomic.Bool
|
var responseHandled atomic.Bool
|
||||||
var firstBackendStartTime atomic.Pointer[time.Time]
|
var firstBackendStartTime atomic.Pointer[time.Time]
|
||||||
primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response
|
primaryResponseChan := make(chan struct{}, 1) // Signal when primary gets a response
|
||||||
|
primaryFailedFast := make(chan struct{}, 1) // Signal when primary fails immediately
|
||||||
|
|
||||||
for _, backend := range backends {
|
for _, backend := range backends {
|
||||||
// Skip secondary backends for stateful methods
|
// Skip secondary backends for stateful methods
|
||||||
@@ -1388,6 +1389,12 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
Duration: time.Since(goroutineStartTime),
|
Duration: time.Since(goroutineStartTime),
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
case <-primaryFailedFast:
|
||||||
|
// Primary failed immediately, start secondary now
|
||||||
|
delayTimer.Stop()
|
||||||
|
if enableDetailedLogs {
|
||||||
|
log.Printf("Primary failed fast for %s, starting secondary immediately", method)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1416,6 +1423,14 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
reqDuration := time.Since(reqStart)
|
reqDuration := time.Since(reqStart)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// If primary failed with connection error, signal secondary to start
|
||||||
|
if b.Role == "primary" {
|
||||||
|
select {
|
||||||
|
case primaryFailedFast <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
statsChan <- ResponseStats{
|
statsChan <- ResponseStats{
|
||||||
Backend: b.Name,
|
Backend: b.Name,
|
||||||
Duration: reqDuration, // Keep backend-specific duration
|
Duration: reqDuration, // Keep backend-specific duration
|
||||||
@@ -1427,6 +1442,15 @@ func handleRequest(w http.ResponseWriter, r *http.Request, backends []Backend, c
|
|||||||
|
|
||||||
// Don't close resp.Body here - it will be closed by the winner or drained by losers
|
// Don't close resp.Body here - it will be closed by the winner or drained by losers
|
||||||
|
|
||||||
|
// Check if primary returned an error status
|
||||||
|
if b.Role == "primary" && resp.StatusCode >= 400 {
|
||||||
|
// Any 4xx or 5xx error should trigger immediate secondary
|
||||||
|
select {
|
||||||
|
case primaryFailedFast <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Signal primary response immediately for secondary backends to check
|
// Signal primary response immediately for secondary backends to check
|
||||||
if b.Role == "primary" && resp.StatusCode < 400 {
|
if b.Role == "primary" && resp.StatusCode < 400 {
|
||||||
select {
|
select {
|
||||||
@@ -1677,6 +1701,8 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B
|
|||||||
|
|
||||||
// Connect to all backends
|
// Connect to all backends
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
primaryConnected := make(chan bool, 1) // Track if primary connected successfully
|
||||||
|
|
||||||
for _, backend := range backends {
|
for _, backend := range backends {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(b Backend) {
|
go func(b Backend) {
|
||||||
@@ -1720,6 +1746,14 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B
|
|||||||
log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status)
|
log.Printf("Failed to connect to backend %s: %v (status: %d)", b.Name, err, status)
|
||||||
stats.Error = err
|
stats.Error = err
|
||||||
statsCollector.AddWebSocketStats(stats)
|
statsCollector.AddWebSocketStats(stats)
|
||||||
|
|
||||||
|
// If primary failed to connect, signal that
|
||||||
|
if b.Role == "primary" {
|
||||||
|
select {
|
||||||
|
case primaryConnected <- false:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer backendConn.Close()
|
defer backendConn.Close()
|
||||||
@@ -1730,39 +1764,81 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B
|
|||||||
stats.IsActive = true
|
stats.IsActive = true
|
||||||
statsCollector.AddWebSocketStats(stats)
|
statsCollector.AddWebSocketStats(stats)
|
||||||
|
|
||||||
|
// If this is the primary backend, signal successful connection
|
||||||
|
if b.Role == "primary" {
|
||||||
|
select {
|
||||||
|
case primaryConnected <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If this is the primary backend, set up bidirectional proxying
|
// If this is the primary backend, set up bidirectional proxying
|
||||||
if b.Role == "primary" {
|
if b.Role == "primary" {
|
||||||
|
// Channel to signal when primary connection fails
|
||||||
|
primaryFailed := make(chan struct{}, 2) // Buffered for 2 signals
|
||||||
|
|
||||||
// Forward messages from client to primary backend
|
// Forward messages from client to primary backend
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
messageType, message, err := clientConn.ReadMessage()
|
messageType, message, err := clientConn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error reading from client: %v", err)
|
log.Printf("Error reading from client: %v", err)
|
||||||
|
select {
|
||||||
|
case primaryFailed <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = backendConn.WriteMessage(messageType, message)
|
err = backendConn.WriteMessage(messageType, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error writing to primary backend: %v", err)
|
log.Printf("Error writing to primary backend: %v", err)
|
||||||
|
select {
|
||||||
|
case primaryFailed <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Forward messages from primary backend to client
|
// Forward messages from primary backend to client
|
||||||
|
go func() {
|
||||||
for {
|
for {
|
||||||
messageType, message, err := backendConn.ReadMessage()
|
messageType, message, err := backendConn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error reading from primary backend: %v", err)
|
log.Printf("Error reading from primary backend: %v", err)
|
||||||
|
select {
|
||||||
|
case primaryFailed <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = clientConn.WriteMessage(messageType, message)
|
err = clientConn.WriteMessage(messageType, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error writing to client: %v", err)
|
log.Printf("Error writing to client: %v", err)
|
||||||
|
select {
|
||||||
|
case primaryFailed <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for primary connection failure
|
||||||
|
<-primaryFailed
|
||||||
|
|
||||||
|
// Primary backend failed, close client connection with proper close message
|
||||||
|
log.Printf("Primary backend WebSocket failed, closing client connection")
|
||||||
|
closeMsg := websocket.FormatCloseMessage(websocket.CloseGoingAway,
|
||||||
|
"Primary backend unavailable")
|
||||||
|
clientConn.WriteControl(websocket.CloseMessage, closeMsg,
|
||||||
|
time.Now().Add(time.Second))
|
||||||
|
|
||||||
|
// Return to trigger cleanup
|
||||||
|
return
|
||||||
} else {
|
} else {
|
||||||
// For secondary backends, just read and discard messages
|
// For secondary backends, just read and discard messages
|
||||||
for {
|
for {
|
||||||
@@ -1778,6 +1854,22 @@ func handleWebSocketRequest(w http.ResponseWriter, r *http.Request, backends []B
|
|||||||
|
|
||||||
// Wait for all connections to terminate
|
// Wait for all connections to terminate
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// Check if primary connected successfully
|
||||||
|
select {
|
||||||
|
case connected := <-primaryConnected:
|
||||||
|
if !connected {
|
||||||
|
// Primary failed to connect, close client connection
|
||||||
|
log.Printf("Primary backend WebSocket failed to connect, closing client connection")
|
||||||
|
closeMsg := websocket.FormatCloseMessage(websocket.CloseServiceRestart,
|
||||||
|
"Primary backend unavailable")
|
||||||
|
clientConn.WriteControl(websocket.CloseMessage, closeMsg,
|
||||||
|
time.Now().Add(time.Second))
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// No primary backend in the configuration (shouldn't happen)
|
||||||
|
log.Printf("Warning: No primary backend configured for WebSocket")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEnv(key, fallback string) string {
|
func getEnv(key, fallback string) string {
|
||||||
|
|||||||
Reference in New Issue
Block a user