From 53757621effe94fd533ca2a86e6bc8c9054a37e4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 12 Feb 2023 10:22:20 -0800 Subject: [PATCH] variable rename --- TODO.md | 3 +++ web3_proxy/src/rpcs/many.rs | 2 +- web3_proxy/src/rpcs/one.rs | 14 +++++++------- web3_proxy/src/rpcs/request.rs | 32 ++++++++++++++++---------------- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/TODO.md b/TODO.md index 10974a82..5f3e18f3 100644 --- a/TODO.md +++ b/TODO.md @@ -332,6 +332,9 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] add archive depth to app config - [x] use from_block and to_block so that eth_getLogs is routed correctly - [x] improve eth_sendRawTransaction server selection +- [x] don't cache methods that are usually very large +- [x] use http provider when available +- [ ] don't use new_head_provider anywhere except new head subscription - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 087ab95b..83eb3922 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -541,7 +541,7 @@ impl Web3Rpcs { }) .collect(); - trace!("todo: better sort here"); + warn!("todo: better sort here"); let sorted_rpcs = { if usable_rpcs.len() == 1 { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index c1db0ad5..dfa08a4f 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -64,7 +64,7 @@ pub struct Web3Rpc { /// it is an async lock because we hold it open across awaits /// this provider is only used for new heads subscriptions /// TODO: put the provider inside an arc? - pub(super) new_head_client: AsyncRwLock>>, + pub(super) provider: AsyncRwLock>>, /// keep track of hard limits pub(super) hard_limit_until: Option>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits @@ -397,7 +397,7 @@ impl Web3Rpc { chain_id: u64, db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { - if let Ok(mut unlocked_provider) = self.new_head_client.try_write() { + if let Ok(mut unlocked_provider) = self.provider.try_write() { #[cfg(test)] if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() { return Ok(()); @@ -494,7 +494,7 @@ impl Web3Rpc { info!("successfully connected to {}", self); } else { - if self.new_head_client.read().await.is_none() { + if self.provider.read().await.is_none() { return Err(anyhow!("failed waiting for client")); } }; @@ -625,7 +625,7 @@ impl Web3Rpc { loop { // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this - if let Some(client) = &*conn.new_head_client.read().await { + if let Some(client) = &*conn.provider.read().await { // trace!("health check unlocked with error on {}", conn); // returning error will trigger a reconnect // TODO: do a query of some kind @@ -700,7 +700,7 @@ impl Web3Rpc { ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); - let unlocked_provider = self.new_head_client.read().await; + let unlocked_provider = self.provider.read().await; match unlocked_provider.as_deref() { Some(Web3Provider::Http(_client)) => { @@ -871,7 +871,7 @@ impl Web3Rpc { ) -> anyhow::Result<()> { // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big // TODO: timeout - let provider = self.new_head_client.read().await; + let provider = self.provider.read().await; trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? @@ -983,7 +983,7 @@ impl Web3Rpc { ) -> anyhow::Result { // TODO: think more about this read block // TODO: this should *not* be new_head_client. this should be a separate object - if unlocked_provider.is_some() || self.new_head_client.read().await.is_some() { + if unlocked_provider.is_some() || self.provider.read().await.is_some() { // we already have an unlocked provider. no need to lock } else { return Ok(OpenRequestResult::NotReady(self.backup)); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index e9d4baf0..2c66307e 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -30,7 +30,7 @@ pub enum OpenRequestResult { #[derive(Debug)] pub struct OpenRequestHandle { authorization: Arc, - conn: Arc, + rpc: Arc, } /// Depending on the context, RPC errors can require different handling. @@ -124,17 +124,17 @@ impl OpenRequestHandle { pub async fn new(authorization: Arc, conn: Arc) -> Self { Self { authorization, - conn, + rpc: conn, } } pub fn connection_name(&self) -> String { - self.conn.name.clone() + self.rpc.name.clone() } #[inline] pub fn clone_connection(&self) -> Arc { - self.conn.clone() + self.rpc.clone() } /// Send a web3 request @@ -154,7 +154,7 @@ impl OpenRequestHandle { // TODO: use tracing spans // TODO: including params in this log is way too verbose // trace!(rpc=%self.conn, %method, "request"); - trace!("requesting from {}", self.conn); + trace!("requesting from {}", self.rpc); let mut provider: Option> = None; let mut logged = false; @@ -167,7 +167,7 @@ impl OpenRequestHandle { break; } - let unlocked_provider = self.conn.new_head_client.read().await; + let unlocked_provider = self.rpc.provider.read().await; if let Some(unlocked_provider) = unlocked_provider.clone() { provider = Some(unlocked_provider); @@ -175,7 +175,7 @@ impl OpenRequestHandle { } if !logged { - debug!("no provider for open handle on {}", self.conn); + debug!("no provider for open handle on {}", self.rpc); logged = true; } @@ -286,10 +286,10 @@ impl OpenRequestHandle { if let Some(msg) = msg { if msg.starts_with("execution reverted") { - trace!("revert from {}", self.conn); + trace!("revert from {}", self.rpc); ResponseTypes::Revert } else if msg.contains("limit") || msg.contains("request") { - trace!("rate limit from {}", self.conn); + trace!("rate limit from {}", self.rpc); ResponseTypes::RateLimit } else { ResponseTypes::Ok @@ -302,10 +302,10 @@ impl OpenRequestHandle { }; if matches!(response_type, ResponseTypes::RateLimit) { - if let Some(hard_limit_until) = self.conn.hard_limit_until.as_ref() { + if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() { let retry_at = Instant::now() + Duration::from_secs(1); - trace!("retry {} at: {:?}", self.conn, retry_at); + trace!("retry {} at: {:?}", self.rpc, retry_at); hard_limit_until.send_replace(retry_at); } @@ -318,14 +318,14 @@ impl OpenRequestHandle { if matches!(response_type, ResponseTypes::Revert) { debug!( "bad response from {}! method={} params={:?} err={:?}", - self.conn, method, params, err + self.rpc, method, params, err ); } } RequestRevertHandler::TraceLevel => { trace!( "bad response from {}! method={} params={:?} err={:?}", - self.conn, + self.rpc, method, params, err @@ -335,20 +335,20 @@ impl OpenRequestHandle { // TODO: include params if not running in release mode error!( "bad response from {}! method={} err={:?}", - self.conn, method, err + self.rpc, method, err ); } RequestRevertHandler::WarnLevel => { // TODO: include params if not running in release mode warn!( "bad response from {}! method={} err={:?}", - self.conn, method, err + self.rpc, method, err ); } RequestRevertHandler::Save => { trace!( "bad response from {}! method={} params={:?} err={:?}", - self.conn, + self.rpc, method, params, err