From d03c00f9a9dc500aca64b451cee8aff30b738ea0 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 29 May 2025 13:57:43 +0000 Subject: [PATCH] Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue. --- benchmark_proxy_rust/Cargo.toml | 22 + .../src/block_height_tracker.rs | 343 ++++++++++++++++ benchmark_proxy_rust/src/config.rs | 169 ++++++++ benchmark_proxy_rust/src/main.rs | 12 + benchmark_proxy_rust/src/request_handler.rs | 272 +++++++++++++ benchmark_proxy_rust/src/rpc_utils.rs | 92 +++++ benchmark_proxy_rust/src/secondary_probe.rs | 383 ++++++++++++++++++ benchmark_proxy_rust/src/stats_collector.rs | 290 +++++++++++++ benchmark_proxy_rust/src/structures.rs | 107 +++++ benchmark_proxy_rust/src/websocket_handler.rs | 228 +++++++++++ 10 files changed, 1918 insertions(+) create mode 100644 benchmark_proxy_rust/Cargo.toml create mode 100644 benchmark_proxy_rust/src/block_height_tracker.rs create mode 100644 benchmark_proxy_rust/src/config.rs create mode 100644 benchmark_proxy_rust/src/main.rs create mode 100644 benchmark_proxy_rust/src/request_handler.rs create mode 100644 benchmark_proxy_rust/src/rpc_utils.rs create mode 100644 benchmark_proxy_rust/src/secondary_probe.rs create mode 100644 benchmark_proxy_rust/src/stats_collector.rs create mode 100644 benchmark_proxy_rust/src/structures.rs create mode 100644 benchmark_proxy_rust/src/websocket_handler.rs diff --git a/benchmark_proxy_rust/Cargo.toml b/benchmark_proxy_rust/Cargo.toml new file mode 100644 index 00000000..da8f095b --- /dev/null +++ b/benchmark_proxy_rust/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "benchmark_proxy_rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1", features = ["full"] } +hyper = { version = "0.14", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio-tungstenite = { version = "0.21", features = ["native-tls"] } +log = "0.4" +env_logger = "0.10" +dashmap = "5.5" +reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false } +thiserror = "1.0" +futures-util = "0.3" +http = "0.2" +url = "2.5" +lazy_static = "1.4.0" diff --git a/benchmark_proxy_rust/src/block_height_tracker.rs b/benchmark_proxy_rust/src/block_height_tracker.rs new file mode 100644 index 00000000..07b21586 --- /dev/null +++ b/benchmark_proxy_rust/src/block_height_tracker.rs @@ -0,0 +1,343 @@ +use crate::{config::AppConfig, structures::Backend}; +use dashmap::DashMap; +use futures_util::{stream::SplitSink, SinkExt, StreamExt}; +use log::{debug, error, info, warn}; +use serde_json::json; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::{Duration, SystemTime}, +}; +use tokio::{ + net::TcpStream, + sync::watch, + task::JoinHandle, + time::sleep, +}; +use tokio_tungstenite::{ + connect_async, + tungstenite::{protocol::Message as TungsteniteMessage, Error as TungsteniteError}, + MaybeTlsStream, WebSocketStream, +}; +use url::Url; + +const RECONNECT_DELAY: Duration = Duration::from_secs(10); + +#[derive(serde::Deserialize, Debug)] +struct SubscriptionMessage { + #[allow(dead_code)] // May not be used if only checking method + jsonrpc: Option, + method: Option, + params: Option, + result: Option, // For subscription ID confirmation + id: Option, // For request echo +} + +#[derive(serde::Deserialize, Debug)] +struct SubscriptionParams { + subscription: String, + result: HeaderData, +} + +#[derive(serde::Deserialize, Debug)] +struct HeaderData { + number: String, // Hex string like "0x123" + // Add other fields like "hash" if ever needed for more advanced logic +} + +pub struct BlockHeightTracker { + config: Arc, + backends: Vec, + block_heights: Arc>, + last_update_times: Arc>, + shutdown_tx: watch::Sender, + tasks: Arc>>>, + enable_detailed_logs: bool, +} + +impl BlockHeightTracker { + pub fn new( + config: Arc, + all_backends: &[Backend], + ) -> Option> { + if !config.enable_block_height_tracking { + info!("BlockHeightTracker disabled by configuration."); + return None; + } + + info!("Initializing BlockHeightTracker for {} backends.", all_backends.len()); + let (shutdown_tx, _shutdown_rx) = watch::channel(false); // _shutdown_rx cloned by tasks + + Some(Arc::new(Self { + config: config.clone(), + backends: all_backends.to_vec(), // Clones the slice into a Vec + block_heights: Arc::new(DashMap::new()), + last_update_times: Arc::new(DashMap::new()), + shutdown_tx, + tasks: Arc::new(Mutex::new(Vec::new())), + enable_detailed_logs: config.enable_detailed_logs, + })) + } + + pub fn start_monitoring(self: Arc) { + if self.backends.is_empty() { + info!("BHT: No backends configured for monitoring."); + return; + } + info!("BHT: Starting block height monitoring for {} backends.", self.backends.len()); + let mut tasks_guard = self.tasks.lock().unwrap(); + for backend in self.backends.clone() { + // Only monitor if backend has a URL, primarily for non-primary roles or specific needs + // For this implementation, we assume all backends in the list are candidates. + let task_self = self.clone(); + let task_backend = backend.clone(); // Clone backend for the task + let task_shutdown_rx = self.shutdown_tx.subscribe(); + + let task = tokio::spawn(async move { + task_self + .monitor_backend_connection(task_backend, task_shutdown_rx) + .await; + }); + tasks_guard.push(task); + } + } + + async fn monitor_backend_connection( + self: Arc, + backend: Backend, + mut shutdown_rx: watch::Receiver, + ) { + info!("BHT: Starting monitoring for backend: {}", backend.name); + loop { // Outer reconnect loop + tokio::select! { + biased; + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!("BHT: Shutdown signal received for {}, terminating monitoring.", backend.name); + break; // Break outer reconnect loop + } + } + _ = tokio::time::sleep(Duration::from_millis(10)) => { // Give a chance for shutdown signal before attempting connection + // Proceed to connection attempt + } + } + if *shutdown_rx.borrow() { break; } + + + let mut ws_url = backend.url.clone(); + let scheme = if backend.url.scheme() == "https" { "wss" } else { "ws" }; + if let Err(_e) = ws_url.set_scheme(scheme) { + error!("BHT: Failed to set scheme to {} for backend {}: {}", scheme, backend.name, backend.url); + sleep(RECONNECT_DELAY).await; + continue; + } + + if self.enable_detailed_logs { + debug!("BHT: Attempting to connect to {} for backend {}", ws_url, backend.name); + } + + match connect_async(ws_url.clone()).await { + Ok((ws_stream, _response)) => { + if self.enable_detailed_logs { + info!("BHT: Successfully connected to WebSocket for backend: {}", backend.name); + } + let (mut write, mut read) = ws_stream.split(); + + let subscribe_payload = json!({ + "jsonrpc": "2.0", + "method": "eth_subscribe", + "params": ["newHeads"], + "id": 1 // Static ID for this subscription + }); + + if let Err(e) = write.send(TungsteniteMessage::Text(subscribe_payload.to_string())).await { + error!("BHT: Failed to send eth_subscribe to {}: {}. Retrying connection.", backend.name, e); + // Connection will be retried by the outer loop after delay + sleep(RECONNECT_DELAY).await; + continue; + } + if self.enable_detailed_logs { + debug!("BHT: Sent eth_subscribe payload to {}", backend.name); + } + + // Inner message reading loop + loop { + tokio::select! { + biased; + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!("BHT: Shutdown signal for {}, closing WebSocket and stopping.", backend.name); + // Attempt to close the WebSocket gracefully + let _ = write.send(TungsteniteMessage::Close(None)).await; + break; // Break inner message_read_loop + } + } + maybe_message = read.next() => { + match maybe_message { + Some(Ok(message)) => { + match message { + TungsteniteMessage::Text(text_msg) => { + if self.enable_detailed_logs { + debug!("BHT: Received text from {}: {}", backend.name, text_msg); + } + match serde_json::from_str::(&text_msg) { + Ok(parsed_msg) => { + if parsed_msg.method.as_deref() == Some("eth_subscription") { + if let Some(params) = parsed_msg.params { + let block_num_str = params.result.number; + match u64::from_str_radix(block_num_str.trim_start_matches("0x"), 16) { + Ok(block_num) => { + self.block_heights.insert(backend.name.clone(), block_num); + self.last_update_times.insert(backend.name.clone(), SystemTime::now()); + if self.enable_detailed_logs { + debug!("BHT: Updated block height for {}: {} (raw: {})", backend.name, block_num, block_num_str); + } + } + Err(e) => error!("BHT: Failed to parse block number hex '{}' for {}: {}", block_num_str, backend.name, e), + } + } + } else if parsed_msg.id == Some(json!(1)) && parsed_msg.result.is_some() { + if self.enable_detailed_logs { + info!("BHT: Received subscription confirmation from {}: {:?}", backend.name, parsed_msg.result); + } + } else { + if self.enable_detailed_logs { + debug!("BHT: Received other JSON message from {}: {:?}", backend.name, parsed_msg); + } + } + } + Err(e) => { + if self.enable_detailed_logs { + warn!("BHT: Failed to parse JSON from {}: {}. Message: {}", backend.name, e, text_msg); + } + } + } + } + TungsteniteMessage::Binary(bin_msg) => { + if self.enable_detailed_logs { + debug!("BHT: Received binary message from {} ({} bytes), ignoring.", backend.name, bin_msg.len()); + } + } + TungsteniteMessage::Ping(ping_data) => { + if self.enable_detailed_logs { debug!("BHT: Received Ping from {}, sending Pong.", backend.name); } + // tokio-tungstenite handles Pongs automatically by default if feature "rustls-pong" or "native-tls-pong" is enabled. + // If not, manual send: + // if let Err(e) = write.send(TungsteniteMessage::Pong(ping_data)).await { + // error!("BHT: Failed to send Pong to {}: {}", backend.name, e); + // break; // Break inner loop, connection might be unstable + // } + } + TungsteniteMessage::Pong(_) => { /* Usually no action needed */ } + TungsteniteMessage::Close(_) => { + if self.enable_detailed_logs { info!("BHT: WebSocket closed by server for {}.", backend.name); } + break; // Break inner loop + } + TungsteniteMessage::Frame(_) => { /* Raw frame, usually not handled directly */ } + } + } + Some(Err(e)) => { + match e { + TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => { + if self.enable_detailed_logs { info!("BHT: WebSocket connection closed for {}.", backend.name); } + } + _ => { + error!("BHT: Error reading from WebSocket for {}: {:?}. Attempting reconnect.", backend.name, e); + } + } + break; // Break inner loop, will trigger reconnect + } + None => { + if self.enable_detailed_logs { info!("BHT: WebSocket stream ended for {}. Attempting reconnect.", backend.name); } + break; // Break inner loop, will trigger reconnect + } + } + } + } // End of inner select + if *shutdown_rx.borrow() { break; } // Ensure inner loop breaks if shutdown occurred + } // End of inner message reading loop + } + Err(e) => { + warn!("BHT: Failed to connect to WebSocket for backend {}: {:?}. Retrying after delay.", backend.name, e); + } + } + // If we are here, it means the connection was dropped or failed. Wait before retrying. + if !*shutdown_rx.borrow() { // Don't sleep if shutting down + sleep(RECONNECT_DELAY).await; + } + } // End of outer reconnect loop + info!("BHT: Stopped monitoring backend {}.", backend.name); + } + + pub fn is_secondary_behind(&self, secondary_name: &str) -> bool { + if !self.config.enable_block_height_tracking { return false; } // If tracking is off, assume not behind + + let primary_info = self.backends.iter().find(|b| b.role == "primary"); + let primary_name = match primary_info { + Some(b) => b.name.clone(), + None => { + if self.enable_detailed_logs { + warn!("BHT: No primary backend configured for is_secondary_behind check."); + } + return false; + } + }; + + let primary_height_opt = self.block_heights.get(&primary_name).map(|h_ref| *h_ref.value()); + + let primary_height = match primary_height_opt { + Some(h) => h, + None => { + if self.enable_detailed_logs { + debug!("BHT: Primary '{}' height unknown for is_secondary_behind check with {}.", primary_name, secondary_name); + } + return false; // Primary height unknown, can't reliably determine if secondary is behind + } + }; + + let secondary_height_opt = self.block_heights.get(secondary_name).map(|h_ref| *h_ref.value()); + + match secondary_height_opt { + Some(secondary_height_val) => { + if primary_height > secondary_height_val { + let diff = primary_height - secondary_height_val; + let is_behind = diff > self.config.max_blocks_behind; + if self.enable_detailed_logs && is_behind { + debug!("BHT: Secondary '{}' (height {}) is behind primary '{}' (height {}). Diff: {}, Max allowed: {}", + secondary_name, secondary_height_val, primary_name, primary_height, diff, self.config.max_blocks_behind); + } + return is_behind; + } + false // Secondary is not behind or is ahead + } + None => { + if self.enable_detailed_logs { + debug!("BHT: Secondary '{}' height unknown, considering it behind primary '{}' (height {}).", secondary_name, primary_name, primary_height); + } + true // Secondary height unknown, assume it's behind if primary height is known + } + } + } + + pub fn get_block_height_status(&self) -> HashMap { + self.block_heights + .iter() + .map(|entry| (entry.key().clone(), *entry.value())) + .collect() + } + + pub async fn stop(&self) { + info!("BHT: Sending shutdown signal to all monitoring tasks..."); + if self.shutdown_tx.send(true).is_err() { + error!("BHT: Failed to send shutdown signal. Tasks might not terminate gracefully."); + } + + let mut tasks_guard = self.tasks.lock().unwrap(); + info!("BHT: Awaiting termination of {} monitoring tasks...", tasks_guard.len()); + for task in tasks_guard.drain(..) { + if let Err(e) = task.await { + error!("BHT: Error awaiting task termination: {:?}", e); + } + } + info!("BHT: All monitoring tasks terminated."); + } +} diff --git a/benchmark_proxy_rust/src/config.rs b/benchmark_proxy_rust/src/config.rs new file mode 100644 index 00000000..9c75231c --- /dev/null +++ b/benchmark_proxy_rust/src/config.rs @@ -0,0 +1,169 @@ +use std::env; +use std::str::FromStr; +use std::time::Duration; +use url::Url; +use thiserror::Error; +use log::{warn, info}; + +#[derive(Debug, Error)] +pub enum ConfigError { + #[error("Failed to parse environment variable '{var_name}': {source}")] + ParseError { + var_name: String, + source: Box, + }, + #[error("Missing required environment variable: {var_name}")] + MissingVariable { var_name: String }, + #[error("Invalid URL format for '{var_name}': {url_str} - {source}")] + UrlParseError { + var_name: String, + url_str: String, + source: url::ParseError, + }, +} + +#[derive(Debug, Clone)] +pub struct AppConfig { + pub listen_addr: String, + pub primary_backend_url: Url, + pub secondary_backend_urls: Vec, + pub summary_interval_secs: u64, + pub enable_detailed_logs: bool, + pub enable_secondary_probing: bool, + pub probe_interval_secs: u64, + pub min_delay_buffer_ms: u64, + pub probe_methods: Vec, + pub enable_block_height_tracking: bool, + pub max_blocks_behind: u64, + pub enable_expensive_method_routing: bool, + pub max_body_size_bytes: usize, + pub http_client_timeout_secs: u64, + pub request_context_timeout_secs: u64, +} + +// Helper function to get and parse environment variables +fn get_env_var(key: &str, default_value: T) -> T +where + ::Err: std::fmt::Display, +{ + match env::var(key) { + Ok(val_str) => match val_str.parse::() { + Ok(val) => val, + Err(e) => { + warn!( + "Failed to parse environment variable '{}' with value '{}': {}. Using default: {:?}", + key, val_str, e, default_value + ); + default_value + } + }, + Err(_) => default_value, + } +} + +// Helper function for boolean environment variables +fn get_env_var_bool(key: &str, default_value: bool) -> bool { + match env::var(key) { + Ok(val_str) => val_str.to_lowercase() == "true", + Err(_) => default_value, + } +} + +// Helper function for Vec from comma-separated string +fn get_env_var_vec_string(key: &str, default_value: Vec) -> Vec { + match env::var(key) { + Ok(val_str) => { + if val_str.is_empty() { + default_value + } else { + val_str.split(',').map(|s| s.trim().to_string()).collect() + } + } + Err(_) => default_value, + } +} + +// Helper function for Vec from comma-separated string +fn get_env_var_vec_url(key: &str, default_value: Vec) -> Result, ConfigError> { + match env::var(key) { + Ok(val_str) => { + if val_str.is_empty() { + return Ok(default_value); + } + val_str + .split(',') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|url_str| { + Url::parse(url_str).map_err(|e| ConfigError::UrlParseError { + var_name: key.to_string(), + url_str: url_str.to_string(), + source: e, + }) + }) + .collect() + } + Err(_) => Ok(default_value), + } +} + + +pub fn load_from_env() -> Result { + info!("Loading configuration from environment variables..."); + + let primary_backend_url_str = env::var("PRIMARY_BACKEND_URL").map_err(|_| { + ConfigError::MissingVariable { + var_name: "PRIMARY_BACKEND_URL".to_string(), + } + })?; + let primary_backend_url = + Url::parse(&primary_backend_url_str).map_err(|e| ConfigError::UrlParseError { + var_name: "PRIMARY_BACKEND_URL".to_string(), + url_str: primary_backend_url_str, + source: e, + })?; + + let secondary_backend_urls = get_env_var_vec_url("SECONDARY_BACKEND_URLS", Vec::new())?; + + let config = AppConfig { + listen_addr: get_env_var("LISTEN_ADDR", "127.0.0.1:8080".to_string()), + primary_backend_url, + secondary_backend_urls, + summary_interval_secs: get_env_var("SUMMARY_INTERVAL_SECS", 60), + enable_detailed_logs: get_env_var_bool("ENABLE_DETAILED_LOGS", false), + enable_secondary_probing: get_env_var_bool("ENABLE_SECONDARY_PROBING", true), + probe_interval_secs: get_env_var("PROBE_INTERVAL_SECS", 10), + min_delay_buffer_ms: get_env_var("MIN_DELAY_BUFFER_MS", 500), + probe_methods: get_env_var_vec_string( + "PROBE_METHODS", + vec!["eth_blockNumber".to_string(), "net_version".to_string()], + ), + enable_block_height_tracking: get_env_var_bool("ENABLE_BLOCK_HEIGHT_TRACKING", true), + max_blocks_behind: get_env_var("MAX_BLOCKS_BEHIND", 5), + enable_expensive_method_routing: get_env_var_bool("ENABLE_EXPENSIVE_METHOD_ROUTING", false), + max_body_size_bytes: get_env_var("MAX_BODY_SIZE_BYTES", 10 * 1024 * 1024), // 10MB + http_client_timeout_secs: get_env_var("HTTP_CLIENT_TIMEOUT_SECS", 30), + request_context_timeout_secs: get_env_var("REQUEST_CONTEXT_TIMEOUT_SECS", 35), + }; + + info!("Configuration loaded successfully: {:?}", config); + Ok(config) +} + +impl AppConfig { + pub fn http_client_timeout(&self) -> Duration { + Duration::from_secs(self.http_client_timeout_secs) + } + + pub fn request_context_timeout(&self) -> Duration { + Duration::from_secs(self.request_context_timeout_secs) + } + + pub fn summary_interval(&self) -> Duration { + Duration::from_secs(self.summary_interval_secs) + } + + pub fn probe_interval(&self) -> Duration { + Duration::from_secs(self.probe_interval_secs) + } +} diff --git a/benchmark_proxy_rust/src/main.rs b/benchmark_proxy_rust/src/main.rs new file mode 100644 index 00000000..716f28af --- /dev/null +++ b/benchmark_proxy_rust/src/main.rs @@ -0,0 +1,12 @@ +pub mod structures; +pub mod config; +pub mod stats_collector; +pub mod secondary_probe; +pub mod block_height_tracker; +pub mod rpc_utils; +pub mod request_handler; +pub mod websocket_handler; + +fn main() { + println!("Hello, world!"); +} diff --git a/benchmark_proxy_rust/src/request_handler.rs b/benchmark_proxy_rust/src/request_handler.rs new file mode 100644 index 00000000..a95851af --- /dev/null +++ b/benchmark_proxy_rust/src/request_handler.rs @@ -0,0 +1,272 @@ +use bytes::Bytes; +use hyper::{Body, Request, Response, StatusCode}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use log; + +use crate::config::AppConfig; +use crate::stats_collector::StatsCollector; +use crate::secondary_probe::SecondaryProbe; +use crate::block_height_tracker::BlockHeightTracker; +use crate::structures::{Backend, BatchInfo}; +use crate::rpc_utils; + +#[derive(Debug)] +pub enum BackendResult { + Success { + backend_name: String, + response: reqwest::Response, // Send the whole reqwest::Response + duration: std::time::Duration, + }, + Error { + backend_name: String, + error: reqwest::Error, // Send the reqwest::Error + duration: std::time::Duration, + }, +} + +fn calculate_secondary_delay( + batch_info: &crate::structures::BatchInfo, + probe: &Option>, + stats: &Arc, + _config: &Arc, // _config might be used later for more complex logic +) -> std::time::Duration { + let mut max_delay = std::time::Duration::from_millis(0); + let default_delay = std::time::Duration::from_millis(25); // Default from Go + + if batch_info.methods.is_empty() { + return default_delay; + } + + for method_name in &batch_info.methods { + let current_method_delay = if let Some(p) = probe { + p.get_delay_for_method(method_name) + } else { + // This will use the stubbed method from StatsCollector which currently returns 25ms + stats.get_primary_p75_for_method(method_name) + }; + if current_method_delay > max_delay { + max_delay = current_method_delay; + } + } + + if max_delay == std::time::Duration::from_millis(0) { // if all methods were unknown or had 0 delay + if let Some(p) = probe { + // Go code uses: probe.minResponseTime + probe.minDelayBuffer + // probe.get_delay_for_method("") would approximate this if it falls back to min_response_time + buffer + return p.get_delay_for_method(""); // Assuming empty method falls back to base delay + } + return default_delay; + } + max_delay +} + +pub async fn handle_http_request( + req: Request, + config: Arc, + stats_collector: Arc, + http_client: Arc, + secondary_probe: Option>, + block_height_tracker: Option>, + all_backends: Arc>, +) -> Result, Box> { + let _overall_start_time = std::time::Instant::now(); // To be used later with request_context_timeout + + // 1. Read and limit request body + let limited_body = hyper::body::Limited::new(req.into_body(), config.max_body_size_bytes); + let body_bytes = match hyper::body::to_bytes(limited_body).await { + Ok(bytes) => bytes, + Err(e) => { + log::error!("Failed to read request body or limit exceeded: {}", e); + let mut err_resp = Response::new(Body::from(format!("Request body error: {}", e))); + *err_resp.status_mut() = if e.is::() && e.downcast_ref::().map_or(false, |he| he.is_body_write_aborted() || format!("{}", he).contains("Too Large")) { // A bit heuristic for "Too Large" + StatusCode::PAYLOAD_TOO_LARGE + } else { + StatusCode::BAD_REQUEST + }; + return Ok(err_resp); + } + }; + + // 2. Parse Batch Info + let batch_info = match rpc_utils::parse_batch_info(&body_bytes) { + Ok(info) => info, + Err(e) => { + log::error!("Invalid JSON-RPC request: {}", e); + let mut err_resp = Response::new(Body::from(format!("Invalid JSON-RPC: {}", e))); + *err_resp.status_mut() = StatusCode::BAD_REQUEST; + return Ok(err_resp); + } + }; + + let display_method = if batch_info.is_batch { + format!("batch[{}]", batch_info.request_count) + } else { + batch_info.methods.get(0).cloned().unwrap_or_else(|| "unknown".to_string()) + }; + log::info!("Received request: Method: {}, IsBatch: {}, NumMethods: {}", display_method, batch_info.is_batch, batch_info.methods.len()); + + // 3. Calculate Secondary Delay + let secondary_delay = calculate_secondary_delay(&batch_info, &secondary_probe, &stats_collector, &config); + if config.enable_detailed_logs { + log::debug!("Method: {}, Calculated secondary delay: {:?}", display_method, secondary_delay); + } + + // 4. Backend Filtering & Expensive Method Routing + let mut target_backends: Vec = (*all_backends).clone(); + + if batch_info.has_stateful { + log::debug!("Stateful method detected in request '{}', targeting primary only.", display_method); + target_backends.retain(|b| b.role == "primary"); + } else { + // Filter by block height + if let Some(bht) = &block_height_tracker { + if config.enable_block_height_tracking { // Check if feature is enabled + target_backends.retain(|b| { + if b.role != "primary" && bht.is_secondary_behind(&b.name) { + if config.enable_detailed_logs { log::info!("Skipping secondary {}: behind in block height for request {}", b.name, display_method); } + // TODO: Add stat for skipped due to block height + false + } else { true } + }); + } + } + // Filter by probe availability + if let Some(sp) = &secondary_probe { + if config.enable_secondary_probing { // Check if feature is enabled + target_backends.retain(|b| { + if b.role != "primary" && !sp.is_backend_available(&b.name) { + if config.enable_detailed_logs { log::info!("Skipping secondary {}: not available via probe for request {}", b.name, display_method); } + // TODO: Add stat for skipped due to probe unavailable + false + } else { true } + }); + } + } + } + + let is_req_expensive = batch_info.methods.iter().any(|m| rpc_utils::is_expensive_method(m)) || + batch_info.methods.iter().any(|m| stats_collector.is_expensive_method_by_stats(m)); // Stubbed + + if config.enable_expensive_method_routing && is_req_expensive && !batch_info.has_stateful { + log::debug!("Expensive method detected in request {}. Attempting to route to a secondary.", display_method); + // TODO: Complex expensive method routing logic. + // For now, this placeholder doesn't change target_backends. + // A real implementation would try to find the best secondary or stick to primary if none are suitable. + } + + // 5. Concurrent Request Dispatch + let (response_tx, mut response_rx) = mpsc::channel::(target_backends.len().max(1)); + let mut dispatched_count = 0; + + for backend in target_backends { // target_backends is now filtered + dispatched_count += 1; + let task_body_bytes = body_bytes.clone(); + let task_http_client = http_client.clone(); + let task_response_tx = response_tx.clone(); + // task_backend_name, task_backend_url, task_backend_role are cloned from 'backend' + let task_backend_name = backend.name.clone(); + let task_backend_url = backend.url.clone(); + let task_backend_role = backend.role.clone(); + let task_secondary_delay = secondary_delay; + let task_config_detailed_logs = config.enable_detailed_logs; + let task_http_timeout = config.http_client_timeout(); // Get Duration from config + + tokio::spawn(async move { + let backend_req_start_time = std::time::Instant::now(); + + if task_backend_role != "primary" { + if task_config_detailed_logs { + log::debug!("Secondary backend {} for request {} delaying for {:?}", task_backend_name, display_method, task_secondary_delay); + } + tokio::time::sleep(task_secondary_delay).await; + } + + let result = task_http_client + .post(task_backend_url) + .header("Content-Type", "application/json") + // TODO: Copy relevant headers from original request 'req.headers()' + .body(task_body_bytes) + .timeout(task_http_timeout) + .send() + .await; + + let duration = backend_req_start_time.elapsed(); + + match result { + Ok(resp) => { + if task_config_detailed_logs { + log::debug!("Backend {} for request {} responded with status {}", task_backend_name, display_method, resp.status()); + } + if task_response_tx.send(BackendResult::Success { + backend_name: task_backend_name, + response: resp, + duration, + }).await.is_err() { + log::error!("Failed to send success to channel for request {}: receiver dropped", display_method); + } + } + Err(err) => { + if task_config_detailed_logs { + log::error!("Backend {} for request {} request failed: {}", task_backend_name, display_method, err); + } + if task_response_tx.send(BackendResult::Error { + backend_name: task_backend_name, + error: err, + duration, + }).await.is_err() { + log::error!("Failed to send error to channel for request {}: receiver dropped", display_method); + } + } + } + }); + } + drop(response_tx); + + if dispatched_count == 0 { + log::warn!("No backends available to dispatch request for method {}", display_method); + // TODO: Add stat for no backend available + let mut err_resp = Response::new(Body::from("No available backends for this request type.")); + *err_resp.status_mut() = StatusCode::SERVICE_UNAVAILABLE; + return Ok(err_resp); + } + + // Placeholder: return the first received response + if let Some(first_result) = response_rx.recv().await { + if config.enable_detailed_logs { + log::info!("First backend response for request {}: {:?}", display_method, first_result); + } + + match first_result { + BackendResult::Success { backend_name: _, response: reqwest_resp, duration: _ } => { + let mut hyper_resp_builder = Response::builder().status(reqwest_resp.status()); + for (name, value) in reqwest_resp.headers().iter() { + hyper_resp_builder = hyper_resp_builder.header(name.clone(), value.clone()); + } + let hyper_resp = hyper_resp_builder + .body(Body::wrap_stream(reqwest_resp.bytes_stream())) + .unwrap_or_else(|e| { + log::error!("Error building response from backend for request {}: {}", display_method, e); + let mut err_resp = Response::new(Body::from("Error processing backend response")); + *err_resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + err_resp + }); + return Ok(hyper_resp); + } + BackendResult::Error { backend_name, error, duration: _ } => { + log::error!("First response for request {} was an error from {}: {}", display_method, backend_name, error); + let mut err_resp = Response::new(Body::from(format!("Error from backend {}: {}", backend_name, error))); + *err_resp.status_mut() = StatusCode::BAD_GATEWAY; + return Ok(err_resp); + } + } + } else { + log::error!("No responses received from any dispatched backend for method {}", display_method); + // TODO: Add stat for no response received + let mut err_resp = Response::new(Body::from("No response from any backend.")); + *err_resp.status_mut() = StatusCode::GATEWAY_TIMEOUT; + return Ok(err_resp); + } + // Note: Overall request context timeout and full response aggregation logic are still TODOs. +} diff --git a/benchmark_proxy_rust/src/rpc_utils.rs b/benchmark_proxy_rust/src/rpc_utils.rs new file mode 100644 index 00000000..864df912 --- /dev/null +++ b/benchmark_proxy_rust/src/rpc_utils.rs @@ -0,0 +1,92 @@ +use crate::structures::{BatchInfo, JsonRpcRequest}; +use std::collections::HashSet; +use log; +use serde_json; // Added for parsing + +fn get_stateful_methods() -> HashSet<&'static str> { + [ + "eth_newFilter", "eth_newBlockFilter", "eth_newPendingTransactionFilter", + "eth_getFilterChanges", "eth_getFilterLogs", "eth_uninstallFilter", + "eth_subscribe", "eth_unsubscribe", "eth_subscription", // "eth_subscription" is a notification, not a method client calls. + // But if it appears in a batch for some reason, it's state-related. + ] + .iter() + .cloned() + .collect() +} + +fn get_expensive_methods() -> HashSet<&'static str> { + [ + // Ethereum Debug API (typically Geth-specific) + "debug_traceBlockByHash", "debug_traceBlockByNumber", "debug_traceCall", "debug_traceTransaction", + "debug_storageRangeAt", "debug_getModifiedAccountsByHash", "debug_getModifiedAccountsByNumber", + // Erigon/OpenEthereum Trace Module (more standard) + "trace_block", "trace_call", "trace_callMany", "trace_filter", "trace_get", "trace_rawTransaction", + "trace_replayBlockTransactions", "trace_replayTransaction", "trace_transaction", + // Specific combinations that might be considered extra expensive + "trace_replayBlockTransactions#vmTrace", // Example, depends on actual usage if # is method part + "trace_replayTransaction#vmTrace", + ] + .iter() + .cloned() + .collect() +} + +lazy_static::lazy_static! { + static ref STATEFUL_METHODS: HashSet<&'static str> = get_stateful_methods(); + static ref EXPENSIVE_METHODS: HashSet<&'static str> = get_expensive_methods(); +} + +pub fn is_stateful_method(method: &str) -> bool { + STATEFUL_METHODS.contains(method) +} + +pub fn is_expensive_method(method: &str) -> bool { + EXPENSIVE_METHODS.contains(method) +} + +pub fn parse_batch_info(body_bytes: &[u8]) -> Result { + if body_bytes.is_empty() { + return Err("Empty request body".to_string()); + } + + // Try parsing as a batch (array) first + match serde_json::from_slice::>(body_bytes) { + Ok(batch_reqs) => { + if batch_reqs.is_empty() { + return Err("Empty batch request".to_string()); + } + let mut methods = Vec::new(); + let mut has_stateful = false; + for req in &batch_reqs { + methods.push(req.method.clone()); + if is_stateful_method(&req.method) { + has_stateful = true; + } + } + Ok(BatchInfo { + is_batch: true, + methods, + request_count: batch_reqs.len(), + has_stateful, + }) + } + Err(_e_batch) => { + // If not a batch, try parsing as a single request + match serde_json::from_slice::(body_bytes) { + Ok(single_req) => Ok(BatchInfo { + is_batch: false, + methods: vec![single_req.method.clone()], + request_count: 1, + has_stateful: is_stateful_method(&single_req.method), + }), + Err(_e_single) => { + // Log the actual errors if needed for debugging, but return a generic one + log::debug!("Failed to parse as batch: {}", _e_batch); + log::debug!("Failed to parse as single: {}", _e_single); + Err("Invalid JSON-RPC request format. Not a valid single request or batch.".to_string()) + } + } + } + } +} diff --git a/benchmark_proxy_rust/src/secondary_probe.rs b/benchmark_proxy_rust/src/secondary_probe.rs new file mode 100644 index 00000000..e767b723 --- /dev/null +++ b/benchmark_proxy_rust/src/secondary_probe.rs @@ -0,0 +1,383 @@ +use crate::{ + config::AppConfig, + structures::{Backend, JsonRpcRequest}, +}; +use chrono::Utc; +use dashmap::DashMap; +use log::{debug, error, info, warn}; +use reqwest::Client; +use serde_json::json; +use std::{ + cmp::min, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, Mutex, RwLock, + }, + time::{Duration, SystemTime}, +}; +use tokio::sync::watch; + +const PROBE_REQUEST_COUNT: usize = 10; +const DEFAULT_MIN_RESPONSE_TIME_MS: u64 = 15; +const PROBE_CYCLE_DELAY_MS: u64 = 10; + +pub struct SecondaryProbe { + config: Arc, + backends: Vec, // Only secondary backends + client: Client, + min_response_time: Arc>, + method_timings: Arc>, // method_name -> min_duration + backend_timings: Arc>, // backend_name -> min_duration + + // Health state per backend + backend_available: Arc>, + backend_error_count: Arc>, + backend_consecutive_success_count: Arc>, // For recovery + backend_last_success: Arc>>, + + last_probe_time: Arc>, + failure_count: Arc, // Consecutive overall probe cycle failures + last_success_time: Arc>, // Last time any probe in an overall cycle succeeded + + shutdown_tx: watch::Sender, + shutdown_rx: watch::Receiver, + enable_detailed_logs: bool, +} + +impl SecondaryProbe { + pub fn new( + config: Arc, + all_backends: &[Backend], + client: Client, + ) -> Option> { + let secondary_backends: Vec = all_backends + .iter() + .filter(|b| b.role.to_lowercase() == "secondary") + .cloned() + .collect(); + + if secondary_backends.is_empty() { + info!("No secondary backends configured. SecondaryProbe will not be initialized."); + return None; + } + + info!( + "Initializing SecondaryProbe for {} secondary backends.", + secondary_backends.len() + ); + + let backend_available = Arc::new(DashMap::new()); + let backend_error_count = Arc::new(DashMap::new()); + let backend_consecutive_success_count = Arc::new(DashMap::new()); + let backend_last_success = Arc::new(DashMap::new()); + + for backend in &secondary_backends { + backend_available.insert(backend.name.clone(), true); + backend_error_count.insert(backend.name.clone(), AtomicU32::new(0)); + backend_consecutive_success_count.insert(backend.name.clone(), AtomicU32::new(0)); + backend_last_success.insert(backend.name.clone(), Mutex::new(SystemTime::now())); + info!(" - Backend '{}' ({}) initialized as available.", backend.name, backend.url); + } + + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + Some(Arc::new(Self { + config: config.clone(), + backends: secondary_backends, + client, + min_response_time: Arc::new(RwLock::new(Duration::from_millis( + DEFAULT_MIN_RESPONSE_TIME_MS, // Or load from config if needed + ))), + method_timings: Arc::new(DashMap::new()), + backend_timings: Arc::new(DashMap::new()), + backend_available, + backend_error_count, + backend_consecutive_success_count, + backend_last_success, + last_probe_time: Arc::new(Mutex::new(SystemTime::now())), + failure_count: Arc::new(AtomicU32::new(0)), + last_success_time: Arc::new(Mutex::new(SystemTime::now())), + shutdown_tx, + shutdown_rx, // Receiver is cloneable + enable_detailed_logs: config.enable_detailed_logs, + })) + } + + pub fn start_periodic_probing(self: Arc) { + if self.backends.is_empty() { + info!("No secondary backends to probe. Periodic probing will not start."); + return; + } + + info!( + "Starting periodic probing for {} secondary backends. Probe interval: {}s. Probe methods: {:?}. Max errors: {}, Recovery threshold: {}.", + self.backends.len(), + self.config.probe_interval_secs, + self.config.probe_methods, + self.config.max_error_threshold, + self.config.recovery_threshold + ); + + // Run initial probe + let initial_probe_self = self.clone(); + tokio::spawn(async move { + if initial_probe_self.enable_detailed_logs { + debug!("Running initial probe..."); + } + initial_probe_self.run_probe().await; + if initial_probe_self.enable_detailed_logs { + debug!("Initial probe finished."); + } + }); + + // Start periodic probing task + let mut interval = tokio::time::interval(self.config.probe_interval()); + let mut shutdown_rx_clone = self.shutdown_rx.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = interval.tick() => { + if self.enable_detailed_logs { + debug!("Running periodic probe cycle..."); + } + self.run_probe().await; + if self.enable_detailed_logs { + debug!("Periodic probe cycle finished."); + } + } + res = shutdown_rx_clone.changed() => { + if res.is_err() || *shutdown_rx_clone.borrow() { + info!("SecondaryProbe: Shutdown signal received or channel closed, stopping periodic probing."); + break; + } + } + } + } + info!("SecondaryProbe: Periodic probing task has stopped."); + }); + } + + async fn run_probe(&self) { + let mut successful_probes_in_overall_cycle = 0; + let mut temp_method_timings: DashMap = DashMap::new(); // method_name -> min_duration for this cycle + let mut temp_backend_timings: DashMap = DashMap::new(); // backend_name -> min_duration for this cycle + let mut temp_overall_min_response_time = Duration::MAX; + + for backend in &self.backends { + let mut backend_cycle_successful_probes = 0; + let mut backend_cycle_min_duration = Duration::MAX; + + for method_name in &self.config.probe_methods { + let mut method_min_duration_for_backend_this_cycle = Duration::MAX; + + for i in 0..PROBE_REQUEST_COUNT { + let probe_id = format!( + "probe-{}-{}-{}-{}", + backend.name, + method_name, + Utc::now().timestamp_nanos_opt().unwrap_or_else(|| SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos() as i64), + i + ); + let request_body = JsonRpcRequest { + method: method_name.clone(), + params: Some(json!([])), + id: Some(json!(probe_id)), + jsonrpc: Some("2.0".to_string()), + }; + + let start_time = SystemTime::now(); + match self.client.post(backend.url.clone()).json(&request_body).timeout(self.config.http_client_timeout()).send().await { + Ok(response) => { + let duration = start_time.elapsed().unwrap_or_default(); + if response.status().is_success() { + // TODO: Optionally parse JSON RPC response for error field + backend_cycle_successful_probes += 1; + successful_probes_in_overall_cycle += 1; + + method_min_duration_for_backend_this_cycle = min(method_min_duration_for_backend_this_cycle, duration); + backend_cycle_min_duration = min(backend_cycle_min_duration, duration); + temp_overall_min_response_time = min(temp_overall_min_response_time, duration); + + if self.enable_detailed_logs { + debug!("Probe success: {} method {} ID {} took {:?}.", backend.name, method_name, probe_id, duration); + } + } else { + if self.enable_detailed_logs { + warn!("Probe failed (HTTP status {}): {} method {} ID {}. Body: {:?}", response.status(), backend.name, method_name, probe_id, response.text().await.unwrap_or_default()); + } + } + } + Err(e) => { + if self.enable_detailed_logs { + warn!("Probe error (request failed): {} method {} ID {}: {:?}", backend.name, method_name, probe_id, e); + } + } + } + tokio::time::sleep(Duration::from_millis(PROBE_CYCLE_DELAY_MS)).await; + } // End of PROBE_REQUEST_COUNT loop + + if method_min_duration_for_backend_this_cycle != Duration::MAX { + temp_method_timings + .entry(method_name.clone()) + .and_modify(|current_min| *current_min = min(*current_min, method_min_duration_for_backend_this_cycle)) + .or_insert(method_min_duration_for_backend_this_cycle); + } + } // End of probe_methods loop + + if backend_cycle_min_duration != Duration::MAX { + temp_backend_timings.insert(backend.name.clone(), backend_cycle_min_duration); + } + self.update_backend_health(&backend.name, backend_cycle_successful_probes > 0); + if self.enable_detailed_logs { + debug!( + "Probe sub-cycle for backend {}: {} successful probes. Min duration for this backend this cycle: {:?}. Current health: available={}", + backend.name, + backend_cycle_successful_probes, + if backend_cycle_min_duration == Duration::MAX { None } else { Some(backend_cycle_min_duration) }, + self.is_backend_available(&backend.name) + ); + } + } // End of backends loop + + // Update overall timings if any probe in the cycle was successful + if successful_probes_in_overall_cycle > 0 { + if temp_overall_min_response_time != Duration::MAX { + let mut min_resp_time_guard = self.min_response_time.write().unwrap(); + *min_resp_time_guard = min(*min_resp_time_guard, temp_overall_min_response_time); + if self.enable_detailed_logs { + debug!("Global min_response_time updated to: {:?}", *min_resp_time_guard); + } + } + + for entry in temp_method_timings.iter() { + self.method_timings + .entry(entry.key().clone()) + .and_modify(|current_min| *current_min = min(*current_min, *entry.value())) + .or_insert(*entry.value()); + if self.enable_detailed_logs { + debug!("Global method_timing for {} updated/set to: {:?}", entry.key(), *entry.value()); + } + } + + for entry in temp_backend_timings.iter() { + self.backend_timings + .entry(entry.key().clone()) + .and_modify(|current_min| *current_min = min(*current_min, *entry.value())) + .or_insert(*entry.value()); + if self.enable_detailed_logs { + debug!("Global backend_timing for {} updated/set to: {:?}", entry.key(), *entry.value()); + } + } + + self.failure_count.store(0, Ordering::Relaxed); + *self.last_success_time.lock().unwrap() = SystemTime::now(); + if self.enable_detailed_logs { + info!("Overall probe cycle completed with {} successes. Overall failure count reset.", successful_probes_in_overall_cycle); + } + + } else { + let prev_failures = self.failure_count.fetch_add(1, Ordering::Relaxed); + warn!( + "Overall probe cycle completed with NO successful probes. Overall failure count incremented to {}.", + prev_failures + 1 + ); + } + + *self.last_probe_time.lock().unwrap() = SystemTime::now(); + } + + fn update_backend_health(&self, backend_name: &str, is_cycle_success: bool) { + let current_availability = self.is_backend_available(backend_name); + let error_count_entry = self.backend_error_count.entry(backend_name.to_string()).or_insert_with(|| AtomicU32::new(0)); + let consecutive_success_entry = self.backend_consecutive_success_count.entry(backend_name.to_string()).or_insert_with(|| AtomicU32::new(0)); + + if is_cycle_success { + error_count_entry.store(0, Ordering::Relaxed); + consecutive_success_entry.fetch_add(1, Ordering::Relaxed); + if let Some(mut last_success_guard) = self.backend_last_success.get_mut(backend_name) { + *last_success_guard.lock().unwrap() = SystemTime::now(); + } + + if !current_availability { + let successes = consecutive_success_entry.load(Ordering::Relaxed); + if successes >= self.config.recovery_threshold { + self.backend_available.insert(backend_name.to_string(), true); + info!("Backend {} recovered and is now AVAILABLE ({} consecutive successes met threshold {}).", backend_name, successes, self.config.recovery_threshold); + consecutive_success_entry.store(0, Ordering::Relaxed); // Reset after recovery + } else { + if self.enable_detailed_logs { + debug!("Backend {} had a successful probe cycle. Consecutive successes: {}. Needs {} for recovery.", backend_name, successes, self.config.recovery_threshold); + } + } + } else { + if self.enable_detailed_logs { + debug!("Backend {} remains available, successful probe cycle.", backend_name); + } + } + } else { // Probe cycle failed for this backend + consecutive_success_entry.store(0, Ordering::Relaxed); // Reset consecutive successes on any failure + let current_errors = error_count_entry.fetch_add(1, Ordering::Relaxed) + 1; // +1 because fetch_add returns previous value + + if current_availability && current_errors >= self.config.max_error_threshold { + self.backend_available.insert(backend_name.to_string(), false); + warn!( + "Backend {} has become UNAVAILABLE due to {} errors (threshold {}).", + backend_name, current_errors, self.config.max_error_threshold + ); + } else { + if self.enable_detailed_logs { + if current_availability { + debug!("Backend {} is still available but error count increased to {}. Max errors before unavailable: {}", backend_name, current_errors, self.config.max_error_threshold); + } else { + debug!("Backend {} remains UNAVAILABLE, error count now {}.", backend_name, current_errors); + } + } + } + } + } + + pub fn get_delay_for_method(&self, method_name: &str) -> Duration { + let base_delay = self + .method_timings + .get(method_name) + .map(|timing_ref| *timing_ref.value()) + .unwrap_or_else(|| *self.min_response_time.read().unwrap()); // Read lock + + let buffer = Duration::from_millis(self.config.min_delay_buffer_ms); + let calculated_delay = base_delay.saturating_add(buffer); + + let overall_failures = self.failure_count.load(Ordering::Relaxed); + // Consider last_success_time to see if failures are recent and persistent + let time_since_last_overall_success = SystemTime::now() + .duration_since(*self.last_success_time.lock().unwrap()) // Lock for last_success_time + .unwrap_or_default(); + + // Fallback logic: if many consecutive failures AND last success was long ago + if overall_failures >= 3 && time_since_last_overall_success > self.config.probe_interval().saturating_mul(3) { + warn!( + "Probes failing ({} consecutive, last overall success {:?} ago). Using conservative fixed delay for method {}.", + overall_failures, time_since_last_overall_success, method_name + ); + return Duration::from_millis(self.config.min_delay_buffer_ms.saturating_mul(3)); + } + + if self.enable_detailed_logs { + debug!("Delay for method '{}': base {:?}, buffer {:?}, final {:?}", method_name, base_delay, buffer, calculated_delay); + } + calculated_delay + } + + pub fn is_backend_available(&self, backend_name: &str) -> bool { + self.backend_available + .get(backend_name) + .map_or(false, |entry| *entry.value()) + } + + pub fn stop(&self) { + info!("SecondaryProbe: Sending shutdown signal..."); + if self.shutdown_tx.send(true).is_err() { + error!("Failed to send shutdown signal to SecondaryProbe task. It might have already stopped or had no receiver."); + } + } +} diff --git a/benchmark_proxy_rust/src/stats_collector.rs b/benchmark_proxy_rust/src/stats_collector.rs new file mode 100644 index 00000000..0fe0b897 --- /dev/null +++ b/benchmark_proxy_rust/src/stats_collector.rs @@ -0,0 +1,290 @@ +use crate::structures::{ResponseStats, WebSocketStats, CuDataPoint, Backend}; +use crate::block_height_tracker::BlockHeightTracker; +use crate::secondary_probe::SecondaryProbe; +use std::time::{Duration, SystemTime}; +use std::sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}}; +use dashmap::DashMap; +use log::{debug, error, info, warn}; + +pub struct StatsCollector { + pub request_stats: Arc>>, + pub method_stats: Arc>>>, // method_name -> list of durations for primary + pub backend_method_stats: Arc>>>>, // backend_name -> method_name -> list of durations + pub backend_wins: Arc>, // backend_name -> count + pub method_backend_wins: Arc>>, // method_name -> backend_name -> count + pub first_response_durations: Arc>>, + pub actual_first_response_durations: Arc>>, + pub method_first_response_durations: Arc>>>, + pub method_actual_first_response_durations: Arc>>>, + pub total_requests: Arc>, + pub error_count: Arc>, + pub skipped_secondary_requests: Arc>, + pub ws_stats: Arc>>, + pub total_ws_connections: Arc>, + pub app_start_time: SystemTime, + pub interval_start_time: Arc>, + pub summary_interval: Duration, + pub method_cu_prices: Arc>, + pub total_cu: Arc>, + pub method_cu: Arc>, // method_name -> total CU for this method in interval + pub historical_cu: Arc>>, + pub has_secondary_backends: bool, + // Placeholders for probe and tracker - actual types will be defined later + // pub secondary_probe: Option>, + // pub block_height_tracker: Option>, +} + +impl StatsCollector { + pub fn new(summary_interval: Duration, has_secondary_backends: bool) -> Self { + let method_cu_prices = Arc::new(DashMap::new()); + Self::init_cu_prices(&method_cu_prices); + + StatsCollector { + request_stats: Arc::new(Mutex::new(Vec::new())), + method_stats: Arc::new(DashMap::new()), + backend_method_stats: Arc::new(DashMap::new()), + backend_wins: Arc::new(DashMap::new()), + method_backend_wins: Arc::new(DashMap::new()), + first_response_durations: Arc::new(Mutex::new(Vec::new())), + actual_first_response_durations: Arc::new(Mutex::new(Vec::new())), + method_first_response_durations: Arc::new(DashMap::new()), + method_actual_first_response_durations: Arc::new(DashMap::new()), + total_requests: Arc::new(AtomicU64::new(0)), + error_count: Arc::new(AtomicU64::new(0)), + skipped_secondary_requests: Arc::new(AtomicU64::new(0)), + ws_stats: Arc::new(Mutex::new(Vec::new())), + total_ws_connections: Arc::new(AtomicU64::new(0)), + app_start_time: SystemTime::now(), + interval_start_time: Arc::new(Mutex::new(SystemTime::now())), + summary_interval, + method_cu_prices, + total_cu: Arc::new(AtomicU64::new(0)), + method_cu: Arc::new(DashMap::new()), + historical_cu: Arc::new(Mutex::new(Vec::new())), + has_secondary_backends, + } + } + + fn init_cu_prices(prices_map: &DashMap) { + // Base CU + prices_map.insert("eth_call".to_string(), 100); + prices_map.insert("eth_estimateGas".to_string(), 150); + prices_map.insert("eth_getLogs".to_string(), 200); + prices_map.insert("eth_sendRawTransaction".to_string(), 250); + prices_map.insert("trace_call".to_string(), 300); + prices_map.insert("trace_replayBlockTransactions".to_string(), 500); + // Default for unknown methods + prices_map.insert("default".to_string(), 50); + } + + pub fn add_stats(&self, stats_vec: Vec) { + if stats_vec.is_empty() { + warn!("add_stats called with empty stats_vec"); + return; + } + + self.total_requests.fetch_add(1, Ordering::Relaxed); + + let mut primary_stats: Option<&ResponseStats> = None; + let mut winning_backend_name: Option = None; + let mut actual_first_response_duration: Option = None; + let mut first_response_duration_from_primary_or_fastest_secondary: Option = None; + + // Find the 'actual-first-response' if present and the primary response + for stat in &stats_vec { + if stat.backend_name == "actual-first-response" { + actual_first_response_duration = Some(stat.duration); + } else if stat.backend_name.contains("-primary") { // Assuming primary name contains "-primary" + primary_stats = Some(stat); + } + } + + let method_name = primary_stats.map_or_else( + || stats_vec.first().map_or_else(|| "unknown".to_string(), |s| s.method.clone()), + |ps| ps.method.clone() + ); + + + // Determine winning backend and first_response_duration_from_primary_or_fastest_secondary + if self.has_secondary_backends { + let mut fastest_duration = Duration::MAX; + for stat in stats_vec.iter().filter(|s| s.backend_name != "actual-first-response" && s.error.is_none()) { + if stat.duration < fastest_duration { + fastest_duration = stat.duration; + winning_backend_name = Some(stat.backend_name.clone()); + } + } + if fastest_duration != Duration::MAX { + first_response_duration_from_primary_or_fastest_secondary = Some(fastest_duration); + } + } else { + // If no secondary backends, primary is the winner if no error + if let Some(ps) = primary_stats { + if ps.error.is_none() { + winning_backend_name = Some(ps.backend_name.clone()); + first_response_duration_from_primary_or_fastest_secondary = Some(ps.duration); + } + } + } + + // If no winner determined yet (e.g. all errored, or no secondary and primary errored), + // and if primary_stats exists, consider it as the "winner" for error tracking purposes. + if winning_backend_name.is_none() && primary_stats.is_some() { + winning_backend_name = Some(primary_stats.unwrap().backend_name.clone()); + } + + + // Update backend_wins and method_backend_wins + if let Some(ref winner_name) = winning_backend_name { + self.backend_wins.entry(winner_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(1, Ordering::Relaxed); + self.method_backend_wins.entry(method_name.clone()).or_default().entry(winner_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(1, Ordering::Relaxed); + } + + // Update first_response_durations and actual_first_response_durations + if let Some(duration) = first_response_duration_from_primary_or_fastest_secondary { + self.first_response_durations.lock().unwrap().push(duration); + self.method_first_response_durations.entry(method_name.clone()).or_insert_with(|| Mutex::new(Vec::new())).lock().unwrap().push(duration); + } + + if let Some(duration) = actual_first_response_duration { + self.actual_first_response_durations.lock().unwrap().push(duration); + self.method_actual_first_response_durations.entry(method_name.clone()).or_insert_with(|| Mutex::new(Vec::new())).lock().unwrap().push(duration); + } + + + let mut request_stats_guard = self.request_stats.lock().unwrap(); + for stat in stats_vec { + if stat.backend_name == "actual-first-response" { // Already handled + continue; + } + + request_stats_guard.push(stat.clone()); + + if stat.error.is_some() { + if stat.error.as_deref() == Some("skipped by primary due to min_delay_buffer") { + self.skipped_secondary_requests.fetch_add(1, Ordering::Relaxed); + } else { + self.error_count.fetch_add(1, Ordering::Relaxed); + } + } + + // Update backend_method_stats for all backends + self.backend_method_stats + .entry(stat.backend_name.clone()) + .or_default() + .entry(stat.method.clone()) + .or_insert_with(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(stat.duration); + + + // If the winning backend is primary and it's not a batch (batch handled separately), update method_stats and CUs + // Assuming primary_stats contains the correct method name for CU calculation + if let Some(ref winner_name_val) = winning_backend_name { + if &stat.backend_name == winner_name_val && stat.backend_name.contains("-primary") && stat.error.is_none() { + // Update method_stats (for primary) + self.method_stats + .entry(stat.method.clone()) + .or_insert_with(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(stat.duration); + + // Update CU + let cu_price = self.method_cu_prices.get(&stat.method).map_or_else( + || self.method_cu_prices.get("default").map_or(0, |p| *p.value()), + |p| *p.value() + ); + if cu_price > 0 { + self.total_cu.fetch_add(cu_price, Ordering::Relaxed); + self.method_cu.entry(stat.method.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(cu_price, Ordering::Relaxed); + } + } + } + } + } + + pub fn add_batch_stats(&self, methods: &[String], duration: Duration, backend_name: &str) { + if !backend_name.contains("-primary") { // Only primary processes batches directly for now + warn!("add_batch_stats called for non-primary backend: {}", backend_name); + return; + } + + let mut batch_cu: u64 = 0; + for method_name in methods { + let cu_price = self.method_cu_prices.get(method_name).map_or_else( + || self.method_cu_prices.get("default").map_or(0, |p| *p.value()), + |p| *p.value() + ); + batch_cu += cu_price; + + if cu_price > 0 { + self.method_cu.entry(method_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(cu_price, Ordering::Relaxed); + } + + // Update method_stats for each method in the batch on the primary + self.method_stats + .entry(method_name.clone()) + .or_insert_with(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(duration); // Using the same duration for all methods in the batch as an approximation + + // Update backend_method_stats + self.backend_method_stats + .entry(backend_name.to_string()) + .or_default() + .entry(method_name.clone()) + .or_insert_with(|| Mutex::new(Vec::new())) + .lock() + .unwrap() + .push(duration); + } + + if batch_cu > 0 { + self.total_cu.fetch_add(batch_cu, Ordering::Relaxed); + } + // Note: total_requests is incremented by add_stats which should be called for the overall batch request + } + + + pub fn add_websocket_stats(&self, ws_stat: WebSocketStats) { + if ws_stat.error.is_some() { + self.error_count.fetch_add(1, Ordering::Relaxed); + } + self.ws_stats.lock().unwrap().push(ws_stat); + self.total_ws_connections.fetch_add(1, Ordering::Relaxed); + } + + // STUBBED METHODS - to be implemented later + pub fn get_primary_p75_for_method(&self, _method: &str) -> std::time::Duration { + // Placeholder: return a default fixed duration + log::debug!("StatsCollector::get_primary_p75_for_method called (stub)"); + std::time::Duration::from_millis(25) // Default from Go's calculateBatchDelay fallback + } + + pub fn get_primary_p50_for_method(&self, _method: &str) -> std::time::Duration { + // Placeholder: return a default fixed duration + log::debug!("StatsCollector::get_primary_p50_for_method called (stub)"); + std::time::Duration::from_millis(15) + } + + pub fn is_expensive_method_by_stats(&self, _method: &str) -> bool { + // Placeholder: always return false + log::debug!("StatsCollector::is_expensive_method_by_stats called (stub)"); + false + } + + pub fn select_best_secondary_for_expensive_method( + &self, + _method: &str, + _backends: &[Backend], + _block_height_tracker: &Option>, + _secondary_probe: &Option>, + ) -> Option { + // Placeholder: always return None + log::debug!("StatsCollector::select_best_secondary_for_expensive_method called (stub)"); + None + } +} diff --git a/benchmark_proxy_rust/src/structures.rs b/benchmark_proxy_rust/src/structures.rs new file mode 100644 index 00000000..dc4f0572 --- /dev/null +++ b/benchmark_proxy_rust/src/structures.rs @@ -0,0 +1,107 @@ +use serde::{Serialize, Deserialize}; +use url::Url; +use http::StatusCode; +use std::time::{Duration, SystemTime}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct JsonRpcRequest { + pub method: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub jsonrpc: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct BatchInfo { + pub is_batch: bool, + pub methods: Vec, + pub request_count: usize, + pub has_stateful: bool, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Backend { + #[serde(with = "url_serde")] + pub url: Url, + pub name: String, + pub role: String, // Consider an enum BackendRole { Primary, Secondary } later +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ResponseStats { + pub backend_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(with = "http_serde_status_code_option", default)] + pub status_code: Option, + #[serde(with = "humantime_serde")] + pub duration: Duration, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + pub method: String, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct WebSocketStats { + pub backend_name: String, + pub error: Option, // Default Option serde is fine + pub connect_time: std::time::Duration, // Default Duration serde (secs/nanos struct) + pub is_active: bool, + pub client_to_backend_messages: u64, + pub backend_to_client_messages: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct CuDataPoint { + pub timestamp: SystemTime, + pub cu: u64, +} + +// Helper module for serializing/deserializing Option +mod http_serde_status_code_option { + use http::StatusCode; + use serde::{self, Deserializer, Serializer, AsOwned}; + + pub fn serialize(status_code: &Option, serializer: S) -> Result + where + S: Serializer, + { + match status_code { + Some(sc) => serializer.serialize_some(&sc.as_u16()), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + Option::::deserialize(deserializer)? + .map(|code| StatusCode::from_u16(code).map_err(serde::de::Error::custom)) + .transpose() + } +} + +// Helper module for serializing/deserializing url::Url +mod url_serde { + use url::Url; + use serde::{self, Deserializer, Serializer}; + + pub fn serialize(url: &Url, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(url.as_str()) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + String::deserialize(deserializer)? + .parse() + .map_err(serde::de::Error::custom) + } +} diff --git a/benchmark_proxy_rust/src/websocket_handler.rs b/benchmark_proxy_rust/src/websocket_handler.rs new file mode 100644 index 00000000..518889bc --- /dev/null +++ b/benchmark_proxy_rust/src/websocket_handler.rs @@ -0,0 +1,228 @@ +use std::sync::Arc; +use std::time::{Duration, Instant}; +use hyper::{Body, Request, Response, StatusCode}; +use hyper_tungstenite::HyperWebsocket; +use log; +use tokio_tungstenite::tungstenite::protocol::Message; +use futures_util::{stream::StreamExt, sink::SinkExt}; + +use crate::config::AppConfig; +use crate::stats_collector::StatsCollector; +use crate::structures::{Backend, WebSocketStats}; // Ensure WebSocketStats has new fields + +pub async fn handle_websocket_request( + mut req: Request, + app_config: Arc, + stats_collector: Arc, + all_backends: Arc>, +) -> Result, Box> { + let upgrade_start_time = Instant::now(); + + // Check for upgrade request + if !hyper_tungstenite::is_upgrade_request(&req) { + log::warn!("Not a WebSocket upgrade request"); + let mut resp = Response::new(Body::from("Not a WebSocket upgrade request")); + *resp.status_mut() = StatusCode::BAD_REQUEST; + return Ok(resp); + } + + // Attempt to upgrade the connection + let (response, websocket) = match hyper_tungstenite::upgrade(&mut req, None) { + Ok((resp, ws)) => (resp, ws), + Err(e) => { + log::error!("WebSocket upgrade failed: {}", e); + let mut resp = Response::new(Body::from(format!("WebSocket upgrade failed: {}", e))); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; // Or BAD_REQUEST + return Ok(resp); + } + }; + + // Spawn a task to handle the WebSocket connection after sending 101 + tokio::spawn(async move { + match websocket.await { + Ok(ws_stream) => { + let client_ws_stream = ws_stream; + if app_config.enable_detailed_logs { + log::info!("Client WebSocket connection established."); + } + // Successfully upgraded client connection, now connect to primary backend + proxy_websocket_to_primary(client_ws_stream, app_config, stats_collector, all_backends).await; + } + Err(e) => { + log::error!("Error awaiting client WebSocket upgrade: {}", e); + // No actual client WS connection to record stats against other than the failed upgrade attempt + let stats = WebSocketStats { + backend_name: "client_upgrade_failed".to_string(), + error: Some(format!("Client WS upgrade await error: {}", e)), + connect_time: upgrade_start_time.elapsed(), + is_active: false, + client_to_backend_messages: 0, + backend_to_client_messages: 0, + }; + stats_collector.add_websocket_stats(stats); + } + } + }); + + // Return the 101 Switching Protocols response to the client + Ok(response) +} + +async fn proxy_websocket_to_primary( + mut client_ws_stream: HyperWebsocket, // Made mutable for close() + app_config: Arc, + stats_collector: Arc, + all_backends: Arc>, +) { + let connect_to_primary_start_time = Instant::now(); + let mut client_to_backend_msg_count: u64 = 0; + let mut backend_to_client_msg_count: u64 = 0; + let mut ws_stats_error: Option = None; + let mut backend_name_for_stats = "primary_unknown".to_string(); + + // 1. Find Primary Backend + let primary_backend = match all_backends.iter().find(|b| b.role == "primary") { + Some(pb) => { + backend_name_for_stats = pb.name.clone(); + pb + } + None => { + log::error!("No primary backend configured for WebSocket proxy."); + ws_stats_error = Some("No primary backend configured".to_string()); + // Close client connection gracefully if possible + let _ = client_ws_stream.close(None).await; // HyperWebsocket uses close method + // Record stats and return + let stats = WebSocketStats { + backend_name: backend_name_for_stats, + error: ws_stats_error, + connect_time: connect_to_primary_start_time.elapsed(), + is_active: false, + client_to_backend_messages, + backend_to_client_messages, + }; + stats_collector.add_websocket_stats(stats); + return; + } + }; + + backend_name_for_stats = primary_backend.name.clone(); // Ensure it's set if primary_backend was found + + // 2. Connect to Primary Backend's WebSocket + let mut ws_url = primary_backend.url.clone(); + let scheme = if ws_url.scheme() == "https" { "wss" } else { "ws" }; + if ws_url.set_scheme(scheme).is_err() { + log::error!("Failed to set WebSocket scheme for backend URL: {}", primary_backend.url); + ws_stats_error = Some(format!("Invalid backend URL scheme for {}", primary_backend.url)); + let _ = client_ws_stream.close(None).await; + let stats = WebSocketStats { + backend_name: backend_name_for_stats, + error: ws_stats_error, + connect_time: connect_to_primary_start_time.elapsed(), + is_active: false, + client_to_backend_messages, + backend_to_client_messages, + }; + stats_collector.add_websocket_stats(stats); + return; + } + + let backend_connect_attempt_time = Instant::now(); + let backend_ws_result = tokio_tungstenite::connect_async(ws_url.clone()).await; + let connect_duration = backend_connect_attempt_time.elapsed(); // This is backend connection time + + let backend_ws_stream_conn = match backend_ws_result { + Ok((stream, _response)) => { + if app_config.enable_detailed_logs { + log::info!("Successfully connected to primary backend WebSocket: {}", primary_backend.name); + } + stream + } + Err(e) => { + log::error!("Failed to connect to primary backend {} WebSocket: {}", primary_backend.name, e); + ws_stats_error = Some(format!("Primary backend connect error: {}", e)); + let _ = client_ws_stream.close(None).await; // Close client connection + let stats = WebSocketStats { + backend_name: backend_name_for_stats, + error: ws_stats_error, + connect_time: connect_duration, + is_active: false, + client_to_backend_messages, + backend_to_client_messages, + }; + stats_collector.add_websocket_stats(stats); + return; + } + }; + + // 3. Proxying Logic + let (mut client_ws_tx, mut client_ws_rx) = client_ws_stream.split(); + let (mut backend_ws_tx, mut backend_ws_rx) = backend_ws_stream_conn.split(); + + let client_to_backend_task = async { + while let Some(msg_result) = client_ws_rx.next().await { + match msg_result { + Ok(msg) => { + if app_config.enable_detailed_logs { log::trace!("C->B: {:?}", msg); } + if backend_ws_tx.send(msg).await.is_err() { + if app_config.enable_detailed_logs { log::debug!("Error sending to backend, C->B loop breaking."); } + break; + } + client_to_backend_msg_count += 1; + } + Err(e) => { + log::warn!("Error reading from client WebSocket: {}", e); + // Use a closure to capture `e` by reference for the format macro. + ws_stats_error.get_or_insert_with(|| { let e_ref = &e; format!("Client read error: {}", e_ref) }); + break; + } + } + } + // Try to close the backend sink gracefully if client read loop ends + if app_config.enable_detailed_logs { log::debug!("C->B proxy loop finished. Closing backend_ws_tx.");} + let _ = backend_ws_tx.close().await; + }; + + let backend_to_client_task = async { + while let Some(msg_result) = backend_ws_rx.next().await { + match msg_result { + Ok(msg) => { + if app_config.enable_detailed_logs { log::trace!("B->C: {:?}", msg); } + if client_ws_tx.send(msg).await.is_err() { + if app_config.enable_detailed_logs { log::debug!("Error sending to client, B->C loop breaking."); } + break; + } + backend_to_client_msg_count += 1; + } + Err(e) => { + log::warn!("Error reading from backend WebSocket: {}", e); + // Use a closure to capture `e` by reference for the format macro. + ws_stats_error.get_or_insert_with(|| { let e_ref = &e; format!("Backend read error: {}", e_ref) }); + break; + } + } + } + // Try to close the client sink gracefully if backend read loop ends + if app_config.enable_detailed_logs { log::debug!("B->C proxy loop finished. Closing client_ws_tx.");} + let _ = client_ws_tx.close().await; + }; + + // Run both proxy tasks concurrently + tokio::join!(client_to_backend_task, backend_to_client_task); + + if app_config.enable_detailed_logs { + log::info!("WebSocket proxying ended for {}. Client->Backend: {}, Backend->Client: {}. Error: {:?}", + backend_name_for_stats, client_to_backend_msg_count, backend_to_client_msg_count, ws_stats_error); + } + + let final_session_duration = connect_to_primary_start_time.elapsed(); + + let final_stats = WebSocketStats { + backend_name: backend_name_for_stats, + error: ws_stats_error, + connect_time: final_session_duration, + is_active: false, // Session is now over + client_to_backend_messages, + backend_to_client_messages, + }; + stats_collector.add_websocket_stats(final_stats); +}