diff --git a/TODO.md b/TODO.md index a5a7f4fb..18585ce5 100644 --- a/TODO.md +++ b/TODO.md @@ -198,3 +198,5 @@ in another repo: event subscriber 2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 - [ ] threshold should check actual available request limits (if any) instead of just the soft limit +- [ ] foreign key on_update and on_delete +- [ ] database creation timestamps \ No newline at end of file diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 7b09cff3..bf5a46ce 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -92,7 +92,7 @@ fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) { } } -fn get_or_set_block_number( +fn clean_block_number( params: &mut serde_json::Value, block_param_id: usize, latest_block: U64, @@ -128,7 +128,7 @@ fn get_or_set_block_number( } // TODO: change this to return also return the hash needed -fn get_min_block_needed( +fn block_needed( method: &str, params: Option<&mut serde_json::Value>, head_block: U64, @@ -225,7 +225,7 @@ fn get_min_block_needed( } }; - match get_or_set_block_number(params, block_param_id, head_block) { + match clean_block_number(params, block_param_id, head_block) { Ok(block) => Some(block), Err(err) => { // TODO: seems unlikely that we will get here @@ -253,7 +253,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, - incoming_requests: ActiveRequestsMap, + active_requests: ActiveRequestsMap, /// bytes available to response_cache (it will be slightly larger than this) response_cache_max_bytes: AtomicUsize, response_cache: ResponseLrcCache, @@ -274,11 +274,11 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - pub fn get_pending_transactions(&self) -> &DashMap { + pub fn pending_transactions(&self) -> &DashMap { &self.pending_transactions } - pub fn get_public_rate_limiter(&self) -> Option<&RedisCellClient> { + pub fn public_rate_limiter(&self) -> Option<&RedisCellClient> { self.public_rate_limiter.as_ref() } @@ -435,7 +435,7 @@ impl Web3ProxyApp { let app = Self { balanced_rpcs, private_rpcs, - incoming_requests: Default::default(), + active_requests: Default::default(), response_cache_max_bytes: AtomicUsize::new(app_config.shared.response_cache_max_bytes), response_cache: Default::default(), head_block_receiver, @@ -647,16 +647,16 @@ impl Web3ProxyApp { Ok((subscription_abort_handle, response)) } - pub fn get_balanced_rpcs(&self) -> &Web3Connections { + pub fn balanced_rpcs(&self) -> &Web3Connections { &self.balanced_rpcs } - pub fn get_private_rpcs(&self) -> &Web3Connections { + pub fn private_rpcs(&self) -> &Web3Connections { &self.private_rpcs } - pub fn get_active_requests(&self) -> &ActiveRequestsMap { - &self.incoming_requests + pub fn active_requests(&self) -> &ActiveRequestsMap { + &self.active_requests } /// send the request or batch of requests to the approriate RPCs @@ -715,7 +715,7 @@ impl Web3ProxyApp { Ok(collected) } - async fn get_cached_response( + async fn cached_response( &self, // TODO: accept a block hash here also? min_block_needed: Option<&U64>, @@ -729,10 +729,10 @@ impl Web3ProxyApp { let request_block_hash = if let Some(min_block_needed) = min_block_needed { // TODO: maybe this should be on the app and not on balanced_rpcs - self.balanced_rpcs.get_block_hash(min_block_needed).await? + self.balanced_rpcs.block_hash(min_block_needed).await? } else { // TODO: maybe this should be on the app and not on balanced_rpcs - self.balanced_rpcs.get_head_block_hash() + self.balanced_rpcs.head_block_hash() }; // TODO: better key? benchmark this @@ -852,7 +852,7 @@ impl Web3ProxyApp { // some commands can use local data or caches "eth_accounts" => serde_json::Value::Array(vec![]), "eth_blockNumber" => { - let head_block_number = self.balanced_rpcs.get_head_block_num(); + let head_block_number = self.balanced_rpcs.head_block_num(); // TODO: technically, block 0 is okay. i guess we should be using an option if head_block_number.as_u64() == 0 { @@ -931,23 +931,23 @@ impl Web3ProxyApp { // TODO: web3_sha3? // anything else gets sent to backend rpcs and cached method => { - let head_block_number = self.balanced_rpcs.get_head_block_num(); + let head_block_number = self.balanced_rpcs.head_block_num(); // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different let min_block_needed = - get_min_block_needed(method, request.params.as_mut(), head_block_number); + block_needed(method, request.params.as_mut(), head_block_number); let min_block_needed = min_block_needed.as_ref(); trace!(?min_block_needed, ?method); let (cache_key, cache_result) = - self.get_cached_response(min_block_needed, &request).await?; + self.cached_response(min_block_needed, &request).await?; let response_cache = match cache_result { Ok(response) => { - let _ = self.incoming_requests.remove(&cache_key); + let _ = self.active_requests.remove(&cache_key); // TODO: if the response is cached, should it count less against the account's costs? @@ -960,7 +960,7 @@ impl Web3ProxyApp { // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) let (incoming_tx, incoming_rx) = watch::channel(true); let mut other_incoming_rx = None; - match self.incoming_requests.entry(cache_key.clone()) { + match self.active_requests.entry(cache_key.clone()) { DashMapEntry::Occupied(entry) => { other_incoming_rx = Some(entry.get().clone()); } @@ -977,7 +977,7 @@ impl Web3ProxyApp { // now that we've waited, lets check the cache again if let Some(cached) = response_cache.read().get(&cache_key) { - let _ = self.incoming_requests.remove(&cache_key); + let _ = self.active_requests.remove(&cache_key); let _ = incoming_tx.send(false); // TODO: emit a stat @@ -1039,7 +1039,7 @@ impl Web3ProxyApp { } } - let _ = self.incoming_requests.remove(&cache_key); + let _ = self.active_requests.remove(&cache_key); let _ = incoming_tx.send(false); return Ok(response); diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 1fd19ebb..9f42e23f 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -54,11 +54,11 @@ impl fmt::Debug for SyncedConnections { } impl SyncedConnections { - pub fn get_head_block_hash(&self) -> &H256 { + pub fn head_block_hash(&self) -> &H256 { &self.head_block_hash } - pub fn get_head_block_num(&self) -> U64 { + pub fn head_block_num(&self) -> U64 { self.head_block_num.into() } } @@ -96,11 +96,11 @@ impl BlockChain { self.block_map.entry(hash).or_insert(block); } - pub fn get_cannonical_block(&self, num: &U64) -> Option>> { + pub fn cannonical_block(&self, num: &U64) -> Option>> { self.chain_map.get(num).map(|x| x.clone()) } - pub fn get_block(&self, hash: &H256) -> Option>> { + pub fn block(&self, hash: &H256) -> Option>> { self.block_map.get(hash).map(|x| x.clone()) } } @@ -423,9 +423,9 @@ impl Web3Connections { Ok(()) } - pub async fn get_block(&self, hash: &H256) -> anyhow::Result>> { + pub async fn block(&self, hash: &H256) -> anyhow::Result>> { // first, try to get the hash from our cache - if let Some(block) = self.chain.get_block(hash) { + if let Some(block) = self.chain.block(hash) { return Ok(block); } @@ -451,15 +451,15 @@ impl Web3Connections { } /// Get the heaviest chain's block from cache or backend rpc - pub async fn get_cannonical_block(&self, num: &U64) -> anyhow::Result>> { + pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { // first, try to get the hash from our cache - if let Some(block) = self.chain.get_cannonical_block(num) { + if let Some(block) = self.chain.cannonical_block(num) { return Ok(block); } // block not in cache. we need to ask an rpc for it // but before we do any queries, be sure the requested block num exists - let head_block_num = self.get_head_block_num(); + let head_block_num = self.head_block_num(); if num > &head_block_num { return Err(anyhow::anyhow!( "Head block is #{}, but #{} was requested", @@ -491,38 +491,38 @@ impl Web3Connections { } /// Convenience method to get the cannonical block at a given block height. - pub async fn get_block_hash(&self, num: &U64) -> anyhow::Result { - let block = self.get_cannonical_block(num).await?; + pub async fn block_hash(&self, num: &U64) -> anyhow::Result { + let block = self.cannonical_block(num).await?; let hash = block.hash.unwrap(); Ok(hash) } - pub fn get_head_block(&self) -> (U64, H256) { + pub fn head_block(&self) -> (U64, H256) { let synced_connections = self.synced_connections.load(); - let num = synced_connections.get_head_block_num(); - let hash = *synced_connections.get_head_block_hash(); + let num = synced_connections.head_block_num(); + let hash = *synced_connections.head_block_hash(); (num, hash) } - pub fn get_head_block_hash(&self) -> H256 { - *self.synced_connections.load().get_head_block_hash() + pub fn head_block_hash(&self) -> H256 { + *self.synced_connections.load().head_block_hash() } - pub fn get_head_block_num(&self) -> U64 { - self.synced_connections.load().get_head_block_num() + pub fn head_block_num(&self) -> U64 { + self.synced_connections.load().head_block_num() } - pub fn has_synced_rpcs(&self) -> bool { + pub fn synced(&self) -> bool { // TODO: require a minimum number of synced rpcs // TODO: move this whole function to SyncedConnections if self.synced_connections.load().conns.is_empty() { return false; } - self.get_head_block_num() > U64::zero() + self.head_block_num() > U64::zero() } pub fn num_synced_rpcs(&self) -> usize { @@ -697,7 +697,7 @@ impl Web3Connections { rpc_urls_by_num.push(rpc_url); - if let Some(parent) = self.chain.get_block(&block.parent_hash) { + if let Some(parent) = self.chain.block(&block.parent_hash) { // save the parent block blocks_by_hash.insert(block.parent_hash, parent.clone()); @@ -812,7 +812,7 @@ impl Web3Connections { conns, }; - let current_head_block = self.get_head_block_hash(); + let current_head_block = self.head_block_hash(); let new_head_block = pending_synced_connections.head_block_hash != current_head_block; @@ -971,7 +971,7 @@ impl Web3Connections { /// get all rpc servers that are not rate limited /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions - pub async fn get_upstream_servers( + pub async fn upstream_servers( &self, min_block_needed: Option<&U64>, ) -> Result, Option> { @@ -1107,7 +1107,7 @@ impl Web3Connections { min_block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self.get_upstream_servers(min_block_needed).await { + match self.upstream_servers(min_block_needed).await { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? diff --git a/web3-proxy/src/frontend/http.rs b/web3-proxy/src/frontend/http.rs index eed6437d..db6c3bc6 100644 --- a/web3-proxy/src/frontend/http.rs +++ b/web3-proxy/src/frontend/http.rs @@ -6,7 +6,7 @@ use crate::app::Web3ProxyApp; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { - if app.get_balanced_rpcs().has_synced_rpcs() { + if app.balanced_rpcs().synced() { (StatusCode::OK, "OK") } else { (StatusCode::SERVICE_UNAVAILABLE, ":(") @@ -16,10 +16,10 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// Very basic status page pub async fn status(Extension(app): Extension>) -> impl IntoResponse { // TODO: what else should we include? uptime? prometheus? - let balanced_rpcs = app.get_balanced_rpcs(); - let private_rpcs = app.get_private_rpcs(); - let num_active_requests = app.get_active_requests().len(); - let num_pending_transactions = app.get_pending_transactions().len(); + let balanced_rpcs = app.balanced_rpcs(); + let private_rpcs = app.private_rpcs(); + let num_active_requests = app.active_requests().len(); + let num_pending_transactions = app.pending_transactions().len(); let body = json!({ "balanced_rpcs": balanced_rpcs, diff --git a/web3-proxy/src/frontend/http_proxy.rs b/web3-proxy/src/frontend/http_proxy.rs index d859270f..bd69e8ba 100644 --- a/web3-proxy/src/frontend/http_proxy.rs +++ b/web3-proxy/src/frontend/http_proxy.rs @@ -10,7 +10,7 @@ pub async fn proxy_web3_rpc( Extension(app): Extension>, ClientIp(ip): ClientIp, ) -> impl IntoResponse { - if let Some(rate_limiter) = app.get_public_rate_limiter() { + if let Some(rate_limiter) = app.public_rate_limiter() { let rate_limiter_key = format!("{}", ip); if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() {