From c37c53f4f9807a28bc483d88b9933f295619b5a1 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Sun, 1 Jun 2025 20:45:46 +0700 Subject: [PATCH] better error handling --- split-proxy/proxy.js | 118 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 94 insertions(+), 24 deletions(-) diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index 9315db2b..2227abbb 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -12,6 +12,8 @@ const httpAgent = new http.Agent({ maxSockets: 100, maxFreeSockets: 10, timeout: config.requestTimeout, + // Set socket timeout to prevent hanging connections + scheduling: 'fifo', // First-in-first-out scheduling // Force fresh DNS lookups lookup: (hostname, options, callback) => { // This forces Node.js to use fresh DNS resolution @@ -25,6 +27,8 @@ const httpsAgent = new https.Agent({ maxSockets: 100, maxFreeSockets: 10, timeout: config.requestTimeout, + // Set socket timeout to prevent hanging connections + scheduling: 'fifo', // First-in-first-out scheduling // Force fresh DNS lookups lookup: (hostname, options, callback) => { // This forces Node.js to use fresh DNS resolution @@ -74,24 +78,28 @@ class RPCProxy { } startDnsRefreshTimer() { - // Periodically clear the DNS cache by recreating the agents + // Only clear IDLE sockets, not active ones setInterval(() => { - logger.debug('Refreshing DNS cache'); + logger.debug('Refreshing DNS cache - clearing idle sockets only'); - // Clear any cached DNS entries in the HTTP agents - if (httpAgent.sockets) { - Object.keys(httpAgent.sockets).forEach(name => { - httpAgent.sockets[name].forEach(socket => { - socket.destroy(); - }); + // Only destroy free (idle) sockets, not active ones + if (httpAgent.freeSockets) { + Object.keys(httpAgent.freeSockets).forEach(name => { + if (httpAgent.freeSockets[name]) { + httpAgent.freeSockets[name].forEach(socket => { + socket.destroy(); + }); + } }); } - if (httpsAgent.sockets) { - Object.keys(httpsAgent.sockets).forEach(name => { - httpsAgent.sockets[name].forEach(socket => { - socket.destroy(); - }); + if (httpsAgent.freeSockets) { + Object.keys(httpsAgent.freeSockets).forEach(name => { + if (httpsAgent.freeSockets[name]) { + httpsAgent.freeSockets[name].forEach(socket => { + socket.destroy(); + }); + } }); } }, config.dnsRefreshInterval); @@ -136,9 +144,21 @@ class RPCProxy { endpoint: 'incoming', }, 'Received JSON-RPC request'); + // Handle client disconnect + let clientClosed = false; + req.on('close', () => { + clientClosed = true; + logger.warn({ requestId }, 'Client connection closed'); + }); + + req.on('error', (error) => { + clientClosed = true; + logger.error({ requestId, error: error.message }, 'Client connection error'); + }); + try { // Start both requests in parallel - const streamPromise = this.streamResponse(requestId, requestBody, res, startTime); + const streamPromise = this.streamResponse(requestId, requestBody, res, startTime, () => clientClosed); const comparePromise = this.compareResponse(requestId, requestBody, startTime); // Wait for the stream to complete and get response info @@ -164,7 +184,7 @@ class RPCProxy { stack: error.stack, }, 'Error handling request'); - if (!res.headersSent) { + if (!res.headersSent && !clientClosed) { res.status(500).json({ jsonrpc: '2.0', error: { @@ -177,9 +197,10 @@ class RPCProxy { } } - async streamResponse(requestId, requestBody, res, startTime) { + async streamResponse(requestId, requestBody, res, startTime, isClientClosed) { let responseData = ''; let statusCode = 0; + let upstreamResponse = null; try { // Create fresh client for this request @@ -190,8 +211,10 @@ class RPCProxy { 'Content-Type': 'application/json', 'Accept-Encoding': 'identity', // Request uncompressed responses }, + validateStatus: (status) => true, // Don't throw on any status }); + upstreamResponse = response; statusCode = response.status; const streamLatency = Date.now() - startTime; @@ -202,23 +225,60 @@ class RPCProxy { statusCode: response.status, }, 'Stream response started'); - // Set response headers - res.status(response.status); - Object.entries(response.headers).forEach(([key, value]) => { - if (key.toLowerCase() !== 'content-encoding') { - res.setHeader(key, value); + // Set response headers only if client hasn't closed + if (!isClientClosed() && !res.headersSent) { + res.status(response.status); + Object.entries(response.headers).forEach(([key, value]) => { + // Remove content-encoding since we're requesting uncompressed + if (key.toLowerCase() !== 'content-encoding') { + res.setHeader(key, value); + } + }); + } + + // Handle upstream errors + response.data.on('error', (error) => { + logger.error({ + requestId, + endpoint: 'stream', + error: error.message, + code: error.code, + }, 'Upstream stream error'); + + if (!isClientClosed() && !res.writableEnded) { + res.destroy(); } }); // Capture and stream the response response.data.on('data', (chunk) => { - responseData += chunk.toString(); - res.write(chunk); + if (!isClientClosed() && !res.writableEnded) { + responseData += chunk.toString(); + try { + res.write(chunk); + } catch (writeError) { + logger.error({ + requestId, + error: writeError.message, + code: writeError.code, + }, 'Error writing to client'); + } + } }); return new Promise((resolve, reject) => { response.data.on('end', () => { - res.end(); + if (!isClientClosed() && !res.writableEnded) { + try { + res.end(); + } catch (endError) { + logger.error({ + requestId, + error: endError.message, + }, 'Error ending response'); + } + } + const totalTime = Date.now() - startTime; logger.info({ @@ -226,6 +286,7 @@ class RPCProxy { endpoint: 'stream', totalTimeMs: totalTime, responseSize: responseData.length, + clientClosed: isClientClosed(), }, 'Stream response completed'); resolve({ @@ -241,6 +302,7 @@ class RPCProxy { requestId, endpoint: 'stream', error: error.message, + code: error.code, }, 'Stream error'); reject(error); }); @@ -254,9 +316,15 @@ class RPCProxy { endpoint: 'stream', latencyMs: streamLatency, error: error.message, + code: error.code, statusCode: error.response?.status, }, 'Stream request failed'); + // Clean up upstream response if it exists + if (upstreamResponse && upstreamResponse.data) { + upstreamResponse.data.destroy(); + } + throw error; } } @@ -271,6 +339,7 @@ class RPCProxy { 'Content-Type': 'application/json', 'Accept-Encoding': 'identity', // Request uncompressed responses }, + validateStatus: (status) => true, // Don't throw on any status }); const compareLatency = Date.now() - compareStart; @@ -299,6 +368,7 @@ class RPCProxy { requestId, endpoint: 'compare', error: error.message, + code: error.code, statusCode: error.response?.status, }, 'Compare request failed');