diff --git a/split-proxy/index.js b/split-proxy/index.js index 879f12d8..c5cd7b35 100644 --- a/split-proxy/index.js +++ b/split-proxy/index.js @@ -102,6 +102,26 @@ server.timeout = config.requestTimeout + 5000; // Add 5 seconds buffer server.keepAliveTimeout = config.requestTimeout + 5000; server.headersTimeout = config.requestTimeout + 6000; // Should be > keepAliveTimeout +// Additional configuration to prevent premature connection closure +server.requestTimeout = config.requestTimeout + 10000; // Give extra time for response to complete +server.on('connection', (socket) => { + // Disable Nagle's algorithm for better real-time performance + socket.setNoDelay(true); + + // Increase socket timeout + socket.setTimeout(config.requestTimeout + 10000); + + // Handle socket errors gracefully + socket.on('error', (err) => { + if (err.code !== 'ECONNRESET') { + logger.error({ + error: err.message, + code: err.code, + }, 'Socket error'); + } + }); +}); + logger.info({ serverTimeout: server.timeout, keepAliveTimeout: server.keepAliveTimeout, diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index d492aec0..0eade3b0 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -416,11 +416,21 @@ class RPCProxy { Object.entries(response.headers).forEach(([key, value]) => { // Don't override Connection header we just set - if (key.toLowerCase() !== 'connection') { + if (key.toLowerCase() !== 'connection' && key.toLowerCase() !== 'keep-alive') { res.setHeader(key, value); } }); + // For Java clients, ensure we handle content properly + const userAgent = res.req.headers['user-agent'] || ''; + if (userAgent.includes('Java') || userAgent.includes('okhttp') || userAgent.includes('Apache-HttpClient')) { + // Java clients often expect explicit content handling + if (!response.headers['content-length'] && !response.headers['transfer-encoding']) { + // If upstream didn't specify, we'll use chunked encoding + res.setHeader('Transfer-Encoding', 'chunked'); + } + } + // Explicitly flush headers to ensure client receives them immediately res.flushHeaders(); @@ -462,7 +472,14 @@ class RPCProxy { const chunks = []; // Check if we should buffer the response (for clients that don't handle streaming well) - const shouldBuffer = res.req.headers['user-agent'] && res.req.headers['user-agent'].includes('ReactorNetty'); + const userAgent = res.req.headers['user-agent'] || ''; + const shouldBuffer = userAgent.includes('ReactorNetty') || + userAgent.includes('Java') || + userAgent.includes('okhttp') || + userAgent.includes('Apache-HttpClient'); + + // For streaming clients, we need to handle backpressure properly + let writeQueue = Promise.resolve(); response.data.on('data', (chunk) => { // Always capture raw chunks for comparison @@ -471,27 +488,45 @@ class RPCProxy { // Only write to client if not buffering // Remove the clientClosed check - let the write fail gracefully if truly disconnected if (!shouldBuffer && !res.writableEnded) { - try { - const written = res.write(chunk); - if (!written) { - logger.debug({ + // Chain writes to handle backpressure properly + writeQueue = writeQueue.then(() => new Promise((resolve) => { + try { + const canContinue = res.write(chunk, (err) => { + if (err) { + logger.error({ + requestId, + error: err.message, + chunkSize: chunk.length, + }, 'Error in write callback'); + } + resolve(); + }); + + if (!canContinue) { + // Wait for drain event if write buffer is full + logger.debug({ + requestId, + chunkSize: chunk.length, + }, 'Backpressure detected, waiting for drain'); + res.once('drain', resolve); + } else { + resolve(); + } + } catch (writeError) { + logger.error({ requestId, - chunkSize: chunk.length, - }, 'Backpressure detected on response write'); + error: writeError.message, + code: writeError.code, + clientClosed: isClientClosed(), + }, 'Error writing to client'); + resolve(); // Continue even on error } - } catch (writeError) { - logger.error({ - requestId, - error: writeError.message, - code: writeError.code, - clientClosed: isClientClosed(), - }, 'Error writing to client'); - } + })); } }); return new Promise((resolve, reject) => { - response.data.on('end', () => { + response.data.on('end', async () => { isResponseCompleted(); // Mark response as completed const totalTime = Date.now() - startTime; @@ -500,40 +535,80 @@ class RPCProxy { const rawData = Buffer.concat(chunks); responseData = rawData.toString('utf8'); - // Send buffered response for clients that don't handle streaming - if (shouldBuffer && !isClientClosed() && !res.writableEnded) { + // Wait for all writes to complete before ending + if (!shouldBuffer) { try { - // Remove transfer-encoding header for buffered responses - res.removeHeader('transfer-encoding'); - // Set content-length for buffered response - res.setHeader('content-length', rawData.length); - // Send all data at once - res.end(rawData); - + await writeQueue; logger.debug({ requestId, endpoint: 'stream', - buffered: true, - responseSize: rawData.length, - clientClosed: isClientClosed(), - }, 'Sent buffered response to ReactorNetty client'); + }, 'All chunks written successfully'); + } catch (err) { + logger.error({ + requestId, + error: err.message, + }, 'Error waiting for writes to complete'); + } + } + + // Send buffered response for clients that don't handle streaming well + // Check for Java clients (including dshackle) and buffer their responses + const userAgent = res.req.headers['user-agent'] || ''; + const shouldBufferForClient = shouldBuffer || + userAgent.includes('Java') || + userAgent.includes('okhttp') || + userAgent.includes('Apache-HttpClient'); + + if (shouldBufferForClient && !res.writableEnded) { + try { + // For buffered responses, ensure we send proper headers + res.removeHeader('transfer-encoding'); + res.setHeader('content-length', rawData.length); + + // Write all data and end in one operation + res.end(rawData, () => { + logger.debug({ + requestId, + endpoint: 'stream', + buffered: true, + responseSize: rawData.length, + userAgent, + clientClosed: isClientClosed(), + }, 'Sent buffered response'); + }); } catch (error) { logger.error({ requestId, error: error.message, + userAgent, }, 'Error sending buffered response'); } } else if (!res.writableEnded) { - // Always try to end the response, even if we think client is closed - // The write will fail gracefully if client is truly disconnected + // For streaming responses, ensure all data is flushed before ending + // Use res.end() with a callback to ensure it completes try { - res.end(); - logger.debug({ - requestId, - endpoint: 'stream', - responseSize: rawData.length, - clientClosed: isClientClosed(), - }, 'Ended streaming response'); + // If there's still data in the write buffer, wait for it to drain + if (res.writableHighWaterMark && res.writableLength > 0) { + res.once('drain', () => { + res.end(() => { + logger.debug({ + requestId, + endpoint: 'stream', + responseSize: rawData.length, + clientClosed: isClientClosed(), + }, 'Ended streaming response after drain'); + }); + }); + } else { + res.end(() => { + logger.debug({ + requestId, + endpoint: 'stream', + responseSize: rawData.length, + clientClosed: isClientClosed(), + }, 'Ended streaming response'); + }); + } } catch (endError) { logger.error({ requestId,