From 061671bcc32cebdb48df9dc9dd12bceb181a46cf Mon Sep 17 00:00:00 2001 From: Para Dox Date: Sun, 1 Jun 2025 22:11:07 +0700 Subject: [PATCH] no more streaming --- split-proxy/proxy.js | 175 ++++++++++++++++++++++++------------------- 1 file changed, 97 insertions(+), 78 deletions(-) diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index 0c1abfa7..d492aec0 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -192,54 +192,46 @@ class RPCProxy { } }); - // The 'close' event can fire for various reasons, not just client disconnect - // Only consider it a client disconnect if the response hasn't been completed + // Don't use the 'close' event to determine client disconnect + // It fires too early and unreliably req.on('close', () => { - // Only log as client disconnect if response wasn't completed - if (!clientClosed && !responseCompleted && !res.headersSent) { - clientClosed = true; - clientCloseReason = 'connection_closed'; - const elapsedMs = Date.now() - startTime; - - // Only log as error if it happens very quickly (< 100ms) - if (elapsedMs < 100) { - logger.debug({ - requestId, - reason: clientCloseReason, - method: requestBody.method, - elapsedMs, - responseCompleted, - headersSent: res.headersSent, - }, 'Request closed early (may be normal behavior)'); - } else { - logger.warn({ - requestId, - reason: clientCloseReason, - headers: req.headers, - userAgent: req.headers['user-agent'], - contentLength: req.headers['content-length'], - method: requestBody.method, - elapsedMs, - responseCompleted, - }, 'Client connection closed'); - } - } + const elapsedMs = Date.now() - startTime; + logger.debug({ + requestId, + reason: 'request_close_event', + method: requestBody.method, + elapsedMs, + responseCompleted, + headersSent: res.headersSent, + finished: res.finished, + }, 'Request close event (informational only)'); }); req.on('error', (error) => { - if (!clientClosed) { + // Only mark as closed for specific network errors + if (error.code === 'ECONNRESET' || error.code === 'EPIPE') { clientClosed = true; - clientCloseReason = `connection_error: ${error.code || error.message}`; - logger.error({ - requestId, - error: error.message, - code: error.code, - reason: clientCloseReason, - headers: req.headers, - method: requestBody.method, - elapsedMs: Date.now() - startTime, - }, 'Client connection error'); + clientCloseReason = `connection_error: ${error.code}`; } + logger.error({ + requestId, + error: error.message, + code: error.code, + reason: clientCloseReason, + headers: req.headers, + method: requestBody.method, + elapsedMs: Date.now() - startTime, + }, 'Client connection error'); + }); + + // Track when response is actually finished + res.on('finish', () => { + responseCompleted = true; + logger.debug({ + requestId, + method: requestBody.method, + elapsedMs: Date.now() - startTime, + }, 'Response finished successfully'); }); // Also track response close events @@ -408,38 +400,47 @@ class RPCProxy { statusCode: response.status, }, 'Stream response started'); - // Set response headers only if client hasn't closed - if (!isClientClosed() && !res.headersSent) { - res.status(response.status); - - // Respect client's connection preference - const clientConnection = res.req.headers.connection; - if (clientConnection && clientConnection.toLowerCase() === 'close') { - res.setHeader('Connection', 'close'); - } else { - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Keep-Alive', `timeout=${Math.floor(config.requestTimeout / 1000)}`); - } - - Object.entries(response.headers).forEach(([key, value]) => { - // Don't override Connection header we just set - if (key.toLowerCase() !== 'connection') { - res.setHeader(key, value); + // Set response headers if not already sent + if (!res.headersSent) { + try { + res.status(response.status); + + // Respect client's connection preference + const clientConnection = res.req.headers.connection; + if (clientConnection && clientConnection.toLowerCase() === 'close') { + res.setHeader('Connection', 'close'); + } else { + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Keep-Alive', `timeout=${Math.floor(config.requestTimeout / 1000)}`); } - }); - - // Explicitly flush headers to ensure client receives them immediately - res.flushHeaders(); - - logger.debug({ - requestId, - endpoint: 'stream', - headersSent: true, - statusCode: response.status, - contentType: response.headers['content-type'], - contentLength: response.headers['content-length'], - transferEncoding: response.headers['transfer-encoding'], - }, 'Response headers sent'); + + Object.entries(response.headers).forEach(([key, value]) => { + // Don't override Connection header we just set + if (key.toLowerCase() !== 'connection') { + res.setHeader(key, value); + } + }); + + // Explicitly flush headers to ensure client receives them immediately + res.flushHeaders(); + + logger.debug({ + requestId, + endpoint: 'stream', + headersSent: true, + statusCode: response.status, + contentType: response.headers['content-type'], + contentLength: response.headers['content-length'], + transferEncoding: response.headers['transfer-encoding'], + clientClosed: isClientClosed(), + }, 'Response headers sent'); + } catch (headerError) { + logger.error({ + requestId, + error: headerError.message, + clientClosed: isClientClosed(), + }, 'Error setting response headers'); + } } // Handle upstream errors @@ -451,7 +452,8 @@ class RPCProxy { code: error.code, }, 'Upstream stream error'); - if (!isClientClosed() && !res.writableEnded) { + // Only destroy if response hasn't been sent yet + if (!res.headersSent && !res.writableEnded) { res.destroy(); } }); @@ -466,15 +468,23 @@ class RPCProxy { // Always capture raw chunks for comparison chunks.push(chunk); - // Only write to client if still connected and not buffering - if (!shouldBuffer && !isClientClosed() && !res.writableEnded) { + // Only write to client if not buffering + // Remove the clientClosed check - let the write fail gracefully if truly disconnected + if (!shouldBuffer && !res.writableEnded) { try { - res.write(chunk); + const written = res.write(chunk); + if (!written) { + logger.debug({ + requestId, + chunkSize: chunk.length, + }, 'Backpressure detected on response write'); + } } catch (writeError) { logger.error({ requestId, error: writeError.message, code: writeError.code, + clientClosed: isClientClosed(), }, 'Error writing to client'); } } @@ -513,13 +523,22 @@ class RPCProxy { error: error.message, }, 'Error sending buffered response'); } - } else if (!isClientClosed() && !res.writableEnded) { + } else if (!res.writableEnded) { + // Always try to end the response, even if we think client is closed + // The write will fail gracefully if client is truly disconnected try { res.end(); + logger.debug({ + requestId, + endpoint: 'stream', + responseSize: rawData.length, + clientClosed: isClientClosed(), + }, 'Ended streaming response'); } catch (endError) { logger.error({ requestId, error: endError.message, + clientClosed: isClientClosed(), }, 'Error ending response'); } }