diff --git a/split-proxy/.dockerignore b/split-proxy/.dockerignore new file mode 100644 index 00000000..f2faa3ac --- /dev/null +++ b/split-proxy/.dockerignore @@ -0,0 +1,12 @@ +node_modules/ +npm-debug.log +.env +.env.local +.DS_Store +*.log +.git/ +.gitignore +README.md +docker-compose.yml +.dockerignore +Dockerfile \ No newline at end of file diff --git a/split-proxy/.gitignore b/split-proxy/.gitignore new file mode 100644 index 00000000..adbeced1 --- /dev/null +++ b/split-proxy/.gitignore @@ -0,0 +1,47 @@ +# Dependencies +node_modules/ + +# Environment files +.env +.env.local +.env.*.local + +# Logs +logs +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Runtime data +pids +*.pid +*.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage +*.lcov + +# nyc test coverage +.nyc_output + +# OS files +.DS_Store +Thumbs.db + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Build outputs +dist/ +build/ \ No newline at end of file diff --git a/split-proxy/Dockerfile b/split-proxy/Dockerfile new file mode 100644 index 00000000..62b00459 --- /dev/null +++ b/split-proxy/Dockerfile @@ -0,0 +1,26 @@ +FROM node:20-alpine + +WORKDIR /app + +# Copy package files +COPY package*.json ./ + +# Install production dependencies +RUN npm install --production + +# Copy application files +COPY . . + +# Set production environment +ENV NODE_ENV=production + +# Expose port +EXPOSE 8545 + +# Run as non-root user +RUN addgroup -g 1001 -S nodejs +RUN adduser -S nodejs -u 1001 +USER nodejs + +# Start the application +CMD ["node", "index.js"] \ No newline at end of file diff --git a/split-proxy/README.md b/split-proxy/README.md new file mode 100644 index 00000000..8bd41605 --- /dev/null +++ b/split-proxy/README.md @@ -0,0 +1,245 @@ +# ETH JSON-RPC Proxy (Dual Upstream Comparator) + +A high-performance Ethereum JSON-RPC proxy that forwards requests to two upstream endpoints, streams responses from the primary, and compares responses for latency and content differences. + +## Features + +- 🚀 **Streaming Response**: Streams response from primary endpoint with minimal latency +- 🔍 **Dual Comparison**: Compares responses from two endpoints in parallel +- 📊 **Latency Monitoring**: Tracks and logs latency differences between endpoints +- 🔧 **Configurable Roles**: Switch primary/secondary roles via configuration +- 🔄 **Dynamic Role Switching**: Switch primary/secondary roles at runtime without restart +- 📝 **Structured Logging**: JSON-formatted logs for easy parsing and monitoring +- 🐳 **Docker Ready**: Includes Dockerfile for easy deployment + +## Quick Start + +### Local Development + +1. Install dependencies: +```bash +npm install +``` + +2. Copy the example environment file: +```bash +cp .env.example .env +``` + +3. Configure your RPC endpoints in `.env`: +```env +PRIMARY_RPC=http://localhost:8545 +SECONDARY_RPC=http://localhost:8546 +``` + +4. Start the proxy: +```bash +npm start +``` + +For development with auto-reload: +```bash +npm run dev +``` + +### Docker + +Build and run with Docker: + +```bash +# Build the image +docker build -t eth-rpc-proxy . + +# Run the container +docker run -p 8545:8545 \ + -e PRIMARY_RPC=http://your-primary-rpc:8545 \ + -e SECONDARY_RPC=http://your-secondary-rpc:8545 \ + eth-rpc-proxy +``` + +## Configuration + +All configuration is done via environment variables: + +| Variable | Default | Description | +|----------|---------|-------------| +| `PRIMARY_RPC` | `http://localhost:8545` | Primary RPC endpoint URL | +| `SECONDARY_RPC` | `http://localhost:8546` | Secondary RPC endpoint URL | +| `PRIMARY_ROLE` | `primary` | Which endpoint to stream (`primary` or `secondary`) | +| `LATENCY_THRESHOLD_MS` | `1000` | Log if secondary is slower by this many ms | +| `SIZE_DIFF_THRESHOLD` | `100` | Log full response if size differs by this many bytes | +| `LOG_MISMATCHES` | `true` | Whether to log response mismatches | +| `PORT` | `8545` | Port to listen on | +| `REQUEST_TIMEOUT` | `30000` | Request timeout in milliseconds | +| `LOG_LEVEL` | `info` | Log level (debug, info, warn, error) | +| `DNS_REFRESH_INTERVAL` | `1000` | DNS cache refresh interval in milliseconds | + +## Usage Examples + +### Basic JSON-RPC Request + +```bash +curl -X POST http://localhost:8545 \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "eth_blockNumber", + "params": [], + "id": 1 + }' +``` + +### Health Check + +```bash +curl http://localhost:8545/health +``` + +Response: +```json +{ + "status": "ok", + "primaryEndpoint": "http://localhost:8545", + "secondaryEndpoint": "http://localhost:8546", + "primaryRole": "primary" +} +``` + +## How It Works + +1. **Request Reception**: Proxy receives JSON-RPC POST request +2. **Parallel Forwarding**: Request is sent to both primary and secondary endpoints simultaneously +3. **Stream Response**: Response from the designated primary is streamed back to client immediately +4. **Background Comparison**: Secondary response is collected and compared in the background +5. **Logging**: Differences in latency, size, or content are logged for monitoring + +## Dynamic Role Switching + +You can switch between primary and secondary endpoints at runtime without restarting the proxy by sending a `SIGUSR1` signal: + +### Local Process +```bash +# Find the process ID +ps aux | grep "node index.js" + +# Send SIGUSR1 signal +kill -USR1 +``` + +### Docker Container +```bash +# Send SIGUSR1 to the container +docker kill -s USR1 eth-rpc-proxy + +# Or if using docker-compose +docker-compose kill -s USR1 eth-rpc-proxy +``` + +When the signal is received, the proxy will: +- Swap the streaming and comparison endpoints +- Log the role switch with the new configuration +- Continue processing requests with the new roles immediately + +The health endpoint will reflect the current active endpoints: +```json +{ + "status": "ok", + "primaryEndpoint": "http://localhost:8545", + "secondaryEndpoint": "http://localhost:8546", + "primaryRole": "secondary", + "currentStreamEndpoint": "http://localhost:8546", + "currentCompareEndpoint": "http://localhost:8545" +} +``` + +## DNS Resolution for Docker Environments + +The proxy automatically handles DNS resolution to ensure it always connects to the correct container IPs, even when containers restart and receive new IP addresses. This is especially important in Docker environments. + +### How it works: + +1. **Fresh DNS Lookups**: The proxy creates new HTTP clients for each request, ensuring fresh DNS resolution +2. **Periodic Cache Clearing**: DNS cache is cleared at the configured interval (default: 1 second) +3. **Connection Pooling**: Despite fresh DNS lookups, connections are pooled for performance +4. **Automatic Recovery**: When a container restarts with a new IP, the proxy automatically discovers it + +### Configuration: + +Set the `DNS_REFRESH_INTERVAL` environment variable to control how often the DNS cache is refreshed: + +```bash +# Refresh DNS every second (default) +DNS_REFRESH_INTERVAL=1000 + +# Refresh DNS every 5 seconds +DNS_REFRESH_INTERVAL=5000 +``` + +This ensures the proxy remains resilient in dynamic container environments where services may be redeployed or scaled. + +## Monitoring + +The proxy logs structured JSON output that includes: + +- Request IDs for tracing +- Method names +- Latency measurements +- Response sizes +- Mismatch details + +Example log output: +```json +{ + "level": "info", + "time": "2024-01-01T12:00:00.000Z", + "requestId": "a1b2c3d4e5f6", + "method": "eth_getBlockByNumber", + "endpoint": "stream", + "latencyMs": 45, + "msg": "Stream response started" +} +``` + +## Performance Considerations + +- The proxy streams responses to minimize latency impact +- Comparison happens asynchronously without blocking the response +- Large responses are handled efficiently with streaming +- Connection pooling is used for upstream requests + +## Docker Compose Example + +```yaml +version: '3.8' + +services: + eth-proxy: + build: . + ports: + - "8545:8545" + environment: + PRIMARY_RPC: http://geth:8545 + SECONDARY_RPC: http://besu:8545 + LATENCY_THRESHOLD_MS: 500 + LOG_LEVEL: info + restart: unless-stopped +``` + +## Troubleshooting + +### Common Issues + +1. **Connection Refused**: Ensure upstream endpoints are accessible +2. **Timeout Errors**: Increase `REQUEST_TIMEOUT` for slow endpoints +3. **Memory Issues**: The proxy buffers secondary responses - ensure adequate memory for large responses + +### Debug Mode + +Enable debug logging for more detailed information: +```bash +LOG_LEVEL=debug npm start +``` + +## License + +MIT \ No newline at end of file diff --git a/split-proxy/config.js b/split-proxy/config.js new file mode 100644 index 00000000..aa1b6145 --- /dev/null +++ b/split-proxy/config.js @@ -0,0 +1,49 @@ +const dotenv = require('dotenv'); +const path = require('path'); + +// Load environment variables from .env file +dotenv.config({ path: path.join(__dirname, '.env') }); + +const config = { + // RPC endpoints + primaryRpc: process.env.PRIMARY_RPC || 'http://localhost:8545', + secondaryRpc: process.env.SECONDARY_RPC || 'http://localhost:8546', + + // Role configuration + primaryRole: process.env.PRIMARY_ROLE || 'primary', // 'primary' or 'secondary' + + // Thresholds + latencyThresholdMs: parseInt(process.env.LATENCY_THRESHOLD_MS || '1000', 10), + sizeDiffThreshold: parseInt(process.env.SIZE_DIFF_THRESHOLD || '100', 10), + + // Logging + logMismatches: process.env.LOG_MISMATCHES !== 'false', // default true + + // Server + port: parseInt(process.env.PORT || '8545', 10), + + // Request timeout + requestTimeout: parseInt(process.env.REQUEST_TIMEOUT || '30000', 10), + + // DNS refresh interval in milliseconds + dnsRefreshInterval: parseInt(process.env.DNS_REFRESH_INTERVAL || '1000', 10), +}; + +// Validate configuration +function validateConfig() { + if (!config.primaryRpc || !config.secondaryRpc) { + throw new Error('PRIMARY_RPC and SECONDARY_RPC must be configured'); + } + + if (!['primary', 'secondary'].includes(config.primaryRole)) { + throw new Error('PRIMARY_ROLE must be either "primary" or "secondary"'); + } + + if (config.primaryRpc === config.secondaryRpc) { + console.warn('WARNING: PRIMARY_RPC and SECONDARY_RPC are the same'); + } +} + +validateConfig(); + +module.exports = config; \ No newline at end of file diff --git a/split-proxy/docker-compose.yml b/split-proxy/docker-compose.yml new file mode 100644 index 00000000..92c13abe --- /dev/null +++ b/split-proxy/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3.8' + +services: + eth-rpc-proxy: + build: . + container_name: eth-rpc-proxy + ports: + - "8545:8545" + environment: + # Replace these with your actual RPC endpoints + PRIMARY_RPC: ${PRIMARY_RPC:-http://primary-node:8545} + SECONDARY_RPC: ${SECONDARY_RPC:-http://secondary-node:8545} + PRIMARY_ROLE: ${PRIMARY_ROLE:-primary} + LATENCY_THRESHOLD_MS: ${LATENCY_THRESHOLD_MS:-1000} + SIZE_DIFF_THRESHOLD: ${SIZE_DIFF_THRESHOLD:-100} + LOG_MISMATCHES: ${LOG_MISMATCHES:-true} + LOG_LEVEL: ${LOG_LEVEL:-info} + NODE_ENV: production + restart: unless-stopped + networks: + - eth-network + healthcheck: + test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8545/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 10s + +networks: + eth-network: + driver: bridge \ No newline at end of file diff --git a/split-proxy/example.env b/split-proxy/example.env new file mode 100644 index 00000000..d8daea37 --- /dev/null +++ b/split-proxy/example.env @@ -0,0 +1,31 @@ +# Primary and Secondary RPC endpoints +PRIMARY_RPC=http://primary-endpoint:8545 +SECONDARY_RPC=http://secondary-endpoint:8545 + +# Which endpoint to use as primary (stream response from this one) +# Options: 'primary' or 'secondary' +PRIMARY_ROLE=primary + +# Latency threshold in milliseconds +# Log warning if secondary is slower than primary by this amount +LATENCY_THRESHOLD_MS=1000 + +# Size difference threshold in bytes +# Log full response if size differs by more than this +SIZE_DIFF_THRESHOLD=100 + +# Whether to log mismatches +LOG_MISMATCHES=true + +# Server port +PORT=8545 + +# Request timeout in milliseconds +REQUEST_TIMEOUT=30000 + +# Log level (debug, info, warn, error) +LOG_LEVEL=info + +# DNS refresh interval in milliseconds +# Important for Docker environments where container IPs can change +DNS_REFRESH_INTERVAL=1000 \ No newline at end of file diff --git a/split-proxy/index.js b/split-proxy/index.js new file mode 100644 index 00000000..ea63e64f --- /dev/null +++ b/split-proxy/index.js @@ -0,0 +1,101 @@ +const express = require('express'); +const config = require('./config'); +const logger = require('./logger'); +const RPCProxy = require('./proxy'); + +const app = express(); +const proxy = new RPCProxy(); + +// Middleware to parse JSON bodies +app.use(express.json({ + limit: '50mb', + type: 'application/json' +})); + +// Health check endpoint +app.get('/health', (req, res) => { + res.json({ + status: 'ok', + primaryEndpoint: config.primaryRpc, + secondaryEndpoint: config.secondaryRpc, + primaryRole: config.primaryRole, + currentStreamEndpoint: proxy.streamEndpoint, + currentCompareEndpoint: proxy.compareEndpoint, + }); +}); + +// Main JSON-RPC endpoint +app.post('/', async (req, res) => { + await proxy.handleRequest(req, res); +}); + +// Error handling middleware +app.use((err, req, res, next) => { + logger.error({ + error: err.message, + stack: err.stack, + }, 'Unhandled error'); + + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal error', + }, + id: req.body?.id, + }); + } +}); + +// Start server +const server = app.listen(config.port, () => { + logger.info({ + port: config.port, + primaryRpc: config.primaryRpc, + secondaryRpc: config.secondaryRpc, + primaryRole: config.primaryRole, + latencyThreshold: config.latencyThresholdMs, + sizeDiffThreshold: config.sizeDiffThreshold, + }, 'ETH JSON-RPC proxy started'); +}); + +// Graceful shutdown +process.on('SIGTERM', () => { + logger.info('SIGTERM received, shutting down gracefully'); + server.close(() => { + logger.info('Server closed'); + process.exit(0); + }); +}); + +process.on('SIGINT', () => { + logger.info('SIGINT received, shutting down gracefully'); + server.close(() => { + logger.info('Server closed'); + process.exit(0); + }); +}); + +// Dynamic role switching with SIGUSR1 +process.on('SIGUSR1', () => { + logger.info('SIGUSR1 received, switching primary/secondary roles'); + try { + const newConfig = proxy.switchRoles(); + logger.info({ + ...newConfig, + signal: 'SIGUSR1', + }, 'Successfully switched roles'); + } catch (error) { + logger.error({ + error: error.message, + signal: 'SIGUSR1', + }, 'Failed to switch roles'); + } +}); + +// Log instructions for role switching on startup +logger.info('To switch primary/secondary roles at runtime, send SIGUSR1 signal:'); +logger.info(' kill -USR1 OR docker kill -s USR1 '); + +module.exports = app; diff --git a/split-proxy/logger.js b/split-proxy/logger.js new file mode 100644 index 00000000..09a9d4c6 --- /dev/null +++ b/split-proxy/logger.js @@ -0,0 +1,15 @@ +const pino = require('pino'); + +const logger = pino({ + level: process.env.LOG_LEVEL || 'info', + transport: process.env.NODE_ENV === 'production' ? undefined : { + target: 'pino-pretty', + options: { + colorize: true, + translateTime: 'SYS:standard', + ignore: 'pid,hostname' + } + } +}); + +module.exports = logger; \ No newline at end of file diff --git a/split-proxy/package.json b/split-proxy/package.json new file mode 100644 index 00000000..580af814 --- /dev/null +++ b/split-proxy/package.json @@ -0,0 +1,23 @@ +{ + "name": "eth-rpc-proxy", + "version": "1.0.0", + "description": "ETH JSON-RPC Proxy with dual upstream comparison", + "main": "index.js", + "scripts": { + "start": "node index.js", + "dev": "node --watch index.js" + }, + "keywords": ["ethereum", "json-rpc", "proxy"], + "author": "", + "license": "MIT", + "dependencies": { + "axios": "^1.6.0", + "dotenv": "^16.3.1", + "express": "^4.18.2", + "pino": "^8.16.0", + "pino-pretty": "^10.2.3" + }, + "engines": { + "node": ">=20.0.0" + } +} \ No newline at end of file diff --git a/split-proxy/proxy.js b/split-proxy/proxy.js new file mode 100644 index 00000000..0767ab61 --- /dev/null +++ b/split-proxy/proxy.js @@ -0,0 +1,380 @@ +const axios = require('axios'); +const config = require('./config'); +const logger = require('./logger'); +const crypto = require('crypto'); +const http = require('http'); +const https = require('https'); + +// Create HTTP agents with DNS caching disabled and connection pooling +const httpAgent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: 1000, + maxSockets: 100, + maxFreeSockets: 10, + timeout: config.requestTimeout, + // Force fresh DNS lookups + lookup: (hostname, options, callback) => { + // This forces Node.js to use fresh DNS resolution + require('dns').lookup(hostname, options, callback); + } +}); + +const httpsAgent = new https.Agent({ + keepAlive: true, + keepAliveMsecs: 1000, + maxSockets: 100, + maxFreeSockets: 10, + timeout: config.requestTimeout, + // Force fresh DNS lookups + lookup: (hostname, options, callback) => { + // This forces Node.js to use fresh DNS resolution + require('dns').lookup(hostname, options, callback); + } +}); + +class RPCProxy { + constructor() { + // Store endpoint URLs instead of clients + this.primaryEndpoint = config.primaryRpc; + this.secondaryEndpoint = config.secondaryRpc; + + // Determine which endpoint to stream and which to compare + if (config.primaryRole === 'primary') { + this.streamEndpoint = this.primaryEndpoint; + this.compareEndpoint = this.secondaryEndpoint; + } else { + this.streamEndpoint = this.secondaryEndpoint; + this.compareEndpoint = this.primaryEndpoint; + } + + // Start DNS refresh timer + this.startDnsRefreshTimer(); + } + + // Create a fresh axios instance for each request to ensure DNS resolution + createClient(baseURL) { + const isHttps = baseURL.startsWith('https://'); + return axios.create({ + baseURL, + timeout: config.requestTimeout, + maxContentLength: Infinity, + maxBodyLength: Infinity, + httpAgent: isHttps ? undefined : httpAgent, + httpsAgent: isHttps ? httpsAgent : undefined, + // Disable axios's built-in DNS caching + transformRequest: [ + (data, headers) => { + // Add timestamp to force fresh connections periodically + headers['X-Request-Time'] = Date.now().toString(); + return data; + }, + ...axios.defaults.transformRequest + ] + }); + } + + startDnsRefreshTimer() { + // Periodically clear the DNS cache by recreating the agents + setInterval(() => { + logger.debug('Refreshing DNS cache'); + + // Clear any cached DNS entries in the HTTP agents + if (httpAgent.sockets) { + Object.keys(httpAgent.sockets).forEach(name => { + httpAgent.sockets[name].forEach(socket => { + socket.destroy(); + }); + }); + } + + if (httpsAgent.sockets) { + Object.keys(httpsAgent.sockets).forEach(name => { + httpsAgent.sockets[name].forEach(socket => { + socket.destroy(); + }); + }); + } + }, config.dnsRefreshInterval); + } + + generateRequestId() { + return crypto.randomBytes(16).toString('hex'); + } + + switchRoles() { + // Swap the endpoints + const tempEndpoint = this.streamEndpoint; + + this.streamEndpoint = this.compareEndpoint; + this.compareEndpoint = tempEndpoint; + + // Update the primaryRole in config + config.primaryRole = config.primaryRole === 'primary' ? 'secondary' : 'primary'; + + logger.info({ + newStreamEndpoint: this.streamEndpoint, + newCompareEndpoint: this.compareEndpoint, + newPrimaryRole: config.primaryRole, + }, 'Switched primary/secondary roles'); + + return { + streamEndpoint: this.streamEndpoint, + compareEndpoint: this.compareEndpoint, + primaryRole: config.primaryRole, + }; + } + + async handleRequest(req, res) { + const requestId = this.generateRequestId(); + const startTime = Date.now(); + const requestBody = req.body; + + logger.info({ + requestId, + method: requestBody.method, + params: requestBody.params, + endpoint: 'incoming', + }, 'Received JSON-RPC request'); + + try { + // Start both requests in parallel + const streamPromise = this.streamResponse(requestId, requestBody, res, startTime); + const comparePromise = this.compareResponse(requestId, requestBody, startTime); + + // Wait for the stream to complete and get response info + const streamInfo = await streamPromise; + + // Get comparison response + comparePromise.then(compareInfo => { + if (compareInfo && streamInfo) { + this.compareResponses(requestId, streamInfo, compareInfo, requestBody); + } + }).catch(err => { + logger.error({ + requestId, + error: err.message, + endpoint: 'compare', + }, 'Error in comparison request'); + }); + + } catch (error) { + logger.error({ + requestId, + error: error.message, + stack: error.stack, + }, 'Error handling request'); + + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal error', + }, + id: requestBody.id, + }); + } + } + } + + async streamResponse(requestId, requestBody, res, startTime) { + let responseData = ''; + let statusCode = 0; + + try { + // Create fresh client for this request + const client = this.createClient(this.streamEndpoint); + const response = await client.post('/', requestBody, { + responseType: 'stream', + headers: { + 'Content-Type': 'application/json', + }, + }); + + statusCode = response.status; + const streamLatency = Date.now() - startTime; + + logger.info({ + requestId, + endpoint: 'stream', + latencyMs: streamLatency, + statusCode: response.status, + }, 'Stream response started'); + + // Set response headers + res.status(response.status); + Object.entries(response.headers).forEach(([key, value]) => { + if (key.toLowerCase() !== 'content-encoding') { + res.setHeader(key, value); + } + }); + + // Capture and stream the response + response.data.on('data', (chunk) => { + responseData += chunk.toString(); + res.write(chunk); + }); + + return new Promise((resolve, reject) => { + response.data.on('end', () => { + res.end(); + const totalTime = Date.now() - startTime; + + logger.info({ + requestId, + endpoint: 'stream', + totalTimeMs: totalTime, + responseSize: responseData.length, + }, 'Stream response completed'); + + resolve({ + statusCode, + data: responseData, + size: responseData.length, + latency: totalTime, + }); + }); + + response.data.on('error', (error) => { + logger.error({ + requestId, + endpoint: 'stream', + error: error.message, + }, 'Stream error'); + reject(error); + }); + }); + + } catch (error) { + const streamLatency = Date.now() - startTime; + + logger.error({ + requestId, + endpoint: 'stream', + latencyMs: streamLatency, + error: error.message, + statusCode: error.response?.status, + }, 'Stream request failed'); + + throw error; + } + } + + async compareResponse(requestId, requestBody, startTime) { + try { + const compareStart = Date.now(); + // Create fresh client for this request + const client = this.createClient(this.compareEndpoint); + const response = await client.post('/', requestBody, { + headers: { + 'Content-Type': 'application/json', + }, + }); + + const compareLatency = Date.now() - compareStart; + const compareData = typeof response.data === 'string' + ? response.data + : JSON.stringify(response.data); + const compareSize = compareData.length; + + logger.info({ + requestId, + endpoint: 'compare', + latencyMs: compareLatency, + statusCode: response.status, + responseSize: compareSize, + }, 'Compare response received'); + + return { + statusCode: response.status, + data: compareData, + size: compareSize, + latency: compareLatency, + }; + + } catch (error) { + logger.error({ + requestId, + endpoint: 'compare', + error: error.message, + statusCode: error.response?.status, + }, 'Compare request failed'); + + return { + error: error.message, + statusCode: error.response?.status || 0, + }; + } + } + + compareResponses(requestId, streamResponse, compareResponse, requestBody) { + if (!config.logMismatches) return; + + const mismatches = []; + + // Check status code mismatch + if (streamResponse.statusCode !== compareResponse.statusCode) { + mismatches.push({ + type: 'status_code', + stream: streamResponse.statusCode, + compare: compareResponse.statusCode, + }); + } + + // Check size difference + const sizeDiff = Math.abs(streamResponse.size - compareResponse.size); + if (sizeDiff > config.sizeDiffThreshold) { + mismatches.push({ + type: 'size', + streamSize: streamResponse.size, + compareSize: compareResponse.size, + difference: sizeDiff, + }); + } + + // Check latency difference + const latencyDiff = compareResponse.latency - streamResponse.latency; + if (latencyDiff > config.latencyThresholdMs) { + mismatches.push({ + type: 'latency', + streamLatency: streamResponse.latency, + compareLatency: compareResponse.latency, + difference: latencyDiff, + }); + } + + // Log mismatches if any found + if (mismatches.length > 0) { + const logEntry = { + requestId, + method: requestBody.method, + mismatches, + streamEndpoint: this.streamEndpoint, + compareEndpoint: this.compareEndpoint, + }; + + // Include full compare response if size differs significantly + if (sizeDiff > config.sizeDiffThreshold) { + try { + logEntry.compareResponseData = JSON.parse(compareResponse.data); + logEntry.streamResponseData = JSON.parse(streamResponse.data); + } catch (e) { + // If not valid JSON, include raw data + logEntry.compareResponseData = compareResponse.data; + logEntry.streamResponseData = streamResponse.data; + } + } + + logger.warn(logEntry, 'Response mismatch detected'); + } else { + logger.debug({ + requestId, + method: requestBody.method, + streamLatency: streamResponse.latency, + compareLatency: compareResponse.latency, + }, 'Responses match'); + } + } +} + +module.exports = RPCProxy; \ No newline at end of file