From 3d13e30a5a98a82ead6869b0e423d7138c5e593b Mon Sep 17 00:00:00 2001 From: Para Dox Date: Sun, 1 Jun 2025 22:20:37 +0700 Subject: [PATCH] streaming is back --- split-proxy/proxy.js | 61 ++++---------------------------------------- 1 file changed, 5 insertions(+), 56 deletions(-) diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index 0eade3b0..9c7f65b5 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -421,16 +421,6 @@ class RPCProxy { } }); - // For Java clients, ensure we handle content properly - const userAgent = res.req.headers['user-agent'] || ''; - if (userAgent.includes('Java') || userAgent.includes('okhttp') || userAgent.includes('Apache-HttpClient')) { - // Java clients often expect explicit content handling - if (!response.headers['content-length'] && !response.headers['transfer-encoding']) { - // If upstream didn't specify, we'll use chunked encoding - res.setHeader('Transfer-Encoding', 'chunked'); - } - } - // Explicitly flush headers to ensure client receives them immediately res.flushHeaders(); @@ -471,13 +461,6 @@ class RPCProxy { // Capture and stream the response const chunks = []; - // Check if we should buffer the response (for clients that don't handle streaming well) - const userAgent = res.req.headers['user-agent'] || ''; - const shouldBuffer = userAgent.includes('ReactorNetty') || - userAgent.includes('Java') || - userAgent.includes('okhttp') || - userAgent.includes('Apache-HttpClient'); - // For streaming clients, we need to handle backpressure properly let writeQueue = Promise.resolve(); @@ -485,9 +468,8 @@ class RPCProxy { // Always capture raw chunks for comparison chunks.push(chunk); - // Only write to client if not buffering - // Remove the clientClosed check - let the write fail gracefully if truly disconnected - if (!shouldBuffer && !res.writableEnded) { + // Stream data to client + if (!res.writableEnded) { // Chain writes to handle backpressure properly writeQueue = writeQueue.then(() => new Promise((resolve) => { try { @@ -536,7 +518,7 @@ class RPCProxy { responseData = rawData.toString('utf8'); // Wait for all writes to complete before ending - if (!shouldBuffer) { + if (!res.writableEnded) { try { await writeQueue; logger.debug({ @@ -551,41 +533,8 @@ class RPCProxy { } } - // Send buffered response for clients that don't handle streaming well - // Check for Java clients (including dshackle) and buffer their responses - const userAgent = res.req.headers['user-agent'] || ''; - const shouldBufferForClient = shouldBuffer || - userAgent.includes('Java') || - userAgent.includes('okhttp') || - userAgent.includes('Apache-HttpClient'); - - if (shouldBufferForClient && !res.writableEnded) { - try { - // For buffered responses, ensure we send proper headers - res.removeHeader('transfer-encoding'); - res.setHeader('content-length', rawData.length); - - // Write all data and end in one operation - res.end(rawData, () => { - logger.debug({ - requestId, - endpoint: 'stream', - buffered: true, - responseSize: rawData.length, - userAgent, - clientClosed: isClientClosed(), - }, 'Sent buffered response'); - }); - } catch (error) { - logger.error({ - requestId, - error: error.message, - userAgent, - }, 'Error sending buffered response'); - } - } else if (!res.writableEnded) { - // For streaming responses, ensure all data is flushed before ending - // Use res.end() with a callback to ensure it completes + // End the response + if (!res.writableEnded) { try { // If there's still data in the write buffer, wait for it to drain if (res.writableHighWaterMark && res.writableLength > 0) {