Jules was unable to complete the task in time. Please review the work done so far and provide feedback for Jules to continue.

This commit is contained in:
google-labs-jules[bot]
2025-05-29 13:57:43 +00:00
parent f9ea6d118c
commit d03c00f9a9
10 changed files with 1918 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
[package]
name = "benchmark_proxy_rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["full"] }
hyper = { version = "0.14", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
log = "0.4"
env_logger = "0.10"
dashmap = "5.5"
reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false }
thiserror = "1.0"
futures-util = "0.3"
http = "0.2"
url = "2.5"
lazy_static = "1.4.0"

View File

@@ -0,0 +1,343 @@
use crate::{config::AppConfig, structures::Backend};
use dashmap::DashMap;
use futures_util::{stream::SplitSink, SinkExt, StreamExt};
use log::{debug, error, info, warn};
use serde_json::json;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use tokio::{
net::TcpStream,
sync::watch,
task::JoinHandle,
time::sleep,
};
use tokio_tungstenite::{
connect_async,
tungstenite::{protocol::Message as TungsteniteMessage, Error as TungsteniteError},
MaybeTlsStream, WebSocketStream,
};
use url::Url;
const RECONNECT_DELAY: Duration = Duration::from_secs(10);
#[derive(serde::Deserialize, Debug)]
struct SubscriptionMessage {
#[allow(dead_code)] // May not be used if only checking method
jsonrpc: Option<String>,
method: Option<String>,
params: Option<SubscriptionParams>,
result: Option<serde_json::Value>, // For subscription ID confirmation
id: Option<serde_json::Value>, // For request echo
}
#[derive(serde::Deserialize, Debug)]
struct SubscriptionParams {
subscription: String,
result: HeaderData,
}
#[derive(serde::Deserialize, Debug)]
struct HeaderData {
number: String, // Hex string like "0x123"
// Add other fields like "hash" if ever needed for more advanced logic
}
pub struct BlockHeightTracker {
config: Arc<AppConfig>,
backends: Vec<Backend>,
block_heights: Arc<DashMap<String, u64>>,
last_update_times: Arc<DashMap<String, SystemTime>>,
shutdown_tx: watch::Sender<bool>,
tasks: Arc<Mutex<Vec<JoinHandle<()>>>>,
enable_detailed_logs: bool,
}
impl BlockHeightTracker {
pub fn new(
config: Arc<AppConfig>,
all_backends: &[Backend],
) -> Option<Arc<Self>> {
if !config.enable_block_height_tracking {
info!("BlockHeightTracker disabled by configuration.");
return None;
}
info!("Initializing BlockHeightTracker for {} backends.", all_backends.len());
let (shutdown_tx, _shutdown_rx) = watch::channel(false); // _shutdown_rx cloned by tasks
Some(Arc::new(Self {
config: config.clone(),
backends: all_backends.to_vec(), // Clones the slice into a Vec
block_heights: Arc::new(DashMap::new()),
last_update_times: Arc::new(DashMap::new()),
shutdown_tx,
tasks: Arc::new(Mutex::new(Vec::new())),
enable_detailed_logs: config.enable_detailed_logs,
}))
}
pub fn start_monitoring(self: Arc<Self>) {
if self.backends.is_empty() {
info!("BHT: No backends configured for monitoring.");
return;
}
info!("BHT: Starting block height monitoring for {} backends.", self.backends.len());
let mut tasks_guard = self.tasks.lock().unwrap();
for backend in self.backends.clone() {
// Only monitor if backend has a URL, primarily for non-primary roles or specific needs
// For this implementation, we assume all backends in the list are candidates.
let task_self = self.clone();
let task_backend = backend.clone(); // Clone backend for the task
let task_shutdown_rx = self.shutdown_tx.subscribe();
let task = tokio::spawn(async move {
task_self
.monitor_backend_connection(task_backend, task_shutdown_rx)
.await;
});
tasks_guard.push(task);
}
}
async fn monitor_backend_connection(
self: Arc<Self>,
backend: Backend,
mut shutdown_rx: watch::Receiver<bool>,
) {
info!("BHT: Starting monitoring for backend: {}", backend.name);
loop { // Outer reconnect loop
tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("BHT: Shutdown signal received for {}, terminating monitoring.", backend.name);
break; // Break outer reconnect loop
}
}
_ = tokio::time::sleep(Duration::from_millis(10)) => { // Give a chance for shutdown signal before attempting connection
// Proceed to connection attempt
}
}
if *shutdown_rx.borrow() { break; }
let mut ws_url = backend.url.clone();
let scheme = if backend.url.scheme() == "https" { "wss" } else { "ws" };
if let Err(_e) = ws_url.set_scheme(scheme) {
error!("BHT: Failed to set scheme to {} for backend {}: {}", scheme, backend.name, backend.url);
sleep(RECONNECT_DELAY).await;
continue;
}
if self.enable_detailed_logs {
debug!("BHT: Attempting to connect to {} for backend {}", ws_url, backend.name);
}
match connect_async(ws_url.clone()).await {
Ok((ws_stream, _response)) => {
if self.enable_detailed_logs {
info!("BHT: Successfully connected to WebSocket for backend: {}", backend.name);
}
let (mut write, mut read) = ws_stream.split();
let subscribe_payload = json!({
"jsonrpc": "2.0",
"method": "eth_subscribe",
"params": ["newHeads"],
"id": 1 // Static ID for this subscription
});
if let Err(e) = write.send(TungsteniteMessage::Text(subscribe_payload.to_string())).await {
error!("BHT: Failed to send eth_subscribe to {}: {}. Retrying connection.", backend.name, e);
// Connection will be retried by the outer loop after delay
sleep(RECONNECT_DELAY).await;
continue;
}
if self.enable_detailed_logs {
debug!("BHT: Sent eth_subscribe payload to {}", backend.name);
}
// Inner message reading loop
loop {
tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("BHT: Shutdown signal for {}, closing WebSocket and stopping.", backend.name);
// Attempt to close the WebSocket gracefully
let _ = write.send(TungsteniteMessage::Close(None)).await;
break; // Break inner message_read_loop
}
}
maybe_message = read.next() => {
match maybe_message {
Some(Ok(message)) => {
match message {
TungsteniteMessage::Text(text_msg) => {
if self.enable_detailed_logs {
debug!("BHT: Received text from {}: {}", backend.name, text_msg);
}
match serde_json::from_str::<SubscriptionMessage>(&text_msg) {
Ok(parsed_msg) => {
if parsed_msg.method.as_deref() == Some("eth_subscription") {
if let Some(params) = parsed_msg.params {
let block_num_str = params.result.number;
match u64::from_str_radix(block_num_str.trim_start_matches("0x"), 16) {
Ok(block_num) => {
self.block_heights.insert(backend.name.clone(), block_num);
self.last_update_times.insert(backend.name.clone(), SystemTime::now());
if self.enable_detailed_logs {
debug!("BHT: Updated block height for {}: {} (raw: {})", backend.name, block_num, block_num_str);
}
}
Err(e) => error!("BHT: Failed to parse block number hex '{}' for {}: {}", block_num_str, backend.name, e),
}
}
} else if parsed_msg.id == Some(json!(1)) && parsed_msg.result.is_some() {
if self.enable_detailed_logs {
info!("BHT: Received subscription confirmation from {}: {:?}", backend.name, parsed_msg.result);
}
} else {
if self.enable_detailed_logs {
debug!("BHT: Received other JSON message from {}: {:?}", backend.name, parsed_msg);
}
}
}
Err(e) => {
if self.enable_detailed_logs {
warn!("BHT: Failed to parse JSON from {}: {}. Message: {}", backend.name, e, text_msg);
}
}
}
}
TungsteniteMessage::Binary(bin_msg) => {
if self.enable_detailed_logs {
debug!("BHT: Received binary message from {} ({} bytes), ignoring.", backend.name, bin_msg.len());
}
}
TungsteniteMessage::Ping(ping_data) => {
if self.enable_detailed_logs { debug!("BHT: Received Ping from {}, sending Pong.", backend.name); }
// tokio-tungstenite handles Pongs automatically by default if feature "rustls-pong" or "native-tls-pong" is enabled.
// If not, manual send:
// if let Err(e) = write.send(TungsteniteMessage::Pong(ping_data)).await {
// error!("BHT: Failed to send Pong to {}: {}", backend.name, e);
// break; // Break inner loop, connection might be unstable
// }
}
TungsteniteMessage::Pong(_) => { /* Usually no action needed */ }
TungsteniteMessage::Close(_) => {
if self.enable_detailed_logs { info!("BHT: WebSocket closed by server for {}.", backend.name); }
break; // Break inner loop
}
TungsteniteMessage::Frame(_) => { /* Raw frame, usually not handled directly */ }
}
}
Some(Err(e)) => {
match e {
TungsteniteError::ConnectionClosed | TungsteniteError::AlreadyClosed => {
if self.enable_detailed_logs { info!("BHT: WebSocket connection closed for {}.", backend.name); }
}
_ => {
error!("BHT: Error reading from WebSocket for {}: {:?}. Attempting reconnect.", backend.name, e);
}
}
break; // Break inner loop, will trigger reconnect
}
None => {
if self.enable_detailed_logs { info!("BHT: WebSocket stream ended for {}. Attempting reconnect.", backend.name); }
break; // Break inner loop, will trigger reconnect
}
}
}
} // End of inner select
if *shutdown_rx.borrow() { break; } // Ensure inner loop breaks if shutdown occurred
} // End of inner message reading loop
}
Err(e) => {
warn!("BHT: Failed to connect to WebSocket for backend {}: {:?}. Retrying after delay.", backend.name, e);
}
}
// If we are here, it means the connection was dropped or failed. Wait before retrying.
if !*shutdown_rx.borrow() { // Don't sleep if shutting down
sleep(RECONNECT_DELAY).await;
}
} // End of outer reconnect loop
info!("BHT: Stopped monitoring backend {}.", backend.name);
}
pub fn is_secondary_behind(&self, secondary_name: &str) -> bool {
if !self.config.enable_block_height_tracking { return false; } // If tracking is off, assume not behind
let primary_info = self.backends.iter().find(|b| b.role == "primary");
let primary_name = match primary_info {
Some(b) => b.name.clone(),
None => {
if self.enable_detailed_logs {
warn!("BHT: No primary backend configured for is_secondary_behind check.");
}
return false;
}
};
let primary_height_opt = self.block_heights.get(&primary_name).map(|h_ref| *h_ref.value());
let primary_height = match primary_height_opt {
Some(h) => h,
None => {
if self.enable_detailed_logs {
debug!("BHT: Primary '{}' height unknown for is_secondary_behind check with {}.", primary_name, secondary_name);
}
return false; // Primary height unknown, can't reliably determine if secondary is behind
}
};
let secondary_height_opt = self.block_heights.get(secondary_name).map(|h_ref| *h_ref.value());
match secondary_height_opt {
Some(secondary_height_val) => {
if primary_height > secondary_height_val {
let diff = primary_height - secondary_height_val;
let is_behind = diff > self.config.max_blocks_behind;
if self.enable_detailed_logs && is_behind {
debug!("BHT: Secondary '{}' (height {}) is behind primary '{}' (height {}). Diff: {}, Max allowed: {}",
secondary_name, secondary_height_val, primary_name, primary_height, diff, self.config.max_blocks_behind);
}
return is_behind;
}
false // Secondary is not behind or is ahead
}
None => {
if self.enable_detailed_logs {
debug!("BHT: Secondary '{}' height unknown, considering it behind primary '{}' (height {}).", secondary_name, primary_name, primary_height);
}
true // Secondary height unknown, assume it's behind if primary height is known
}
}
}
pub fn get_block_height_status(&self) -> HashMap<String, u64> {
self.block_heights
.iter()
.map(|entry| (entry.key().clone(), *entry.value()))
.collect()
}
pub async fn stop(&self) {
info!("BHT: Sending shutdown signal to all monitoring tasks...");
if self.shutdown_tx.send(true).is_err() {
error!("BHT: Failed to send shutdown signal. Tasks might not terminate gracefully.");
}
let mut tasks_guard = self.tasks.lock().unwrap();
info!("BHT: Awaiting termination of {} monitoring tasks...", tasks_guard.len());
for task in tasks_guard.drain(..) {
if let Err(e) = task.await {
error!("BHT: Error awaiting task termination: {:?}", e);
}
}
info!("BHT: All monitoring tasks terminated.");
}
}

