diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index 86dc4eab..22af214f 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -610,8 +610,8 @@ class RPCProxy { code: error.code, }, 'Upstream stream error'); - // Only destroy if response hasn't been sent yet - if (!res.headersSent && !res.writableEnded) { + // Only destroy if response hasn't been sent yet and isn't already destroyed + if (!res.headersSent && !res.writableEnded && !res.destroyed) { res.destroy(); } }); @@ -629,10 +629,22 @@ class RPCProxy { // Always capture raw chunks for comparison chunks.push(chunk); - // Stream data to client - if (!res.writableEnded) { + // Stream data to client - check both writableEnded and destroyed state + if (!res.writableEnded && !res.destroyed) { // Chain writes to handle backpressure properly writeQueue = writeQueue.then(() => new Promise((resolve) => { + // Double-check stream state before writing + if (res.destroyed || res.writableEnded) { + logger.debug({ + requestId, + destroyed: res.destroyed, + writableEnded: res.writableEnded, + chunkSize: chunk.length, + }, 'Stream destroyed/ended before write, skipping chunk'); + resolve(); + return; + } + try { const canContinue = res.write(chunk, (err) => { // Log per-chunk overhead @@ -645,22 +657,49 @@ class RPCProxy { }, 'High chunk processing overhead'); } if (err) { - logger.error({ - requestId, - error: err.message, - chunkSize: chunk.length, - }, 'Error in write callback'); + // Check if it's the specific "destroyed" error + if (err.message === 'Cannot call write after a stream was destroyed') { + logger.debug({ + requestId, + error: err.message, + chunkSize: chunk.length, + destroyed: res.destroyed, + writableEnded: res.writableEnded, + }, 'Stream was destroyed during write (expected race condition)'); + } else { + logger.error({ + requestId, + error: err.message, + chunkSize: chunk.length, + }, 'Error in write callback'); + } } resolve(); }); - if (!canContinue) { + if (!canContinue && !res.destroyed) { // Wait for drain event if write buffer is full logger.debug({ requestId, chunkSize: chunk.length, }, 'Backpressure detected, waiting for drain'); - res.once('drain', resolve); + + // Set up drain listener with error handling + const drainHandler = () => resolve(); + const errorHandler = (err) => { + res.removeListener('drain', drainHandler); + logger.debug({ + requestId, + error: err.message, + }, 'Stream error while waiting for drain'); + resolve(); + }; + + res.once('drain', drainHandler); + res.once('error', errorHandler); + + // Clean up error handler if drain happens first + res.once('drain', () => res.removeListener('error', errorHandler)); } else { resolve(); } @@ -674,6 +713,13 @@ class RPCProxy { resolve(); // Continue even on error } })); + } else { + logger.debug({ + requestId, + destroyed: res.destroyed, + writableEnded: res.writableEnded, + chunkSize: chunk.length, + }, 'Skipping chunk write - stream not writable'); } }); @@ -708,21 +754,30 @@ class RPCProxy { } // End the response - if (!res.writableEnded) { + if (!res.writableEnded && !res.destroyed) { try { // If there's still data in the write buffer, wait for it to drain if (res.writableHighWaterMark && res.writableLength > 0) { res.once('drain', () => { - res.end(() => { - const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000; + // Check again before ending + if (!res.writableEnded && !res.destroyed) { + res.end(() => { + const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000; + logger.debug({ + requestId, + endpoint: 'stream', + responseSize: rawData.length, + clientClosed: isClientClosed(), + transferCompleteHrMs: transferCompleteHR, + }, 'Ended streaming response after drain'); + }); + } else { logger.debug({ requestId, - endpoint: 'stream', - responseSize: rawData.length, - clientClosed: isClientClosed(), - transferCompleteHrMs: transferCompleteHR, - }, 'Ended streaming response after drain'); - }); + destroyed: res.destroyed, + writableEnded: res.writableEnded, + }, 'Response already ended/destroyed after drain'); + } }); } else { res.end(() => { @@ -741,8 +796,17 @@ class RPCProxy { requestId, error: endError.message, clientClosed: isClientClosed(), + destroyed: res.destroyed, + writableEnded: res.writableEnded, }, 'Error ending response'); } + } else { + logger.debug({ + requestId, + destroyed: res.destroyed, + writableEnded: res.writableEnded, + responseSize: rawData.length, + }, 'Response already ended/destroyed, skipping end call'); } // Log if client closed very early