streaming is back
This commit is contained in:
@@ -421,16 +421,6 @@ class RPCProxy {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// For Java clients, ensure we handle content properly
|
|
||||||
const userAgent = res.req.headers['user-agent'] || '';
|
|
||||||
if (userAgent.includes('Java') || userAgent.includes('okhttp') || userAgent.includes('Apache-HttpClient')) {
|
|
||||||
// Java clients often expect explicit content handling
|
|
||||||
if (!response.headers['content-length'] && !response.headers['transfer-encoding']) {
|
|
||||||
// If upstream didn't specify, we'll use chunked encoding
|
|
||||||
res.setHeader('Transfer-Encoding', 'chunked');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Explicitly flush headers to ensure client receives them immediately
|
// Explicitly flush headers to ensure client receives them immediately
|
||||||
res.flushHeaders();
|
res.flushHeaders();
|
||||||
|
|
||||||
@@ -471,13 +461,6 @@ class RPCProxy {
|
|||||||
// Capture and stream the response
|
// Capture and stream the response
|
||||||
const chunks = [];
|
const chunks = [];
|
||||||
|
|
||||||
// Check if we should buffer the response (for clients that don't handle streaming well)
|
|
||||||
const userAgent = res.req.headers['user-agent'] || '';
|
|
||||||
const shouldBuffer = userAgent.includes('ReactorNetty') ||
|
|
||||||
userAgent.includes('Java') ||
|
|
||||||
userAgent.includes('okhttp') ||
|
|
||||||
userAgent.includes('Apache-HttpClient');
|
|
||||||
|
|
||||||
// For streaming clients, we need to handle backpressure properly
|
// For streaming clients, we need to handle backpressure properly
|
||||||
let writeQueue = Promise.resolve();
|
let writeQueue = Promise.resolve();
|
||||||
|
|
||||||
@@ -485,9 +468,8 @@ class RPCProxy {
|
|||||||
// Always capture raw chunks for comparison
|
// Always capture raw chunks for comparison
|
||||||
chunks.push(chunk);
|
chunks.push(chunk);
|
||||||
|
|
||||||
// Only write to client if not buffering
|
// Stream data to client
|
||||||
// Remove the clientClosed check - let the write fail gracefully if truly disconnected
|
if (!res.writableEnded) {
|
||||||
if (!shouldBuffer && !res.writableEnded) {
|
|
||||||
// Chain writes to handle backpressure properly
|
// Chain writes to handle backpressure properly
|
||||||
writeQueue = writeQueue.then(() => new Promise((resolve) => {
|
writeQueue = writeQueue.then(() => new Promise((resolve) => {
|
||||||
try {
|
try {
|
||||||
@@ -536,7 +518,7 @@ class RPCProxy {
|
|||||||
responseData = rawData.toString('utf8');
|
responseData = rawData.toString('utf8');
|
||||||
|
|
||||||
// Wait for all writes to complete before ending
|
// Wait for all writes to complete before ending
|
||||||
if (!shouldBuffer) {
|
if (!res.writableEnded) {
|
||||||
try {
|
try {
|
||||||
await writeQueue;
|
await writeQueue;
|
||||||
logger.debug({
|
logger.debug({
|
||||||
@@ -551,41 +533,8 @@ class RPCProxy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send buffered response for clients that don't handle streaming well
|
// End the response
|
||||||
// Check for Java clients (including dshackle) and buffer their responses
|
if (!res.writableEnded) {
|
||||||
const userAgent = res.req.headers['user-agent'] || '';
|
|
||||||
const shouldBufferForClient = shouldBuffer ||
|
|
||||||
userAgent.includes('Java') ||
|
|
||||||
userAgent.includes('okhttp') ||
|
|
||||||
userAgent.includes('Apache-HttpClient');
|
|
||||||
|
|
||||||
if (shouldBufferForClient && !res.writableEnded) {
|
|
||||||
try {
|
|
||||||
// For buffered responses, ensure we send proper headers
|
|
||||||
res.removeHeader('transfer-encoding');
|
|
||||||
res.setHeader('content-length', rawData.length);
|
|
||||||
|
|
||||||
// Write all data and end in one operation
|
|
||||||
res.end(rawData, () => {
|
|
||||||
logger.debug({
|
|
||||||
requestId,
|
|
||||||
endpoint: 'stream',
|
|
||||||
buffered: true,
|
|
||||||
responseSize: rawData.length,
|
|
||||||
userAgent,
|
|
||||||
clientClosed: isClientClosed(),
|
|
||||||
}, 'Sent buffered response');
|
|
||||||
});
|
|
||||||
} catch (error) {
|
|
||||||
logger.error({
|
|
||||||
requestId,
|
|
||||||
error: error.message,
|
|
||||||
userAgent,
|
|
||||||
}, 'Error sending buffered response');
|
|
||||||
}
|
|
||||||
} else if (!res.writableEnded) {
|
|
||||||
// For streaming responses, ensure all data is flushed before ending
|
|
||||||
// Use res.end() with a callback to ensure it completes
|
|
||||||
try {
|
try {
|
||||||
// If there's still data in the write buffer, wait for it to drain
|
// If there's still data in the write buffer, wait for it to drain
|
||||||
if (res.writableHighWaterMark && res.writableLength > 0) {
|
if (res.writableHighWaterMark && res.writableLength > 0) {
|
||||||
|
|||||||
Reference in New Issue
Block a user