View File

@@ -0,0 +1,169 @@
use std::env;
use std::str::FromStr;
use std::time::Duration;
use url::Url;
use thiserror::Error;
use log::{warn, info};
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("Failed to parse environment variable '{var_name}': {source}")]
ParseError {
var_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Missing required environment variable: {var_name}")]
MissingVariable { var_name: String },
#[error("Invalid URL format for '{var_name}': {url_str} - {source}")]
UrlParseError {
var_name: String,
url_str: String,
source: url::ParseError,
},
}
#[derive(Debug, Clone)]
pub struct AppConfig {
pub listen_addr: String,
pub primary_backend_url: Url,
pub secondary_backend_urls: Vec<Url>,
pub summary_interval_secs: u64,
pub enable_detailed_logs: bool,
pub enable_secondary_probing: bool,
pub probe_interval_secs: u64,
pub min_delay_buffer_ms: u64,
pub probe_methods: Vec<String>,
pub enable_block_height_tracking: bool,
pub max_blocks_behind: u64,
pub enable_expensive_method_routing: bool,
pub max_body_size_bytes: usize,
pub http_client_timeout_secs: u64,
pub request_context_timeout_secs: u64,
}
// Helper function to get and parse environment variables
fn get_env_var<T: FromStr>(key: &str, default_value: T) -> T
where
<T as FromStr>::Err: std::fmt::Display,
{
match env::var(key) {
Ok(val_str) => match val_str.parse::<T>() {
Ok(val) => val,
Err(e) => {
warn!(
"Failed to parse environment variable '{}' with value '{}': {}. Using default: {:?}",
key, val_str, e, default_value
);
default_value
}
},
Err(_) => default_value,
}
}
// Helper function for boolean environment variables
fn get_env_var_bool(key: &str, default_value: bool) -> bool {
match env::var(key) {
Ok(val_str) => val_str.to_lowercase() == "true",
Err(_) => default_value,
}
}
// Helper function for Vec<String> from comma-separated string
fn get_env_var_vec_string(key: &str, default_value: Vec<String>) -> Vec<String> {
match env::var(key) {
Ok(val_str) => {
if val_str.is_empty() {
default_value
} else {
val_str.split(',').map(|s| s.trim().to_string()).collect()
}
}
Err(_) => default_value,
}
}
// Helper function for Vec<Url> from comma-separated string
fn get_env_var_vec_url(key: &str, default_value: Vec<Url>) -> Result<Vec<Url>, ConfigError> {
match env::var(key) {
Ok(val_str) => {
if val_str.is_empty() {
return Ok(default_value);
}
val_str
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(|url_str| {
Url::parse(url_str).map_err(|e| ConfigError::UrlParseError {
var_name: key.to_string(),
url_str: url_str.to_string(),
source: e,
})
})
.collect()
}
Err(_) => Ok(default_value),
}
}
pub fn load_from_env() -> Result<AppConfig, ConfigError> {
info!("Loading configuration from environment variables...");
let primary_backend_url_str = env::var("PRIMARY_BACKEND_URL").map_err(|_| {
ConfigError::MissingVariable {
var_name: "PRIMARY_BACKEND_URL".to_string(),
}
})?;
let primary_backend_url =
Url::parse(&primary_backend_url_str).map_err(|e| ConfigError::UrlParseError {
var_name: "PRIMARY_BACKEND_URL".to_string(),
url_str: primary_backend_url_str,
source: e,
})?;
let secondary_backend_urls = get_env_var_vec_url("SECONDARY_BACKEND_URLS", Vec::new())?;
let config = AppConfig {
listen_addr: get_env_var("LISTEN_ADDR", "127.0.0.1:8080".to_string()),
primary_backend_url,
secondary_backend_urls,
summary_interval_secs: get_env_var("SUMMARY_INTERVAL_SECS", 60),
enable_detailed_logs: get_env_var_bool("ENABLE_DETAILED_LOGS", false),
enable_secondary_probing: get_env_var_bool("ENABLE_SECONDARY_PROBING", true),
probe_interval_secs: get_env_var("PROBE_INTERVAL_SECS", 10),
min_delay_buffer_ms: get_env_var("MIN_DELAY_BUFFER_MS", 500),
probe_methods: get_env_var_vec_string(
"PROBE_METHODS",
vec!["eth_blockNumber".to_string(), "net_version".to_string()],
),
enable_block_height_tracking: get_env_var_bool("ENABLE_BLOCK_HEIGHT_TRACKING", true),
max_blocks_behind: get_env_var("MAX_BLOCKS_BEHIND", 5),
enable_expensive_method_routing: get_env_var_bool("ENABLE_EXPENSIVE_METHOD_ROUTING", false),
max_body_size_bytes: get_env_var("MAX_BODY_SIZE_BYTES", 10 * 1024 * 1024), // 10MB
http_client_timeout_secs: get_env_var("HTTP_CLIENT_TIMEOUT_SECS", 30),
request_context_timeout_secs: get_env_var("REQUEST_CONTEXT_TIMEOUT_SECS", 35),
};
info!("Configuration loaded successfully: {:?}", config);
Ok(config)
}
impl AppConfig {
pub fn http_client_timeout(&self) -> Duration {
Duration::from_secs(self.http_client_timeout_secs)
}
pub fn request_context_timeout(&self) -> Duration {
Duration::from_secs(self.request_context_timeout_secs)
}
pub fn summary_interval(&self) -> Duration {
Duration::from_secs(self.summary_interval_secs)
}
pub fn probe_interval(&self) -> Duration {
Duration::from_secs(self.probe_interval_secs)
}
}

