From 8548753a32d006933c1a7f26fd23f553d8d8aafc Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 6 May 2022 20:44:12 +0000 Subject: [PATCH] set overall max inside the lock --- TODO.md | 34 +++------------ config/example.toml | 3 ++ web3-proxy/src/connection.rs | 47 +++++++------------- web3-proxy/src/connections.rs | 81 ++++++++++++++++++++++++++--------- web3-proxy/src/main.rs | 31 +++++++++++--- 5 files changed, 108 insertions(+), 88 deletions(-) diff --git a/TODO.md b/TODO.md index fcec1baf..8271cf84 100644 --- a/TODO.md +++ b/TODO.md @@ -1,32 +1,8 @@ # Todo +- [ ] the ethermine rpc is usually fastest. but its in the private tier. since we only allow synced rpcs, we are going to not have an rpc a lot of the time + - [ ] if not backends. return a 502 instead of delaying? - [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high -- [ ] thundering herd problem if we only allow a lag of 1 block. soft rate limits should help - -# notes -its almost working. when i curl it, it doesn't work exactly right though - -## first time: - - ``` - thread 'tokio-runtime-worker' panicked at 'not implemented', src/provider_tiers.rs:142:13 - note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace - ``` - -I think this is not seeing any as in sync. not sure why else it would not have any not_until set. -I believe this is because we don't know the first block. we should force an update or something at the start - -## second time: -"false" - -it loses all the "jsonrpc" parts and just has the simple result. need to return a proper jsonrpc response - -# TODO: add the backend server to the header - -# random thoughts: - -the web3proxyapp object gets cloned for every call. why do we need any arcs inside that? shouldn't they be able to connect to the app's? - -on friday i had it over 100k rps. but now, even when i roll back to that commit, i can't get it that high. what changed? - -i think we need a top level head block. otherwise if tier0 stalls, we will keep using it \ No newline at end of file +- [ ] thundering herd problem if we only allow a lag of 0 blocks. i don't see any solution besides allowing a one or two block lag +- [ ] add the backend server to the header? +- [ ] the web3proxyapp object gets cloned for every call. why do we need any arcs inside that? shouldn't they be able to connect to the app's? diff --git a/config/example.toml b/config/example.toml index f2cba3ad..aeef796d 100644 --- a/config/example.toml +++ b/config/example.toml @@ -6,6 +6,9 @@ url = "ws://127.0.0.1:8546" soft_limit = 200_000 + [balanced_rpc_tiers.0.ankr] + url = "https://rpc.ankr.com/eth" + soft_limit = 3_000 [private_rpcs] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index fe30d4d0..3c232bc8 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -130,6 +130,11 @@ impl Web3Connection { self.head_block_number.load(atomic::Ordering::Acquire) } + #[inline] + pub fn soft_limit(&self) -> u32 { + self.soft_limit + } + #[inline] pub fn url(&self) -> &str { &self.url @@ -140,7 +145,6 @@ impl Web3Connection { pub async fn new_heads( self: Arc, connections: Option>, - best_head_block_number: Arc, ) -> anyhow::Result<()> { info!("Watching new_heads on {}", self); @@ -170,18 +174,10 @@ impl Web3Connection { .swap(block_number, atomic::Ordering::AcqRel); if old_block_number != block_number { - info!("new block on {}: {}", self, block_number); - - // we don't care about this result. - let _ = best_head_block_number.compare_exchange( - old_block_number, - block_number, - atomic::Ordering::AcqRel, - atomic::Ordering::Acquire, - ); - if let Some(connections) = &connections { connections.update_synced_rpcs(&self)?; + } else { + info!("new block on {}: {}", self, block_number); } } } @@ -206,37 +202,24 @@ impl Web3Connection { drop(active_request_handle); - info!("current block on {}: {}", self, block_number); - - let old_block_number = self - .head_block_number - .swap(block_number, atomic::Ordering::Release); - - // we don't care about this result - let _ = best_head_block_number.compare_exchange( - old_block_number, - block_number, - atomic::Ordering::AcqRel, - atomic::Ordering::Acquire, - ); + // TODO: swap and check the result? + self.head_block_number + .store(block_number, atomic::Ordering::Release); if let Some(connections) = &connections { connections.update_synced_rpcs(&self)?; + } else { + info!("new head block from {}: {}", self, block_number); } - while let Some(block) = stream.next().await { - let block_number = block.number.unwrap().as_u64(); + while let Some(new_block) = stream.next().await { + let new_block_number = new_block.number.unwrap().as_u64(); // TODO: only store if this isn't already stored? // TODO: also send something to the provider_tier so it can sort? // TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run self.head_block_number - .store(block_number, atomic::Ordering::Release); - - // TODO: what ordering? - best_head_block_number.fetch_max(block_number, atomic::Ordering::AcqRel); - - info!("new block on {}: {}", self, block_number); + .fetch_max(new_block_number, atomic::Ordering::AcqRel); if let Some(connections) = &connections { connections.update_synced_rpcs(&self)?; diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 3204a953..66f9ae49 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -11,7 +11,7 @@ use std::cmp; use std::fmt; use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; -use tracing::warn; +use tracing::{debug, info, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; @@ -90,15 +90,11 @@ impl Web3Connections { // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); let connections = connections.clone(); - let best_head_block_number = best_head_block_number.clone(); tokio::spawn(async move { let url = connection.url().to_string(); // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection - .new_heads(Some(connections), best_head_block_number) - .await - { + if let Err(e) = connection.new_heads(Some(connections)).await { warn!("new_heads error on {}: {:?}", url, e); } }); @@ -202,10 +198,30 @@ impl Web3Connections { new_block.cmp(¤t_best_block_number), ) { (cmp::Ordering::Greater, cmp::Ordering::Greater) => { - // this newest block is the new overall best block + // the rpc's newest block is the new overall best block synced_connections.inner.clear(); synced_connections.head_block_number = new_block; + + // TODO: what ordering? + match self.best_head_block_number.compare_exchange( + overall_best_head_block, + new_block, + atomic::Ordering::AcqRel, + atomic::Ordering::Acquire, + ) { + Ok(current_best_block_number) => { + info!("new head block from {}: {}", rpc, current_best_block_number); + } + Err(current_best_block_number) => { + // actually, there was a race and this ended up not being the latest block. return now without adding this rpc to the synced list + debug!( + "behind {} on {:?}: {}", + current_best_block_number, rpc, new_block + ); + return Ok(()); + } + } } (cmp::Ordering::Equal, cmp::Ordering::Less) => { // no need to do anything @@ -242,7 +258,9 @@ impl Web3Connections { synced_connections.head_block_number = new_block; } (cmp::Ordering::Greater, cmp::Ordering::Equal) => { - panic!("Greater+Equal should be impossible") + // TODO: what should we do? i think we got here because we aren't using atomics properly + // the overall block hasn't yet updated, but our internal block has + // TODO: maybe we should } } @@ -252,8 +270,11 @@ impl Web3Connections { .position(|x| x.url() == rpc.url()) .unwrap(); + // TODO: hopefully nothing ends up in here twice. Greater+Equal might do that to us synced_connections.inner.push(rpc_index); + trace!("Now synced {:?}: {:?}", self, synced_connections.inner); + Ok(()) } @@ -266,25 +287,39 @@ impl Web3Connections { // TODO: this clone is probably not the best way to do this let mut synced_rpc_indexes = self.synced_connections.read().inner.clone(); - let cache: HashMap = synced_rpc_indexes + // // TODO: how should we include the soft limit? floats are slower than integer math + // let a = a as f32 / self.soft_limit as f32; + // let b = b as f32 / other.soft_limit as f32; + + let sort_cache: HashMap = synced_rpc_indexes .iter() .map(|synced_index| { - ( - *synced_index, - self.inner.get(*synced_index).unwrap().active_requests(), - ) + let key = *synced_index; + + let connection = self.inner.get(*synced_index).unwrap(); + + let active_requests = connection.active_requests(); + let soft_limit = connection.soft_limit(); + + let utilization = active_requests as f32 / soft_limit as f32; + + (key, (utilization, soft_limit)) }) .collect(); // TODO: i think we might need to load active connections and then synced_rpc_indexes.sort_unstable_by(|a, b| { - let a = cache.get(a).unwrap(); - let b = cache.get(b).unwrap(); + let (a_utilization, a_soft_limit) = sort_cache.get(a).unwrap(); + let (b_utilization, b_soft_limit) = sort_cache.get(b).unwrap(); - // TODO: don't just sort by active requests. sort by active requests as a percentage of soft limit - // TODO: if those are equal, sort on soft limit - - a.cmp(b) + // TODO: i'm comparing floats. crap + match a_utilization + .partial_cmp(b_utilization) + .unwrap_or(cmp::Ordering::Equal) + { + cmp::Ordering::Equal => a_soft_limit.cmp(b_soft_limit), + x => x, + } }); for selected_rpc in synced_rpc_indexes.into_iter() { @@ -295,10 +330,16 @@ impl Web3Connections { Err(not_until) => { earliest_possible(&mut earliest_not_until, not_until); } - Ok(handle) => return Ok(handle), + Ok(handle) => { + trace!("next server on {:?}: {:?}", self, selected_rpc); + return Ok(handle); + } } } + // TODO: this is too verbose + // warn!("no servers on {:?}! {:?}", self, earliest_not_until); + // this might be None Err(earliest_not_until) } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 6f9ba347..31e9f3a4 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -175,17 +175,32 @@ impl Web3ProxyApp { } else { // this is not a private transaction (or no private relays are configured) // try to send to each tier, stopping at the first success + // if no tiers are synced, fallback to privates loop { // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again let mut earliest_not_until = None; - for balanced_rpcs in self.balanced_rpc_tiers.iter() { - let current_block = balanced_rpcs.head_block_number(); // TODO: we don't store current block for everything anymore. we store it on the connections + // TODO: how can we better build this iterator? + let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() { + self.balanced_rpc_tiers.iter().chain(vec![private_rpcs]) + } else { + self.balanced_rpc_tiers.iter().chain(vec![]) + }; + + for balanced_rpcs in rpc_iter { + let best_head_block_number = + self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections + + let best_rpc_block_number = balanced_rpcs.head_block_number(); + + if best_rpc_block_number < best_head_block_number { + continue; + } // TODO: building this cache key is slow and its large, but i don't see a better way right now // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block let cache_key = ( - current_block, + best_head_block_number, json_body.method.clone(), json_body.params.to_string(), ); @@ -212,7 +227,6 @@ impl Web3ProxyApp { // info!("forwarding request from {}", upstream_server); let response = JsonRpcForwardedResponse { - // TODO: re-use their jsonrpc? jsonrpc: "2.0".to_string(), id: json_body.id, // TODO: since we only use the result here, should that be all we return from try_send_request? @@ -245,7 +259,7 @@ impl Web3ProxyApp { return Ok(warp::reply::json(&response)); } Err(None) => { - // TODO: this is too verbose. if there are other servers in other tiers, use those! + // TODO: this is too verbose. if there are other servers in other tiers, we use those! // warn!("No servers in sync!"); } Err(Some(not_until)) => { @@ -267,7 +281,8 @@ impl Web3ProxyApp { } } - // we haven't returned an Ok, sleep and try again + // we haven't returned an Ok + // if we did return a rate limit error, sleep and try again if let Some(earliest_not_until) = earliest_not_until { let deadline = earliest_not_until.wait_time_from(self.clock.now()); @@ -275,7 +290,9 @@ impl Web3ProxyApp { } else { // TODO: how long should we wait? // TODO: max wait time? - sleep(Duration::from_millis(500)).await; + warn!("No servers in sync!"); + // TODO: return json error? return a 502? + return Err(anyhow::anyhow!("no servers in sync")); }; } }