diff --git a/TODO.md b/TODO.md index 5b51c357..1a35095c 100644 --- a/TODO.md +++ b/TODO.md @@ -295,6 +295,9 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] all_backend_connections skips syncing servers - [x] change weight back to tier - [x] fix multiple origin and referer checks +- [x] ip detection needs work so that everything doesnt show up as 172.x.x.x + - i think this was done, but am not positive. +- [x] if private txs are disabled, only send trasactions to some of our servers. we were DOSing ourselves with transactions and slowing down sync - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in and spawned in the background since it will slow things down and will make their calls less private - [ ] automatic pruning of old revert logs once too many are collected @@ -302,11 +305,8 @@ These are not yet ordered. There might be duplicates. We might not actually need - [-] add configurable size limits to all the Caches - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - https://github.com/moka-rs/moka/issues/201 -- [-] ip detection needs work so that everything doesnt show up as 172.x.x.x - - i think this was done, but am not positive. +- [ ] have private transactions be enabled by a url setting rather than a setting on the key - [ ] cli for adding rpc keys to an existing user -- [ ] automatically tune database and redis connection pool size -- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then - [ ] rate limiting/throttling on query_user_stats - [ ] minimum allowed query_start on query_user_stats - [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100 @@ -452,6 +452,8 @@ These are not yet ordered. There might be duplicates. We might not actually need These are not ordered. I think some rows also accidently got deleted here. Check git history. +- [ ] automatically tune database and redis connection pool size +- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then - [ ] handle user payments - [ ] separate daemon (or users themselves) call POST /users/process_transaction - checks a transaction to see if it modifies a user's balance. records results in a sql database diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 92597cb0..42f78632 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -156,6 +156,8 @@ pub struct AuthorizationChecks { /// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database. /// TODO: f32 would be fine pub log_revert_chance: f64, + /// if true, transactions are broadcast to private mempools. They will still be public on the blockchain! + pub private_txs: bool, } /// Simple wrapper so that we can keep track of read only connections. @@ -1170,8 +1172,18 @@ impl Web3ProxyApp { // TODO: eth_sendBundle (flashbots command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { - // emit stats - let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); + let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() { + if authorization.checks.private_txs { + (private_rpcs, None) + } else { + // TODO: how many balanced rpcs should we send to? configurable? percentage of total? + // TODO: what if we do 2 per tier? we want to blast the third party rpcs + // TODO: maybe having the third party rpcs would be good for this + (&self.balanced_rpcs, Some(2)) + } + } else { + (&self.balanced_rpcs, Some(2)) + }; // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. let mut response = private_rpcs @@ -1181,6 +1193,7 @@ impl Web3ProxyApp { Some(request_metadata.clone()), None, Level::Trace, + num, ) .await?; @@ -1224,6 +1237,7 @@ impl Web3ProxyApp { let rpcs = request_metadata.backend_requests.lock().clone(); + // emit stats if let Some(salt) = self.config.public_recent_ips_salt.as_ref() { if let Some(tx_hash) = response.result.clone() { let now = Utc::now().timestamp(); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 7d651955..f98cf7d0 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -750,6 +750,7 @@ impl Web3ProxyApp { log_revert_chance: rpc_key_model.log_revert_chance, max_concurrent_requests: user_tier_model.max_concurrent_requests, max_requests_per_period: user_tier_model.max_requests_per_period, + private_txs: rpc_key_model.private_txs, }) } None => Ok(AuthorizationChecks::default()), diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 7d48fb3e..6b283ae7 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -292,10 +292,10 @@ impl Web3Connection { self.block_data_limit.load(atomic::Ordering::Acquire).into() } - pub fn syncing(&self) -> bool { + pub fn syncing(&self, allowed_lag: u64) -> bool { match self.head_block.read().clone() { None => true, - Some(x) => x.syncing(60), + Some(x) => x.syncing(allowed_lag), } } @@ -303,6 +303,7 @@ impl Web3Connection { let head_block_num = match self.head_block.read().clone() { None => return false, Some(x) => { + // TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks? if x.syncing(60) { // skip syncing nodes. even though they might be able to serve a query, // latency will be poor and it will get in the way of them syncing further @@ -542,7 +543,7 @@ impl Web3Connection { let _ = head_block.insert(new_head_block.clone().into()); } - if self.block_data_limit() == U64::zero() && !self.syncing() { + if self.block_data_limit() == U64::zero() && !self.syncing(1) { let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); if let Err(err) = self.check_block_data_limit(&authorization).await { warn!( diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 2de4fc3d..fef01b73 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -597,23 +597,34 @@ impl Web3Connections { } /// get all rpc servers that are not rate limited - /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions + /// returns servers even if they aren't fully in sync. This is useful for broadcasting signed transactions // TODO: better type on this that can return an anyhow::Result - pub async fn all_backend_connections( + pub async fn all_synced_connections( &self, authorization: &Arc, block_needed: Option<&U64>, + max_count: Option, ) -> Result, Option> { let mut earliest_retry_at = None; // TODO: with capacity? let mut selected_rpcs = vec![]; + let mut max_count = if max_count.is_none() { + self.conns.len() + } else { + self.conns.len().min(max_count.unwrap()) + }; + for connection in self.conns.values() { + if max_count == 0 { + break; + } + if let Some(block_needed) = block_needed { if !connection.has_block_data(block_needed) { continue; } - } else if connection.syncing() { + } else if connection.syncing(30) { continue; } @@ -626,7 +637,10 @@ impl Web3Connections { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), + Ok(OpenRequestResult::Handle(handle)) => { + max_count -= 1; + selected_rpcs.push(handle) + } Ok(OpenRequestResult::NotReady) => { warn!("no request handle for {}", connection) } @@ -800,10 +814,11 @@ impl Web3Connections { request_metadata: Option>, block_needed: Option<&U64>, error_level: Level, + max_count: Option, ) -> anyhow::Result { loop { match self - .all_backend_connections(authorization, block_needed) + .all_synced_connections(authorization, block_needed, max_count) .await { Ok(active_request_handles) => { @@ -1052,7 +1067,7 @@ mod tests { // all_backend_connections gives everything regardless of sync status assert_eq!( conns - .all_backend_connections(&authorization, None) + .all_synced_connections(&authorization, None, None) .await .unwrap() .len(),