View File

@@ -0,0 +1,12 @@
pub mod structures;
pub mod config;
pub mod stats_collector;
pub mod secondary_probe;
pub mod block_height_tracker;
pub mod rpc_utils;
pub mod request_handler;
pub mod websocket_handler;
fn main() {
println!("Hello, world!");
}

View File

@@ -0,0 +1,272 @@
use bytes::Bytes;
use hyper::{Body, Request, Response, StatusCode};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use log;
use crate::config::AppConfig;
use crate::stats_collector::StatsCollector;
use crate::secondary_probe::SecondaryProbe;
use crate::block_height_tracker::BlockHeightTracker;
use crate::structures::{Backend, BatchInfo};
use crate::rpc_utils;
#[derive(Debug)]
pub enum BackendResult {
Success {
backend_name: String,
response: reqwest::Response, // Send the whole reqwest::Response
duration: std::time::Duration,
},
Error {
backend_name: String,
error: reqwest::Error, // Send the reqwest::Error
duration: std::time::Duration,
},
}
fn calculate_secondary_delay(
batch_info: &crate::structures::BatchInfo,
probe: &Option<Arc<crate::secondary_probe::SecondaryProbe>>,
stats: &Arc<crate::stats_collector::StatsCollector>,
_config: &Arc<crate::config::AppConfig>, // _config might be used later for more complex logic
) -> std::time::Duration {
let mut max_delay = std::time::Duration::from_millis(0);
let default_delay = std::time::Duration::from_millis(25); // Default from Go
if batch_info.methods.is_empty() {
return default_delay;
}
for method_name in &batch_info.methods {
let current_method_delay = if let Some(p) = probe {
p.get_delay_for_method(method_name)
} else {
// This will use the stubbed method from StatsCollector which currently returns 25ms
stats.get_primary_p75_for_method(method_name)
};
if current_method_delay > max_delay {
max_delay = current_method_delay;
}
}
if max_delay == std::time::Duration::from_millis(0) { // if all methods were unknown or had 0 delay
if let Some(p) = probe {
// Go code uses: probe.minResponseTime + probe.minDelayBuffer
// probe.get_delay_for_method("") would approximate this if it falls back to min_response_time + buffer
return p.get_delay_for_method(""); // Assuming empty method falls back to base delay
}
return default_delay;
}
max_delay
}
pub async fn handle_http_request(
req: Request<Body>,
config: Arc<AppConfig>,
stats_collector: Arc<StatsCollector>,
http_client: Arc<reqwest::Client>,
secondary_probe: Option<Arc<SecondaryProbe>>,
block_height_tracker: Option<Arc<BlockHeightTracker>>,
all_backends: Arc<Vec<Backend>>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync>> {
let _overall_start_time = std::time::Instant::now(); // To be used later with request_context_timeout
// 1. Read and limit request body
let limited_body = hyper::body::Limited::new(req.into_body(), config.max_body_size_bytes);
let body_bytes = match hyper::body::to_bytes(limited_body).await {
Ok(bytes) => bytes,
Err(e) => {
log::error!("Failed to read request body or limit exceeded: {}", e);
let mut err_resp = Response::new(Body::from(format!("Request body error: {}", e)));
*err_resp.status_mut() = if e.is::<hyper::Error>() && e.downcast_ref::<hyper::Error>().map_or(false, |he| he.is_body_write_aborted() || format!("{}", he).contains("Too Large")) { // A bit heuristic for "Too Large"
StatusCode::PAYLOAD_TOO_LARGE
} else {
StatusCode::BAD_REQUEST
};
return Ok(err_resp);
}
};
// 2. Parse Batch Info
let batch_info = match rpc_utils::parse_batch_info(&body_bytes) {
Ok(info) => info,
Err(e) => {
log::error!("Invalid JSON-RPC request: {}", e);
let mut err_resp = Response::new(Body::from(format!("Invalid JSON-RPC: {}", e)));
*err_resp.status_mut() = StatusCode::BAD_REQUEST;
return Ok(err_resp);
}
};
let display_method = if batch_info.is_batch {
format!("batch[{}]", batch_info.request_count)
} else {
batch_info.methods.get(0).cloned().unwrap_or_else(|| "unknown".to_string())
};
log::info!("Received request: Method: {}, IsBatch: {}, NumMethods: {}", display_method, batch_info.is_batch, batch_info.methods.len());
// 3. Calculate Secondary Delay
let secondary_delay = calculate_secondary_delay(&batch_info, &secondary_probe, &stats_collector, &config);
if config.enable_detailed_logs {
log::debug!("Method: {}, Calculated secondary delay: {:?}", display_method, secondary_delay);
}
// 4. Backend Filtering & Expensive Method Routing
let mut target_backends: Vec<Backend> = (*all_backends).clone();
if batch_info.has_stateful {
log::debug!("Stateful method detected in request '{}', targeting primary only.", display_method);
target_backends.retain(|b| b.role == "primary");
} else {
// Filter by block height
if let Some(bht) = &block_height_tracker {
if config.enable_block_height_tracking { // Check if feature is enabled
target_backends.retain(|b| {
if b.role != "primary" && bht.is_secondary_behind(&b.name) {
if config.enable_detailed_logs { log::info!("Skipping secondary {}: behind in block height for request {}", b.name, display_method); }
// TODO: Add stat for skipped due to block height
false
} else { true }
});
}
}
// Filter by probe availability
if let Some(sp) = &secondary_probe {
if config.enable_secondary_probing { // Check if feature is enabled
target_backends.retain(|b| {
if b.role != "primary" && !sp.is_backend_available(&b.name) {
if config.enable_detailed_logs { log::info!("Skipping secondary {}: not available via probe for request {}", b.name, display_method); }
// TODO: Add stat for skipped due to probe unavailable
false
} else { true }
});
}
}
}
let is_req_expensive = batch_info.methods.iter().any(|m| rpc_utils::is_expensive_method(m)) ||
batch_info.methods.iter().any(|m| stats_collector.is_expensive_method_by_stats(m)); // Stubbed
if config.enable_expensive_method_routing && is_req_expensive && !batch_info.has_stateful {
log::debug!("Expensive method detected in request {}. Attempting to route to a secondary.", display_method);
// TODO: Complex expensive method routing logic.
// For now, this placeholder doesn't change target_backends.
// A real implementation would try to find the best secondary or stick to primary if none are suitable.
}
// 5. Concurrent Request Dispatch
let (response_tx, mut response_rx) = mpsc::channel::<BackendResult>(target_backends.len().max(1));
let mut dispatched_count = 0;
for backend in target_backends { // target_backends is now filtered
dispatched_count += 1;
let task_body_bytes = body_bytes.clone();
let task_http_client = http_client.clone();
let task_response_tx = response_tx.clone();
// task_backend_name, task_backend_url, task_backend_role are cloned from 'backend'
let task_backend_name = backend.name.clone();
let task_backend_url = backend.url.clone();
let task_backend_role = backend.role.clone();
let task_secondary_delay = secondary_delay;
let task_config_detailed_logs = config.enable_detailed_logs;
let task_http_timeout = config.http_client_timeout(); // Get Duration from config
tokio::spawn(async move {
let backend_req_start_time = std::time::Instant::now();
if task_backend_role != "primary" {
if task_config_detailed_logs {
log::debug!("Secondary backend {} for request {} delaying for {:?}", task_backend_name, display_method, task_secondary_delay);
}
tokio::time::sleep(task_secondary_delay).await;
}
let result = task_http_client
.post(task_backend_url)
.header("Content-Type", "application/json")
// TODO: Copy relevant headers from original request 'req.headers()'
.body(task_body_bytes)
.timeout(task_http_timeout)
.send()
.await;
let duration = backend_req_start_time.elapsed();
match result {
Ok(resp) => {
if task_config_detailed_logs {
log::debug!("Backend {} for request {} responded with status {}", task_backend_name, display_method, resp.status());
}
if task_response_tx.send(BackendResult::Success {
backend_name: task_backend_name,
response: resp,
duration,
}).await.is_err() {
log::error!("Failed to send success to channel for request {}: receiver dropped", display_method);
}
}
Err(err) => {
if task_config_detailed_logs {
log::error!("Backend {} for request {} request failed: {}", task_backend_name, display_method, err);
}
if task_response_tx.send(BackendResult::Error {
backend_name: task_backend_name,
error: err,
duration,
}).await.is_err() {
log::error!("Failed to send error to channel for request {}: receiver dropped", display_method);
}
}
}
});
}
drop(response_tx);
if dispatched_count == 0 {
log::warn!("No backends available to dispatch request for method {}", display_method);
// TODO: Add stat for no backend available
let mut err_resp = Response::new(Body::from("No available backends for this request type."));
*err_resp.status_mut() = StatusCode::SERVICE_UNAVAILABLE;
return Ok(err_resp);
}
// Placeholder: return the first received response
if let Some(first_result) = response_rx.recv().await {
if config.enable_detailed_logs {
log::info!("First backend response for request {}: {:?}", display_method, first_result);
}
match first_result {
BackendResult::Success { backend_name: _, response: reqwest_resp, duration: _ } => {
let mut hyper_resp_builder = Response::builder().status(reqwest_resp.status());
for (name, value) in reqwest_resp.headers().iter() {
hyper_resp_builder = hyper_resp_builder.header(name.clone(), value.clone());
}
let hyper_resp = hyper_resp_builder
.body(Body::wrap_stream(reqwest_resp.bytes_stream()))
.unwrap_or_else(|e| {
log::error!("Error building response from backend for request {}: {}", display_method, e);
let mut err_resp = Response::new(Body::from("Error processing backend response"));
*err_resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
err_resp
});
return Ok(hyper_resp);
}
BackendResult::Error { backend_name, error, duration: _ } => {
log::error!("First response for request {} was an error from {}: {}", display_method, backend_name, error);
let mut err_resp = Response::new(Body::from(format!("Error from backend {}: {}", backend_name, error)));
*err_resp.status_mut() = StatusCode::BAD_GATEWAY;
return Ok(err_resp);
}
}
} else {
log::error!("No responses received from any dispatched backend for method {}", display_method);
// TODO: Add stat for no response received
let mut err_resp = Response::new(Body::from("No response from any backend."));
*err_resp.status_mut() = StatusCode::GATEWAY_TIMEOUT;
return Ok(err_resp);
}
// Note: Overall request context timeout and full response aggregation logic are still TODOs.
}

View File

@@ -0,0 +1,92 @@
use crate::structures::{BatchInfo, JsonRpcRequest};
use std::collections::HashSet;
use log;
use serde_json; // Added for parsing
fn get_stateful_methods() -> HashSet<&'static str> {
[
"eth_newFilter", "eth_newBlockFilter", "eth_newPendingTransactionFilter",
"eth_getFilterChanges", "eth_getFilterLogs", "eth_uninstallFilter",
"eth_subscribe", "eth_unsubscribe", "eth_subscription", // "eth_subscription" is a notification, not a method client calls.
// But if it appears in a batch for some reason, it's state-related.
]
.iter()
.cloned()
.collect()
}
fn get_expensive_methods() -> HashSet<&'static str> {
[
// Ethereum Debug API (typically Geth-specific)
"debug_traceBlockByHash", "debug_traceBlockByNumber", "debug_traceCall", "debug_traceTransaction",
"debug_storageRangeAt", "debug_getModifiedAccountsByHash", "debug_getModifiedAccountsByNumber",
// Erigon/OpenEthereum Trace Module (more standard)
"trace_block", "trace_call", "trace_callMany", "trace_filter", "trace_get", "trace_rawTransaction",
"trace_replayBlockTransactions", "trace_replayTransaction", "trace_transaction",
// Specific combinations that might be considered extra expensive
"trace_replayBlockTransactions#vmTrace", // Example, depends on actual usage if # is method part
"trace_replayTransaction#vmTrace",
]
.iter()
.cloned()
.collect()
}
lazy_static::lazy_static! {
static ref STATEFUL_METHODS: HashSet<&'static str> = get_stateful_methods();
static ref EXPENSIVE_METHODS: HashSet<&'static str> = get_expensive_methods();
}
pub fn is_stateful_method(method: &str) -> bool {
STATEFUL_METHODS.contains(method)
}
pub fn is_expensive_method(method: &str) -> bool {
EXPENSIVE_METHODS.contains(method)
}
pub fn parse_batch_info(body_bytes: &[u8]) -> Result<BatchInfo, String> {
if body_bytes.is_empty() {
return Err("Empty request body".to_string());
}
// Try parsing as a batch (array) first
match serde_json::from_slice::<Vec<JsonRpcRequest>>(body_bytes) {
Ok(batch_reqs) => {
if batch_reqs.is_empty() {
return Err("Empty batch request".to_string());
}
let mut methods = Vec::new();
let mut has_stateful = false;
for req in &batch_reqs {
methods.push(req.method.clone());
if is_stateful_method(&req.method) {
has_stateful = true;
}
}
Ok(BatchInfo {
is_batch: true,
methods,
request_count: batch_reqs.len(),
has_stateful,
})
}
Err(_e_batch) => {
// If not a batch, try parsing as a single request
match serde_json::from_slice::<JsonRpcRequest>(body_bytes) {
Ok(single_req) => Ok(BatchInfo {
is_batch: false,
methods: vec![single_req.method.clone()],
request_count: 1,
has_stateful: is_stateful_method(&single_req.method),
}),
Err(_e_single) => {
// Log the actual errors if needed for debugging, but return a generic one
log::debug!("Failed to parse as batch: {}", _e_batch);
log::debug!("Failed to parse as single: {}", _e_single);
Err("Invalid JSON-RPC request format. Not a valid single request or batch.".to_string())
}
}
}
}
}

