feature not bug
This commit is contained in:
@@ -610,8 +610,8 @@ class RPCProxy {
|
||||
code: error.code,
|
||||
}, 'Upstream stream error');
|
||||
|
||||
// Only destroy if response hasn't been sent yet
|
||||
if (!res.headersSent && !res.writableEnded) {
|
||||
// Only destroy if response hasn't been sent yet and isn't already destroyed
|
||||
if (!res.headersSent && !res.writableEnded && !res.destroyed) {
|
||||
res.destroy();
|
||||
}
|
||||
});
|
||||
@@ -629,10 +629,22 @@ class RPCProxy {
|
||||
// Always capture raw chunks for comparison
|
||||
chunks.push(chunk);
|
||||
|
||||
// Stream data to client
|
||||
if (!res.writableEnded) {
|
||||
// Stream data to client - check both writableEnded and destroyed state
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
// Chain writes to handle backpressure properly
|
||||
writeQueue = writeQueue.then(() => new Promise((resolve) => {
|
||||
// Double-check stream state before writing
|
||||
if (res.destroyed || res.writableEnded) {
|
||||
logger.debug({
|
||||
requestId,
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
chunkSize: chunk.length,
|
||||
}, 'Stream destroyed/ended before write, skipping chunk');
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const canContinue = res.write(chunk, (err) => {
|
||||
// Log per-chunk overhead
|
||||
@@ -645,22 +657,49 @@ class RPCProxy {
|
||||
}, 'High chunk processing overhead');
|
||||
}
|
||||
if (err) {
|
||||
logger.error({
|
||||
requestId,
|
||||
error: err.message,
|
||||
chunkSize: chunk.length,
|
||||
}, 'Error in write callback');
|
||||
// Check if it's the specific "destroyed" error
|
||||
if (err.message === 'Cannot call write after a stream was destroyed') {
|
||||
logger.debug({
|
||||
requestId,
|
||||
error: err.message,
|
||||
chunkSize: chunk.length,
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
}, 'Stream was destroyed during write (expected race condition)');
|
||||
} else {
|
||||
logger.error({
|
||||
requestId,
|
||||
error: err.message,
|
||||
chunkSize: chunk.length,
|
||||
}, 'Error in write callback');
|
||||
}
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
|
||||
if (!canContinue) {
|
||||
if (!canContinue && !res.destroyed) {
|
||||
// Wait for drain event if write buffer is full
|
||||
logger.debug({
|
||||
requestId,
|
||||
chunkSize: chunk.length,
|
||||
}, 'Backpressure detected, waiting for drain');
|
||||
res.once('drain', resolve);
|
||||
|
||||
// Set up drain listener with error handling
|
||||
const drainHandler = () => resolve();
|
||||
const errorHandler = (err) => {
|
||||
res.removeListener('drain', drainHandler);
|
||||
logger.debug({
|
||||
requestId,
|
||||
error: err.message,
|
||||
}, 'Stream error while waiting for drain');
|
||||
resolve();
|
||||
};
|
||||
|
||||
res.once('drain', drainHandler);
|
||||
res.once('error', errorHandler);
|
||||
|
||||
// Clean up error handler if drain happens first
|
||||
res.once('drain', () => res.removeListener('error', errorHandler));
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
@@ -674,6 +713,13 @@ class RPCProxy {
|
||||
resolve(); // Continue even on error
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
logger.debug({
|
||||
requestId,
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
chunkSize: chunk.length,
|
||||
}, 'Skipping chunk write - stream not writable');
|
||||
}
|
||||
});
|
||||
|
||||
@@ -708,21 +754,30 @@ class RPCProxy {
|
||||
}
|
||||
|
||||
// End the response
|
||||
if (!res.writableEnded) {
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
try {
|
||||
// If there's still data in the write buffer, wait for it to drain
|
||||
if (res.writableHighWaterMark && res.writableLength > 0) {
|
||||
res.once('drain', () => {
|
||||
res.end(() => {
|
||||
const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
||||
// Check again before ending
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.end(() => {
|
||||
const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
||||
logger.debug({
|
||||
requestId,
|
||||
endpoint: 'stream',
|
||||
responseSize: rawData.length,
|
||||
clientClosed: isClientClosed(),
|
||||
transferCompleteHrMs: transferCompleteHR,
|
||||
}, 'Ended streaming response after drain');
|
||||
});
|
||||
} else {
|
||||
logger.debug({
|
||||
requestId,
|
||||
endpoint: 'stream',
|
||||
responseSize: rawData.length,
|
||||
clientClosed: isClientClosed(),
|
||||
transferCompleteHrMs: transferCompleteHR,
|
||||
}, 'Ended streaming response after drain');
|
||||
});
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
}, 'Response already ended/destroyed after drain');
|
||||
}
|
||||
});
|
||||
} else {
|
||||
res.end(() => {
|
||||
@@ -741,8 +796,17 @@ class RPCProxy {
|
||||
requestId,
|
||||
error: endError.message,
|
||||
clientClosed: isClientClosed(),
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
}, 'Error ending response');
|
||||
}
|
||||
} else {
|
||||
logger.debug({
|
||||
requestId,
|
||||
destroyed: res.destroyed,
|
||||
writableEnded: res.writableEnded,
|
||||
responseSize: rawData.length,
|
||||
}, 'Response already ended/destroyed, skipping end call');
|
||||
}
|
||||
|
||||
// Log if client closed very early
|
||||
|
||||
Reference in New Issue
Block a user