better error handling

This commit is contained in:
Para Dox
2025-06-01 20:45:46 +07:00
parent 7f97cfe09a
commit c37c53f4f9

View File

@@ -12,6 +12,8 @@ const httpAgent = new http.Agent({
maxSockets: 100, maxSockets: 100,
maxFreeSockets: 10, maxFreeSockets: 10,
timeout: config.requestTimeout, timeout: config.requestTimeout,
// Set socket timeout to prevent hanging connections
scheduling: 'fifo', // First-in-first-out scheduling
// Force fresh DNS lookups // Force fresh DNS lookups
lookup: (hostname, options, callback) => { lookup: (hostname, options, callback) => {
// This forces Node.js to use fresh DNS resolution // This forces Node.js to use fresh DNS resolution
@@ -25,6 +27,8 @@ const httpsAgent = new https.Agent({
maxSockets: 100, maxSockets: 100,
maxFreeSockets: 10, maxFreeSockets: 10,
timeout: config.requestTimeout, timeout: config.requestTimeout,
// Set socket timeout to prevent hanging connections
scheduling: 'fifo', // First-in-first-out scheduling
// Force fresh DNS lookups // Force fresh DNS lookups
lookup: (hostname, options, callback) => { lookup: (hostname, options, callback) => {
// This forces Node.js to use fresh DNS resolution // This forces Node.js to use fresh DNS resolution
@@ -74,24 +78,28 @@ class RPCProxy {
} }
startDnsRefreshTimer() { startDnsRefreshTimer() {
// Periodically clear the DNS cache by recreating the agents // Only clear IDLE sockets, not active ones
setInterval(() => { setInterval(() => {
logger.debug('Refreshing DNS cache'); logger.debug('Refreshing DNS cache - clearing idle sockets only');
// Clear any cached DNS entries in the HTTP agents // Only destroy free (idle) sockets, not active ones
if (httpAgent.sockets) { if (httpAgent.freeSockets) {
Object.keys(httpAgent.sockets).forEach(name => { Object.keys(httpAgent.freeSockets).forEach(name => {
httpAgent.sockets[name].forEach(socket => { if (httpAgent.freeSockets[name]) {
socket.destroy(); httpAgent.freeSockets[name].forEach(socket => {
}); socket.destroy();
});
}
}); });
} }
if (httpsAgent.sockets) { if (httpsAgent.freeSockets) {
Object.keys(httpsAgent.sockets).forEach(name => { Object.keys(httpsAgent.freeSockets).forEach(name => {
httpsAgent.sockets[name].forEach(socket => { if (httpsAgent.freeSockets[name]) {
socket.destroy(); httpsAgent.freeSockets[name].forEach(socket => {
}); socket.destroy();
});
}
}); });
} }
}, config.dnsRefreshInterval); }, config.dnsRefreshInterval);
@@ -136,9 +144,21 @@ class RPCProxy {
endpoint: 'incoming', endpoint: 'incoming',
}, 'Received JSON-RPC request'); }, 'Received JSON-RPC request');
// Handle client disconnect
let clientClosed = false;
req.on('close', () => {
clientClosed = true;
logger.warn({ requestId }, 'Client connection closed');
});
req.on('error', (error) => {
clientClosed = true;
logger.error({ requestId, error: error.message }, 'Client connection error');
});
try { try {
// Start both requests in parallel // Start both requests in parallel
const streamPromise = this.streamResponse(requestId, requestBody, res, startTime); const streamPromise = this.streamResponse(requestId, requestBody, res, startTime, () => clientClosed);
const comparePromise = this.compareResponse(requestId, requestBody, startTime); const comparePromise = this.compareResponse(requestId, requestBody, startTime);
// Wait for the stream to complete and get response info // Wait for the stream to complete and get response info
@@ -164,7 +184,7 @@ class RPCProxy {
stack: error.stack, stack: error.stack,
}, 'Error handling request'); }, 'Error handling request');
if (!res.headersSent) { if (!res.headersSent && !clientClosed) {
res.status(500).json({ res.status(500).json({
jsonrpc: '2.0', jsonrpc: '2.0',
error: { error: {
@@ -177,9 +197,10 @@ class RPCProxy {
} }
} }
async streamResponse(requestId, requestBody, res, startTime) { async streamResponse(requestId, requestBody, res, startTime, isClientClosed) {
let responseData = ''; let responseData = '';
let statusCode = 0; let statusCode = 0;
let upstreamResponse = null;
try { try {
// Create fresh client for this request // Create fresh client for this request
@@ -190,8 +211,10 @@ class RPCProxy {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
'Accept-Encoding': 'identity', // Request uncompressed responses 'Accept-Encoding': 'identity', // Request uncompressed responses
}, },
validateStatus: (status) => true, // Don't throw on any status
}); });
upstreamResponse = response;
statusCode = response.status; statusCode = response.status;
const streamLatency = Date.now() - startTime; const streamLatency = Date.now() - startTime;
@@ -202,23 +225,60 @@ class RPCProxy {
statusCode: response.status, statusCode: response.status,
}, 'Stream response started'); }, 'Stream response started');
// Set response headers // Set response headers only if client hasn't closed
res.status(response.status); if (!isClientClosed() && !res.headersSent) {
Object.entries(response.headers).forEach(([key, value]) => { res.status(response.status);
if (key.toLowerCase() !== 'content-encoding') { Object.entries(response.headers).forEach(([key, value]) => {
res.setHeader(key, value); // Remove content-encoding since we're requesting uncompressed
if (key.toLowerCase() !== 'content-encoding') {
res.setHeader(key, value);
}
});
}
// Handle upstream errors
response.data.on('error', (error) => {
logger.error({
requestId,
endpoint: 'stream',
error: error.message,
code: error.code,
}, 'Upstream stream error');
if (!isClientClosed() && !res.writableEnded) {
res.destroy();
} }
}); });
// Capture and stream the response // Capture and stream the response
response.data.on('data', (chunk) => { response.data.on('data', (chunk) => {
responseData += chunk.toString(); if (!isClientClosed() && !res.writableEnded) {
res.write(chunk); responseData += chunk.toString();
try {
res.write(chunk);
} catch (writeError) {
logger.error({
requestId,
error: writeError.message,
code: writeError.code,
}, 'Error writing to client');
}
}
}); });
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
response.data.on('end', () => { response.data.on('end', () => {
res.end(); if (!isClientClosed() && !res.writableEnded) {
try {
res.end();
} catch (endError) {
logger.error({
requestId,
error: endError.message,
}, 'Error ending response');
}
}
const totalTime = Date.now() - startTime; const totalTime = Date.now() - startTime;
logger.info({ logger.info({
@@ -226,6 +286,7 @@ class RPCProxy {
endpoint: 'stream', endpoint: 'stream',
totalTimeMs: totalTime, totalTimeMs: totalTime,
responseSize: responseData.length, responseSize: responseData.length,
clientClosed: isClientClosed(),
}, 'Stream response completed'); }, 'Stream response completed');
resolve({ resolve({
@@ -241,6 +302,7 @@ class RPCProxy {
requestId, requestId,
endpoint: 'stream', endpoint: 'stream',
error: error.message, error: error.message,
code: error.code,
}, 'Stream error'); }, 'Stream error');
reject(error); reject(error);
}); });
@@ -254,9 +316,15 @@ class RPCProxy {
endpoint: 'stream', endpoint: 'stream',
latencyMs: streamLatency, latencyMs: streamLatency,
error: error.message, error: error.message,
code: error.code,
statusCode: error.response?.status, statusCode: error.response?.status,
}, 'Stream request failed'); }, 'Stream request failed');
// Clean up upstream response if it exists
if (upstreamResponse && upstreamResponse.data) {
upstreamResponse.data.destroy();
}
throw error; throw error;
} }
} }
@@ -271,6 +339,7 @@ class RPCProxy {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
'Accept-Encoding': 'identity', // Request uncompressed responses 'Accept-Encoding': 'identity', // Request uncompressed responses
}, },
validateStatus: (status) => true, // Don't throw on any status
}); });
const compareLatency = Date.now() - compareStart; const compareLatency = Date.now() - compareStart;
@@ -299,6 +368,7 @@ class RPCProxy {
requestId, requestId,
endpoint: 'compare', endpoint: 'compare',
error: error.message, error: error.message,
code: error.code,
statusCode: error.response?.status, statusCode: error.response?.status,
}, 'Compare request failed'); }, 'Compare request failed');