View File

@@ -0,0 +1,383 @@
use crate::{
config::AppConfig,
structures::{Backend, JsonRpcRequest},
};
use chrono::Utc;
use dashmap::DashMap;
use log::{debug, error, info, warn};
use reqwest::Client;
use serde_json::json;
use std::{
cmp::min,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex, RwLock,
},
time::{Duration, SystemTime},
};
use tokio::sync::watch;
const PROBE_REQUEST_COUNT: usize = 10;
const DEFAULT_MIN_RESPONSE_TIME_MS: u64 = 15;
const PROBE_CYCLE_DELAY_MS: u64 = 10;
pub struct SecondaryProbe {
config: Arc<AppConfig>,
backends: Vec<Backend>, // Only secondary backends
client: Client,
min_response_time: Arc<RwLock<Duration>>,
method_timings: Arc<DashMap<String, Duration>>, // method_name -> min_duration
backend_timings: Arc<DashMap<String, Duration>>, // backend_name -> min_duration
// Health state per backend
backend_available: Arc<DashMap<String, bool>>,
backend_error_count: Arc<DashMap<String, AtomicU32>>,
backend_consecutive_success_count: Arc<DashMap<String, AtomicU32>>, // For recovery
backend_last_success: Arc<DashMap<String, Mutex<SystemTime>>>,
last_probe_time: Arc<Mutex<SystemTime>>,
failure_count: Arc<AtomicU32>, // Consecutive overall probe cycle failures
last_success_time: Arc<Mutex<SystemTime>>, // Last time any probe in an overall cycle succeeded
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
enable_detailed_logs: bool,
}
impl SecondaryProbe {
pub fn new(
config: Arc<AppConfig>,
all_backends: &[Backend],
client: Client,
) -> Option<Arc<Self>> {
let secondary_backends: Vec<Backend> = all_backends
.iter()
.filter(|b| b.role.to_lowercase() == "secondary")
.cloned()
.collect();
if secondary_backends.is_empty() {
info!("No secondary backends configured. SecondaryProbe will not be initialized.");
return None;
}
info!(
"Initializing SecondaryProbe for {} secondary backends.",
secondary_backends.len()
);
let backend_available = Arc::new(DashMap::new());
let backend_error_count = Arc::new(DashMap::new());
let backend_consecutive_success_count = Arc::new(DashMap::new());
let backend_last_success = Arc::new(DashMap::new());
for backend in &secondary_backends {
backend_available.insert(backend.name.clone(), true);
backend_error_count.insert(backend.name.clone(), AtomicU32::new(0));
backend_consecutive_success_count.insert(backend.name.clone(), AtomicU32::new(0));
backend_last_success.insert(backend.name.clone(), Mutex::new(SystemTime::now()));
info!(" - Backend '{}' ({}) initialized as available.", backend.name, backend.url);
}
let (shutdown_tx, shutdown_rx) = watch::channel(false);
Some(Arc::new(Self {
config: config.clone(),
backends: secondary_backends,
client,
min_response_time: Arc::new(RwLock::new(Duration::from_millis(
DEFAULT_MIN_RESPONSE_TIME_MS, // Or load from config if needed
))),
method_timings: Arc::new(DashMap::new()),
backend_timings: Arc::new(DashMap::new()),
backend_available,
backend_error_count,
backend_consecutive_success_count,
backend_last_success,
last_probe_time: Arc::new(Mutex::new(SystemTime::now())),
failure_count: Arc::new(AtomicU32::new(0)),
last_success_time: Arc::new(Mutex::new(SystemTime::now())),
shutdown_tx,
shutdown_rx, // Receiver is cloneable
enable_detailed_logs: config.enable_detailed_logs,
}))
}
pub fn start_periodic_probing(self: Arc<Self>) {
if self.backends.is_empty() {
info!("No secondary backends to probe. Periodic probing will not start.");
return;
}
info!(
"Starting periodic probing for {} secondary backends. Probe interval: {}s. Probe methods: {:?}. Max errors: {}, Recovery threshold: {}.",
self.backends.len(),
self.config.probe_interval_secs,
self.config.probe_methods,
self.config.max_error_threshold,
self.config.recovery_threshold
);
// Run initial probe
let initial_probe_self = self.clone();
tokio::spawn(async move {
if initial_probe_self.enable_detailed_logs {
debug!("Running initial probe...");
}
initial_probe_self.run_probe().await;
if initial_probe_self.enable_detailed_logs {
debug!("Initial probe finished.");
}
});
// Start periodic probing task
let mut interval = tokio::time::interval(self.config.probe_interval());
let mut shutdown_rx_clone = self.shutdown_rx.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
if self.enable_detailed_logs {
debug!("Running periodic probe cycle...");
}
self.run_probe().await;
if self.enable_detailed_logs {
debug!("Periodic probe cycle finished.");
}
}
res = shutdown_rx_clone.changed() => {
if res.is_err() || *shutdown_rx_clone.borrow() {
info!("SecondaryProbe: Shutdown signal received or channel closed, stopping periodic probing.");
break;
}
}
}
}
info!("SecondaryProbe: Periodic probing task has stopped.");
});
}
async fn run_probe(&self) {
let mut successful_probes_in_overall_cycle = 0;
let mut temp_method_timings: DashMap<String, Duration> = DashMap::new(); // method_name -> min_duration for this cycle
let mut temp_backend_timings: DashMap<String, Duration> = DashMap::new(); // backend_name -> min_duration for this cycle
let mut temp_overall_min_response_time = Duration::MAX;
for backend in &self.backends {
let mut backend_cycle_successful_probes = 0;
let mut backend_cycle_min_duration = Duration::MAX;
for method_name in &self.config.probe_methods {
let mut method_min_duration_for_backend_this_cycle = Duration::MAX;
for i in 0..PROBE_REQUEST_COUNT {
let probe_id = format!(
"probe-{}-{}-{}-{}",
backend.name,
method_name,
Utc::now().timestamp_nanos_opt().unwrap_or_else(|| SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos() as i64),
i
);
let request_body = JsonRpcRequest {
method: method_name.clone(),
params: Some(json!([])),
id: Some(json!(probe_id)),
jsonrpc: Some("2.0".to_string()),
};
let start_time = SystemTime::now();
match self.client.post(backend.url.clone()).json(&request_body).timeout(self.config.http_client_timeout()).send().await {
Ok(response) => {
let duration = start_time.elapsed().unwrap_or_default();
if response.status().is_success() {
// TODO: Optionally parse JSON RPC response for error field
backend_cycle_successful_probes += 1;
successful_probes_in_overall_cycle += 1;
method_min_duration_for_backend_this_cycle = min(method_min_duration_for_backend_this_cycle, duration);
backend_cycle_min_duration = min(backend_cycle_min_duration, duration);
temp_overall_min_response_time = min(temp_overall_min_response_time, duration);
if self.enable_detailed_logs {
debug!("Probe success: {} method {} ID {} took {:?}.", backend.name, method_name, probe_id, duration);
}
} else {
if self.enable_detailed_logs {
warn!("Probe failed (HTTP status {}): {} method {} ID {}. Body: {:?}", response.status(), backend.name, method_name, probe_id, response.text().await.unwrap_or_default());
}
}
}
Err(e) => {
if self.enable_detailed_logs {
warn!("Probe error (request failed): {} method {} ID {}: {:?}", backend.name, method_name, probe_id, e);
}
}
}
tokio::time::sleep(Duration::from_millis(PROBE_CYCLE_DELAY_MS)).await;
} // End of PROBE_REQUEST_COUNT loop
if method_min_duration_for_backend_this_cycle != Duration::MAX {
temp_method_timings
.entry(method_name.clone())
.and_modify(|current_min| *current_min = min(*current_min, method_min_duration_for_backend_this_cycle))
.or_insert(method_min_duration_for_backend_this_cycle);
}
} // End of probe_methods loop
if backend_cycle_min_duration != Duration::MAX {
temp_backend_timings.insert(backend.name.clone(), backend_cycle_min_duration);
}
self.update_backend_health(&backend.name, backend_cycle_successful_probes > 0);
if self.enable_detailed_logs {
debug!(
"Probe sub-cycle for backend {}: {} successful probes. Min duration for this backend this cycle: {:?}. Current health: available={}",
backend.name,
backend_cycle_successful_probes,
if backend_cycle_min_duration == Duration::MAX { None } else { Some(backend_cycle_min_duration) },
self.is_backend_available(&backend.name)
);
}
} // End of backends loop
// Update overall timings if any probe in the cycle was successful
if successful_probes_in_overall_cycle > 0 {
if temp_overall_min_response_time != Duration::MAX {
let mut min_resp_time_guard = self.min_response_time.write().unwrap();
*min_resp_time_guard = min(*min_resp_time_guard, temp_overall_min_response_time);
if self.enable_detailed_logs {
debug!("Global min_response_time updated to: {:?}", *min_resp_time_guard);
}
}
for entry in temp_method_timings.iter() {
self.method_timings
.entry(entry.key().clone())
.and_modify(|current_min| *current_min = min(*current_min, *entry.value()))
.or_insert(*entry.value());
if self.enable_detailed_logs {
debug!("Global method_timing for {} updated/set to: {:?}", entry.key(), *entry.value());
}
}
for entry in temp_backend_timings.iter() {
self.backend_timings
.entry(entry.key().clone())
.and_modify(|current_min| *current_min = min(*current_min, *entry.value()))
.or_insert(*entry.value());
if self.enable_detailed_logs {
debug!("Global backend_timing for {} updated/set to: {:?}", entry.key(), *entry.value());
}
}
self.failure_count.store(0, Ordering::Relaxed);
*self.last_success_time.lock().unwrap() = SystemTime::now();
if self.enable_detailed_logs {
info!("Overall probe cycle completed with {} successes. Overall failure count reset.", successful_probes_in_overall_cycle);
}
} else {
let prev_failures = self.failure_count.fetch_add(1, Ordering::Relaxed);
warn!(
"Overall probe cycle completed with NO successful probes. Overall failure count incremented to {}.",
prev_failures + 1
);
}
*self.last_probe_time.lock().unwrap() = SystemTime::now();
}
fn update_backend_health(&self, backend_name: &str, is_cycle_success: bool) {
let current_availability = self.is_backend_available(backend_name);
let error_count_entry = self.backend_error_count.entry(backend_name.to_string()).or_insert_with(|| AtomicU32::new(0));
let consecutive_success_entry = self.backend_consecutive_success_count.entry(backend_name.to_string()).or_insert_with(|| AtomicU32::new(0));
if is_cycle_success {
error_count_entry.store(0, Ordering::Relaxed);
consecutive_success_entry.fetch_add(1, Ordering::Relaxed);
if let Some(mut last_success_guard) = self.backend_last_success.get_mut(backend_name) {
*last_success_guard.lock().unwrap() = SystemTime::now();
}
if !current_availability {
let successes = consecutive_success_entry.load(Ordering::Relaxed);
if successes >= self.config.recovery_threshold {
self.backend_available.insert(backend_name.to_string(), true);
info!("Backend {} recovered and is now AVAILABLE ({} consecutive successes met threshold {}).", backend_name, successes, self.config.recovery_threshold);
consecutive_success_entry.store(0, Ordering::Relaxed); // Reset after recovery
} else {
if self.enable_detailed_logs {
debug!("Backend {} had a successful probe cycle. Consecutive successes: {}. Needs {} for recovery.", backend_name, successes, self.config.recovery_threshold);
}
}
} else {
if self.enable_detailed_logs {
debug!("Backend {} remains available, successful probe cycle.", backend_name);
}
}
} else { // Probe cycle failed for this backend
consecutive_success_entry.store(0, Ordering::Relaxed); // Reset consecutive successes on any failure
let current_errors = error_count_entry.fetch_add(1, Ordering::Relaxed) + 1; // +1 because fetch_add returns previous value
if current_availability && current_errors >= self.config.max_error_threshold {
self.backend_available.insert(backend_name.to_string(), false);
warn!(
"Backend {} has become UNAVAILABLE due to {} errors (threshold {}).",
backend_name, current_errors, self.config.max_error_threshold
);
} else {
if self.enable_detailed_logs {
if current_availability {
debug!("Backend {} is still available but error count increased to {}. Max errors before unavailable: {}", backend_name, current_errors, self.config.max_error_threshold);
} else {
debug!("Backend {} remains UNAVAILABLE, error count now {}.", backend_name, current_errors);
}
}
}
}
}
pub fn get_delay_for_method(&self, method_name: &str) -> Duration {
let base_delay = self
.method_timings
.get(method_name)
.map(|timing_ref| *timing_ref.value())
.unwrap_or_else(|| *self.min_response_time.read().unwrap()); // Read lock
let buffer = Duration::from_millis(self.config.min_delay_buffer_ms);
let calculated_delay = base_delay.saturating_add(buffer);
let overall_failures = self.failure_count.load(Ordering::Relaxed);
// Consider last_success_time to see if failures are recent and persistent
let time_since_last_overall_success = SystemTime::now()
.duration_since(*self.last_success_time.lock().unwrap()) // Lock for last_success_time
.unwrap_or_default();
// Fallback logic: if many consecutive failures AND last success was long ago
if overall_failures >= 3 && time_since_last_overall_success > self.config.probe_interval().saturating_mul(3) {
warn!(
"Probes failing ({} consecutive, last overall success {:?} ago). Using conservative fixed delay for method {}.",
overall_failures, time_since_last_overall_success, method_name
);
return Duration::from_millis(self.config.min_delay_buffer_ms.saturating_mul(3));
}
if self.enable_detailed_logs {
debug!("Delay for method '{}': base {:?}, buffer {:?}, final {:?}", method_name, base_delay, buffer, calculated_delay);
}
calculated_delay
}
pub fn is_backend_available(&self, backend_name: &str) -> bool {
self.backend_available
.get(backend_name)
.map_or(false, |entry| *entry.value())
}
pub fn stop(&self) {
info!("SecondaryProbe: Sending shutdown signal...");
if self.shutdown_tx.send(true).is_err() {
error!("Failed to send shutdown signal to SecondaryProbe task. It might have already stopped or had no receiver.");
}
}
}

