no more streaming

This commit is contained in:
Para Dox
2025-06-01 22:17:28 +07:00
parent 061671bcc3
commit 338eaa3da6
2 changed files with 134 additions and 39 deletions

View File

@@ -102,6 +102,26 @@ server.timeout = config.requestTimeout + 5000; // Add 5 seconds buffer
server.keepAliveTimeout = config.requestTimeout + 5000;
server.headersTimeout = config.requestTimeout + 6000; // Should be > keepAliveTimeout
// Additional configuration to prevent premature connection closure
server.requestTimeout = config.requestTimeout + 10000; // Give extra time for response to complete
server.on('connection', (socket) => {
// Disable Nagle's algorithm for better real-time performance
socket.setNoDelay(true);
// Increase socket timeout
socket.setTimeout(config.requestTimeout + 10000);
// Handle socket errors gracefully
socket.on('error', (err) => {
if (err.code !== 'ECONNRESET') {
logger.error({
error: err.message,
code: err.code,
}, 'Socket error');
}
});
});
logger.info({
serverTimeout: server.timeout,
keepAliveTimeout: server.keepAliveTimeout,

View File

@@ -416,11 +416,21 @@ class RPCProxy {
Object.entries(response.headers).forEach(([key, value]) => {
// Don't override Connection header we just set
if (key.toLowerCase() !== 'connection') {
if (key.toLowerCase() !== 'connection' && key.toLowerCase() !== 'keep-alive') {
res.setHeader(key, value);
}
});
// For Java clients, ensure we handle content properly
const userAgent = res.req.headers['user-agent'] || '';
if (userAgent.includes('Java') || userAgent.includes('okhttp') || userAgent.includes('Apache-HttpClient')) {
// Java clients often expect explicit content handling
if (!response.headers['content-length'] && !response.headers['transfer-encoding']) {
// If upstream didn't specify, we'll use chunked encoding
res.setHeader('Transfer-Encoding', 'chunked');
}
}
// Explicitly flush headers to ensure client receives them immediately
res.flushHeaders();
@@ -462,7 +472,14 @@ class RPCProxy {
const chunks = [];
// Check if we should buffer the response (for clients that don't handle streaming well)
const shouldBuffer = res.req.headers['user-agent'] && res.req.headers['user-agent'].includes('ReactorNetty');
const userAgent = res.req.headers['user-agent'] || '';
const shouldBuffer = userAgent.includes('ReactorNetty') ||
userAgent.includes('Java') ||
userAgent.includes('okhttp') ||
userAgent.includes('Apache-HttpClient');
// For streaming clients, we need to handle backpressure properly
let writeQueue = Promise.resolve();
response.data.on('data', (chunk) => {
// Always capture raw chunks for comparison
@@ -471,27 +488,45 @@ class RPCProxy {
// Only write to client if not buffering
// Remove the clientClosed check - let the write fail gracefully if truly disconnected
if (!shouldBuffer && !res.writableEnded) {
try {
const written = res.write(chunk);
if (!written) {
logger.debug({
// Chain writes to handle backpressure properly
writeQueue = writeQueue.then(() => new Promise((resolve) => {
try {
const canContinue = res.write(chunk, (err) => {
if (err) {
logger.error({
requestId,
error: err.message,
chunkSize: chunk.length,
}, 'Error in write callback');
}
resolve();
});
if (!canContinue) {
// Wait for drain event if write buffer is full
logger.debug({
requestId,
chunkSize: chunk.length,
}, 'Backpressure detected, waiting for drain');
res.once('drain', resolve);
} else {
resolve();
}
} catch (writeError) {
logger.error({
requestId,
chunkSize: chunk.length,
}, 'Backpressure detected on response write');
error: writeError.message,
code: writeError.code,
clientClosed: isClientClosed(),
}, 'Error writing to client');
resolve(); // Continue even on error
}
} catch (writeError) {
logger.error({
requestId,
error: writeError.message,
code: writeError.code,
clientClosed: isClientClosed(),
}, 'Error writing to client');
}
}));
}
});
return new Promise((resolve, reject) => {
response.data.on('end', () => {
response.data.on('end', async () => {
isResponseCompleted(); // Mark response as completed
const totalTime = Date.now() - startTime;
@@ -500,40 +535,80 @@ class RPCProxy {
const rawData = Buffer.concat(chunks);
responseData = rawData.toString('utf8');
// Send buffered response for clients that don't handle streaming
if (shouldBuffer && !isClientClosed() && !res.writableEnded) {
// Wait for all writes to complete before ending
if (!shouldBuffer) {
try {
// Remove transfer-encoding header for buffered responses
res.removeHeader('transfer-encoding');
// Set content-length for buffered response
res.setHeader('content-length', rawData.length);
// Send all data at once
res.end(rawData);
await writeQueue;
logger.debug({
requestId,
endpoint: 'stream',
buffered: true,
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Sent buffered response to ReactorNetty client');
}, 'All chunks written successfully');
} catch (err) {
logger.error({
requestId,
error: err.message,
}, 'Error waiting for writes to complete');
}
}
// Send buffered response for clients that don't handle streaming well
// Check for Java clients (including dshackle) and buffer their responses
const userAgent = res.req.headers['user-agent'] || '';
const shouldBufferForClient = shouldBuffer ||
userAgent.includes('Java') ||
userAgent.includes('okhttp') ||
userAgent.includes('Apache-HttpClient');
if (shouldBufferForClient && !res.writableEnded) {
try {
// For buffered responses, ensure we send proper headers
res.removeHeader('transfer-encoding');
res.setHeader('content-length', rawData.length);
// Write all data and end in one operation
res.end(rawData, () => {
logger.debug({
requestId,
endpoint: 'stream',
buffered: true,
responseSize: rawData.length,
userAgent,
clientClosed: isClientClosed(),
}, 'Sent buffered response');
});
} catch (error) {
logger.error({
requestId,
error: error.message,
userAgent,
}, 'Error sending buffered response');
}
} else if (!res.writableEnded) {
// Always try to end the response, even if we think client is closed
// The write will fail gracefully if client is truly disconnected
// For streaming responses, ensure all data is flushed before ending
// Use res.end() with a callback to ensure it completes
try {
res.end();
logger.debug({
requestId,
endpoint: 'stream',
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Ended streaming response');
// If there's still data in the write buffer, wait for it to drain
if (res.writableHighWaterMark && res.writableLength > 0) {
res.once('drain', () => {
res.end(() => {
logger.debug({
requestId,
endpoint: 'stream',
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Ended streaming response after drain');
});
});
} else {
res.end(() => {
logger.debug({
requestId,
endpoint: 'stream',
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Ended streaming response');
});
}
} catch (endError) {
logger.error({
requestId,