no more streaming

This commit is contained in:
Para Dox
2025-06-01 21:27:51 +07:00
parent 46ede89ece
commit d8659d1040

View File

@@ -364,6 +364,19 @@ class RPCProxy {
res.setHeader(key, value); res.setHeader(key, value);
} }
}); });
// Explicitly flush headers to ensure client receives them immediately
res.flushHeaders();
logger.debug({
requestId,
endpoint: 'stream',
headersSent: true,
statusCode: response.status,
contentType: response.headers['content-type'],
contentLength: response.headers['content-length'],
transferEncoding: response.headers['transfer-encoding'],
}, 'Response headers sent');
} }
// Handle upstream errors // Handle upstream errors
@@ -382,12 +395,16 @@ class RPCProxy {
// Capture and stream the response // Capture and stream the response
const chunks = []; const chunks = [];
// Check if we should buffer the response (for clients that don't handle streaming well)
const shouldBuffer = req.headers['user-agent'] && req.headers['user-agent'].includes('ReactorNetty');
response.data.on('data', (chunk) => { response.data.on('data', (chunk) => {
// Always capture raw chunks for comparison // Always capture raw chunks for comparison
chunks.push(chunk); chunks.push(chunk);
// Only write to client if still connected // Only write to client if still connected and not buffering
if (!isClientClosed() && !res.writableEnded) { if (!shouldBuffer && !isClientClosed() && !res.writableEnded) {
try { try {
res.write(chunk); res.write(chunk);
} catch (writeError) { } catch (writeError) {
@@ -404,7 +421,36 @@ class RPCProxy {
response.data.on('end', () => { response.data.on('end', () => {
isResponseCompleted(); // Mark response as completed isResponseCompleted(); // Mark response as completed
if (!isClientClosed() && !res.writableEnded) { const totalTime = Date.now() - startTime;
// Combine chunks and convert to string for logging
const rawData = Buffer.concat(chunks);
responseData = rawData.toString('utf8');
// Send buffered response for clients that don't handle streaming
if (shouldBuffer && !isClientClosed() && !res.writableEnded) {
try {
// Remove transfer-encoding header for buffered responses
res.removeHeader('transfer-encoding');
// Set content-length for buffered response
res.setHeader('content-length', rawData.length);
// Send all data at once
res.end(rawData);
logger.debug({
requestId,
endpoint: 'stream',
buffered: true,
responseSize: rawData.length,
clientClosed: isClientClosed(),
}, 'Sent buffered response to ReactorNetty client');
} catch (error) {
logger.error({
requestId,
error: error.message,
}, 'Error sending buffered response');
}
} else if (!isClientClosed() && !res.writableEnded) {
try { try {
res.end(); res.end();
} catch (endError) { } catch (endError) {
@@ -415,12 +461,6 @@ class RPCProxy {
} }
} }
const totalTime = Date.now() - startTime;
// Combine chunks and convert to string for logging
const rawData = Buffer.concat(chunks);
responseData = rawData.toString('utf8');
// Log if client closed very early // Log if client closed very early
if (isClientClosed() && totalTime < 10) { if (isClientClosed() && totalTime < 10) {
// This appears to be normal JSON-RPC client behavior // This appears to be normal JSON-RPC client behavior