diff --git a/TODO.md b/TODO.md index 62a45489..9422d021 100644 --- a/TODO.md +++ b/TODO.md @@ -63,6 +63,7 @@ - [x] sort forked blocks by total difficulty like geth does - [x] refactor result type on active handlers to use a cleaner success/error so we can use the try operator - [x] give users different rate limits looked up from the database +- [x] Add a "weight" key to the servers. Sort on that after block. keep most requests local - [ ] allow blocking public requests - [ ] use siwe messages and signatures for sign up and login - [ ] basic request method stats @@ -90,9 +91,7 @@ - [ ] if a rpc fails to connect at start, retry later instead of skipping it forever - [ ] synced connections swap threshold should come from config - if there are bad forks, we need to think about this more. keep backfilling until there is a common block, or just error? if the common block is old, i think we should error rather than serve data. that's kind of "downtime" but really its on the chain and not us. think about this more -- [ ] have a "backup" tier that is only used when the primary tier has no servers or is many blocks behind - - we don't want the backup tier taking over with the head block if they happen to be fast at that (but overall low/expensive rps). only if the primary tier has fallen behind or gone entirely offline should we go to third parties - - [ ] until this is done, an alternative is for infra to have a "failover" script that changes the configs to include a bunch of third party servers manually. +- [ ] improve rpc weights. i think theres still a potential thundering herd - [ ] stats when forks are resolved (and what chain they were on?) - [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] right now the block_map is unbounded. move this to redis and do some calculations to be sure about RAM usage diff --git a/config/example.toml b/config/example.toml index 000d88cc..3e170c87 100644 --- a/config/example.toml +++ b/config/example.toml @@ -11,38 +11,47 @@ response_cache_max_bytes = 10000000000 [balanced_rpcs.ankr] url = "https://rpc.ankr.com/eth" soft_limit = 1_000 + weight = 0 [balanced_rpcs.cloudflare] url = "https://cloudflare-eth.com" soft_limit = 1_000 + weight = 0 #[balanced_rpcs.linkpool-light] #url = "https://main-light.eth.linkpool.io" #soft_limit = 1_000 + weight = 1 [balanced_rpcs.blastapi] url = "https://eth-mainnet.public.blastapi.io" soft_limit = 1_000 + weight = 0 #[balanced_rpcs.mycryptoapi] #url = "https://api.mycryptoapi.com/eth" #soft_limit = 1_000 + weight = 0 [balanced_rpcs.runonflux] url = "https://ethereumnodelight.app.runonflux.io" soft_limit = 1_000 + weight = 1 [balanced_rpcs.pokt-v1] url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79" soft_limit = 1_000 + weight = 1 [balanced_rpcs.pokt] url = "https://eth-rpc.gateway.pokt.network" soft_limit = 1_000 + weight = 1 [balanced_rpcs.linkpool] url = "https://main-rpc.linkpool.io" soft_limit = 1_000 + weight = 2 [private_rpcs] diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 1a91bd60..7153795b 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -262,7 +262,7 @@ pub async fn get_migrated_db( Ok(db) } -// TODO: think more about TxState. d +// TODO: think more about TxState #[derive(Clone)] pub enum TxState { Pending(Transaction), @@ -279,6 +279,8 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, + /// Track active requests so that we don't 66 + /// active_requests: ActiveRequestsMap, /// bytes available to response_cache (it will be slightly larger than this) response_cache_max_bytes: AtomicUsize, diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index f22d177b..de0ab1f4 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -201,11 +201,11 @@ mod tests { balanced_rpcs: HashMap::from([ ( "anvil".to_string(), - Web3ConnectionConfig::new(anvil.endpoint(), 100, None), + Web3ConnectionConfig::new(anvil.endpoint(), 100, None, 1), ), ( "anvil_ws".to_string(), - Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None), + Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None, 0), ), ]), private_rpcs: None, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 3411f0d9..f92b0f2a 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -62,6 +62,7 @@ pub struct Web3ConnectionConfig { url: String, soft_limit: u32, hard_limit: Option, + weight: u32, } impl Web3ConnectionConfig { @@ -97,6 +98,7 @@ impl Web3ConnectionConfig { block_sender, tx_id_sender, true, + self.weight, ) .await } diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 25a0a388..be227309 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -83,6 +83,7 @@ pub struct Web3Connection { /// used for load balancing to the least loaded server soft_limit: u32, block_data_limit: AtomicU64, + weight: u32, head_block: parking_lot::RwLock<(H256, U64)>, } @@ -151,6 +152,7 @@ impl Web3Connection { block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, + weight: u32, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow configurable period and max_burst @@ -175,6 +177,7 @@ impl Web3Connection { soft_limit, block_data_limit: Default::default(), head_block: parking_lot::RwLock::new((H256::zero(), 0isize.into())), + weight, }; let new_connection = Arc::new(new_connection); @@ -663,6 +666,10 @@ impl Web3Connection { Ok(HandleResult::ActiveRequest(handle)) } + + pub fn weight(&self) -> u32 { + self.weight + } } impl Hash for Web3Connection { diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 21a87702..22b938bd 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -731,7 +731,7 @@ impl Web3Connections { impl<'a> State<'a> { // TODO: there are sortable traits, but this seems simpler /// sort the blocks in descending height - fn sortable_values(&self) -> Reverse<(&U64, &u32, &U256, &H256)> { + fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) { trace!(?self.block, ?self.conns); // first we care about the block number @@ -751,7 +751,7 @@ impl Web3Connections { // TODO: what does geth do? let block_hash = self.block.hash.as_ref().unwrap(); - Reverse((block_num, sum_soft_limit, difficulty, block_hash)) + (block_num, sum_soft_limit, difficulty, block_hash) } } @@ -784,7 +784,8 @@ impl Web3Connections { }) } }) - .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) + // sort b to a for descending order. sort a to b for ascending order + .max_by(|a, b| b.sortable_values().cmp(&a.sortable_values())) { let best_head_num = x.block.number.unwrap(); let best_head_hash = x.block.hash.unwrap(); @@ -948,14 +949,14 @@ impl Web3Connections { .iter() .map(|rpc| { // TODO: get active requests and the soft limit out of redis? + let weight = rpc.weight(); let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); - let block_data_limit = rpc.block_data_limit(); let utilization = active_requests as f32 / soft_limit as f32; // TODO: double check this sorts how we want - (rpc.clone(), (block_data_limit, utilization, soft_limit)) + (rpc.clone(), (weight, utilization, Reverse(soft_limit))) }) .collect();