859 lines
29 KiB
JavaScript
859 lines
29 KiB
JavaScript
const axios = require('axios');
|
|
const config = require('./config');
|
|
const logger = require('./logger');
|
|
const crypto = require('crypto');
|
|
const http = require('http');
|
|
const https = require('https');
|
|
|
|
// Create HTTP agents with DNS caching disabled and connection pooling
|
|
const httpAgent = new http.Agent({
|
|
keepAlive: true,
|
|
keepAliveMsecs: 1000,
|
|
maxSockets: 100,
|
|
maxFreeSockets: 10,
|
|
timeout: config.requestTimeout,
|
|
// Set socket timeout to prevent hanging connections
|
|
scheduling: 'fifo', // First-in-first-out scheduling
|
|
// Force fresh DNS lookups
|
|
lookup: (hostname, options, callback) => {
|
|
// This forces Node.js to use fresh DNS resolution
|
|
require('dns').lookup(hostname, options, callback);
|
|
}
|
|
});
|
|
|
|
const httpsAgent = new https.Agent({
|
|
keepAlive: true,
|
|
keepAliveMsecs: 1000,
|
|
maxSockets: 100,
|
|
maxFreeSockets: 10,
|
|
timeout: config.requestTimeout,
|
|
// Set socket timeout to prevent hanging connections
|
|
scheduling: 'fifo', // First-in-first-out scheduling
|
|
// Force fresh DNS lookups
|
|
lookup: (hostname, options, callback) => {
|
|
// This forces Node.js to use fresh DNS resolution
|
|
require('dns').lookup(hostname, options, callback);
|
|
}
|
|
});
|
|
|
|
class RPCProxy {
|
|
constructor() {
|
|
// Store endpoint URLs instead of clients
|
|
this.primaryEndpoint = config.primaryRpc;
|
|
this.secondaryEndpoint = config.secondaryRpc;
|
|
|
|
// Determine which endpoint to stream and which to compare
|
|
if (config.primaryRole === 'primary') {
|
|
this.streamEndpoint = this.primaryEndpoint;
|
|
this.compareEndpoint = this.secondaryEndpoint;
|
|
} else {
|
|
this.streamEndpoint = this.secondaryEndpoint;
|
|
this.compareEndpoint = this.primaryEndpoint;
|
|
}
|
|
|
|
// Start DNS refresh timer
|
|
this.startDnsRefreshTimer();
|
|
}
|
|
|
|
// Create a fresh axios instance for each request to ensure DNS resolution
|
|
createClient(baseURL) {
|
|
const isHttps = baseURL.startsWith('https://');
|
|
return axios.create({
|
|
baseURL,
|
|
timeout: config.requestTimeout,
|
|
maxContentLength: Infinity,
|
|
maxBodyLength: Infinity,
|
|
httpAgent: isHttps ? undefined : httpAgent,
|
|
httpsAgent: isHttps ? httpsAgent : undefined,
|
|
// Disable axios's built-in DNS caching
|
|
transformRequest: [
|
|
(data, headers) => {
|
|
// Add timestamp to force fresh connections periodically
|
|
headers['X-Request-Time'] = Date.now().toString();
|
|
return data;
|
|
},
|
|
...axios.defaults.transformRequest
|
|
]
|
|
});
|
|
}
|
|
|
|
startDnsRefreshTimer() {
|
|
// Only clear IDLE sockets, not active ones
|
|
setInterval(() => {
|
|
logger.debug('Refreshing DNS cache - clearing idle sockets only');
|
|
|
|
// Only destroy free (idle) sockets, not active ones
|
|
if (httpAgent.freeSockets) {
|
|
Object.keys(httpAgent.freeSockets).forEach(name => {
|
|
if (httpAgent.freeSockets[name]) {
|
|
httpAgent.freeSockets[name].forEach(socket => {
|
|
socket.destroy();
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
if (httpsAgent.freeSockets) {
|
|
Object.keys(httpsAgent.freeSockets).forEach(name => {
|
|
if (httpsAgent.freeSockets[name]) {
|
|
httpsAgent.freeSockets[name].forEach(socket => {
|
|
socket.destroy();
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}, config.dnsRefreshInterval);
|
|
}
|
|
|
|
generateRequestId() {
|
|
return crypto.randomBytes(16).toString('hex');
|
|
}
|
|
|
|
switchRoles() {
|
|
// Swap the endpoints
|
|
const tempEndpoint = this.streamEndpoint;
|
|
|
|
this.streamEndpoint = this.compareEndpoint;
|
|
this.compareEndpoint = tempEndpoint;
|
|
|
|
// Update the primaryRole in config
|
|
config.primaryRole = config.primaryRole === 'primary' ? 'secondary' : 'primary';
|
|
|
|
logger.info({
|
|
newStreamEndpoint: this.streamEndpoint,
|
|
newCompareEndpoint: this.compareEndpoint,
|
|
newPrimaryRole: config.primaryRole,
|
|
}, 'Switched primary/secondary roles');
|
|
|
|
return {
|
|
streamEndpoint: this.streamEndpoint,
|
|
compareEndpoint: this.compareEndpoint,
|
|
primaryRole: config.primaryRole,
|
|
};
|
|
}
|
|
|
|
async handleRequest(req, res) {
|
|
const requestId = this.generateRequestId();
|
|
const startTime = Date.now();
|
|
const hrStartTime = process.hrtime.bigint(); // Add high-resolution start time
|
|
const requestBody = req.body;
|
|
|
|
// Validate request body
|
|
if (!requestBody || !requestBody.method) {
|
|
logger.error({
|
|
requestId,
|
|
body: requestBody,
|
|
headers: req.headers,
|
|
}, 'Invalid or missing request body');
|
|
|
|
res.status(400).json({
|
|
jsonrpc: '2.0',
|
|
error: {
|
|
code: -32600,
|
|
message: 'Invalid Request',
|
|
},
|
|
id: requestBody?.id || null,
|
|
});
|
|
return;
|
|
}
|
|
|
|
logger.info({
|
|
requestId,
|
|
method: requestBody.method,
|
|
params: requestBody.params,
|
|
endpoint: 'incoming',
|
|
httpVersion: req.httpVersion,
|
|
connection: req.headers.connection,
|
|
userAgent: req.headers['user-agent'],
|
|
acceptEncoding: req.headers['accept-encoding'],
|
|
}, 'Received JSON-RPC request');
|
|
|
|
// Handle client disconnect
|
|
let clientClosed = false;
|
|
let clientCloseReason = null;
|
|
let responseCompleted = false;
|
|
|
|
// Use 'aborted' event which is more reliable for detecting client disconnects
|
|
req.on('aborted', () => {
|
|
if (!clientClosed) {
|
|
clientClosed = true;
|
|
clientCloseReason = 'request_aborted';
|
|
const elapsedMs = Date.now() - startTime;
|
|
|
|
logger.warn({
|
|
requestId,
|
|
reason: clientCloseReason,
|
|
headers: req.headers,
|
|
userAgent: req.headers['user-agent'],
|
|
contentLength: req.headers['content-length'],
|
|
method: requestBody.method,
|
|
elapsedMs,
|
|
responseCompleted,
|
|
}, 'Client aborted request');
|
|
}
|
|
});
|
|
|
|
// Don't use the 'close' event to determine client disconnect
|
|
// It fires too early and unreliably
|
|
req.on('close', () => {
|
|
const elapsedMs = Date.now() - startTime;
|
|
logger.debug({
|
|
requestId,
|
|
reason: 'request_close_event',
|
|
method: requestBody.method,
|
|
elapsedMs,
|
|
responseCompleted,
|
|
headersSent: res.headersSent,
|
|
finished: res.finished,
|
|
}, 'Request close event (informational only)');
|
|
});
|
|
|
|
req.on('error', (error) => {
|
|
// Only mark as closed for specific network errors
|
|
if (error.code === 'ECONNRESET' || error.code === 'EPIPE') {
|
|
clientClosed = true;
|
|
clientCloseReason = `connection_error: ${error.code}`;
|
|
}
|
|
logger.error({
|
|
requestId,
|
|
error: error.message,
|
|
code: error.code,
|
|
reason: clientCloseReason,
|
|
headers: req.headers,
|
|
method: requestBody.method,
|
|
elapsedMs: Date.now() - startTime,
|
|
}, 'Client connection error');
|
|
});
|
|
|
|
// Track when response is actually finished
|
|
res.on('finish', () => {
|
|
responseCompleted = true;
|
|
const finishTimeHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
|
logger.debug({
|
|
requestId,
|
|
method: requestBody.method,
|
|
elapsedMs: Date.now() - startTime,
|
|
finishTimeHrMs: finishTimeHR,
|
|
}, 'Response finished successfully');
|
|
});
|
|
|
|
// Also track response close events
|
|
res.on('close', () => {
|
|
if (!responseCompleted) {
|
|
const closeTimeHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
|
logger.warn({
|
|
requestId,
|
|
reason: 'response_closed',
|
|
finished: res.finished,
|
|
headersSent: res.headersSent,
|
|
method: requestBody.method,
|
|
elapsedMs: Date.now() - startTime,
|
|
closeTimeHrMs: closeTimeHR,
|
|
clientClosed,
|
|
}, 'Response connection closed before completion');
|
|
}
|
|
});
|
|
|
|
try {
|
|
// Start both requests in parallel
|
|
const streamPromise = this.streamResponse(
|
|
requestId,
|
|
requestBody,
|
|
res,
|
|
startTime,
|
|
hrStartTime, // Pass high-resolution start time
|
|
() => clientClosed,
|
|
() => responseCompleted = true,
|
|
() => clientCloseReason
|
|
);
|
|
|
|
// Check if method should be excluded from comparison
|
|
const excludedMethods = ['eth_sendRawTransaction', 'eth_sendTransaction'];
|
|
const shouldCompare = !excludedMethods.includes(requestBody.method);
|
|
|
|
let comparePromise = null;
|
|
if (shouldCompare) {
|
|
comparePromise = this.compareResponse(requestId, requestBody, startTime);
|
|
} else {
|
|
logger.info({
|
|
requestId,
|
|
method: requestBody.method,
|
|
reason: 'excluded_write_transaction',
|
|
}, 'Skipping comparison for write transaction method');
|
|
}
|
|
|
|
// Wait for the stream to complete and get response info
|
|
const streamInfo = await streamPromise;
|
|
|
|
// Get comparison response if applicable
|
|
if (comparePromise) {
|
|
comparePromise.then(compareInfo => {
|
|
if (compareInfo && streamInfo) {
|
|
this.compareResponses(requestId, streamInfo, compareInfo, requestBody);
|
|
}
|
|
}).catch(err => {
|
|
logger.error({
|
|
requestId,
|
|
error: err.message,
|
|
endpoint: 'compare',
|
|
}, 'Error in comparison request');
|
|
});
|
|
}
|
|
|
|
} catch (error) {
|
|
logger.error({
|
|
requestId,
|
|
error: error.message,
|
|
stack: error.stack,
|
|
streamEndpoint: this.streamEndpoint,
|
|
compareEndpoint: this.compareEndpoint,
|
|
}, 'Error handling request');
|
|
|
|
// Always try to send an error response if possible
|
|
if (!res.headersSent && !res.writableEnded) {
|
|
try {
|
|
// Send a proper JSON-RPC error response
|
|
res.status(502).json({
|
|
jsonrpc: '2.0',
|
|
error: {
|
|
code: -32603,
|
|
message: 'Internal error: Unable to connect to upstream RPC endpoints',
|
|
data: {
|
|
error: error.message,
|
|
streamEndpoint: this.streamEndpoint,
|
|
compareEndpoint: this.compareEndpoint,
|
|
}
|
|
},
|
|
id: requestBody.id || null,
|
|
});
|
|
|
|
logger.info({
|
|
requestId,
|
|
sentErrorResponse: true,
|
|
}, 'Sent error response to client');
|
|
} catch (sendError) {
|
|
logger.error({
|
|
requestId,
|
|
error: sendError.message,
|
|
}, 'Failed to send error response to client');
|
|
}
|
|
} else {
|
|
logger.warn({
|
|
requestId,
|
|
headersSent: res.headersSent,
|
|
writableEnded: res.writableEnded,
|
|
clientClosed,
|
|
}, 'Cannot send error response - headers already sent or connection closed');
|
|
}
|
|
}
|
|
}
|
|
|
|
async streamResponse(requestId, requestBody, res, startTime, hrStartTime, isClientClosed, isResponseCompleted, getClientCloseReason) {
|
|
let responseData = '';
|
|
let statusCode = 0;
|
|
let upstreamResponse = null;
|
|
|
|
// Add high-resolution timing for this method
|
|
const streamMethodStartTime = process.hrtime.bigint();
|
|
|
|
try {
|
|
// Create fresh client for this request
|
|
const client = this.createClient(this.streamEndpoint);
|
|
|
|
// Get the original Accept-Encoding from the client request
|
|
const acceptEncoding = res.req.headers['accept-encoding'] || 'identity';
|
|
|
|
// Check if client already closed before making upstream request
|
|
if (isClientClosed()) {
|
|
logger.warn({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
method: requestBody.method,
|
|
clientCloseReason: getClientCloseReason(),
|
|
elapsedBeforeRequest: Date.now() - startTime,
|
|
}, 'Client closed before upstream request could be made');
|
|
// Don't return early - still try to make the request in case our detection was wrong
|
|
// The actual response sending will handle the client closed state
|
|
}
|
|
|
|
logger.debug({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
method: requestBody.method,
|
|
streamEndpoint: this.streamEndpoint,
|
|
}, 'Making upstream request');
|
|
|
|
// Measure time to upstream request
|
|
const upstreamStartTime = process.hrtime.bigint();
|
|
|
|
let response;
|
|
try {
|
|
response = await client.post('/', requestBody, {
|
|
responseType: 'stream',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Accept-Encoding': acceptEncoding, // Forward client's encoding preference
|
|
},
|
|
validateStatus: (status) => true, // Don't throw on any status
|
|
});
|
|
} catch (upstreamError) {
|
|
// Log the specific error details
|
|
logger.error({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
error: upstreamError.message,
|
|
code: upstreamError.code,
|
|
streamEndpoint: this.streamEndpoint,
|
|
errno: upstreamError.errno,
|
|
syscall: upstreamError.syscall,
|
|
address: upstreamError.address,
|
|
port: upstreamError.port,
|
|
}, 'Failed to connect to upstream endpoint');
|
|
|
|
// Re-throw with more context
|
|
const enhancedError = new Error(`Failed to connect to upstream RPC endpoint at ${this.streamEndpoint}: ${upstreamError.message}`);
|
|
enhancedError.code = upstreamError.code;
|
|
enhancedError.originalError = upstreamError;
|
|
throw enhancedError;
|
|
}
|
|
|
|
upstreamResponse = response;
|
|
statusCode = response.status;
|
|
const streamLatency = Date.now() - startTime;
|
|
|
|
// Calculate pre-streaming overhead in nanoseconds
|
|
const preStreamOverheadNs = Number(process.hrtime.bigint() - streamMethodStartTime);
|
|
|
|
logger.info({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
latencyMs: streamLatency,
|
|
statusCode: response.status,
|
|
preStreamOverheadNs,
|
|
upstreamConnectNs: Number(process.hrtime.bigint() - upstreamStartTime),
|
|
}, 'Stream response started');
|
|
|
|
// Set response headers if not already sent
|
|
if (!res.headersSent) {
|
|
try {
|
|
res.status(response.status);
|
|
|
|
// Respect client's connection preference
|
|
const clientConnection = res.req.headers.connection;
|
|
if (clientConnection && clientConnection.toLowerCase() === 'close') {
|
|
res.setHeader('Connection', 'close');
|
|
} else {
|
|
res.setHeader('Connection', 'keep-alive');
|
|
res.setHeader('Keep-Alive', `timeout=${Math.floor(config.requestTimeout / 1000)}`);
|
|
}
|
|
|
|
Object.entries(response.headers).forEach(([key, value]) => {
|
|
// Don't override Connection header we just set
|
|
if (key.toLowerCase() !== 'connection' && key.toLowerCase() !== 'keep-alive') {
|
|
res.setHeader(key, value);
|
|
}
|
|
});
|
|
|
|
// Explicitly flush headers to ensure client receives them immediately
|
|
res.flushHeaders();
|
|
|
|
logger.debug({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
headersSent: true,
|
|
statusCode: response.status,
|
|
contentType: response.headers['content-type'],
|
|
contentLength: response.headers['content-length'],
|
|
transferEncoding: response.headers['transfer-encoding'],
|
|
clientClosed: isClientClosed(),
|
|
}, 'Response headers sent');
|
|
} catch (headerError) {
|
|
logger.error({
|
|
requestId,
|
|
error: headerError.message,
|
|
clientClosed: isClientClosed(),
|
|
}, 'Error setting response headers');
|
|
}
|
|
}
|
|
|
|
// Handle upstream errors
|
|
response.data.on('error', (error) => {
|
|
logger.error({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
error: error.message,
|
|
code: error.code,
|
|
}, 'Upstream stream error');
|
|
|
|
// Only destroy if response hasn't been sent yet
|
|
if (!res.headersSent && !res.writableEnded) {
|
|
res.destroy();
|
|
}
|
|
});
|
|
|
|
// Capture and stream the response
|
|
const chunks = [];
|
|
|
|
// For streaming clients, we need to handle backpressure properly
|
|
let writeQueue = Promise.resolve();
|
|
|
|
response.data.on('data', (chunk) => {
|
|
// Measure per-chunk overhead
|
|
const chunkStartTime = process.hrtime.bigint();
|
|
|
|
// Always capture raw chunks for comparison
|
|
chunks.push(chunk);
|
|
|
|
// Stream data to client
|
|
if (!res.writableEnded) {
|
|
// Chain writes to handle backpressure properly
|
|
writeQueue = writeQueue.then(() => new Promise((resolve) => {
|
|
try {
|
|
const canContinue = res.write(chunk, (err) => {
|
|
// Log per-chunk overhead
|
|
const chunkOverheadNs = Number(process.hrtime.bigint() - chunkStartTime);
|
|
if (chunkOverheadNs > 100000) { // Log if over 100 microseconds
|
|
logger.debug({
|
|
requestId,
|
|
chunkOverheadNs,
|
|
chunkSize: chunk.length,
|
|
}, 'High chunk processing overhead');
|
|
}
|
|
if (err) {
|
|
logger.error({
|
|
requestId,
|
|
error: err.message,
|
|
chunkSize: chunk.length,
|
|
}, 'Error in write callback');
|
|
}
|
|
resolve();
|
|
});
|
|
|
|
if (!canContinue) {
|
|
// Wait for drain event if write buffer is full
|
|
logger.debug({
|
|
requestId,
|
|
chunkSize: chunk.length,
|
|
}, 'Backpressure detected, waiting for drain');
|
|
res.once('drain', resolve);
|
|
} else {
|
|
resolve();
|
|
}
|
|
} catch (writeError) {
|
|
logger.error({
|
|
requestId,
|
|
error: writeError.message,
|
|
code: writeError.code,
|
|
clientClosed: isClientClosed(),
|
|
}, 'Error writing to client');
|
|
resolve(); // Continue even on error
|
|
}
|
|
}));
|
|
}
|
|
});
|
|
|
|
return new Promise((resolve, reject) => {
|
|
response.data.on('end', async () => {
|
|
isResponseCompleted(); // Mark response as completed
|
|
|
|
const totalTime = Date.now() - startTime;
|
|
const totalTimeHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000; // Convert nanoseconds to milliseconds with decimal precision
|
|
|
|
// Combine chunks and convert to string for logging
|
|
const rawData = Buffer.concat(chunks);
|
|
responseData = rawData.toString('utf8');
|
|
|
|
// Wait for all writes to complete before ending
|
|
if (!res.writableEnded) {
|
|
try {
|
|
await writeQueue;
|
|
logger.debug({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
}, 'All chunks written successfully');
|
|
} catch (err) {
|
|
logger.error({
|
|
requestId,
|
|
error: err.message,
|
|
}, 'Error waiting for writes to complete');
|
|
}
|
|
}
|
|
|
|
// End the response
|
|
if (!res.writableEnded) {
|
|
try {
|
|
// If there's still data in the write buffer, wait for it to drain
|
|
if (res.writableHighWaterMark && res.writableLength > 0) {
|
|
res.once('drain', () => {
|
|
res.end(() => {
|
|
const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
|
logger.debug({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
responseSize: rawData.length,
|
|
clientClosed: isClientClosed(),
|
|
transferCompleteHrMs: transferCompleteHR,
|
|
}, 'Ended streaming response after drain');
|
|
});
|
|
});
|
|
} else {
|
|
res.end(() => {
|
|
const transferCompleteHR = Number(process.hrtime.bigint() - hrStartTime) / 1000000;
|
|
logger.debug({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
responseSize: rawData.length,
|
|
clientClosed: isClientClosed(),
|
|
transferCompleteHrMs: transferCompleteHR,
|
|
}, 'Ended streaming response');
|
|
});
|
|
}
|
|
} catch (endError) {
|
|
logger.error({
|
|
requestId,
|
|
error: endError.message,
|
|
clientClosed: isClientClosed(),
|
|
}, 'Error ending response');
|
|
}
|
|
}
|
|
|
|
// Log if client closed very early
|
|
if (isClientClosed() && totalTime < 10) {
|
|
// This appears to be normal JSON-RPC client behavior
|
|
logger.info({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
totalTimeMs: totalTime,
|
|
totalTimeHrMs: totalTimeHR, // High-resolution time in milliseconds
|
|
responseSize: rawData.length,
|
|
contentEncoding: response.headers['content-encoding'],
|
|
responseHeaders: response.headers,
|
|
method: requestBody.method,
|
|
httpVersion: res.req.httpVersion,
|
|
keepAlive: res.req.headers.connection,
|
|
clientClosedAt: getClientCloseReason(),
|
|
responseComplete: true, // We're in the 'end' event, so response is complete
|
|
chunksReceived: chunks.length,
|
|
// For small responses, log the actual data to see what's happening
|
|
responseData: rawData.length < 200 ? responseData : '[truncated]',
|
|
}, 'Client closed connection quickly (normal for JSON-RPC)');
|
|
}
|
|
|
|
// Add transfer timing to final log
|
|
const endTime = process.hrtime.bigint();
|
|
const streamingDurationHR = Number(endTime - streamMethodStartTime) / 1000000;
|
|
|
|
logger.info({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
totalTimeMs: totalTime,
|
|
totalTimeHrMs: totalTimeHR, // High-resolution time in milliseconds
|
|
streamingDurationHrMs: streamingDurationHR, // Time spent in streamResponse method
|
|
responseSize: rawData.length,
|
|
contentEncoding: response.headers['content-encoding'],
|
|
clientClosed: isClientClosed(),
|
|
}, 'Stream response completed');
|
|
|
|
resolve({
|
|
statusCode,
|
|
data: responseData,
|
|
size: rawData.length,
|
|
latency: totalTime,
|
|
latencyHR: totalTimeHR, // Add high-resolution latency
|
|
contentEncoding: response.headers['content-encoding'],
|
|
});
|
|
});
|
|
|
|
response.data.on('error', (error) => {
|
|
logger.error({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
error: error.message,
|
|
code: error.code,
|
|
}, 'Stream error');
|
|
reject(error);
|
|
});
|
|
});
|
|
|
|
} catch (error) {
|
|
const streamLatency = Date.now() - startTime;
|
|
|
|
logger.error({
|
|
requestId,
|
|
endpoint: 'stream',
|
|
latencyMs: streamLatency,
|
|
error: error.message,
|
|
code: error.code,
|
|
statusCode: error.response?.status,
|
|
streamEndpoint: this.streamEndpoint,
|
|
}, 'Stream request failed');
|
|
|
|
// Clean up upstream response if it exists
|
|
if (upstreamResponse && upstreamResponse.data) {
|
|
upstreamResponse.data.destroy();
|
|
}
|
|
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async compareResponse(requestId, requestBody, startTime) {
|
|
try {
|
|
const compareStart = Date.now();
|
|
const compareStartHR = process.hrtime.bigint(); // Add high-resolution timing
|
|
// Create fresh client for this request
|
|
const client = this.createClient(this.compareEndpoint);
|
|
const response = await client.post('/', requestBody, {
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Accept-Encoding': 'gzip, deflate', // Accept compressed responses for comparison
|
|
},
|
|
validateStatus: (status) => true, // Don't throw on any status
|
|
});
|
|
|
|
const compareLatency = Date.now() - compareStart;
|
|
const compareLatencyHR = Number(process.hrtime.bigint() - compareStartHR) / 1000000; // High-resolution in milliseconds
|
|
const compareData = typeof response.data === 'string'
|
|
? response.data
|
|
: JSON.stringify(response.data);
|
|
const compareSize = compareData.length;
|
|
|
|
logger.info({
|
|
requestId,
|
|
endpoint: 'compare',
|
|
latencyMs: compareLatency,
|
|
latencyHrMs: compareLatencyHR, // High-resolution timing
|
|
statusCode: response.status,
|
|
responseSize: compareSize,
|
|
}, 'Compare response received');
|
|
|
|
return {
|
|
statusCode: response.status,
|
|
data: compareData,
|
|
size: compareSize,
|
|
latency: compareLatency,
|
|
latencyHR: compareLatencyHR, // Include high-resolution timing
|
|
};
|
|
|
|
} catch (error) {
|
|
logger.error({
|
|
requestId,
|
|
endpoint: 'compare',
|
|
error: error.message,
|
|
code: error.code,
|
|
statusCode: error.response?.status,
|
|
}, 'Compare request failed');
|
|
|
|
return {
|
|
error: error.message,
|
|
statusCode: error.response?.status || 0,
|
|
};
|
|
}
|
|
}
|
|
|
|
compareResponses(requestId, streamResponse, compareResponse, requestBody) {
|
|
if (!config.logMismatches) return;
|
|
|
|
const mismatches = [];
|
|
|
|
// Check status code mismatch
|
|
if (streamResponse.statusCode !== compareResponse.statusCode) {
|
|
mismatches.push({
|
|
type: 'status_code',
|
|
stream: streamResponse.statusCode,
|
|
compare: compareResponse.statusCode,
|
|
});
|
|
}
|
|
|
|
// Check size difference
|
|
const sizeDiff = Math.abs(streamResponse.size - compareResponse.size);
|
|
if (sizeDiff > config.sizeDiffThreshold) {
|
|
mismatches.push({
|
|
type: 'size',
|
|
streamSize: streamResponse.size,
|
|
compareSize: compareResponse.size,
|
|
difference: sizeDiff,
|
|
});
|
|
}
|
|
|
|
// Check latency difference
|
|
const latencyDiff = compareResponse.latency - streamResponse.latency;
|
|
if (!config.ignoreLatencyMismatches && latencyDiff > config.latencyThresholdMs) {
|
|
mismatches.push({
|
|
type: 'latency',
|
|
streamLatency: streamResponse.latency,
|
|
compareLatency: compareResponse.latency,
|
|
difference: latencyDiff,
|
|
});
|
|
}
|
|
|
|
// Log mismatches if any found
|
|
if (mismatches.length > 0) {
|
|
const logEntry = {
|
|
requestId,
|
|
method: requestBody.method,
|
|
request: requestBody, // Include the original request for context
|
|
mismatches,
|
|
streamEndpoint: this.streamEndpoint,
|
|
compareEndpoint: this.compareEndpoint,
|
|
};
|
|
|
|
// Include full responses if there's any mismatch (not just size)
|
|
// This helps debug status code mismatches, timeouts, etc.
|
|
const shouldLogResponses = sizeDiff > config.sizeDiffThreshold ||
|
|
streamResponse.statusCode !== compareResponse.statusCode ||
|
|
(config.logAllMismatchedResponses === true);
|
|
|
|
if (shouldLogResponses) {
|
|
try {
|
|
// Try to parse as JSON for better readability
|
|
logEntry.streamResponse = {
|
|
statusCode: streamResponse.statusCode,
|
|
size: streamResponse.size,
|
|
data: streamResponse.data ? JSON.parse(streamResponse.data) : null
|
|
};
|
|
logEntry.compareResponse = {
|
|
statusCode: compareResponse.statusCode,
|
|
size: compareResponse.size,
|
|
data: compareResponse.data ? JSON.parse(compareResponse.data) : null
|
|
};
|
|
} catch (e) {
|
|
// If not valid JSON, include raw data
|
|
logEntry.streamResponse = {
|
|
statusCode: streamResponse.statusCode,
|
|
size: streamResponse.size,
|
|
data: streamResponse.data
|
|
};
|
|
logEntry.compareResponse = {
|
|
statusCode: compareResponse.statusCode,
|
|
size: compareResponse.size,
|
|
data: compareResponse.data
|
|
};
|
|
}
|
|
|
|
// Log a summary for easier reading
|
|
logger.warn({
|
|
requestId,
|
|
method: requestBody.method,
|
|
mismatchTypes: mismatches.map(m => m.type),
|
|
streamEndpoint: this.streamEndpoint,
|
|
compareEndpoint: this.compareEndpoint,
|
|
}, 'Response mismatch detected - full details below');
|
|
}
|
|
|
|
// Log the full entry with all details
|
|
logger.warn(logEntry, 'Response mismatch details');
|
|
} else {
|
|
logger.debug({
|
|
requestId,
|
|
method: requestBody.method,
|
|
streamLatency: streamResponse.latency,
|
|
compareLatency: compareResponse.latency,
|
|
streamSize: streamResponse.size,
|
|
compareSize: compareResponse.size,
|
|
}, 'Responses match');
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = RPCProxy;
|