From 4daf984b4b6e6a4b5fb393b4c72e16bd35b4cdf3 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 12 Feb 2023 01:22:53 -0800 Subject: [PATCH] use http and ws providers in a single config --- Cargo.lock | 2 +- config/example.toml | 28 +- config/minimal.toml | 17 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 37 +- web3_proxy/src/bin/web3_proxy_cli/daemon.rs | 28 +- web3_proxy/src/block_number.rs | 6 + web3_proxy/src/config.rs | 46 +- web3_proxy/src/rpcs/blockchain.rs | 3 +- web3_proxy/src/rpcs/grpc_erigon.rs | 0 web3_proxy/src/rpcs/http.rs | 0 web3_proxy/src/rpcs/many.rs | 108 +-- web3_proxy/src/rpcs/one.rs | 769 +++++++++----------- web3_proxy/src/rpcs/provider.rs | 33 +- web3_proxy/src/rpcs/request.rs | 120 ++- web3_proxy/src/rpcs/transactions.rs | 4 +- web3_proxy/src/rpcs/ws.rs | 0 17 files changed, 564 insertions(+), 639 deletions(-) create mode 100644 web3_proxy/src/rpcs/grpc_erigon.rs create mode 100644 web3_proxy/src/rpcs/http.rs create mode 100644 web3_proxy/src/rpcs/ws.rs diff --git a/Cargo.lock b/Cargo.lock index feec357a..0196af87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5761,7 +5761,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.13.0" +version = "0.13.1" dependencies = [ "anyhow", "argh", diff --git a/config/example.toml b/config/example.toml index e2c9d8b7..8227635f 100644 --- a/config/example.toml +++ b/config/example.toml @@ -52,50 +52,50 @@ response_cache_max_bytes = 10_000_000_000 [balanced_rpcs.ankr] display_name = "Ankr" - url = "https://rpc.ankr.com/eth" + http_url = "https://rpc.ankr.com/eth" soft_limit = 1_000 tier = 0 [balanced_rpcs.cloudflare] display_name = "Cloudflare" - url = "https://cloudflare-eth.com" + http_url = "https://cloudflare-eth.com" soft_limit = 1_000 tier = 1 [balanced_rpcs.blastapi] display_name = "Blast" - url = "https://eth-mainnet.public.blastapi.io" + http_url = "https://eth-mainnet.public.blastapi.io" soft_limit = 1_000 tier = 1 [balanced_rpcs.mycryptoapi] display_name = "MyCrypto" disabled = true - url = "https://api.mycryptoapi.com/eth" + http_url = "https://api.mycryptoapi.com/eth" soft_limit = 1_000 tier = 2 [balanced_rpcs.pokt-v1] display_name = "Pokt #1" - url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79" + http_url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79" soft_limit = 500 tier = 2 [balanced_rpcs.pokt] display_name = "Pokt #2" - url = "https://eth-rpc.gateway.pokt.network" + http_url = "https://eth-rpc.gateway.pokt.network" soft_limit = 500 tier = 3 [balanced_rpcs.linkpool] display_name = "Linkpool" - url = "https://main-rpc.linkpool.io" + http_url = "https://main-rpc.linkpool.io" soft_limit = 500 tier = 4 [balanced_rpcs.runonflux] display_name = "Run on Flux (light)" - url = "https://ethereumnodelight.app.runonflux.io" + http_url = "https://ethereumnodelight.app.runonflux.io" soft_limit = 1_000 tier = 5 @@ -103,7 +103,7 @@ response_cache_max_bytes = 10_000_000_000 [balanced_rpcs.linkpool-light] display_name = "Linkpool (light)" disabled = true - url = "https://main-light.eth.linkpool.io" + http_url = "https://main-light.eth.linkpool.io" soft_limit = 100 tier = 5 @@ -114,34 +114,34 @@ response_cache_max_bytes = 10_000_000_000 [private_rpcs.eden] disabled = true display_name = "Eden network" - url = "https://api.edennetwork.io/v1/" + http_url = "https://api.edennetwork.io/v1/" soft_limit = 1_805 tier = 0 [private_rpcs.eden_beta] disabled = true display_name = "Eden network beta" - url = "https://api.edennetwork.io/v1/beta" + http_url = "https://api.edennetwork.io/v1/beta" soft_limit = 5_861 tier = 0 [private_rpcs.ethermine] disabled = true display_name = "Ethermine" - url = "https://rpc.ethermine.org" + http_url = "https://rpc.ethermine.org" soft_limit = 5_861 tier = 0 [private_rpcs.flashbots] disabled = true display_name = "Flashbots Fast" - url = "https://rpc.flashbots.net/fast" + http_url = "https://rpc.flashbots.net/fast" soft_limit = 7_074 tier = 0 [private_rpcs.securerpc] disabled = true display_name = "SecureRPC" - url = "https://gibson.securerpc.com/v1" + http_url = "https://gibson.securerpc.com/v1" soft_limit = 4_560 tier = 0 diff --git a/config/minimal.toml b/config/minimal.toml index 2225c9d1..770e3484 100644 --- a/config/minimal.toml +++ b/config/minimal.toml @@ -16,17 +16,26 @@ response_cache_max_bytes = 1_000_000_000 [balanced_rpcs] - [balanced_rpcs.llama_public_wss] + [balanced_rpcs.llama_public_both] # TODO: what should we do if all rpcs are disabled? warn and wait for a config change? disabled = false - display_name = "LlamaNodes WSS" - url = "wss://eth.llamarpc.com/" + display_name = "LlamaNodes Both" + ws_url = "wss://eth.llamarpc.com/" + http_url = "https://eth.llamarpc.com/" soft_limit = 1_000 tier = 0 [balanced_rpcs.llama_public_https] disabled = false display_name = "LlamaNodes HTTPS" - url = "https://eth.llamarpc.com/" + http_url = "https://eth.llamarpc.com/" + soft_limit = 1_000 + tier = 0 + + [balanced_rpcs.llama_public_wss] + # TODO: what should we do if all rpcs are disabled? warn and wait for a config change? + disabled = false + display_name = "LlamaNodes WSS" + ws_url = "wss://eth.llamarpc.com/" soft_limit = 1_000 tier = 0 diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index d60d162d..9ebe64e5 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.13.0" +version = "0.13.1" edition = "2021" default-run = "web3_proxy_cli" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 4a35eb71..91e9c95d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -482,7 +482,7 @@ impl Web3ProxyApp { let http_client = Some( reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(60)) + .timeout(Duration::from_secs(5 * 60)) .user_agent(APP_USER_AGENT) .build()?, ); @@ -573,17 +573,17 @@ impl Web3ProxyApp { // connect to the load balanced rpcs let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn( + block_map.clone(), top_config.app.chain_id, db_conn.clone(), - balanced_rpcs, http_client.clone(), - vredis_pool.clone(), - block_map.clone(), - Some(watch_consensus_head_sender), - top_config.app.min_sum_soft_limit, top_config.app.min_synced_rpcs, - Some(pending_tx_sender.clone()), + top_config.app.min_sum_soft_limit, pending_transactions.clone(), + Some(pending_tx_sender.clone()), + vredis_pool.clone(), + balanced_rpcs, + Some(watch_consensus_head_sender), ) .await .context("spawning balanced rpcs")?; @@ -599,21 +599,22 @@ impl Web3ProxyApp { None } else { let (private_rpcs, private_handle) = Web3Rpcs::spawn( + block_map, top_config.app.chain_id, db_conn.clone(), - private_rpcs, http_client.clone(), + 0, + 0, + pending_transactions.clone(), + // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have + None, vredis_pool.clone(), - block_map, + private_rpcs, // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs // they also often have low rate limits // however, they are well connected to miners/validators. so maybe using them as a safety check would be good + // TODO: but maybe we could include privates in the "backup" tier None, - 0, - 0, - // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits - None, - pending_transactions.clone(), ) .await .context("spawning private_rpcs")?; @@ -1035,9 +1036,17 @@ impl Web3ProxyApp { | "db_getString" | "db_putHex" | "db_putString" + | "debug_accountRange" + | "debug_backtraceAt" + | "debug_blockProfile" | "debug_chaindbCompact" + | "debug_chaindbProperty" + | "debug_cpuProfile" + | "debug_freeOSMemory" | "debug_freezeClient" + | "debug_gcStats" | "debug_goTrace" + | "debug_memStats" | "debug_mutexProfile" | "debug_setBlockProfileRate" | "debug_setGCPercent" diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs index 62d742e5..cf2f4cf8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs @@ -205,31 +205,27 @@ mod tests { ( "anvil".to_string(), Web3RpcConfig { - disabled: false, - display_name: None, - url: anvil.endpoint(), - backup: Some(false), - block_data_limit: None, + http_url: Some(anvil.endpoint()), soft_limit: 100, - hard_limit: None, tier: 0, - subscribe_txs: Some(false), - extra: Default::default(), + ..Default::default() }, ), ( "anvil_ws".to_string(), Web3RpcConfig { - disabled: false, - display_name: None, - url: anvil.ws_endpoint(), - backup: Some(false), - block_data_limit: None, + ws_url: Some(anvil.ws_endpoint()), soft_limit: 100, - hard_limit: None, tier: 0, - subscribe_txs: Some(false), - extra: Default::default(), + ..Default::default() + }, + ), + ( + "anvil_both".to_string(), + Web3RpcConfig { + http_url: Some(anvil.endpoint()), + ws_url: Some(anvil.ws_endpoint()), + ..Default::default() }, ), ]), diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index ef256b84..da708286 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -132,6 +132,12 @@ pub async fn block_needed( head_block_num: U64, rpcs: &Web3Rpcs, ) -> anyhow::Result { + // some requests have potentially very large responses + // TODO: only skip caching if the response actually is large + if method.starts_with("trace_") || method == "debug_traceTransaction" { + return Ok(BlockNeeded::CacheNever); + } + let params = if let Some(params) = params { // grab the params so we can inspect and potentially modify them params diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 942632e7..2bec1bd0 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -197,15 +197,19 @@ fn default_response_cache_max_bytes() -> u64 { } /// Configuration for a backend web3 RPC server -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct Web3RpcConfig { /// simple way to disable a connection without deleting the row #[serde(default)] pub disabled: bool, /// a name used in /status and other user facing messages pub display_name: Option, - /// websocket (or http if no websocket) - pub url: String, + /// (deprecated) rpc url + pub url: Option, + /// while not absolutely required, a ws:// or wss:// connection will be able to subscribe to head blocks + pub ws_url: Option, + /// while not absolutely required, a http:// or https:// connection will allow erigon to stream JSON + pub http_url: Option, /// block data limit. If None, will be queried pub block_data_limit: Option, /// the requests per second at which the server starts slowing down @@ -213,14 +217,15 @@ pub struct Web3RpcConfig { /// the requests per second at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs - pub backup: Option, + #[serde(default)] + pub backup: bool, /// All else equal, a server with a lower tier receives all requests #[serde(default = "default_tier")] pub tier: u64, /// Subscribe to the firehose of pending transactions /// Don't do this with free rpcs #[serde(default)] - pub subscribe_txs: Option, + pub subscribe_txs: bool, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, @@ -245,47 +250,24 @@ impl Web3RpcConfig { block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option>, + reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); } - let hard_limit = match (self.hard_limit, redis_pool) { - (None, None) => None, - (Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)), - (None, Some(_)) => None, - (Some(_hard_limit), None) => { - return Err(anyhow::anyhow!( - "no redis client pool! needed for hard limit" - )) - } - }; - - let tx_id_sender = if self.subscribe_txs.unwrap_or(false) { - tx_id_sender - } else { - None - }; - - let backup = self.backup.unwrap_or(false); - Web3Rpc::spawn( + self, name, - self.display_name, chain_id, db_conn, - self.url, http_client, http_interval_sender, - hard_limit, - self.soft_limit, - backup, - self.block_data_limit, + redis_pool, block_map, block_sender, tx_id_sender, - true, - self.tier, + reconnect, ) .await } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 679516a0..b70663f1 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -149,12 +149,13 @@ impl Web3Rpcs { // TODO: if error, retry? let block: ArcBlock = match rpc { Some(rpc) => rpc - .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), false) + .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None) .await? .request::<_, Option<_>>( "eth_getBlockByHash", &json!(get_block_params), Level::Error.into(), + None, ) .await? .context("no block!")?, diff --git a/web3_proxy/src/rpcs/grpc_erigon.rs b/web3_proxy/src/rpcs/grpc_erigon.rs new file mode 100644 index 00000000..e69de29b diff --git a/web3_proxy/src/rpcs/http.rs b/web3_proxy/src/rpcs/http.rs new file mode 100644 index 00000000..e69de29b diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 32cfc8a0..086ce81b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -56,17 +56,17 @@ impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( + block_map: BlockHashesCache, chain_id: u64, db_conn: Option, - server_configs: HashMap, http_client: Option, - redis_pool: Option, - block_map: BlockHashesCache, - watch_consensus_head_sender: Option>, - min_sum_soft_limit: u32, min_head_rpcs: usize, - pending_tx_sender: Option>, + min_sum_soft_limit: u32, pending_transactions: Cache, + pending_tx_sender: Option>, + redis_pool: Option, + server_configs: HashMap, + watch_consensus_head_sender: Option>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -160,6 +160,7 @@ impl Web3Rpcs { block_map, block_sender, pending_tx_id_sender, + true, ) .await }); @@ -343,7 +344,7 @@ impl Web3Rpcs { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = active_request_handle - .request(method, &json!(¶ms), error_level.into()) + .request(method, &json!(¶ms), error_level.into(), None) .await; result }) @@ -473,12 +474,20 @@ impl Web3Rpcs { for x in self .conns .values() - .filter(|x| if allow_backups { true } else { !x.backup }) - .filter(|x| !skip.contains(x)) - .filter(|x| x.has_block_data(min_block_needed)) .filter(|x| { - if let Some(max_block_needed) = max_block_needed { - x.has_block_data(max_block_needed) + if !allow_backups && x.backup { + false + } else if skip.contains(x) { + false + } else if !x.has_block_data(min_block_needed) { + false + } else if max_block_needed + .and_then(|max_block_needed| { + Some(!x.has_block_data(max_block_needed)) + }) + .unwrap_or(false) + { + false } else { true } @@ -521,58 +530,22 @@ impl Web3Rpcs { let mut earliest_retry_at = None; for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() { - // under heavy load, it is possible for even our best server to be negative - let mut minimum = f64::MAX; - let mut maximum = f64::MIN; - // we sort on a combination of values. cache them here so that we don't do this math multiple times. - let mut available_request_map: HashMap<_, f64> = usable_rpcs + // TODO: is this necessary if we use sort_by_cached_key? + let available_request_map: HashMap<_, f64> = usable_rpcs .iter() .map(|rpc| { - // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? - // TODO: get active requests out of redis (that's definitely too slow) - // TODO: do something with hard limit instead? (but that is hitting redis too much) - let active_requests = rpc.active_requests() as f64; - let soft_limit = rpc.soft_limit as f64; - - let available_requests = soft_limit - active_requests; - - // trace!("available requests on {}: {}", rpc, available_requests); - - minimum = minimum.min(available_requests); - maximum = maximum.max(available_requests); - - (rpc, available_requests) + // TODO: weighted sort by remaining hard limit? + // TODO: weighted sort by soft_limit - ewma_active_requests? that assumes soft limits are any good + (rpc, 1.0) }) .collect(); - // trace!("minimum available requests: {}", minimum); - // trace!("maximum available requests: {}", maximum); - - if maximum < 0.0 { - // TODO: if maximum < 0 and there are other tiers on the same block, we should include them now - warn!("soft limits overloaded: {} to {}", minimum, maximum) - } - - // choose_multiple_weighted can't have negative numbers. shift up if any are negative - // TODO: is this a correct way to shift? - if minimum < 0.0 { - available_request_map = available_request_map - .into_iter() - .map(|(rpc, available_requests)| { - // TODO: is simple addition the right way to shift everyone? - // TODO: probably want something non-linear - // minimum is negative, so we subtract to make available requests bigger - let x = available_requests - minimum; - - (rpc, x) - }) - .collect() - } + debug!("todo: better sort here"); let sorted_rpcs = { if usable_rpcs.len() == 1 { - // TODO: return now instead? we shouldn't need another alloc + // TODO: try the next tier vec![usable_rpcs.get(0).expect("there should be 1")] } else { let mut rng = thread_fast_rng::thread_fast_rng(); @@ -589,12 +562,10 @@ impl Web3Rpcs { }; // now that the rpcs are sorted, try to get an active request handle for one of them + // TODO: pick two randomly and choose the one with the lower rpc.latency.ewma for best_rpc in sorted_rpcs.into_iter() { // increment our connection counter - match best_rpc - .try_request_handle(authorization, min_block_needed.is_none()) - .await - { + match best_rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::Handle(handle)) => { // trace!("opened handle: {}", best_rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -741,10 +712,7 @@ impl Web3Rpcs { } // check rate limits and increment our connection counter - match connection - .try_request_handle(authorization, min_block_needed.is_none()) - .await - { + match connection.try_request_handle(authorization, None).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -827,6 +795,7 @@ impl Web3Rpcs { &request.method, &json!(request.params), RequestRevertHandler::Save, + None, ) .await; @@ -1214,7 +1183,6 @@ mod tests { use super::*; use crate::rpcs::{ blockchain::{ConsensusFinder, SavedBlock}, - one::ProviderState, provider::Web3Provider, }; use ethers::types::{Block, U256}; @@ -1338,9 +1306,6 @@ mod tests { let head_rpc = Web3Rpc { name: "synced".to_string(), - provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( - Web3Provider::Mock, - ))), soft_limit: 1_000, automatic_block_limit: false, backup: false, @@ -1352,9 +1317,6 @@ mod tests { let lagged_rpc = Web3Rpc { name: "lagged".to_string(), - provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( - Web3Provider::Mock, - ))), soft_limit: 1_000, automatic_block_limit: false, backup: false, @@ -1553,9 +1515,6 @@ mod tests { let pruned_rpc = Web3Rpc { name: "pruned".to_string(), - provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( - Web3Provider::Mock, - ))), soft_limit: 3_000, automatic_block_limit: false, backup: false, @@ -1567,9 +1526,6 @@ mod tests { let archive_rpc = Web3Rpc { name: "archive".to_string(), - provider_state: AsyncRwLock::new(ProviderState::Connected(Arc::new( - Web3Provider::Mock, - ))), soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 9a01cd80..c1db0ad5 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -3,9 +3,9 @@ use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::BlockAndRpc; +use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; -use anyhow::Context; +use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::U256; use futures::future::try_join_all; @@ -21,50 +21,13 @@ use serde_json::json; use std::cmp::min; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU32, AtomicU64}; +use std::sync::atomic::{self, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; -// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then -#[derive(Clone, Debug)] -pub enum ProviderState { - None, - Connecting(Arc), - Connected(Arc), -} - -impl Default for ProviderState { - fn default() -> Self { - Self::None - } -} - -impl ProviderState { - pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc> { - match self { - ProviderState::None => None, - ProviderState::Connecting(x) => { - if allow_not_ready { - Some(x) - } else { - // TODO: do a ready check here? - None - } - } - ProviderState::Connected(x) => { - if x.ready() { - Some(x) - } else { - None - } - } - } - } -} - pub struct Web3RpcLatencies { /// Traack how far behind the fastest node we are pub new_head: Histogram, @@ -93,19 +56,15 @@ pub struct Web3Rpc { pub name: String, pub display_name: Option, pub db_conn: Option, - /// TODO: can we get this from the provider? do we even need it? - pub(super) url: String, + pub(super) ws_url: Option, + pub(super) http_url: Option, /// Some connections use an http_client. we keep a clone for reconnecting pub(super) http_client: Option, - /// keep track of currently open requests. We sort on this - pub(super) active_requests: AtomicU32, - /// keep track of total requests from the frontend - pub(super) frontend_requests: AtomicU64, - /// keep track of total requests from web3-proxy itself - pub(super) internal_requests: AtomicU64, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits - pub(super) provider_state: AsyncRwLock, + /// this provider is only used for new heads subscriptions + /// TODO: put the provider inside an arc? + pub(super) new_head_client: AsyncRwLock>>, /// keep track of hard limits pub(super) hard_limit_until: Option>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits @@ -121,7 +80,7 @@ pub struct Web3Rpc { pub(super) block_data_limit: AtomicU64, /// Lower tiers are higher priority when sending requests pub(super) tier: u64, - /// TODO: change this to a watch channel so that http providers can subscribe and take action on change + /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. pub(super) head_block: RwLock>, /// Track how fast this RPC is pub(super) latency: Web3RpcLatencies, @@ -132,39 +91,54 @@ impl Web3Rpc { // TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields #[allow(clippy::too_many_arguments)] pub async fn spawn( + mut config: Web3RpcConfig, name: String, - display_name: Option, chain_id: u64, db_conn: Option, - url_str: String, // optional because this is only used for http providers. websocket providers don't use it http_client: Option, + // TODO: rename to http_new_head_interval_sender? http_interval_sender: Option>>, - // TODO: have a builder struct for this. - hard_limit: Option<(u64, RedisPool)>, - // TODO: think more about this type - soft_limit: u32, - backup: bool, - block_data_limit: Option, + redis_pool: Option, + // TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough + // soft_limit: u32, block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, - tier: u64, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { - let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { - // TODO: is cache size 1 okay? i think we need - RedisRateLimiter::new( - "web3_proxy", - &format!("{}:{}", chain_id, name), - hard_rate_limit, - 60.0, - redis_pool, - ) - }); + let hard_limit = match (config.hard_limit, redis_pool) { + (None, None) => None, + (Some(hard_limit), Some(redis_pool)) => { + // TODO: in process rate limiter instead? or is deffered good enough? + let rrl = RedisRateLimiter::new( + "web3_proxy", + &format!("{}:{}", chain_id, name), + hard_limit, + 60.0, + redis_pool, + ); - // TODO: should we do this even if block_sender is None? then we would know limits on private relays - let block_data_limit: AtomicU64 = block_data_limit.unwrap_or_default().into(); + Some(rrl) + } + (None, Some(_)) => None, + (Some(_hard_limit), None) => { + return Err(anyhow::anyhow!( + "no redis client pool! needed for hard limit" + )) + } + }; + + let tx_id_sender = if config.subscribe_txs { + // TODO: warn if tx_id_sender is None? + tx_id_sender + } else { + None + }; + + let backup = config.backup; + + let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into(); let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some(); @@ -178,19 +152,36 @@ impl Web3Rpc { None }; + if config.ws_url.is_none() && config.http_url.is_none() { + if let Some(url) = config.url { + if url.starts_with("ws") { + config.ws_url = Some(url); + } else if url.starts_with("http") { + config.http_url = Some(url); + } else { + return Err(anyhow!("only ws or http urls are supported")); + } + } else { + return Err(anyhow!( + "either ws_url or http_url are required. it is best to set both" + )); + } + } + let new_connection = Self { name, db_conn: db_conn.clone(), - display_name, + display_name: config.display_name, http_client, - url: url_str, + ws_url: config.ws_url, + http_url: config.http_url, hard_limit, hard_limit_until, - soft_limit, + soft_limit: config.soft_limit, automatic_block_limit, backup, block_data_limit, - tier, + tier: config.tier, ..Default::default() }; @@ -224,6 +215,7 @@ impl Web3Rpc { async fn check_block_data_limit( self: &Arc, authorization: &Arc, + unlocked_provider: Option>, ) -> anyhow::Result> { if !self.automatic_block_limit { // TODO: is this a good thing to return? @@ -238,7 +230,7 @@ impl Web3Rpc { // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { let handle = self - .wait_for_request_handle(authorization, None, true) + .wait_for_request_handle(authorization, None, unlocked_provider.clone()) .await?; let head_block_num_future = handle.request::, U256>( @@ -246,6 +238,7 @@ impl Web3Rpc { &None, // error here are expected, so keep the level low Level::Debug.into(), + unlocked_provider.clone(), ); let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) @@ -264,7 +257,7 @@ impl Web3Rpc { // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: what should the request be? let handle = self - .wait_for_request_handle(authorization, None, true) + .wait_for_request_handle(authorization, None, unlocked_provider.clone()) .await?; let archive_result: Result = handle @@ -276,6 +269,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Level::Trace.into(), + unlocked_provider.clone(), ) .await; @@ -403,119 +397,111 @@ impl Web3Rpc { chain_id: u64, db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { - // trace!("provider_state {} locking...", self); - let mut provider_state = self - .provider_state - .try_write() - .context("locking provider for write")?; - // trace!("provider_state {} locked: {:?}", self, provider_state); - - match &*provider_state { - ProviderState::None => { - info!("connecting to {}", self); + if let Ok(mut unlocked_provider) = self.new_head_client.try_write() { + #[cfg(test)] + if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() { + return Ok(()); } - ProviderState::Connecting(provider) | ProviderState::Connected(provider) => { - // disconnect the current provider - if let Web3Provider::Mock = provider.as_ref() { - return Ok(()); + + *unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() { + // set up ws client + match &*unlocked_provider { + None => { + info!("connecting to {}", self); + } + Some(_) => { + debug!("reconnecting to {}", self); + + // tell the block subscriber that this rpc doesn't have any blocks + if let Some(block_sender) = block_sender { + block_sender + .send_async((None, self.clone())) + .await + .context("block_sender during connect")?; + } + + // reset sync status + let mut head_block = self.head_block.write(); + *head_block = None; + + // disconnect the current provider + // TODO: what until the block_sender's receiver finishes updating this item? + *unlocked_provider = None; + } } - debug!("reconnecting to {}", self); + let p = Web3Provider::from_str(ws_url.as_str(), None) + .await + .context(format!("failed connecting to {}", ws_url))?; - // disconnect the current provider - *provider_state = ProviderState::None; + assert!(p.ws().is_some()); - // reset sync status - // trace!("locking head block on {}", self); - { - let mut head_block = self.head_block.write(); - *head_block = None; - } - // trace!("done with head block on {}", self); - - // tell the block subscriber that we don't have any blocks - if let Some(block_sender) = block_sender { - block_sender - .send_async((None, self.clone())) + Some(Arc::new(p)) + } else { + // http client + if let Some(url) = &self.http_url { + let p = Web3Provider::from_str(url, self.http_client.clone()) .await - .context("block_sender during connect")?; + .context(format!("failed connecting to {}", url))?; + + assert!(p.http().is_some()); + + Some(Arc::new(p)) + } else { + None + } + }; + + let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); + + // check the server's chain_id here + // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error + // TODO: what should the timeout be? should there be a request timeout? + // trace!("waiting on chain id for {}", self); + let found_chain_id: Result = self + .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .await? + .request( + "eth_chainId", + &json!(Option::None::<()>), + Level::Trace.into(), + unlocked_provider.clone(), + ) + .await; + // trace!("found_chain_id: {:?}", found_chain_id); + + match found_chain_id { + Ok(found_chain_id) => { + // TODO: there has to be a cleaner way to do this + if chain_id != found_chain_id.as_u64() { + return Err(anyhow::anyhow!( + "incorrect chain id! Config has {}, but RPC has {}", + chain_id, + found_chain_id + ) + .context(format!("failed @ {}", self))); + } + } + Err(e) => { + return Err(anyhow::Error::from(e)); } } - } - // trace!("Creating new Web3Provider on {}", self); - // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! - let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; + self.check_block_data_limit(&authorization, unlocked_provider.clone()) + .await?; - // trace!("saving provider state as NotReady on {}", self); - *provider_state = ProviderState::Connecting(Arc::new(new_provider)); + drop(unlocked_provider); - // drop the lock so that we can get a request handle - // trace!("provider_state {} unlocked", self); - drop(provider_state); - - let authorization = Arc::new(Authorization::internal(db_conn.cloned())?); - - // check the server's chain_id here - // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error - // TODO: what should the timeout be? should there be a request timeout? - // trace!("waiting on chain id for {}", self); - let found_chain_id: Result = self - .wait_for_request_handle(&authorization, None, true) - .await? - .request( - "eth_chainId", - &json!(Option::None::<()>), - Level::Trace.into(), - ) - .await; - // trace!("found_chain_id: {:?}", found_chain_id); - - match found_chain_id { - Ok(found_chain_id) => { - // TODO: there has to be a cleaner way to do this - if chain_id != found_chain_id.as_u64() { - return Err(anyhow::anyhow!( - "incorrect chain id! Config has {}, but RPC has {}", - chain_id, - found_chain_id - ) - .context(format!("failed @ {}", self))); - } + info!("successfully connected to {}", self); + } else { + if self.new_head_client.read().await.is_none() { + return Err(anyhow!("failed waiting for client")); } - Err(e) => { - return Err(anyhow::Error::from(e)); - } - } - - self.check_block_data_limit(&authorization).await?; - - { - // trace!("locking for ready..."); - let mut provider_state = self.provider_state.write().await; - // trace!("locked for ready..."); - - // TODO: do this without a clone - let ready_provider = provider_state - .provider(true) - .await - .context("provider missing")? - .clone(); - - *provider_state = ProviderState::Connected(ready_provider); - // trace!("unlocked for ready..."); - } - - info!("successfully connected to {}", self); + }; Ok(()) } - #[inline] - pub fn active_requests(&self) -> u32 { - self.active_requests.load(atomic::Ordering::Acquire) - } - async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, @@ -558,7 +544,7 @@ impl Web3Rpc { if self.block_data_limit() == U64::zero() { let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); - if let Err(err) = self.check_block_data_limit(&authorization).await { + if let Err(err) = self.check_block_data_limit(&authorization, None).await { warn!( "failed checking block limit after {} finished syncing. {:?}", self, err @@ -629,29 +615,21 @@ impl Web3Rpc { // provider is ready ready_tx.send(()).unwrap(); - // wait before doing the initial health check // TODO: how often? - // TODO: subscribe to self.head_block + // TODO: reset this timeout every time a new block is seen let health_sleep_seconds = 10; + + // wait before doing the initial health check sleep(Duration::from_secs(health_sleep_seconds)).await; loop { // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this - // trace!("health check on {}. locking...", conn); - if conn - .provider_state - .read() - .await - .provider(false) - .await - .is_none() - { + if let Some(client) = &*conn.new_head_client.read().await { // trace!("health check unlocked with error on {}", conn); // returning error will trigger a reconnect - return Err(anyhow::anyhow!("{} is not ready", conn)); + // TODO: do a query of some kind } - // trace!("health check on {}. unlocked", conn); sleep(Duration::from_secs(health_sleep_seconds)).await; } @@ -712,7 +690,7 @@ impl Web3Rpc { Ok(()) } - /// Subscribe to new blocks. If `reconnect` is true, this runs forever. + /// Subscribe to new blocks. async fn subscribe_new_heads( self: Arc, authorization: Arc, @@ -722,233 +700,218 @@ impl Web3Rpc { ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); - // trace!("locking on new heads"); - let provider_state = self - .provider_state - .try_read() - .context("subscribe_new_heads")? - .clone(); - // trace!("unlocked on new heads"); + let unlocked_provider = self.new_head_client.read().await; - // TODO: need a timeout - if let ProviderState::Connected(provider) = provider_state { - match provider.as_ref() { - Web3Provider::Mock => unimplemented!(), - Web3Provider::Http(_provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: try watch_blocks and fall back to this? + match unlocked_provider.as_deref() { + Some(Web3Provider::Http(_client)) => { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: try watch_blocks and fall back to this? - let mut http_interval_receiver = http_interval_receiver.unwrap(); + let mut http_interval_receiver = http_interval_receiver.unwrap(); - let mut last_hash = H256::zero(); + let mut last_hash = H256::zero(); - loop { - // TODO: what should the max_wait be? - match self - .wait_for_request_handle(&authorization, None, false) - .await - { - Ok(active_request_handle) => { - let block: Result, _> = active_request_handle - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), + loop { + // TODO: what should the max_wait be? + match self + .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .await + { + Ok(active_request_handle) => { + let block: Result, _> = active_request_handle + .request( + "eth_getBlockByNumber", + &json!(("latest", false)), + Level::Warn.into(), + None, + ) + .await; + + match block { + Ok(None) => { + warn!("no head block on {}", self); + + self.send_head_block_result( + Ok(None), + &block_sender, + block_map.clone(), ) - .await; + .await?; + } + Ok(Some(block)) => { + // don't send repeat blocks + let new_hash = + block.hash.expect("blocks here should always have hashes"); - match block { - Ok(None) => { - warn!("no head block on {}", self); + if new_hash != last_hash { + // new hash! + last_hash = new_hash; self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) - .await?; - } - Ok(Some(block)) => { - // don't send repeat blocks - let new_hash = block - .hash - .expect("blocks here should always have hashes"); - - if new_hash != last_hash { - // new hash! - last_hash = new_hash; - - self.send_head_block_result( - Ok(Some(block)), - &block_sender, - block_map.clone(), - ) - .await?; - } - } - Err(err) => { - // we did not get a block back. something is up with the server. take it out of rotation - self.send_head_block_result( - Err(err), + Ok(Some(block)), &block_sender, block_map.clone(), ) .await?; } } + Err(err) => { + // we did not get a block back. something is up with the server. take it out of rotation + self.send_head_block_result( + Err(err), + &block_sender, + block_map.clone(), + ) + .await?; + } } - Err(err) => { - warn!("Internal error on latest block from {}. {:?}", self, err); + } + Err(err) => { + warn!("Internal error on latest block from {}. {:?}", self, err); - self.send_head_block_result( - Ok(None), - &block_sender, - block_map.clone(), - ) + self.send_head_block_result(Ok(None), &block_sender, block_map.clone()) .await?; - // TODO: what should we do? sleep? extra time? - } + // TODO: what should we do? sleep? extra time? } + } - // wait for the next interval - // TODO: if error or rate limit, increase interval? - while let Err(err) = http_interval_receiver.recv().await { - match err { - broadcast::error::RecvError::Closed => { - // channel is closed! that's not good. bubble the error up - return Err(err.into()); - } - broadcast::error::RecvError::Lagged(lagged) => { - // querying the block was delayed - // this can happen if tokio is very busy or waiting for requests limits took too long - warn!("http interval on {} lagging by {}!", self, lagged); - } + // wait for the next interval + // TODO: if error or rate limit, increase interval? + while let Err(err) = http_interval_receiver.recv().await { + match err { + broadcast::error::RecvError::Closed => { + // channel is closed! that's not good. bubble the error up + return Err(err.into()); + } + broadcast::error::RecvError::Lagged(lagged) => { + // querying the block was delayed + // this can happen if tokio is very busy or waiting for requests limits took too long + warn!("http interval on {} lagging by {}!", self, lagged); } } } } - Web3Provider::Ws(provider) => { - // todo: move subscribe_blocks onto the request handle? - let active_request_handle = self - .wait_for_request_handle(&authorization, None, false) - .await; - let mut stream = provider.subscribe_blocks().await?; - drop(active_request_handle); - - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - // TODO: how does this get wrapped in an arc? does ethers handle that? - let block: Result, _> = self - .wait_for_request_handle(&authorization, None, false) - .await? - .request( - "eth_getBlockByNumber", - &json!(("latest", false)), - Level::Warn.into(), - ) - .await; - - let mut last_hash = match &block { - Ok(Some(new_block)) => new_block - .hash - .expect("blocks should always have a hash here"), - _ => H256::zero(), - }; - - self.send_head_block_result(block, &block_sender, block_map.clone()) - .await?; - - while let Some(new_block) = stream.next().await { - // TODO: check the new block's hash to be sure we don't send dupes - let new_hash = new_block - .hash - .expect("blocks should always have a hash here"); - - if new_hash == last_hash { - // some rpcs like to give us duplicates. don't waste our time on them - continue; - } else { - last_hash = new_hash; - } - - self.send_head_block_result( - Ok(Some(Arc::new(new_block))), - &block_sender, - block_map.clone(), - ) - .await?; - } - - // clear the head block. this might not be needed, but it won't hurt - self.send_head_block_result(Ok(None), &block_sender, block_map) - .await?; - - // TODO: is this always an error? - // TODO: we probably don't want a warn and to return error - warn!("new_heads subscription to {} ended", self); - Err(anyhow::anyhow!("new_heads subscription ended")) - } } - } else { - Err(anyhow::anyhow!( - "Provider not ready! Unable to subscribe to heads" - )) + Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => { + // todo: move subscribe_blocks onto the request handle? + let active_request_handle = self + .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .await; + let mut stream = client.subscribe_blocks().await?; + drop(active_request_handle); + + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // but all that does is print "new block" for the same block as current block + // TODO: how does this get wrapped in an arc? does ethers handle that? + // TODO: do this part over http? + let block: Result, _> = self + .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .await? + .request( + "eth_getBlockByNumber", + &json!(("latest", false)), + Level::Warn.into(), + unlocked_provider.clone(), + ) + .await; + + let mut last_hash = match &block { + Ok(Some(new_block)) => new_block + .hash + .expect("blocks should always have a hash here"), + _ => H256::zero(), + }; + + self.send_head_block_result(block, &block_sender, block_map.clone()) + .await?; + + while let Some(new_block) = stream.next().await { + // TODO: check the new block's hash to be sure we don't send dupes + let new_hash = new_block + .hash + .expect("blocks should always have a hash here"); + + if new_hash == last_hash { + // some rpcs like to give us duplicates. don't waste our time on them + continue; + } else { + last_hash = new_hash; + } + + self.send_head_block_result( + Ok(Some(Arc::new(new_block))), + &block_sender, + block_map.clone(), + ) + .await?; + } + + // clear the head block. this might not be needed, but it won't hurt + self.send_head_block_result(Ok(None), &block_sender, block_map) + .await?; + + // TODO: is this always an error? + // TODO: we probably don't want a warn and to return error + warn!("new_heads subscription to {} ended", self); + Err(anyhow::anyhow!("new_heads subscription ended")) + } + None => todo!("what should happen now? wait for a connection?"), + #[cfg(test)] + Some(Web3Provider::Mock) => unimplemented!(), } } + /// Turn on the firehose of pending transactions async fn subscribe_pending_transactions( self: Arc, authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - if let ProviderState::Connected(provider) = self - .provider_state - .try_read() - .context("subscribe_pending_transactions")? - .clone() - { - trace!("watching pending transactions on {}", self); - // TODO: does this keep the lock open for too long? - match provider.as_ref() { - Web3Provider::Mock => unimplemented!(), - Web3Provider::Http(provider) => { - // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: maybe subscribe to self.head_block? - // TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client? - futures::future::pending::<()>().await; - } - Web3Provider::Ws(provider) => { - // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle - let active_request_handle = self - .wait_for_request_handle(&authorization, None, false) - .await?; + // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big + // TODO: timeout + let provider = self.new_head_client.read().await; - let mut stream = provider.subscribe_pending_txs().await?; - - drop(active_request_handle); - - while let Some(pending_tx_id) = stream.next().await { - tx_id_sender - .send_async((pending_tx_id, self.clone())) - .await - .context("tx_id_sender")?; - - // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription - } - - // TODO: is this always an error? - // TODO: we probably don't want a warn and to return error - warn!("pending_transactions subscription ended on {}", self); - return Err(anyhow::anyhow!("pending_transactions subscription ended")); - } + trace!("watching pending transactions on {}", self); + // TODO: does this keep the lock open for too long? + match provider.as_deref() { + None => { + // TODO: wait for a provider + return Err(anyhow!("no provider")); } - } else { - warn!( - "Provider not ready! Unable to watch pending transactions on {}", - self - ); + Some(Web3Provider::Http(provider)) => { + // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: maybe subscribe to self.head_block? + // TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client? + futures::future::pending::<()>().await; + } + Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => { + // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle + let active_request_handle = self + .wait_for_request_handle(&authorization, None, provider.clone()) + .await?; + + let mut stream = client.subscribe_pending_txs().await?; + + drop(active_request_handle); + + while let Some(pending_tx_id) = stream.next().await { + tx_id_sender + .send_async((pending_tx_id, self.clone())) + .await + .context("tx_id_sender")?; + + // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription + } + + // TODO: is this always an error? + // TODO: we probably don't want a warn and to return error + warn!("pending_transactions subscription ended on {}", self); + return Err(anyhow::anyhow!("pending_transactions subscription ended")); + } + #[cfg(test)] + Some(Web3Provider::Mock) => futures::future::pending::<()>().await, } Ok(()) @@ -957,17 +920,17 @@ impl Web3Rpc { /// be careful with this; it might wait forever! /// `allow_not_ready` is only for use by health checks while starting the provider /// TODO: don't use anyhow. use specific error type - pub async fn wait_for_request_handle( - self: &Arc, - authorization: &Arc, + pub async fn wait_for_request_handle<'a>( + self: &'a Arc, + authorization: &'a Arc, max_wait: Option, - allow_not_ready: bool, + unlocked_provider: Option>, ) -> anyhow::Result { let max_wait = max_wait.map(|x| Instant::now() + x); loop { match self - .try_request_handle(authorization, allow_not_ready) + .try_request_handle(authorization, unlocked_provider.clone()) .await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), @@ -1015,20 +978,14 @@ impl Web3Rpc { pub async fn try_request_handle( self: &Arc, authorization: &Arc, - // TODO? ready_provider: Option<&Arc>, - allow_not_ready: bool, + // TODO: borrow on this instead of needing to clone the Arc? + unlocked_provider: Option>, ) -> anyhow::Result { // TODO: think more about this read block - if !allow_not_ready - && self - .provider_state - .read() - .await - .provider(allow_not_ready) - .await - .is_none() - { - trace!("{} is not ready", self); + // TODO: this should *not* be new_head_client. this should be a separate object + if unlocked_provider.is_some() || self.new_head_client.read().await.is_some() { + // we already have an unlocked provider. no need to lock + } else { return Ok(OpenRequestResult::NotReady(self.backup)); } @@ -1144,15 +1101,11 @@ impl Serialize for Web3Rpc { state.serialize_field("soft_limit", &self.soft_limit)?; - state.serialize_field( - "active_requests", - &self.active_requests.load(atomic::Ordering::Relaxed), - )?; - - state.serialize_field( - "total_requests", - &self.frontend_requests.load(atomic::Ordering::Relaxed), - )?; + // TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana? + // state.serialize_field( + // "frontend_requests", + // &self.frontend_requests.load(atomic::Ordering::Relaxed), + // )?; { // TODO: maybe this is too much data. serialize less? @@ -1216,7 +1169,7 @@ mod tests { let x = Web3Rpc { name: "name".to_string(), - url: "ws://example.com".to_string(), + ws_url: Some("ws://example.com".to_string()), soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index add17a43..a65c7cea 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -2,22 +2,45 @@ use anyhow::Context; use derive_more::From; use std::time::Duration; +// TODO: our own structs for these that handle streaming large responses +type EthersHttpProvider = ethers::providers::Provider; +type EthersWsProvider = ethers::providers::Provider; + /// Use HTTP and WS providers. // TODO: instead of an enum, I tried to use Box, but hit +// TODO: custom types that let us stream JSON responses #[derive(From)] pub enum Web3Provider { - Http(ethers::providers::Provider), - Ws(ethers::providers::Provider), - // TODO: only include this for tests. + Both(EthersHttpProvider, EthersWsProvider), + Http(EthersHttpProvider), + // TODO: deadpool? custom tokio-tungstenite + Ws(EthersWsProvider), + #[cfg(test)] Mock, } impl Web3Provider { pub fn ready(&self) -> bool { match self { - Self::Mock => true, + Self::Both(_, ws) => ws.as_ref().ready(), Self::Http(_) => true, - Self::Ws(provider) => provider.as_ref().ready(), + Self::Ws(ws) => ws.as_ref().ready(), + #[cfg(test)] + Self::Mock => true, + } + } + + pub fn http(&self) -> Option<&EthersHttpProvider> { + match self { + Self::Http(x) => Some(x), + _ => None, + } + } + + pub fn ws(&self) -> Option<&EthersWsProvider> { + match self { + Self::Both(_, x) | Self::Ws(x) => Some(x), + _ => None, } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index da204992..e9d4baf0 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,6 +1,6 @@ use super::one::Web3Rpc; use super::provider::Web3Provider; -use crate::frontend::authorization::{Authorization, AuthorizationType}; +use crate::frontend::authorization::Authorization; use anyhow::Context; use chrono::Utc; use entities::revert_log; @@ -11,7 +11,6 @@ use log::{debug, error, trace, warn, Level}; use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use serde_json::json; use std::fmt; -use std::sync::atomic; use std::sync::Arc; use thread_fast_rng::rand::Rng; use tokio::time::{sleep, Duration, Instant}; @@ -27,11 +26,11 @@ pub enum OpenRequestResult { } /// Make RPC requests through this handle and drop it when you are done. +/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible #[derive(Debug)] pub struct OpenRequestHandle { authorization: Arc, conn: Arc, - provider: Arc, } /// Depending on the context, RPC errors can require different handling. @@ -123,60 +122,9 @@ impl Authorization { impl OpenRequestHandle { pub async fn new(authorization: Arc, conn: Arc) -> Self { - // TODO: take request_id as an argument? - // TODO: attach a unique id to this? customer requests have one, but not internal queries - // TODO: what ordering?! - conn.active_requests.fetch_add(1, atomic::Ordering::Relaxed); - - let mut provider = None; - let mut logged = false; - while provider.is_none() { - // trace!("waiting on provider: locking..."); - - let ready_provider = conn - .provider_state - .read() - .await - // TODO: hard code true, or take a bool in the `new` function? - .provider(true) - .await - .cloned(); - // trace!("waiting on provider: unlocked!"); - - match ready_provider { - None => { - if !logged { - logged = true; - warn!("no provider for {}!", conn); - } - - // TODO: how should this work? a reconnect should be in progress. but maybe force one now? - // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? - // TODO: this is going to be way too verbose! - sleep(Duration::from_millis(100)).await - } - Some(x) => provider = Some(x), - } - } - let provider = provider.expect("provider was checked already"); - - // TODO: handle overflows? - // TODO: what ordering? - match authorization.as_ref().authorization_type { - AuthorizationType::Frontend => { - conn.frontend_requests - .fetch_add(1, atomic::Ordering::Relaxed); - } - AuthorizationType::Internal => { - conn.internal_requests - .fetch_add(1, atomic::Ordering::Relaxed); - } - } - Self { authorization, conn, - provider, } } @@ -196,6 +144,7 @@ impl OpenRequestHandle { method: &str, params: &P, revert_handler: RequestRevertHandler, + unlocked_provider: Option>, ) -> Result where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it @@ -205,12 +154,45 @@ impl OpenRequestHandle { // TODO: use tracing spans // TODO: including params in this log is way too verbose // trace!(rpc=%self.conn, %method, "request"); + trace!("requesting from {}", self.conn); + + let mut provider: Option> = None; + let mut logged = false; + while provider.is_none() { + // trace!("waiting on provider: locking..."); + + // TODO: this should *not* be new_head_client. that is dedicated to only new heads + if let Some(unlocked_provider) = unlocked_provider { + provider = Some(unlocked_provider); + break; + } + + let unlocked_provider = self.conn.new_head_client.read().await; + + if let Some(unlocked_provider) = unlocked_provider.clone() { + provider = Some(unlocked_provider); + break; + } + + if !logged { + debug!("no provider for open handle on {}", self.conn); + logged = true; + } + + sleep(Duration::from_millis(100)).await; + } + + let provider = provider.expect("provider was checked already"); // TODO: replace ethers-rs providers with our own that supports streaming the responses - let response = match &*self.provider { + let response = match provider.as_ref() { + #[cfg(test)] Web3Provider::Mock => unimplemented!(), - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, + Web3Provider::Ws(p) => p.request(method, params).await, + Web3Provider::Http(p) | Web3Provider::Both(p, _) => { + // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks + p.request(method, params).await + } }; // // TODO: i think ethers already has trace logging (and does it much more fancy) @@ -266,8 +248,22 @@ impl OpenRequestHandle { // check for "execution reverted" here let response_type = if let ProviderError::JsonRpcClientError(err) = err { // Http and Ws errors are very similar, but different types - let msg = match &*self.provider { + let msg = match &*provider { + #[cfg(test)] Web3Provider::Mock => unimplemented!(), + Web3Provider::Both(_, _) => { + if let Some(HttpClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + Some(&err.message) + } else if let Some(WsClientError::JsonRpcError(err)) = + err.downcast_ref::() + { + Some(&err.message) + } else { + None + } + } Web3Provider::Http(_) => { if let Some(HttpClientError::JsonRpcError(err)) = err.downcast_ref::() @@ -377,11 +373,3 @@ impl OpenRequestHandle { response } } - -impl Drop for OpenRequestHandle { - fn drop(&mut self) { - self.conn - .active_requests - .fetch_sub(1, atomic::Ordering::AcqRel); - } -} diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index dc5710d1..466a92be 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -28,13 +28,15 @@ impl Web3Rpcs { // TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? - let tx: Transaction = match rpc.try_request_handle(authorization, false).await { + // TODO: try_request_handle, or wait_for_request_handle? I think we want wait here + let tx: Transaction = match rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::Handle(handle)) => { handle .request( "eth_getTransactionByHash", &(pending_tx_id,), Level::Error.into(), + None, ) .await? } diff --git a/web3_proxy/src/rpcs/ws.rs b/web3_proxy/src/rpcs/ws.rs new file mode 100644 index 00000000..e69de29b