From a4400bbd406587fb247d6e4ace703b693c69b9fc Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Nov 2023 10:06:32 -0800 Subject: [PATCH] stricter ordering. refs #210 --- deduped_broadcast/src/lib.rs | 27 +- latency/src/rolling_quantile.rs | 1 + web3_proxy/src/app/mod.rs | 12 +- web3_proxy/src/frontend/authorization.rs | 1 + web3_proxy/src/frontend/mod.rs | 6 +- web3_proxy/src/jsonrpc/request_builder.rs | 14 +- web3_proxy/src/kafka.rs | 6 +- web3_proxy/src/prometheus.rs | 4 +- web3_proxy/src/rpcs/consensus.rs | 10 +- web3_proxy/src/rpcs/one.rs | 38 +- web3_proxy/src/rpcs/request.rs | 10 +- web3_proxy/src/rpcs/request_handle.rs | 559 ---------------------- web3_proxy/src/rpcs/ws.rs | 0 web3_proxy/src/stats/mod.rs | 12 +- web3_proxy_cli/src/test_utils/app.rs | 4 +- 15 files changed, 77 insertions(+), 627 deletions(-) delete mode 100644 web3_proxy/src/rpcs/request_handle.rs delete mode 100644 web3_proxy/src/rpcs/ws.rs diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs index 9829a6b8..21a8d55b 100644 --- a/deduped_broadcast/src/lib.rs +++ b/deduped_broadcast/src/lib.rs @@ -49,14 +49,17 @@ where /// filter duplicates and send the rest to any subscribers /// TODO: change this to be `send` and put a moka cache here instead of lru. then the de-dupe load will be spread across senders pub async fn send(&self, item: T) { - self.total_unfiltered.fetch_add(1, Ordering::Relaxed); + // this is just a debug counter so Relaxed is probably fine + self.total_unfiltered.fetch_add(1, Ordering::AcqRel); self.cache .get_with(item.clone(), async { - self.total_filtered.fetch_add(1, Ordering::Relaxed); + // this is just a debug counter so Relaxed is probably fine + self.total_filtered.fetch_add(1, Ordering::AcqRel); if let Ok(x) = self.broadcast_filtered_tx.send(item) { - self.total_broadcasts.fetch_add(x, Ordering::Relaxed); + // this is just a debug counter so Relaxed is probably fine + self.total_broadcasts.fetch_add(x, Ordering::AcqRel); } }) .await; @@ -75,15 +78,15 @@ where f.debug_struct("DedupedBroadcaster") .field( "total_unfiltered", - &self.total_unfiltered.load(Ordering::Relaxed), + &self.total_unfiltered.load(Ordering::Acquire), ) .field( "total_filtered", - &self.total_filtered.load(Ordering::Relaxed), + &self.total_filtered.load(Ordering::Acquire), ) .field( "total_broadcasts", - &self.total_broadcasts.load(Ordering::Relaxed), + &self.total_broadcasts.load(Ordering::Acquire), ) .field( "subscriptions", @@ -105,15 +108,15 @@ where state.serialize_field( "total_unfiltered", - &self.total_unfiltered.load(Ordering::Relaxed), + &self.total_unfiltered.load(Ordering::Acquire), )?; state.serialize_field( "total_filtered", - &self.total_filtered.load(Ordering::Relaxed), + &self.total_filtered.load(Ordering::Acquire), )?; state.serialize_field( "total_broadcasts", - &self.total_broadcasts.load(Ordering::Relaxed), + &self.total_broadcasts.load(Ordering::Acquire), )?; state.serialize_field( "subscriptions", @@ -153,8 +156,8 @@ mod tests { yield_now().await; - assert_eq!(broadcaster.total_unfiltered.load(Ordering::Relaxed), 7); - assert_eq!(broadcaster.total_filtered.load(Ordering::Relaxed), 3); - assert_eq!(broadcaster.total_broadcasts.load(Ordering::Relaxed), 6); + assert_eq!(broadcaster.total_unfiltered.load(Ordering::Acquire), 7); + assert_eq!(broadcaster.total_filtered.load(Ordering::Acquire), 3); + assert_eq!(broadcaster.total_broadcasts.load(Ordering::Acquire), 6); } } diff --git a/latency/src/rolling_quantile.rs b/latency/src/rolling_quantile.rs index ae1f793d..1c94275e 100644 --- a/latency/src/rolling_quantile.rs +++ b/latency/src/rolling_quantile.rs @@ -52,6 +52,7 @@ impl RollingQuantileLatencyTask { } /// Update the estimate object atomically. + /// This is relaxed because we care more about speed than accuracy. fn update(&mut self, rtt: f32) { self.quantile.update(rtt); diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c92899c5..29aed76f 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -765,7 +765,7 @@ impl App { // TODO: what interval? i don't think we use it // i tried and failed to `impl JsonRpcClient for Web3ProxyApi` // i tried and failed to set up ipc. http is already running, so lets just use that - let frontend_port = self.frontend_port.load(Ordering::Relaxed); + let frontend_port = self.frontend_port.load(Ordering::SeqCst); if frontend_port == 0 { panic!("frontend is not running. cannot create provider yet"); @@ -1305,22 +1305,22 @@ impl App { let (code, response) = match last_response { Ok(response_data) => { - web3_request.error_response.store(false, Ordering::Relaxed); + web3_request.error_response.store(false, Ordering::Release); // TODO: is it true that all jsonrpc errors are user errors? web3_request .user_error_response - .store(response_data.is_jsonrpc_err(), Ordering::Relaxed); + .store(response_data.is_jsonrpc_err(), Ordering::Release); (StatusCode::OK, response_data) } Err(err) => { // max tries exceeded. return the error - web3_request.error_response.store(true, Ordering::Relaxed); + web3_request.error_response.store(true, Ordering::Release); web3_request .user_error_response - .store(false, Ordering::Relaxed); + .store(false, Ordering::Release); err.as_json_response_parts(web3_request.id()) } @@ -1523,7 +1523,7 @@ impl App { // TODO: only charge for archive if it gave a result web3_request .archive_request - .store(true, atomic::Ordering::Relaxed); + .store(true, atomic::Ordering::Release); // TODO: we don't actually want try_send_all. we want the first non-null, non-error response self diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index c22af877..310dafa9 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -54,6 +54,7 @@ pub enum RateLimitResult { #[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] pub enum AuthorizationType { + /// TODO: sometimes localhost should be internal and other times it should be Frontend. make a better separatation Internal, Frontend, } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index c1f8b54c..7b159f34 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -320,13 +320,13 @@ pub async fn serve( axum::Server::from_tcp(listener)? } else { // TODO: allow only listening on localhost? top_config.app.host.parse()? - let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::Relaxed))); + let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::SeqCst))); axum::Server::try_bind(&addr)? }; #[cfg(not(feature = "listenfd"))] let server_builder = { - let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::Relaxed))); + let addr = SocketAddr::from(([0, 0, 0, 0], app.frontend_port.load(Ordering::SeqCst))); axum::Server::try_bind(&addr)? }; @@ -358,7 +358,7 @@ pub async fn serve( let port = server.local_addr().port(); info!("listening on port {}", port); - app.frontend_port.store(port, Ordering::Relaxed); + app.frontend_port.store(port, Ordering::SeqCst); let server = server // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index c351b9e1..af45509f 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -180,7 +180,7 @@ impl RequestBuilder { if let Ok(x) = &x { if self.archive_request { - x.archive_request.store(true, atomic::Ordering::Relaxed); + x.archive_request.store(true, atomic::Ordering::Release); } } @@ -279,7 +279,7 @@ impl Serialize for ValidatedRequest { state.serialize_field( "archive_request", - &self.archive_request.load(atomic::Ordering::Relaxed), + &self.archive_request.load(atomic::Ordering::Acquire), )?; state.serialize_field("chain_id", &self.chain_id)?; @@ -302,7 +302,7 @@ impl Serialize for ValidatedRequest { state.serialize_field( "response_bytes", - &self.response_bytes.load(atomic::Ordering::Relaxed), + &self.response_bytes.load(atomic::Ordering::Acquire), )?; state.end() @@ -510,7 +510,7 @@ impl ValidatedRequest { #[inline] pub fn min_block_needed(&self) -> Option { - if self.archive_request.load(atomic::Ordering::Relaxed) { + if self.archive_request.load(atomic::Ordering::Acquire) { Some(U64::zero()) } else { self.cache_mode.from_block().map(|x| x.num()) @@ -564,16 +564,16 @@ impl ValidatedRequest { let num_bytes = response.num_bytes(); self.response_bytes - .fetch_add(num_bytes, atomic::Ordering::Relaxed); + .fetch_add(num_bytes, atomic::Ordering::AcqRel); self.response_millis.fetch_add( self.start_instant.elapsed().as_millis() as u64, - atomic::Ordering::Relaxed, + atomic::Ordering::AcqRel, ); // TODO: record first or last timestamp? really, we need multiple self.response_timestamp - .store(Utc::now().timestamp(), atomic::Ordering::Relaxed); + .store(Utc::now().timestamp(), atomic::Ordering::Release); // TODO: set user_error_response and error_response here instead of outside this function diff --git a/web3_proxy/src/kafka.rs b/web3_proxy/src/kafka.rs index 3dcda7ec..d4c34ed1 100644 --- a/web3_proxy/src/kafka.rs +++ b/web3_proxy/src/kafka.rs @@ -136,7 +136,8 @@ impl KafkaDebugLogger { let payload = serde_json::to_vec(&request).expect("requests should always serialize with rmp"); - self.num_requests.fetch_add(1, atomic::Ordering::Relaxed); + // this is just a debug counter so Relaxed is probably fine + self.num_requests.fetch_add(1, atomic::Ordering::AcqRel); self.background_log(payload) } @@ -148,7 +149,8 @@ impl KafkaDebugLogger { let payload = serde_json::to_vec(&response).expect("requests should always serialize with rmp"); - self.num_responses.fetch_add(1, atomic::Ordering::Relaxed); + // this is just a debug counter so Relaxed is probably fine + self.num_responses.fetch_add(1, atomic::Ordering::AcqRel); self.background_log(payload) } diff --git a/web3_proxy/src/prometheus.rs b/web3_proxy/src/prometheus.rs index c3dc7517..9ea6abf1 100644 --- a/web3_proxy/src/prometheus.rs +++ b/web3_proxy/src/prometheus.rs @@ -22,7 +22,7 @@ pub async fn serve( .layer(Extension(app.clone())); // note: the port here might be 0 - let port = app.prometheus_port.load(Ordering::Relaxed); + let port = app.prometheus_port.load(Ordering::SeqCst); // TODO: config for the host? let addr = SocketAddr::from(([0, 0, 0, 0], port)); @@ -34,7 +34,7 @@ pub async fn serve( let port = server.local_addr().port(); info!("prometheus listening on port {}", port); - app.prometheus_port.store(port, Ordering::Relaxed); + app.prometheus_port.store(port, Ordering::SeqCst); server .with_graceful_shutdown(async move { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index fa3cac98..1a1423b4 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -672,7 +672,7 @@ impl ConsensusFinder { 0 => {} 1 => { for rpc in self.rpc_heads.keys() { - rpc.tier.store(1, atomic::Ordering::Relaxed) + rpc.tier.store(1, atomic::Ordering::Release) } } _ => { @@ -752,7 +752,7 @@ impl ConsensusFinder { trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier); - rpc.tier.store(tier, atomic::Ordering::Relaxed); + rpc.tier.store(tier, atomic::Ordering::Release); } } } @@ -810,7 +810,7 @@ impl ConsensusFinder { HashMap::with_capacity(num_known); for (rpc, rpc_head) in self.rpc_heads.iter() { - if !rpc.healthy.load(atomic::Ordering::Relaxed) { + if !rpc.healthy.load(atomic::Ordering::Acquire) { // TODO: should unhealthy servers get a vote? they were included in minmax_block. i think that is enough continue; } @@ -878,14 +878,14 @@ impl ConsensusFinder { pub fn best_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed)) + .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .min() } pub fn worst_tier(&self) -> Option { self.rpc_heads .iter() - .map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed)) + .map(|(x, _)| x.tier.load(atomic::Ordering::Acquire)) .max() } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index faae4a04..161b070f 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -155,7 +155,7 @@ impl Web3Rpc { let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.into(); - let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Relaxed) == 0) + let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_and_rpc_sender.is_some(); // have a sender for tracking hard limit anywhere. we use this in case we @@ -287,7 +287,7 @@ impl Web3Rpc { head_block = head_block.min(max_block); } - let tier = self.tier.load(atomic::Ordering::Relaxed); + let tier = self.tier.load(atomic::Ordering::Acquire); let backup = self.backup; @@ -354,7 +354,7 @@ impl Web3Rpc { let request_scaling = 0.01; // TODO: what ordering? let active_requests = - self.active_requests.load(atomic::Ordering::Relaxed) as f32 * request_scaling + 1.0; + self.active_requests.load(atomic::Ordering::Acquire) as f32 * request_scaling + 1.0; peak_latency.mul_f32(active_requests) } @@ -439,7 +439,7 @@ impl Web3Rpc { } self.block_data_limit - .store(limit, atomic::Ordering::Relaxed); + .store(limit, atomic::Ordering::Release); } if limit == Some(u64::MAX) { @@ -763,7 +763,7 @@ impl Web3Rpc { .await .web3_context("failed check_provider") { - self.healthy.store(false, atomic::Ordering::Relaxed); + self.healthy.store(false, atomic::Ordering::Release); return Err(err); } @@ -791,14 +791,14 @@ impl Web3Rpc { break; } - new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) - + rpc.external_requests.load(atomic::Ordering::Relaxed); + new_total_requests = rpc.internal_requests.load(atomic::Ordering::Acquire) + + rpc.external_requests.load(atomic::Ordering::Acquire); let detailed_healthcheck = new_total_requests - old_total_requests < 5; // TODO: if this fails too many times, reset the connection if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await { - rpc.healthy.store(false, atomic::Ordering::Relaxed); + rpc.healthy.store(false, atomic::Ordering::Release); // TODO: different level depending on the error handler // TODO: if rate limit error, set "retry_at" @@ -808,7 +808,7 @@ impl Web3Rpc { error!(?err, "health check on {} failed", rpc); } } else { - rpc.healthy.store(true, atomic::Ordering::Relaxed); + rpc.healthy.store(true, atomic::Ordering::Release); } // TODO: should we count the requests done inside this health check @@ -833,7 +833,7 @@ impl Web3Rpc { true }; - self.healthy.store(initial_check, atomic::Ordering::Relaxed); + self.healthy.store(initial_check, atomic::Ordering::Release); tokio::spawn(f) } else { @@ -849,7 +849,7 @@ impl Web3Rpc { // TODO: if this fails too many times, reset the connection if let Err(err) = rpc.check_provider().await { - rpc.healthy.store(false, atomic::Ordering::Relaxed); + rpc.healthy.store(false, atomic::Ordering::Release); // TODO: if rate limit error, set "retry_at" if rpc.backup { @@ -858,7 +858,7 @@ impl Web3Rpc { error!(?err, "provider check on {} failed", rpc); } } else { - rpc.healthy.store(true, atomic::Ordering::Relaxed); + rpc.healthy.store(true, atomic::Ordering::Release); } sleep(Duration::from_secs(health_sleep_seconds)).await; @@ -904,7 +904,7 @@ impl Web3Rpc { let (first_exit, _, _) = select_all(futures).await; // mark unhealthy - self.healthy.store(false, atomic::Ordering::Relaxed); + self.healthy.store(false, atomic::Ordering::Release); debug!(?first_exit, "subscriptions on {} exited", self); @@ -1163,7 +1163,7 @@ impl Web3Rpc { // TODO: if websocket is reconnecting, return an error? if !allow_unhealthy { - if !(self.healthy.load(atomic::Ordering::Relaxed)) { + if !(self.healthy.load(atomic::Ordering::Acquire)) { return Ok(OpenRequestResult::Failed); } @@ -1384,17 +1384,17 @@ impl Serialize for Web3Rpc { state.serialize_field( "external_requests", - &self.external_requests.load(atomic::Ordering::Relaxed), + &self.external_requests.load(atomic::Ordering::Acquire), )?; state.serialize_field( "internal_requests", - &self.internal_requests.load(atomic::Ordering::Relaxed), + &self.internal_requests.load(atomic::Ordering::Acquire), )?; state.serialize_field( "active_requests", - &self.active_requests.load(atomic::Ordering::Relaxed), + &self.active_requests.load(atomic::Ordering::Acquire), )?; { @@ -1423,7 +1423,7 @@ impl Serialize for Web3Rpc { state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; } { - let healthy = self.healthy.load(atomic::Ordering::Relaxed); + let healthy = self.healthy.load(atomic::Ordering::Acquire); state.serialize_field("healthy", &healthy)?; } @@ -1446,7 +1446,7 @@ impl fmt::Debug for Web3Rpc { f.field("backup", &self.backup); - f.field("tier", &self.tier.load(atomic::Ordering::Relaxed)); + f.field("tier", &self.tier.load(atomic::Ordering::Acquire)); f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 50652673..b47d82b0 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -145,7 +145,7 @@ impl Drop for OpenRequestHandle { fn drop(&mut self) { self.rpc .active_requests - .fetch_sub(1, atomic::Ordering::Relaxed); + .fetch_sub(1, atomic::Ordering::AcqRel); } } @@ -159,7 +159,7 @@ impl OpenRequestHandle { // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! rpc.active_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); let error_handler = error_handler.unwrap_or_default(); @@ -306,14 +306,16 @@ impl OpenRequestHandle { match &authorization.authorization_type { AuthorizationType::Frontend => { + // this is just a debug counter, so Relaxed is probably fine self.rpc .external_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); } AuthorizationType::Internal => { + // this is just a debug counter, so Relaxed is probably fine self.rpc .internal_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); } } diff --git a/web3_proxy/src/rpcs/request_handle.rs b/web3_proxy/src/rpcs/request_handle.rs deleted file mode 100644 index f0bcc86c..00000000 --- a/web3_proxy/src/rpcs/request_handle.rs +++ /dev/null @@ -1,559 +0,0 @@ -use super::one::Web3Rpc; -use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::authorization::{Authorization, AuthorizationType}; -use crate::globals::{global_db_conn, DB_CONN}; -use crate::jsonrpc::{ - self, JsonRpcErrorData, JsonRpcResultData, ResponsePayload, ValidatedRequest, -}; -use anyhow::Context; -use chrono::Utc; -use derive_more::From; -use entities::revert_log; -use entities::sea_orm_active_enums::Method; -use ethers::providers::ProviderError; -use ethers::types::{Address, Bytes}; -use futures::Future; -use http::StatusCode; -use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; -use nanorand::Rng; -use serde_json::json; -use std::pin::Pin; -use std::sync::atomic; -use std::sync::Arc; -use tokio::time::{Duration, Instant}; -use tracing::{debug, error, info, trace, warn, Level}; - -#[derive(From)] -pub enum OpenRequestResult { - Handle(OpenRequestHandle), - /// Unable to start a request. Retry at the given time. - RetryAt(Instant), - /// The rpc are not synced, but they should be soon. - /// You should wait for the given block number. - /// TODO: should this return an OpenRequestHandle? that might recurse - Lagged(Pin>> + Send>>), - /// Unable to start a request because no servers are synced or the necessary data has been pruned - Failed, -} - -/// Make RPC requests through this handle and drop it when you are done. -/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible -pub struct OpenRequestHandle { - web3_request: Arc, - error_handler: RequestErrorHandler, - rpc: Arc, -} - -/// Depending on the context, RPC errors require different handling. -#[derive(Copy, Clone, Debug, Default)] -pub enum RequestErrorHandler { - /// Log at the trace level. Use when errors are expected. - #[default] - TraceLevel, - /// Log at the debug level. Use when errors are expected. - DebugLevel, - /// Log at the info level. Use when errors are expected. - InfoLevel, - /// Log at the error level. Use when errors are bad. - ErrorLevel, - /// Log at the warn level. Use when errors do not cause problems. - WarnLevel, - /// Potentially save the revert. Users can tune how often this happens - Save, -} - -// TODO: second param could be skipped since we don't need it here -#[derive(serde::Deserialize, serde::Serialize)] -struct EthCallParams((EthCallFirstParams, Option)); - -#[derive(serde::Deserialize, serde::Serialize)] -struct EthCallFirstParams { - to: Option
, - data: Option, -} - -impl std::fmt::Debug for OpenRequestHandle { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("OpenRequestHandle") - .field("method", &self.web3_request.inner.method()) - .field("rpc", &self.rpc.name) - .finish_non_exhaustive() - } -} - -impl From for RequestErrorHandler { - fn from(level: Level) -> Self { - match level { - Level::DEBUG => RequestErrorHandler::DebugLevel, - Level::ERROR => RequestErrorHandler::ErrorLevel, - Level::INFO => RequestErrorHandler::InfoLevel, - Level::TRACE => RequestErrorHandler::TraceLevel, - Level::WARN => RequestErrorHandler::WarnLevel, - } - } -} - -impl Authorization { - /// Save a RPC call that return "execution reverted" to the database. - async fn save_revert( - self: Arc, - method: Method, - params: EthCallFirstParams, - ) -> Web3ProxyResult<()> { - let rpc_key_id = match self.checks.rpc_secret_key_id { - Some(rpc_key_id) => rpc_key_id.into(), - None => { - // trace!(?self, "cannot save revert without rpc_key_id"); - return Ok(()); - } - }; - - let db_conn = global_db_conn()?; - - // TODO: should the database set the timestamp? - // we intentionally use "now" and not the time the request started - // why? because we aggregate stats and setting one in the past could cause confusion - let timestamp = Utc::now(); - - let to = params.to.unwrap_or_else(Address::zero).as_bytes().to_vec(); - - let call_data = params.data.map(|x| x.to_string()); - - let rl = revert_log::ActiveModel { - rpc_key_id: sea_orm::Set(rpc_key_id), - method: sea_orm::Set(method), - to: sea_orm::Set(to), - call_data: sea_orm::Set(call_data), - timestamp: sea_orm::Set(timestamp), - ..Default::default() - }; - - let rl = rl - .save(&db_conn) - .await - .web3_context("Failed saving new revert log")?; - - // TODO: what log level and format? - trace!(revert_log=?rl); - - // TODO: return something useful - Ok(()) - } -} - -impl Drop for OpenRequestHandle { - fn drop(&mut self) { - self.rpc - .active_requests - .fetch_sub(1, atomic::Ordering::Relaxed); - } -} - -impl OpenRequestHandle { - pub async fn new( - web3_request: Arc, - rpc: Arc, - error_handler: Option, - ) -> Self { - // TODO: take request_id as an argument? - // TODO: attach a unique id to this? customer requests have one, but not internal queries - // TODO: what ordering?! - rpc.active_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - let error_handler = error_handler.unwrap_or_default(); - - Self { - web3_request, - error_handler, - rpc, - } - } - - pub fn connection_name(&self) -> String { - self.rpc.name.clone() - } - - #[inline] - pub fn clone_connection(&self) -> Arc { - self.rpc.clone() - } - - pub fn rate_limit_for(&self, duration: Duration) { - if self.rpc.backup { - debug!(?duration, "rate limited on {}!", self.rpc); - } else { - warn!(?duration, "rate limited on {}!", self.rpc); - } - - // TODO: use send_if_modified to be sure we only send if our value is greater - self.rpc - .hard_limit_until - .as_ref() - .unwrap() - .send_replace(Instant::now() + duration); - } - - /// Just get the response from the provider without any extra handling. - /// This lets us use the try operator which makes it much easier to read - async fn _request( - &self, - ) -> Web3ProxyResult> { - // TODO: replace ethers-rs providers with our own that supports streaming the responses - // TODO: replace ethers-rs providers with our own that handles "id" being null - if let (Some(url), Some(ref client)) = (self.rpc.http_url.clone(), &self.rpc.http_client) { - // prefer the http provider - let request = self - .web3_request - .inner - .jsonrpc_request() - .context("there should always be a request here")?; - - let response = client.post(url).json(request).send().await?; - - if response.status() == StatusCode::TOO_MANY_REQUESTS { - // TODO: how much should we actually rate limit? - self.rate_limit_for(Duration::from_secs(1)); - } - - let response = response.error_for_status()?; - - jsonrpc::SingleResponse::read_if_short(response, 1024, &self.web3_request).await - } else if let Some(p) = self.rpc.ws_provider.load().as_ref() { - // use the websocket provider if no http provider is available - let method = self.web3_request.inner.method(); - let params = self.web3_request.inner.params(); - - // some ethers::ProviderError need to be converted to JsonRpcErrorData. the rest to Web3ProxyError - let response = match p.request::<_, R>(method, params).await { - Ok(x) => jsonrpc::ParsedResponse::from_result(x, self.web3_request.id()), - Err(provider_error) => match JsonRpcErrorData::try_from(&provider_error) { - Ok(x) => jsonrpc::ParsedResponse::from_error(x, self.web3_request.id()), - Err(ProviderError::HTTPError(error)) => { - if let Some(status_code) = error.status() { - if status_code == StatusCode::TOO_MANY_REQUESTS { - // TODO: how much should we actually rate limit? - self.rate_limit_for(Duration::from_secs(1)); - } - } - return Err(provider_error.into()); - } - Err(err) => { - warn!(?err, "error from {}", self.rpc); - - return Err(provider_error.into()); - } - }, - }; - - Ok(response.into()) - } else { - // this must be a test - Err(anyhow::anyhow!("no provider configured!").into()) - } - } - - pub fn error_handler(&self) -> RequestErrorHandler { - if let RequestErrorHandler::Save = self.error_handler { - let method = self.web3_request.inner.method(); - - // TODO: should all these be Trace or Debug or a mix? - // TODO: this list should come from config. other methods might be desired - if !["eth_call", "eth_estimateGas"].contains(&method) { - // trace!(%method, "skipping save on revert"); - RequestErrorHandler::TraceLevel - } else if DB_CONN.read().is_ok() { - let log_revert_chance = self.web3_request.authorization.checks.log_revert_chance; - - if log_revert_chance == 0 { - // trace!(%method, "no chance. skipping save on revert"); - RequestErrorHandler::TraceLevel - } else if log_revert_chance == u16::MAX { - // trace!(%method, "gaurenteed chance. SAVING on revert"); - self.error_handler - } else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance { - // trace!(%method, "missed chance. skipping save on revert"); - RequestErrorHandler::TraceLevel - } else { - // trace!("Saving on revert"); - // TODO: is always logging at debug level fine? - self.error_handler - } - } else { - // trace!(%method, "no database. skipping save on revert"); - RequestErrorHandler::TraceLevel - } - } else { - self.error_handler - } - } - - /// Send a web3 request - /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented - /// depending on how things are locked, you might need to pass the provider in - /// we take self to ensure this function only runs once - /// This does some inspection of the response to check for non-standard errors and rate limiting to try to give a Web3ProxyError instead of an Ok - pub async fn request( - self, - ) -> Web3ProxyResult> { - // TODO: use tracing spans - // TODO: including params in this log is way too verbose - // trace!(rpc=%self.rpc, %method, "request"); - trace!("requesting from {}", self.rpc); - - let authorization = &self.web3_request.authorization; - - match &authorization.authorization_type { - AuthorizationType::Frontend => { - self.rpc - .external_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - AuthorizationType::Internal => { - self.rpc - .internal_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - } - - // we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions) - - // we generally don't want to use the try operator. we might need to log errors - let start = Instant::now(); - - let mut response = self._request().await; - - // measure successes and errors - // originally i thought we wouldn't want errors, but I think it's a more accurate number including all requests - let latency = start.elapsed(); - - // we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called! - - trace!( - "response from {} for {}: {:?}", - self.rpc, - self.web3_request, - response, - ); - - // TODO: move this to a helper function? - // true if we got a jsonrpc result. a jsonrpc error or other error is false. - // TODO: counters for errors vs jsonrpc vs success? - let response_is_success = match &response { - Ok(jsonrpc::SingleResponse::Parsed(x)) => { - matches!(&x.payload, ResponsePayload::Success { .. }) - } - Ok(jsonrpc::SingleResponse::Stream(..)) => true, - Err(_) => false, - }; - - if response_is_success { - // only track latency for successful requests - tokio::spawn(async move { - self.rpc.peak_latency.as_ref().unwrap().report(latency); - self.rpc.median_latency.as_ref().unwrap().record(latency); - - // TODO: app-wide median and peak latency? - }); - } else { - // only save reverts for some types of calls - // TODO: do something special for eth_sendRawTransaction too - // we do **NOT** use self.error_handler here because it might have been modified - let error_handler = self.error_handler(); - - enum ResponseType { - Error, - Revert, - RateLimited, - } - - let response_type: ResponseType = match &response { - Ok(jsonrpc::SingleResponse::Parsed(x)) => match &x.payload { - ResponsePayload::Success { .. } => unreachable!(), - ResponsePayload::Error { error } => { - trace!(?error, "jsonrpc error data"); - - if error.message.starts_with("execution reverted") { - ResponseType::Revert - } else if error.code == StatusCode::TOO_MANY_REQUESTS.as_u16() as i64 { - ResponseType::RateLimited - } else { - // TODO! THIS HAS TOO MANY FALSE POSITIVES! Theres another spot in the code that checks for things. - // if error.message.contains("limit") || error.message.contains("request") { - // self.rate_limit_for(Duration::from_secs(1)); - // } - - match error.code { - -32000 => { - if error.message.contains("MDBX_PANIC:") { - response = Err(Web3ProxyError::MdbxPanic( - self.connection_name(), - error.message.clone(), - )); - } else { - // TODO: regex? - let archive_prefixes = [ - "header not found", - "header for hash not found", - "missing trie node", - ]; - for prefix in archive_prefixes { - if error.message.starts_with(prefix) { - // TODO: what error? - response = Err(Web3ProxyError::ArchiveRequired { - min: self.web3_request.min_block_needed(), - max: self.web3_request.max_block_needed(), - }); - break; - } - } - } - - ResponseType::Error - } - -32001 => { - if error.message == "Exceeded the quota usage" { - ResponseType::RateLimited - } else { - ResponseType::Error - } - } - -32005 => { - if error.message == "rate limit exceeded" { - ResponseType::RateLimited - } else { - ResponseType::Error - } - } - -32601 => { - let error_msg = error.message.as_ref(); - - // sometimes a provider does not support all rpc methods - // we check other connections rather than returning the error - // but sometimes the method is something that is actually unsupported, - // so we save the response here to return it later - - // some providers look like this - if (error_msg.starts_with("the method") - && error_msg.ends_with("is not available")) - || error_msg == "Method not found" - { - let method = self.web3_request.inner.method().to_string(); - - response = - Err(Web3ProxyError::MethodNotFound(method.into())) - } - - ResponseType::Error - } - _ => ResponseType::Error, - } - } - } - }, - Ok(jsonrpc::SingleResponse::Stream(..)) => unreachable!(), - Err(_) => ResponseType::Error, - }; - - if matches!(response_type, ResponseType::RateLimited) { - // TODO: how long? - self.rate_limit_for(Duration::from_secs(1)); - } - - match error_handler { - RequestErrorHandler::DebugLevel => { - // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag - if matches!(response_type, ResponseType::Revert) { - trace!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "revert", - ); - } else { - debug!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - } - } - RequestErrorHandler::InfoLevel => { - info!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - } - RequestErrorHandler::TraceLevel => { - trace!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - } - RequestErrorHandler::ErrorLevel => { - // TODO: only include params if not running in release mode - error!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - } - RequestErrorHandler::WarnLevel => { - // TODO: only include params if not running in release mode - warn!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - } - RequestErrorHandler::Save => { - trace!( - rpc=%self.rpc, - %self.web3_request, - ?response, - "bad response", - ); - - // TODO: do not unwrap! (doesn't matter much since we check method as a string above) - // TODO: open this up for even more methods - let method: Method = - Method::try_from_value(&self.web3_request.inner.method().to_string()) - .unwrap(); - - // TODO: i don't think this prsing is correct - match serde_json::from_value::(json!(self - .web3_request - .inner - .params())) - { - Ok(params) => { - // spawn saving to the database so we don't slow down the request - // TODO: log if this errors - // TODO: aren't the method and params already saved? this should just need the response - let f = authorization.clone().save_revert(method, params.0 .0); - - tokio::spawn(f); - } - Err(err) => { - warn!( - %self.web3_request, - ?response, - ?err, - "failed parsing eth_call params. unable to save revert", - ); - } - } - } - } - } - - response - } -} diff --git a/web3_proxy/src/rpcs/ws.rs b/web3_proxy/src/rpcs/ws.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 06961806..66c8c201 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -551,20 +551,20 @@ impl RpcQueryStats { // TODO: do this without a clone let authorization = metadata.authorization.clone(); - let archive_request = metadata.archive_request.load(Ordering::Relaxed); + let archive_request = metadata.archive_request.load(Ordering::Acquire); // TODO: do this without cloning. we can take their vec let backend_rpcs_used = metadata.backend_rpcs_used(); let request_bytes = metadata.inner.num_bytes() as u64; - let response_bytes = metadata.response_bytes.load(Ordering::Relaxed); + let response_bytes = metadata.response_bytes.load(Ordering::Acquire); - let mut error_response = metadata.error_response.load(Ordering::Relaxed); - let mut response_millis = metadata.response_millis.load(Ordering::Relaxed); + let mut error_response = metadata.error_response.load(Ordering::Acquire); + let mut response_millis = metadata.response_millis.load(Ordering::Acquire); - let user_error_response = metadata.user_error_response.load(Ordering::Relaxed); + let user_error_response = metadata.user_error_response.load(Ordering::Acquire); - let response_timestamp = match metadata.response_timestamp.load(Ordering::Relaxed) { + let response_timestamp = match metadata.response_timestamp.load(Ordering::Acquire) { 0 => { // no response timestamp! if !error_response { diff --git a/web3_proxy_cli/src/test_utils/app.rs b/web3_proxy_cli/src/test_utils/app.rs index 2ac912e4..9c88de1c 100644 --- a/web3_proxy_cli/src/test_utils/app.rs +++ b/web3_proxy_cli/src/test_utils/app.rs @@ -154,7 +154,7 @@ impl TestApp { }) }; - let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed); + let mut frontend_port = frontend_port_arc.load(Ordering::SeqCst); let start = Instant::now(); while frontend_port == 0 { // we have to give it some time because it might have to do migrations @@ -163,7 +163,7 @@ impl TestApp { } sleep(Duration::from_millis(10)).await; - frontend_port = frontend_port_arc.load(Ordering::Relaxed); + frontend_port = frontend_port_arc.load(Ordering::SeqCst); } let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port);