From c54970da0ad77201fdf1503ab6ab465a79db5dc2 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 12 May 2023 23:00:03 -0700 Subject: [PATCH] change ordering and move fetch_add and fetch_sub --- README.md | 2 +- deferred-rate-limiter/src/lib.rs | 2 +- latency/src/peak_ewma/rtt_estimate.rs | 4 +- latency/src/util/atomic_f32_pair.rs | 6 +-- web3_proxy/src/app/mod.rs | 4 +- web3_proxy/src/frontend/authorization.rs | 4 +- web3_proxy/src/rpcs/many.rs | 10 ++--- web3_proxy/src/rpcs/one.rs | 24 ++++++++--- web3_proxy/src/rpcs/request.rs | 54 ++++++++++++++---------- 9 files changed, 65 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index af5dabf1..4461169e 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or simi Signed transactions (eth_sendRawTransaction) are sent in parallel to the configured private RPCs (eden, ethermine, flashbots, etc.). -All other requests are sent to an RPC server on the latest block (alchemy, moralis, rivet, your own node, or one of many other providers). If multiple servers are in sync, they are prioritized by `active_requests/soft_limit`. Note that this means that the fastest server is most likely to serve requests and slow servers are unlikely to ever get any requests. +All other requests are sent to an RPC server on the latest block (llamanodes, alchemy, moralis, rivet, your own node, or one of many other providers). If multiple servers are in sync, they are prioritized by `active_requests` and request latency. Note that this means that the fastest server is most likely to serve requests and slow servers are unlikely to ever get any requests. Each server has different limits to configure. The `soft_limit` is the number of parallel active requests where a server starts to slow down. The `hard_limit` is where a server starts giving rate limits or other errors. diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 8f055ce3..f02c69f8 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -139,7 +139,7 @@ where Ok(deferred_rate_limit_result) } else { // we have a cached amount here - let cached_key_count = local_key_count.fetch_add(count, Ordering::Acquire); + let cached_key_count = local_key_count.fetch_add(count, Ordering::AcqRel); // assuming no other parallel futures incremented this key, this is the count that redis has let expected_key_count = cached_key_count + count; diff --git a/latency/src/peak_ewma/rtt_estimate.rs b/latency/src/peak_ewma/rtt_estimate.rs index be56fe9c..c169ea4a 100644 --- a/latency/src/peak_ewma/rtt_estimate.rs +++ b/latency/src/peak_ewma/rtt_estimate.rs @@ -97,7 +97,7 @@ impl AtomicRttEstimate { /// This method omits the ordering argument since loads may use /// slightly stale data to avoid adding additional latency. pub fn load(&self) -> RttEstimate { - RttEstimate::from_pair(self.pair.load(Ordering::Relaxed), self.start_time) + RttEstimate::from_pair(self.pair.load(Ordering::Acquire), self.start_time) } /// Fetches the value, and applies a function to it that returns an @@ -114,7 +114,7 @@ impl AtomicRttEstimate { let mut update_at = Instant::now(); let mut rtt = Duration::ZERO; self.pair - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |pair| { + .fetch_update(Ordering::Release, Ordering::Acquire, |pair| { rtt = f(RttEstimate::from_pair(pair, self.start_time)); // Save the new update_at inside the function in case it // is run multiple times diff --git a/latency/src/util/atomic_f32_pair.rs b/latency/src/util/atomic_f32_pair.rs index fa74fa0b..18b356c3 100644 --- a/latency/src/util/atomic_f32_pair.rs +++ b/latency/src/util/atomic_f32_pair.rs @@ -69,7 +69,7 @@ mod tests { fn test_atomic_f32_pair_load() { let pair = [f32::consts::PI, f32::consts::E]; let atomic = AtomicF32Pair::new(pair); - assert_eq!(pair, atomic.load(Ordering::Relaxed)); + assert_eq!(pair, atomic.load(Ordering::Acquire)); } #[test] @@ -77,13 +77,13 @@ mod tests { let pair = [f32::consts::PI, f32::consts::E]; let atomic = AtomicF32Pair::new(pair); atomic - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |[f1, f2]| { + .fetch_update(Ordering::Release, Ordering::Acquire, |[f1, f2]| { Some([f1 + 1.0, f2 + 1.0]) }) .unwrap(); assert_eq!( [pair[0] + 1.0, pair[1] + 1.0], - atomic.load(Ordering::Relaxed) + atomic.load(Ordering::Acquire) ); } } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a7cfc851..ce1ac9ba 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1780,7 +1780,7 @@ impl Web3ProxyApp { if block_depth < self.config.archive_depth { request_metadata .archive_request - .store(true, atomic::Ordering::Relaxed); + .store(true, atomic::Ordering::Release); } let request_block = self @@ -1810,7 +1810,7 @@ impl Web3ProxyApp { if block_depth < self.config.archive_depth { request_metadata .archive_request - .store(true, atomic::Ordering::Relaxed); + .store(true, atomic::Ordering::Release); } let from_block = self diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 15982954..6ba772f0 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -205,7 +205,7 @@ impl KafkaDebugLogger { let payload = rmp_serde::to_vec(&request).expect("requests should always serialize with rmp"); - self.num_requests.fetch_add(1, atomic::Ordering::SeqCst); + self.num_requests.fetch_add(1, atomic::Ordering::AcqRel); self.background_log(payload) } @@ -217,7 +217,7 @@ impl KafkaDebugLogger { let payload = rmp_serde::to_vec(&response).expect("requests should always serialize with rmp"); - self.num_responses.fetch_add(1, atomic::Ordering::SeqCst); + self.num_responses.fetch_add(1, atomic::Ordering::AcqRel); self.background_log(payload) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6addeb6f..82924eb0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -620,7 +620,7 @@ impl Web3Rpcs { } if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); + request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } match earliest_retry_at { @@ -929,7 +929,7 @@ impl Web3Rpcs { // TODO: have a separate column for rate limited? if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); + request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } tokio::select! { @@ -943,7 +943,7 @@ impl Web3Rpcs { } OpenRequestResult::NotReady => { if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); + request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } let waiting_for = min_block_needed.max(max_block_needed); @@ -1086,7 +1086,7 @@ impl Web3Rpcs { if let Some(request_metadata) = &request_metadata { // TODO: if this times out, i think we drop this - request_metadata.no_servers.fetch_add(1, Ordering::Release); + request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } watch_consensus_rpcs.changed().await?; @@ -1101,7 +1101,7 @@ impl Web3Rpcs { warn!("All rate limits exceeded. Sleeping"); if let Some(request_metadata) = &request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Release); + request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } tokio::select! { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 416055a8..ee792049 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -239,7 +239,7 @@ impl Web3Rpc { let peak_latency = self.peak_latency.as_ref().unwrap().latency().as_secs_f64(); // TODO: what ordering? - let active_requests = self.active_requests.load(atomic::Ordering::Relaxed) as f64 + 1.0; + let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; OrderedFloat(peak_latency * active_requests) } @@ -734,7 +734,7 @@ impl Web3Rpc { // health check as a way of keeping this rpc's request_ewma accurate // TODO: do something different if this is a backup server? - new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); + new_total_requests = rpc.total_requests.load(atomic::Ordering::Acquire); // TODO: how many requests should we require in order to skip a health check? if new_total_requests - old_total_requests < 10 { @@ -1363,7 +1363,7 @@ impl Serialize for Web3Rpc { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Rpc", 9)?; + let mut state = serializer.serialize_struct("Web3Rpc", 12)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1372,7 +1372,7 @@ impl Serialize for Web3Rpc { state.serialize_field("backup", &self.backup)?; - match self.block_data_limit.load(atomic::Ordering::Relaxed) { + match self.block_data_limit.load(atomic::Ordering::Acquire) { u64::MAX => { state.serialize_field("block_data_limit", &None::<()>)?; } @@ -1395,9 +1395,21 @@ impl Serialize for Web3Rpc { state.serialize_field("head_latency", &self.head_latency.read().value())?; + state.serialize_field( + "peak_latency", + &self.peak_latency.as_ref().unwrap().latency(), + )?; + + state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?; + + state.serialize_field( + "active_requests", + &self.active_requests.load(atomic::Ordering::Acquire), + )?; + state.serialize_field( "total_requests", - &self.total_requests.load(atomic::Ordering::Relaxed), + &self.total_requests.load(atomic::Ordering::Acquire), )?; state.end() @@ -1410,7 +1422,7 @@ impl fmt::Debug for Web3Rpc { f.field("name", &self.name); - let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); + let block_data_limit = self.block_data_limit.load(atomic::Ordering::Acquire); if block_data_limit == u64::MAX { f.field("blocks", &"all"); } else { diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index ed0474cc..445f5d7a 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -7,7 +7,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::ProviderError; use ethers::types::{Address, Bytes}; -use log::{debug, error, trace, warn, Level}; +use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use serde_json::json; use std::fmt; @@ -121,14 +121,22 @@ impl Authorization { } } +impl Drop for OpenRequestHandle { + fn drop(&mut self) { + let x = self + .rpc + .active_requests + .fetch_sub(1, atomic::Ordering::AcqRel); + } +} + impl OpenRequestHandle { pub async fn new(authorization: Arc, rpc: Arc) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! - // TODO: should we be using metered, or not? i think not because we want stats for each handle - // TODO: these should maybe be sent to an influxdb instance? - rpc.active_requests.fetch_add(1, atomic::Ordering::Relaxed); + rpc.active_requests + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); Self { authorization, rpc } } @@ -188,11 +196,9 @@ impl OpenRequestHandle { self.rpc .total_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); - self.rpc - .active_requests - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions) let start = Instant::now(); @@ -212,16 +218,16 @@ impl OpenRequestHandle { }; // note. we intentionally do not record this latency now. we do NOT want to measure errors - // let latency = latency.elapsed(); + let latency = start.elapsed(); - self.rpc - .active_requests - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + // we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called! - // TODO: i think ethers already has trace logging (and does it much more fancy) trace!( "response from {} for {} {:?}: {:?}", - self.rpc, method, params, response, + self.rpc, + method, + params, + response, ); if let Err(err) = &response { @@ -352,19 +358,21 @@ impl OpenRequestHandle { // TODO: do not unwrap! (doesn't matter much since we check method as a string above) let method: Method = Method::try_from_value(&method.to_string()).unwrap(); - // TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here - let params: EthCallParams = serde_json::from_value(json!(params)) - .context("parsing params to EthCallParams") - .unwrap(); + match serde_json::from_value::(json!(params)) { + Ok(params) => { + // spawn saving to the database so we don't slow down the request + let f = self.authorization.clone().save_revert(method, params.0 .0); - // spawn saving to the database so we don't slow down the request - let f = self.authorization.clone().save_revert(method, params.0 .0); - - tokio::spawn(f); + tokio::spawn(f); + } + Err(err) => { + warn!("failed parsing eth_call params. unable to save revert"); + } + } } } } else if let Some(peak_latency) = &self.rpc.peak_latency { - peak_latency.report(start.elapsed()); + peak_latency.report(latency); } else { unreachable!("peak_latency not initialized"); }