From c0fc999e02e1cb5d0291367920bccf42ca0e82ce Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 3 Jan 2023 22:37:51 -0800 Subject: [PATCH] change weight to tier --- TODO.md | 2 +- web3_proxy/src/bin/web3_proxy.rs | 4 +-- web3_proxy/src/config.rs | 10 +++---- web3_proxy/src/rpcs/connection.rs | 24 ++++++++--------- web3_proxy/src/rpcs/connections.rs | 43 ++++++++++++++++-------------- 5 files changed, 42 insertions(+), 41 deletions(-) diff --git a/TODO.md b/TODO.md index b2957393..b27e5964 100644 --- a/TODO.md +++ b/TODO.md @@ -293,6 +293,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] serde collect unknown fields in config instead of crash - [x] upgrade user tier by address - [x] all_backend_connections skips syncing servers +- [x] change weight back to tier - [-] fix multiple origin and referer checks - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in or spawned since it will slow things down and will make their calls less private @@ -541,7 +542,6 @@ in another repo: event subscriber - if someone subscribes to all pending transactions, how should that count against rate limits - when those rate limits are hit, what should happen? - missing pending transactions might be okay, but not missing confirmed blocks -- [ ] double check weight sorting code - [ ] sea-orm brings in async-std, but we are using tokio. benchmark switching - [ ] this query always times out, but erigon can serve it quickly: `curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"debug_traceBlockByNumber","params":["latest"],"id":1}' 127.0.0.1:8544' 127.0.0.1:8544` {"jsonrpc":"2.0","id":null,"error":{"code":-32099,"message":"deadline has elapsed"}} diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 8f4c6ee2..c61b5476 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -315,7 +315,7 @@ mod tests { block_data_limit: None, soft_limit: 100, hard_limit: None, - weight: 1, + tier: 0, subscribe_txs: Some(false), extra: Default::default(), }, @@ -329,7 +329,7 @@ mod tests { block_data_limit: None, soft_limit: 100, hard_limit: None, - weight: 1, + tier: 0, subscribe_txs: Some(false), extra: Default::default(), }, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index bab4124e..41fa9793 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -198,9 +198,9 @@ pub struct Web3ConnectionConfig { pub soft_limit: u32, /// the requests per second at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, - /// All else equal, a server with a lower weight receives more requests. Ranges 0-100 - #[serde(default = "default_weight")] - pub weight: u32, + /// All else equal, a server with a lower tier receives all requests + #[serde(default = "default_tier")] + pub tier: u64, /// Subscribe to the firehose of pending transactions /// Don't do this with free rpcs #[serde(default)] @@ -210,7 +210,7 @@ pub struct Web3ConnectionConfig { pub extra: HashMap, } -fn default_weight() -> u32 { +fn default_tier() -> u64 { 0 } @@ -270,7 +270,7 @@ impl Web3ConnectionConfig { block_sender, tx_id_sender, true, - self.weight, + self.tier, open_request_handle_metrics, ) .await diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 6a299133..e30a10b6 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -85,8 +85,8 @@ pub struct Web3Connection { pub(super) automatic_block_limit: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, - /// Lower weight are higher priority when sending requests. 0 to 99. - pub(super) weight: f64, + /// Lower tiers are higher priority when sending requests + pub(super) tier: u64, /// TODO: should this be an AsyncRwLock? pub(super) head_block: RwLock>, pub(super) open_request_handle_metrics: Arc, @@ -94,7 +94,7 @@ pub struct Web3Connection { impl Web3Connection { /// Connect to a web3 rpc - // TODO: have this take a builder (which will have channels attached) + // TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields #[allow(clippy::too_many_arguments)] pub async fn spawn( name: String, @@ -114,7 +114,7 @@ impl Web3Connection { block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, - weight: u32, + tier: u64, open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { @@ -128,9 +128,6 @@ impl Web3Connection { ) }); - // turn weight 0 into 100% and weight 100 into 0% - let weight = (100 - weight) as f64 / 100.0; - // TODO: should we do this even if block_sender is None? then we would know limits on private relays let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into(); let automatic_block_limit = @@ -151,7 +148,7 @@ impl Web3Connection { automatic_block_limit, block_data_limit, head_block: RwLock::new(Default::default()), - weight, + tier, open_request_handle_metrics, }; @@ -1116,7 +1113,7 @@ impl Serialize for Web3Connection { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Connection", 8)?; + let mut state = serializer.serialize_struct("Web3Connection", 9)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1132,7 +1129,8 @@ impl Serialize for Web3Connection { } } - state.serialize_field("weight", &self.weight)?; + state.serialize_field("tier", &self.tier)?; + state.serialize_field("weight", &1.0)?; state.serialize_field("soft_limit", &self.soft_limit)?; @@ -1222,7 +1220,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: false, block_data_limit: block_data_limit.into(), - weight: 100.0, + tier: 0, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(metrics), }; @@ -1269,7 +1267,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: false, block_data_limit: block_data_limit.into(), - weight: 100.0, + tier: 0, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(metrics), }; @@ -1320,7 +1318,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: false, block_data_limit: block_data_limit.into(), - weight: 100.0, + tier: 0, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(metrics), }; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 1bc25844..f2497515 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -411,10 +411,9 @@ impl Web3Connections { skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { - let usable_rpcs_by_head_num: BTreeMap>> = + let usable_rpcs_by_head_num_and_weight: BTreeMap<(U64, u64), Vec>> = if let Some(min_block_needed) = min_block_needed { // need a potentially old block. check all the rpcs - // TODO: we are going to be checking "has_block_data" a lot now let mut m = BTreeMap::new(); for x in self @@ -429,7 +428,9 @@ impl Web3Connections { match x_head_block { None => continue, Some(x_head) => { - m.entry(x_head.number()).or_insert_with(Vec::new).push(x); + let key = (x_head.number(), u64::MAX - x.tier); + + m.entry(key).or_insert_with(Vec::new).push(x); } } } @@ -437,7 +438,6 @@ impl Web3Connections { m } else { // need latest. filter the synced rpcs - // TODO: double check has_block_data? let synced_connections = self.synced_connections.load(); let head_block = match synced_connections.head_block.as_ref() { @@ -445,26 +445,32 @@ impl Web3Connections { Some(x) => x, }; - // TODO: different allowed_lag depending on the chain + // TODO: self.allowed_lag instead of taking as an arg if head_block.syncing(allowed_lag) { return Ok(OpenRequestResult::NotReady); } let head_num = head_block.number(); - let c: Vec<_> = synced_connections + let mut m = BTreeMap::new(); + + for x in synced_connections .conns .iter() .filter(|x| !skip.contains(x)) - .cloned() - .collect(); + { + let key = (head_num, u64::MAX - x.tier); - BTreeMap::from([(head_num, c)]) + m.entry(key).or_insert_with(Vec::new).push(x.clone()); + } + + m }; let mut earliest_retry_at = None; - for usable_rpcs in usable_rpcs_by_head_num.into_values().rev() { + for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() { + // under heavy load, it is possible for even our best server to be negative let mut minimum = f64::MAX; // we sort on a combination of values. cache them here so that we don't do this math multiple times. @@ -475,24 +481,22 @@ impl Web3Connections { // TODO: get active requests out of redis (that's definitely too slow) // TODO: do something with hard limit instead? (but that is hitting redis too much) let active_requests = rpc.active_requests() as f64; - let soft_limit = rpc.soft_limit as f64 * rpc.weight; + let soft_limit = rpc.soft_limit as f64; - // TODO: maybe store weight as the percentile let available_requests = soft_limit - active_requests; trace!("available requests on {}: {}", rpc, available_requests); - // under heavy load, it is possible for even our best server to be negative minimum = available_requests.min(minimum); - // TODO: clone needed? (rpc, available_requests) }) .collect(); trace!("minimum available requests: {}", minimum); - // weights can't have negative numbers. shift up if any are negative + // choose_multiple_weighted can't have negative numbers. shift up if any are negative + // TODO: is this a correct way to shift? if minimum < 0.0 { available_request_map = available_request_map .into_iter() @@ -943,7 +947,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: true, block_data_limit: block_data_limit.into(), - weight: 100.0, + tier: 0, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; @@ -962,7 +966,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: false, block_data_limit: block_data_limit.into(), - weight: 100.0, + tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; @@ -1165,7 +1169,7 @@ mod tests { soft_limit: 3_000, automatic_block_limit: false, block_data_limit: 64.into(), - weight: 1.0, + tier: 1, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), }; @@ -1184,8 +1188,7 @@ mod tests { soft_limit: 1_000, automatic_block_limit: false, block_data_limit: u64::MAX.into(), - // TODO: does weight = 0 work? - weight: 0.01, + tier: 2, head_block: RwLock::new(Some(head_block.clone())), open_request_handle_metrics: Arc::new(Default::default()), };