View File

@@ -0,0 +1,290 @@
use crate::structures::{ResponseStats, WebSocketStats, CuDataPoint, Backend};
use crate::block_height_tracker::BlockHeightTracker;
use crate::secondary_probe::SecondaryProbe;
use std::time::{Duration, SystemTime};
use std::sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}};
use dashmap::DashMap;
use log::{debug, error, info, warn};
pub struct StatsCollector {
pub request_stats: Arc<Mutex<Vec<ResponseStats>>>,
pub method_stats: Arc<DashMap<String, Mutex<Vec<Duration>>>>, // method_name -> list of durations for primary
pub backend_method_stats: Arc<DashMap<String, DashMap<String, Mutex<Vec<Duration>>>>>, // backend_name -> method_name -> list of durations
pub backend_wins: Arc<DashMap<String, AtomicU64>>, // backend_name -> count
pub method_backend_wins: Arc<DashMap<String, DashMap<String, AtomicU64>>>, // method_name -> backend_name -> count
pub first_response_durations: Arc<Mutex<Vec<Duration>>>,
pub actual_first_response_durations: Arc<Mutex<Vec<Duration>>>,
pub method_first_response_durations: Arc<DashMap<String, Mutex<Vec<Duration>>>>,
pub method_actual_first_response_durations: Arc<DashMap<String, Mutex<Vec<Duration>>>>,
pub total_requests: Arc<AtomicU64>>,
pub error_count: Arc<AtomicU64>>,
pub skipped_secondary_requests: Arc<AtomicU64>>,
pub ws_stats: Arc<Mutex<Vec<WebSocketStats>>>,
pub total_ws_connections: Arc<AtomicU64>>,
pub app_start_time: SystemTime,
pub interval_start_time: Arc<Mutex<SystemTime>>,
pub summary_interval: Duration,
pub method_cu_prices: Arc<DashMap<String, u64>>,
pub total_cu: Arc<AtomicU64>>,
pub method_cu: Arc<DashMap<String, AtomicU64>>, // method_name -> total CU for this method in interval
pub historical_cu: Arc<Mutex<Vec<CuDataPoint>>>,
pub has_secondary_backends: bool,
// Placeholders for probe and tracker - actual types will be defined later
// pub secondary_probe: Option<Arc<SecondaryProbe>>,
// pub block_height_tracker: Option<Arc<BlockHeightTracker>>,
}
impl StatsCollector {
pub fn new(summary_interval: Duration, has_secondary_backends: bool) -> Self {
let method_cu_prices = Arc::new(DashMap::new());
Self::init_cu_prices(&method_cu_prices);
StatsCollector {
request_stats: Arc::new(Mutex::new(Vec::new())),
method_stats: Arc::new(DashMap::new()),
backend_method_stats: Arc::new(DashMap::new()),
backend_wins: Arc::new(DashMap::new()),
method_backend_wins: Arc::new(DashMap::new()),
first_response_durations: Arc::new(Mutex::new(Vec::new())),
actual_first_response_durations: Arc::new(Mutex::new(Vec::new())),
method_first_response_durations: Arc::new(DashMap::new()),
method_actual_first_response_durations: Arc::new(DashMap::new()),
total_requests: Arc::new(AtomicU64::new(0)),
error_count: Arc::new(AtomicU64::new(0)),
skipped_secondary_requests: Arc::new(AtomicU64::new(0)),
ws_stats: Arc::new(Mutex::new(Vec::new())),
total_ws_connections: Arc::new(AtomicU64::new(0)),
app_start_time: SystemTime::now(),
interval_start_time: Arc::new(Mutex::new(SystemTime::now())),
summary_interval,
method_cu_prices,
total_cu: Arc::new(AtomicU64::new(0)),
method_cu: Arc::new(DashMap::new()),
historical_cu: Arc::new(Mutex::new(Vec::new())),
has_secondary_backends,
}
}
fn init_cu_prices(prices_map: &DashMap<String, u64>) {
// Base CU
prices_map.insert("eth_call".to_string(), 100);
prices_map.insert("eth_estimateGas".to_string(), 150);
prices_map.insert("eth_getLogs".to_string(), 200);
prices_map.insert("eth_sendRawTransaction".to_string(), 250);
prices_map.insert("trace_call".to_string(), 300);
prices_map.insert("trace_replayBlockTransactions".to_string(), 500);
// Default for unknown methods
prices_map.insert("default".to_string(), 50);
}
pub fn add_stats(&self, stats_vec: Vec<ResponseStats>) {
if stats_vec.is_empty() {
warn!("add_stats called with empty stats_vec");
return;
}
self.total_requests.fetch_add(1, Ordering::Relaxed);
let mut primary_stats: Option<&ResponseStats> = None;
let mut winning_backend_name: Option<String> = None;
let mut actual_first_response_duration: Option<Duration> = None;
let mut first_response_duration_from_primary_or_fastest_secondary: Option<Duration> = None;
// Find the 'actual-first-response' if present and the primary response
for stat in &stats_vec {
if stat.backend_name == "actual-first-response" {
actual_first_response_duration = Some(stat.duration);
} else if stat.backend_name.contains("-primary") { // Assuming primary name contains "-primary"
primary_stats = Some(stat);
}
}
let method_name = primary_stats.map_or_else(
|| stats_vec.first().map_or_else(|| "unknown".to_string(), |s| s.method.clone()),
|ps| ps.method.clone()
);
// Determine winning backend and first_response_duration_from_primary_or_fastest_secondary
if self.has_secondary_backends {
let mut fastest_duration = Duration::MAX;
for stat in stats_vec.iter().filter(|s| s.backend_name != "actual-first-response" && s.error.is_none()) {
if stat.duration < fastest_duration {
fastest_duration = stat.duration;
winning_backend_name = Some(stat.backend_name.clone());
}
}
if fastest_duration != Duration::MAX {
first_response_duration_from_primary_or_fastest_secondary = Some(fastest_duration);
}
} else {
// If no secondary backends, primary is the winner if no error
if let Some(ps) = primary_stats {
if ps.error.is_none() {
winning_backend_name = Some(ps.backend_name.clone());
first_response_duration_from_primary_or_fastest_secondary = Some(ps.duration);
}
}
}
// If no winner determined yet (e.g. all errored, or no secondary and primary errored),
// and if primary_stats exists, consider it as the "winner" for error tracking purposes.
if winning_backend_name.is_none() && primary_stats.is_some() {
winning_backend_name = Some(primary_stats.unwrap().backend_name.clone());
}
// Update backend_wins and method_backend_wins
if let Some(ref winner_name) = winning_backend_name {
self.backend_wins.entry(winner_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(1, Ordering::Relaxed);
self.method_backend_wins.entry(method_name.clone()).or_default().entry(winner_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(1, Ordering::Relaxed);
}
// Update first_response_durations and actual_first_response_durations
if let Some(duration) = first_response_duration_from_primary_or_fastest_secondary {
self.first_response_durations.lock().unwrap().push(duration);
self.method_first_response_durations.entry(method_name.clone()).or_insert_with(|| Mutex::new(Vec::new())).lock().unwrap().push(duration);
}
if let Some(duration) = actual_first_response_duration {
self.actual_first_response_durations.lock().unwrap().push(duration);
self.method_actual_first_response_durations.entry(method_name.clone()).or_insert_with(|| Mutex::new(Vec::new())).lock().unwrap().push(duration);
}
let mut request_stats_guard = self.request_stats.lock().unwrap();
for stat in stats_vec {
if stat.backend_name == "actual-first-response" { // Already handled
continue;
}
request_stats_guard.push(stat.clone());
if stat.error.is_some() {
if stat.error.as_deref() == Some("skipped by primary due to min_delay_buffer") {
self.skipped_secondary_requests.fetch_add(1, Ordering::Relaxed);
} else {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
}
// Update backend_method_stats for all backends
self.backend_method_stats
.entry(stat.backend_name.clone())
.or_default()
.entry(stat.method.clone())
.or_insert_with(|| Mutex::new(Vec::new()))
.lock()
.unwrap()
.push(stat.duration);
// If the winning backend is primary and it's not a batch (batch handled separately), update method_stats and CUs
// Assuming primary_stats contains the correct method name for CU calculation
if let Some(ref winner_name_val) = winning_backend_name {
if &stat.backend_name == winner_name_val && stat.backend_name.contains("-primary") && stat.error.is_none() {
// Update method_stats (for primary)
self.method_stats
.entry(stat.method.clone())
.or_insert_with(|| Mutex::new(Vec::new()))
.lock()
.unwrap()
.push(stat.duration);
// Update CU
let cu_price = self.method_cu_prices.get(&stat.method).map_or_else(
|| self.method_cu_prices.get("default").map_or(0, |p| *p.value()),
|p| *p.value()
);
if cu_price > 0 {
self.total_cu.fetch_add(cu_price, Ordering::Relaxed);
self.method_cu.entry(stat.method.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(cu_price, Ordering::Relaxed);
}
}
}
}
}
pub fn add_batch_stats(&self, methods: &[String], duration: Duration, backend_name: &str) {
if !backend_name.contains("-primary") { // Only primary processes batches directly for now
warn!("add_batch_stats called for non-primary backend: {}", backend_name);
return;
}
let mut batch_cu: u64 = 0;
for method_name in methods {
let cu_price = self.method_cu_prices.get(method_name).map_or_else(
|| self.method_cu_prices.get("default").map_or(0, |p| *p.value()),
|p| *p.value()
);
batch_cu += cu_price;
if cu_price > 0 {
self.method_cu.entry(method_name.clone()).or_insert_with(|| AtomicU64::new(0)).fetch_add(cu_price, Ordering::Relaxed);
}
// Update method_stats for each method in the batch on the primary
self.method_stats
.entry(method_name.clone())
.or_insert_with(|| Mutex::new(Vec::new()))
.lock()
.unwrap()
.push(duration); // Using the same duration for all methods in the batch as an approximation
// Update backend_method_stats
self.backend_method_stats
.entry(backend_name.to_string())
.or_default()
.entry(method_name.clone())
.or_insert_with(|| Mutex::new(Vec::new()))
.lock()
.unwrap()
.push(duration);
}
if batch_cu > 0 {
self.total_cu.fetch_add(batch_cu, Ordering::Relaxed);
}
// Note: total_requests is incremented by add_stats which should be called for the overall batch request
}
pub fn add_websocket_stats(&self, ws_stat: WebSocketStats) {
if ws_stat.error.is_some() {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
self.ws_stats.lock().unwrap().push(ws_stat);
self.total_ws_connections.fetch_add(1, Ordering::Relaxed);
}
// STUBBED METHODS - to be implemented later
pub fn get_primary_p75_for_method(&self, _method: &str) -> std::time::Duration {
// Placeholder: return a default fixed duration
log::debug!("StatsCollector::get_primary_p75_for_method called (stub)");
std::time::Duration::from_millis(25) // Default from Go's calculateBatchDelay fallback
}
pub fn get_primary_p50_for_method(&self, _method: &str) -> std::time::Duration {
// Placeholder: return a default fixed duration
log::debug!("StatsCollector::get_primary_p50_for_method called (stub)");
std::time::Duration::from_millis(15)
}
pub fn is_expensive_method_by_stats(&self, _method: &str) -> bool {
// Placeholder: always return false
log::debug!("StatsCollector::is_expensive_method_by_stats called (stub)");
false
}
pub fn select_best_secondary_for_expensive_method(
&self,
_method: &str,
_backends: &[Backend],
_block_height_tracker: &Option<Arc<BlockHeightTracker>>,
_secondary_probe: &Option<Arc<SecondaryProbe>>,
) -> Option<Backend> {
// Placeholder: always return None
log::debug!("StatsCollector::select_best_secondary_for_expensive_method called (stub)");
None
}
}

View File

@@ -0,0 +1,107 @@
use serde::{Serialize, Deserialize};
use url::Url;
use http::StatusCode;
use std::time::{Duration, SystemTime};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JsonRpcRequest {
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub jsonrpc: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BatchInfo {
pub is_batch: bool,
pub methods: Vec<String>,
pub request_count: usize,
pub has_stateful: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Backend {
#[serde(with = "url_serde")]
pub url: Url,
pub name: String,
pub role: String, // Consider an enum BackendRole { Primary, Secondary } later
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ResponseStats {
pub backend_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "http_serde_status_code_option", default)]
pub status_code: Option<StatusCode>,
#[serde(with = "humantime_serde")]
pub duration: Duration,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
pub method: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WebSocketStats {
pub backend_name: String,
pub error: Option<String>, // Default Option<String> serde is fine
pub connect_time: std::time::Duration, // Default Duration serde (secs/nanos struct)
pub is_active: bool,
pub client_to_backend_messages: u64,
pub backend_to_client_messages: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CuDataPoint {
pub timestamp: SystemTime,
pub cu: u64,
}
// Helper module for serializing/deserializing Option<http::StatusCode>
mod http_serde_status_code_option {
use http::StatusCode;
use serde::{self, Deserializer, Serializer, AsOwned};
pub fn serialize<S>(status_code: &Option<StatusCode>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match status_code {
Some(sc) => serializer.serialize_some(&sc.as_u16()),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<StatusCode>, D::Error>
where
D: Deserializer<'de>,
{
Option::<u16>::deserialize(deserializer)?
.map(|code| StatusCode::from_u16(code).map_err(serde::de::Error::custom))
.transpose()
}
}
// Helper module for serializing/deserializing url::Url
mod url_serde {
use url::Url;
use serde::{self, Deserializer, Serializer};
pub fn serialize<S>(url: &Url, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(url.as_str())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Url, D::Error>
where
D: Deserializer<'de>,
{
String::deserialize(deserializer)?
.parse()
.map_err(serde::de::Error::custom)
}
}

View File

@@ -0,0 +1,228 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use hyper::{Body, Request, Response, StatusCode};
use hyper_tungstenite::HyperWebsocket;
use log;
use tokio_tungstenite::tungstenite::protocol::Message;
use futures_util::{stream::StreamExt, sink::SinkExt};
use crate::config::AppConfig;
use crate::stats_collector::StatsCollector;
use crate::structures::{Backend, WebSocketStats}; // Ensure WebSocketStats has new fields
pub async fn handle_websocket_request(
mut req: Request<Body>,
app_config: Arc<AppConfig>,
stats_collector: Arc<StatsCollector>,
all_backends: Arc<Vec<Backend>>,
) -> Result<Response<Body>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let upgrade_start_time = Instant::now();
// Check for upgrade request
if !hyper_tungstenite::is_upgrade_request(&req) {
log::warn!("Not a WebSocket upgrade request");
let mut resp = Response::new(Body::from("Not a WebSocket upgrade request"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return Ok(resp);
}
// Attempt to upgrade the connection
let (response, websocket) = match hyper_tungstenite::upgrade(&mut req, None) {
Ok((resp, ws)) => (resp, ws),
Err(e) => {
log::error!("WebSocket upgrade failed: {}", e);
let mut resp = Response::new(Body::from(format!("WebSocket upgrade failed: {}", e)));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; // Or BAD_REQUEST
return Ok(resp);
}
};
// Spawn a task to handle the WebSocket connection after sending 101
tokio::spawn(async move {
match websocket.await {
Ok(ws_stream) => {
let client_ws_stream = ws_stream;
if app_config.enable_detailed_logs {
log::info!("Client WebSocket connection established.");
}
// Successfully upgraded client connection, now connect to primary backend
proxy_websocket_to_primary(client_ws_stream, app_config, stats_collector, all_backends).await;
}
Err(e) => {
log::error!("Error awaiting client WebSocket upgrade: {}", e);
// No actual client WS connection to record stats against other than the failed upgrade attempt
let stats = WebSocketStats {
backend_name: "client_upgrade_failed".to_string(),
error: Some(format!("Client WS upgrade await error: {}", e)),
connect_time: upgrade_start_time.elapsed(),
is_active: false,
client_to_backend_messages: 0,
backend_to_client_messages: 0,
};
stats_collector.add_websocket_stats(stats);
}
}
});
// Return the 101 Switching Protocols response to the client
Ok(response)
}
async fn proxy_websocket_to_primary(
mut client_ws_stream: HyperWebsocket, // Made mutable for close()
app_config: Arc<AppConfig>,
stats_collector: Arc<StatsCollector>,
all_backends: Arc<Vec<Backend>>,
) {
let connect_to_primary_start_time = Instant::now();
let mut client_to_backend_msg_count: u64 = 0;
let mut backend_to_client_msg_count: u64 = 0;
let mut ws_stats_error: Option<String> = None;
let mut backend_name_for_stats = "primary_unknown".to_string();
// 1. Find Primary Backend
let primary_backend = match all_backends.iter().find(|b| b.role == "primary") {
Some(pb) => {
backend_name_for_stats = pb.name.clone();
pb
}
None => {
log::error!("No primary backend configured for WebSocket proxy.");
ws_stats_error = Some("No primary backend configured".to_string());
// Close client connection gracefully if possible
let _ = client_ws_stream.close(None).await; // HyperWebsocket uses close method
// Record stats and return
let stats = WebSocketStats {
backend_name: backend_name_for_stats,
error: ws_stats_error,
connect_time: connect_to_primary_start_time.elapsed(),
is_active: false,
client_to_backend_messages,
backend_to_client_messages,
};
stats_collector.add_websocket_stats(stats);
return;
}
};
backend_name_for_stats = primary_backend.name.clone(); // Ensure it's set if primary_backend was found
// 2. Connect to Primary Backend's WebSocket
let mut ws_url = primary_backend.url.clone();
let scheme = if ws_url.scheme() == "https" { "wss" } else { "ws" };
if ws_url.set_scheme(scheme).is_err() {
log::error!("Failed to set WebSocket scheme for backend URL: {}", primary_backend.url);
ws_stats_error = Some(format!("Invalid backend URL scheme for {}", primary_backend.url));
let _ = client_ws_stream.close(None).await;
let stats = WebSocketStats {
backend_name: backend_name_for_stats,
error: ws_stats_error,
connect_time: connect_to_primary_start_time.elapsed(),
is_active: false,
client_to_backend_messages,
backend_to_client_messages,
};
stats_collector.add_websocket_stats(stats);
return;
}
let backend_connect_attempt_time = Instant::now();
let backend_ws_result = tokio_tungstenite::connect_async(ws_url.clone()).await;
let connect_duration = backend_connect_attempt_time.elapsed(); // This is backend connection time
let backend_ws_stream_conn = match backend_ws_result {
Ok((stream, _response)) => {
if app_config.enable_detailed_logs {
log::info!("Successfully connected to primary backend WebSocket: {}", primary_backend.name);
}
stream
}
Err(e) => {
log::error!("Failed to connect to primary backend {} WebSocket: {}", primary_backend.name, e);
ws_stats_error = Some(format!("Primary backend connect error: {}", e));
let _ = client_ws_stream.close(None).await; // Close client connection
let stats = WebSocketStats {
backend_name: backend_name_for_stats,
error: ws_stats_error,
connect_time: connect_duration,
is_active: false,
client_to_backend_messages,
backend_to_client_messages,
};
stats_collector.add_websocket_stats(stats);
return;
}
};
// 3. Proxying Logic
let (mut client_ws_tx, mut client_ws_rx) = client_ws_stream.split();
let (mut backend_ws_tx, mut backend_ws_rx) = backend_ws_stream_conn.split();
let client_to_backend_task = async {
while let Some(msg_result) = client_ws_rx.next().await {
match msg_result {
Ok(msg) => {
if app_config.enable_detailed_logs { log::trace!("C->B: {:?}", msg); }
if backend_ws_tx.send(msg).await.is_err() {
if app_config.enable_detailed_logs { log::debug!("Error sending to backend, C->B loop breaking."); }
break;
}
client_to_backend_msg_count += 1;
}
Err(e) => {
log::warn!("Error reading from client WebSocket: {}", e);
// Use a closure to capture `e` by reference for the format macro.
ws_stats_error.get_or_insert_with(|| { let e_ref = &e; format!("Client read error: {}", e_ref) });
break;
}
}
}
// Try to close the backend sink gracefully if client read loop ends
if app_config.enable_detailed_logs { log::debug!("C->B proxy loop finished. Closing backend_ws_tx.");}
let _ = backend_ws_tx.close().await;
};
let backend_to_client_task = async {
while let Some(msg_result) = backend_ws_rx.next().await {
match msg_result {
Ok(msg) => {
if app_config.enable_detailed_logs { log::trace!("B->C: {:?}", msg); }
if client_ws_tx.send(msg).await.is_err() {
if app_config.enable_detailed_logs { log::debug!("Error sending to client, B->C loop breaking."); }
break;
}
backend_to_client_msg_count += 1;
}
Err(e) => {
log::warn!("Error reading from backend WebSocket: {}", e);
// Use a closure to capture `e` by reference for the format macro.
ws_stats_error.get_or_insert_with(|| { let e_ref = &e; format!("Backend read error: {}", e_ref) });
break;
}
}
}
// Try to close the client sink gracefully if backend read loop ends
if app_config.enable_detailed_logs { log::debug!("B->C proxy loop finished. Closing client_ws_tx.");}
let _ = client_ws_tx.close().await;
};
// Run both proxy tasks concurrently
tokio::join!(client_to_backend_task, backend_to_client_task);
if app_config.enable_detailed_logs {
log::info!("WebSocket proxying ended for {}. Client->Backend: {}, Backend->Client: {}. Error: {:?}",
backend_name_for_stats, client_to_backend_msg_count, backend_to_client_msg_count, ws_stats_error);
}
let final_session_duration = connect_to_primary_start_time.elapsed();
let final_stats = WebSocketStats {
backend_name: backend_name_for_stats,
error: ws_stats_error,
connect_time: final_session_duration,
is_active: false, // Session is now over
client_to_backend_messages,
backend_to_client_messages,
};
stats_collector.add_websocket_stats(final_stats);
}