From eb50b33aa38585177cc8ba6ffea47d904a39d3c9 Mon Sep 17 00:00:00 2001 From: Para Dox Date: Mon, 2 Jun 2025 01:37:09 +0700 Subject: [PATCH] persistent connections --- split-proxy/proxy.js | 199 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 162 insertions(+), 37 deletions(-) diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js index e098901e..599e8254 100644 --- a/split-proxy/proxy.js +++ b/split-proxy/proxy.js @@ -4,6 +4,10 @@ const logger = require('./logger'); const crypto = require('crypto'); const http = require('http'); const https = require('https'); +const dns = require('dns'); +const { promisify } = require('util'); + +const dnsLookup = promisify(dns.lookup); // Create HTTP agents with DNS caching disabled and connection pooling const httpAgent = new http.Agent({ @@ -51,60 +55,181 @@ class RPCProxy { this.compareEndpoint = this.primaryEndpoint; } + // Create persistent axios clients + this.clients = new Map(); + this.createPersistentClient(this.primaryEndpoint); + this.createPersistentClient(this.secondaryEndpoint); + + // Track DNS resolution for each endpoint + this.dnsCache = new Map(); + + // Initialize socket creation time tagging + this.tagSocketCreationTime(); + // Start DNS refresh timer this.startDnsRefreshTimer(); } - // Create a fresh axios instance for each request to ensure DNS resolution - createClient(baseURL) { + // Create a persistent axios client for an endpoint + createPersistentClient(baseURL) { const isHttps = baseURL.startsWith('https://'); - return axios.create({ + const client = axios.create({ baseURL, timeout: config.requestTimeout, maxContentLength: Infinity, maxBodyLength: Infinity, httpAgent: isHttps ? undefined : httpAgent, httpsAgent: isHttps ? httpsAgent : undefined, - // Disable axios's built-in DNS caching - transformRequest: [ - (data, headers) => { - // Add timestamp to force fresh connections periodically - headers['X-Request-Time'] = Date.now().toString(); - return data; - }, - ...axios.defaults.transformRequest - ] }); + + this.clients.set(baseURL, client); + logger.info({ baseURL }, 'Created persistent axios client'); + return client; + } + + // Get or create a client for an endpoint + getClient(endpoint) { + let client = this.clients.get(endpoint); + if (!client) { + client = this.createPersistentClient(endpoint); + } + return client; + } + + // Extract hostname from URL + getHostnameFromUrl(url) { + try { + const urlObj = new URL(url); + return urlObj.hostname; + } catch (e) { + logger.error({ url, error: e.message }, 'Failed to parse URL'); + return null; + } + } + + // Check if DNS has changed for an endpoint + async checkDnsChange(endpoint) { + const hostname = this.getHostnameFromUrl(endpoint); + if (!hostname) return false; + + try { + const { address } = await dnsLookup(hostname); + const cachedAddress = this.dnsCache.get(hostname); + + if (!cachedAddress) { + // First time checking this hostname + this.dnsCache.set(hostname, address); + logger.info({ hostname, address }, 'Initial DNS resolution cached'); + return false; + } + + if (cachedAddress !== address) { + // DNS has changed + logger.info({ + hostname, + oldAddress: cachedAddress, + newAddress: address + }, 'DNS change detected'); + this.dnsCache.set(hostname, address); + return true; + } + + return false; + } catch (error) { + logger.error({ hostname, error: error.message }, 'DNS lookup failed'); + return false; + } + } + + // Recreate client for an endpoint if DNS changed + async refreshClientIfDnsChanged(endpoint) { + const dnsChanged = await this.checkDnsChange(endpoint); + if (dnsChanged) { + const hostname = this.getHostnameFromUrl(endpoint); + const isHttps = endpoint.startsWith('https://'); + const agent = isHttps ? httpsAgent : httpAgent; + + // Only destroy sockets for this specific hostname + if (agent.sockets) { + Object.keys(agent.sockets).forEach(name => { + if (name.includes(hostname)) { + agent.sockets[name].forEach(socket => socket.destroy()); + delete agent.sockets[name]; + } + }); + } + + if (agent.freeSockets) { + Object.keys(agent.freeSockets).forEach(name => { + if (name.includes(hostname)) { + agent.freeSockets[name].forEach(socket => socket.destroy()); + delete agent.freeSockets[name]; + } + }); + } + + // Recreate the client for this endpoint + this.createPersistentClient(endpoint); + logger.info({ endpoint }, 'Recreated client due to DNS change'); + } } startDnsRefreshTimer() { - // Only clear IDLE sockets, not active ones - setInterval(() => { - logger.debug('Refreshing DNS cache - clearing idle sockets only'); + setInterval(async () => { + logger.debug('Checking for DNS changes'); - // Only destroy free (idle) sockets, not active ones - if (httpAgent.freeSockets) { - Object.keys(httpAgent.freeSockets).forEach(name => { - if (httpAgent.freeSockets[name]) { - httpAgent.freeSockets[name].forEach(socket => { - socket.destroy(); - }); - } - }); - } + // Check DNS for all known endpoints + const endpoints = [this.primaryEndpoint, this.secondaryEndpoint]; + const uniqueEndpoints = [...new Set(endpoints)]; - if (httpsAgent.freeSockets) { - Object.keys(httpsAgent.freeSockets).forEach(name => { - if (httpsAgent.freeSockets[name]) { - httpsAgent.freeSockets[name].forEach(socket => { - socket.destroy(); - }); - } - }); + for (const endpoint of uniqueEndpoints) { + await this.refreshClientIfDnsChanged(endpoint); } + + // Clean up very old idle sockets (older than 5 minutes) + const maxIdleTime = 5 * 60 * 1000; + const now = Date.now(); + + [httpAgent, httpsAgent].forEach(agent => { + if (agent.freeSockets) { + Object.keys(agent.freeSockets).forEach(name => { + if (agent.freeSockets[name]) { + agent.freeSockets[name] = agent.freeSockets[name].filter(socket => { + const socketAge = now - (socket._createdTime || now); + if (socketAge > maxIdleTime) { + socket.destroy(); + logger.debug({ name, socketAge }, 'Destroyed old idle socket'); + return false; + } + return true; + }); + + if (agent.freeSockets[name].length === 0) { + delete agent.freeSockets[name]; + } + } + }); + } + }); + }, config.dnsRefreshInterval); } + // Tag sockets with creation time for cleanup + tagSocketCreationTime() { + [httpAgent, httpsAgent].forEach(agent => { + const originalCreateConnection = agent.createConnection.bind(agent); + agent.createConnection = function(options, callback) { + return originalCreateConnection(options, (err, socket) => { + if (!err && socket) { + socket._createdTime = Date.now(); + } + callback(err, socket); + }); + }; + }); + } + generateRequestId() { return crypto.randomBytes(16).toString('hex'); } @@ -357,8 +482,8 @@ class RPCProxy { const streamMethodStartTime = process.hrtime.bigint(); try { - // Create fresh client for this request - const client = this.createClient(this.streamEndpoint); + // Get persistent client for this endpoint + const client = this.getClient(this.streamEndpoint); // Get the original Accept-Encoding from the client request const acceptEncoding = res.req.headers['accept-encoding'] || 'identity'; @@ -701,8 +826,8 @@ class RPCProxy { try { const compareStart = Date.now(); const compareStartHR = process.hrtime.bigint(); // Add high-resolution timing - // Create fresh client for this request - const client = this.createClient(this.compareEndpoint); + // Get persistent client for this endpoint + const client = this.getClient(this.compareEndpoint); const response = await client.post('/', requestBody, { headers: { 'Content-Type': 'application/json',