no more streaming

This commit is contained in:
Para Dox
2025-06-01 22:11:07 +07:00
parent 40d5ad9750
commit 061671bcc3

View File

@@ -192,54 +192,46 @@ class RPCProxy {
} }
}); });
// The 'close' event can fire for various reasons, not just client disconnect // Don't use the 'close' event to determine client disconnect
// Only consider it a client disconnect if the response hasn't been completed // It fires too early and unreliably
req.on('close', () => { req.on('close', () => {
// Only log as client disconnect if response wasn't completed const elapsedMs = Date.now() - startTime;
if (!clientClosed && !responseCompleted && !res.headersSent) { logger.debug({
clientClosed = true; requestId,
clientCloseReason = 'connection_closed'; reason: 'request_close_event',
const elapsedMs = Date.now() - startTime; method: requestBody.method,
elapsedMs,
// Only log as error if it happens very quickly (< 100ms) responseCompleted,
if (elapsedMs < 100) { headersSent: res.headersSent,
logger.debug({ finished: res.finished,
requestId, }, 'Request close event (informational only)');
reason: clientCloseReason,
method: requestBody.method,
elapsedMs,
responseCompleted,
headersSent: res.headersSent,
}, 'Request closed early (may be normal behavior)');
} else {
logger.warn({
requestId,
reason: clientCloseReason,
headers: req.headers,
userAgent: req.headers['user-agent'],
contentLength: req.headers['content-length'],
method: requestBody.method,
elapsedMs,
responseCompleted,
}, 'Client connection closed');
}
}
}); });
req.on('error', (error) => { req.on('error', (error) => {
if (!clientClosed) { // Only mark as closed for specific network errors
if (error.code === 'ECONNRESET' || error.code === 'EPIPE') {
clientClosed = true; clientClosed = true;
clientCloseReason = `connection_error: ${error.code || error.message}`; clientCloseReason = `connection_error: ${error.code}`;
logger.error({
requestId,
error: error.message,
code: error.code,
reason: clientCloseReason,
headers: req.headers,
method: requestBody.method,
elapsedMs: Date.now() - startTime,
}, 'Client connection error');
} }
logger.error({
requestId,
error: error.message,
code: error.code,
reason: clientCloseReason,
headers: req.headers,
method: requestBody.method,
elapsedMs: Date.now() - startTime,
}, 'Client connection error');
});
// Track when response is actually finished
res.on('finish', () => {
responseCompleted = true;
logger.debug({
requestId,
method: requestBody.method,
elapsedMs: Date.now() - startTime,
}, 'Response finished successfully');
}); });
// Also track response close events // Also track response close events
@@ -408,38 +400,47 @@ class RPCProxy {
statusCode: response.status, statusCode: response.status,
}, 'Stream response started'); }, 'Stream response started');
// Set response headers only if client hasn't closed // Set response headers if not already sent
if (!isClientClosed() && !res.headersSent) { if (!res.headersSent) {
res.status(response.status); try {
res.status(response.status);
// Respect client's connection preference
const clientConnection = res.req.headers.connection; // Respect client's connection preference
if (clientConnection && clientConnection.toLowerCase() === 'close') { const clientConnection = res.req.headers.connection;
res.setHeader('Connection', 'close'); if (clientConnection && clientConnection.toLowerCase() === 'close') {
} else { res.setHeader('Connection', 'close');
res.setHeader('Connection', 'keep-alive'); } else {
res.setHeader('Keep-Alive', `timeout=${Math.floor(config.requestTimeout / 1000)}`); res.setHeader('Connection', 'keep-alive');
} res.setHeader('Keep-Alive', `timeout=${Math.floor(config.requestTimeout / 1000)}`);
Object.entries(response.headers).forEach(([key, value]) => {
// Don't override Connection header we just set
if (key.toLowerCase() !== 'connection') {
res.setHeader(key, value);
} }
});
Object.entries(response.headers).forEach(([key, value]) => {
// Explicitly flush headers to ensure client receives them immediately // Don't override Connection header we just set
res.flushHeaders(); if (key.toLowerCase() !== 'connection') {
res.setHeader(key, value);
logger.debug({ }
requestId, });
endpoint: 'stream',
headersSent: true, // Explicitly flush headers to ensure client receives them immediately
statusCode: response.status, res.flushHeaders();
contentType: response.headers['content-type'],
contentLength: response.headers['content-length'], logger.debug({
transferEncoding: response.headers['transfer-encoding'], requestId,
}, 'Response headers sent'); endpoint: 'stream',
headersSent: true,
statusCode: response.status,
contentType: response.headers['content-type'],
contentLength: response.headers['content-length'],
transferEncoding: response.headers['transfer-encoding'],
clientClosed: isClientClosed(),
}, 'Response headers sent');
} catch (headerError) {
logger.error({
requestId,
error: headerError.message,
clientClosed: isClientClosed(),
}, 'Error setting response headers');
}
} }
// Handle upstream errors // Handle upstream errors
@@ -451,7 +452,8 @@ class RPCProxy {
code: error.code, code: error.code,
}, 'Upstream stream error'); }, 'Upstream stream error');
if (!isClientClosed() && !res.writableEnded) { // Only destroy if response hasn't been sent yet
if (!res.headersSent && !res.writableEnded) {
res.destroy(); res.destroy();
} }
}); });
@@ -466,15 +468,23 @@ class RPCProxy {
// Always capture raw chunks for comparison // Always capture raw chunks for comparison
chunks.push(chunk); chunks.push(chunk);
// Only write to client if still connected and not buffering // Only write to client if not buffering
if (!shouldBuffer && !isClientClosed() && !res.writableEnded) { // Remove the clientClosed check - let the write fail gracefully if truly disconnected
if (!shouldBuffer && !res.writableEnded) {
try { try {
res.write(chunk); const written = res.write(chunk);
if (!written) {
logger.debug({
requestId,
chunkSize: chunk.length,
}, 'Backpressure detected on response write');
}
} catch (writeError) { } catch (writeError) {
logger.error({ logger.error({
requestId, requestId,
error: writeError.message, error: writeError.message,
code: writeError.code, code: writeError.code,
clientClosed: isClientClosed(),
}, 'Error writing to client'); }, 'Error writing to client');
} }
} }
@@ -513,13 +523,22 @@ class RPCProxy {
error: error.message, error: error.message,
}, 'Error sending buffered response'); }, 'Error sending buffered response');
} }
} else if (!isClientClosed() && !res.writableEnded) { } else if (!res.writableEnded) {
// Always try to end the response, even if we think client is closed
// The write will fail gracefully if client is truly disconnected
try { try {
res.end(); res.end();
logger.debug({
requestId,
endpoint: 'stream',
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Ended streaming response');
} catch (endError) { } catch (endError) {
logger.error({ logger.error({
requestId, requestId,
error: endError.message, error: endError.message,
clientClosed: isClientClosed(),
}, 'Error ending response'); }, 'Error ending response');
} }
} }