From 77a589a96d0a82075a43867c0144ee7ed7571f8d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 10 Aug 2022 05:56:09 +0000 Subject: [PATCH] pass name through and use pub more --- TODO.md | 1 + web3_proxy/src/app.rs | 85 ++++++------------ web3_proxy/src/config.rs | 3 + web3_proxy/src/connection.rs | 119 ++++++++++++-------------- web3_proxy/src/connections.rs | 54 +++++------- web3_proxy/src/frontend/http.rs | 17 ++-- web3_proxy/src/frontend/rate_limit.rs | 12 ++- web3_proxy/src/frontend/users.rs | 2 +- 8 files changed, 120 insertions(+), 173 deletions(-) diff --git a/TODO.md b/TODO.md index fcf72be2..18069801 100644 --- a/TODO.md +++ b/TODO.md @@ -66,6 +66,7 @@ - [x] Add a "weight" key to the servers. Sort on that after block. keep most requests local - [x] cache db query results for user data. db is a big bottleneck right now - [x] allow blocking public requests +- [ ] cache more things locally or in redis - [ ] use siwe messages and signatures for sign up and login - [ ] basic request method stats diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index a00ee74d..dc731101 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -58,6 +58,21 @@ type ActiveRequestsMap = DashMap>; pub type AnyhowJoinHandle = JoinHandle>; +// TODO: think more about TxState +#[derive(Clone)] +pub enum TxState { + Pending(Transaction), + Confirmed(Transaction), + Orphaned(Transaction), +} + +#[derive(Clone, Copy, From)] +pub struct UserCacheValue { + pub expires_at: Instant, + pub user_id: i64, + pub user_count_per_period: u32, +} + /// flatten a JoinError into an anyhow error /// Useful when joining multiple futures. pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { @@ -109,42 +124,26 @@ pub async fn get_migrated_db( Ok(db) } -// TODO: think more about TxState -#[derive(Clone)] -pub enum TxState { - Pending(Transaction), - Confirmed(Transaction), - Orphaned(Transaction), -} - -#[derive(Clone, Copy, From)] -pub struct UserCacheValue { - pub expires_at: Instant, - pub user_id: i64, - pub user_count_per_period: u32, -} - /// The application // TODO: this debug impl is way too verbose. make something smaller -// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard pub struct Web3ProxyApp { /// Send requests to the best server available - balanced_rpcs: Arc, + pub balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Arc, + pub private_rpcs: Arc, /// Track active requests so that we don't send the same query to multiple backends - active_requests: ActiveRequestsMap, + pub active_requests: ActiveRequestsMap, response_cache: ResponseLrcCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? head_block_receiver: watch::Receiver>>, pending_tx_sender: broadcast::Sender, - pending_transactions: Arc>, - user_cache: RwLock>, - redis_pool: Option, - rate_limiter: Option, - db_conn: Option, + pub pending_transactions: Arc>, + pub user_cache: RwLock>, + pub redis_pool: Option, + pub rate_limiter: Option, + pub db_conn: Option, } impl fmt::Debug for Web3ProxyApp { @@ -155,26 +154,6 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - pub fn db_conn(&self) -> Option<&sea_orm::DatabaseConnection> { - self.db_conn.as_ref() - } - - pub fn pending_transactions(&self) -> &DashMap { - &self.pending_transactions - } - - pub fn rate_limiter(&self) -> Option<&RedisCell> { - self.rate_limiter.as_ref() - } - - pub fn redis_pool(&self) -> Option<&RedisPool> { - self.redis_pool.as_ref() - } - - pub fn user_cache(&self) -> &RwLock> { - &self.user_cache - } - // TODO: should we just take the rpc config as the only arg instead? pub async fn spawn( app_config: AppConfig, @@ -195,12 +174,12 @@ impl Web3ProxyApp { None }; - let balanced_rpcs = app_config.balanced_rpcs.into_values().collect(); + let balanced_rpcs = app_config.balanced_rpcs; let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs { - private_rpcs.into_values().collect() + private_rpcs } else { - vec![] + Default::default() }; // TODO: try_join_all instead? @@ -523,18 +502,6 @@ impl Web3ProxyApp { Ok((subscription_abort_handle, response)) } - pub fn balanced_rpcs(&self) -> &Web3Connections { - &self.balanced_rpcs - } - - pub fn private_rpcs(&self) -> &Web3Connections { - &self.private_rpcs - } - - pub fn active_requests(&self) -> &ActiveRequestsMap { - &self.active_requests - } - /// send the request or batch of requests to the approriate RPCs #[instrument(skip_all)] pub async fn proxy_web3_rpc( diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 96962d7e..70aa0706 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -68,8 +68,10 @@ pub struct Web3ConnectionConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] + #[allow(clippy::too_many_arguments)] pub async fn spawn( self, + name: String, redis_client_pool: Option, chain_id: u64, http_client: Option, @@ -89,6 +91,7 @@ impl Web3ConnectionConfig { }; Web3Connection::spawn( + name, chain_id, self.url, http_client, diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 00bce794..77a6bf6f 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -73,8 +73,9 @@ impl fmt::Debug for Web3Provider { /// An active connection to a Web3Rpc pub struct Web3Connection { + name: String, /// TODO: can we get this from the provider? do we even need it? - url: String, + pub url: String, /// keep track of currently open requests. We sort on this active_requests: AtomicU32, /// provider is in a RwLock so that we can replace it if re-connecting @@ -83,66 +84,19 @@ pub struct Web3Connection { /// rate limits are stored in a central redis so that multiple proxies can share their rate limits hard_limit: Option, /// used for load balancing to the least loaded server - soft_limit: u32, + pub soft_limit: u32, block_data_limit: AtomicU64, - weight: u32, + pub weight: u32, head_block: RwLock<(H256, U64)>, } -impl Serialize for Web3Connection { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Connection", 4)?; - - // TODO: sanitize any credentials in the url - state.serialize_field("url", &self.url)?; - - let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); - state.serialize_field("block_data_limit", &block_data_limit)?; - - state.serialize_field("soft_limit", &self.soft_limit)?; - - state.serialize_field( - "active_requests", - &self.active_requests.load(atomic::Ordering::Relaxed), - )?; - - state.end() - } -} -impl fmt::Debug for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut f = f.debug_struct("Web3Connection"); - - f.field("url", &self.url); - - let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); - if block_data_limit == u64::MAX { - f.field("data", &"archive"); - } else { - f.field("data", &block_data_limit); - } - - f.finish_non_exhaustive() - } -} - -impl fmt::Display for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: filter basic auth and api keys - write!(f, "{}", &self.url) - } -} - impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] // TODO: have this take a builder (which will have channels attached) #[allow(clippy::too_many_arguments)] pub async fn spawn( + name: String, chain_id: u64, url_str: String, // optional because this is only used for http providers. websocket providers don't use it @@ -172,6 +126,7 @@ impl Web3Connection { let provider = Web3Provider::from_str(&url_str, http_client).await?; let new_connection = Self { + name, url: url_str.clone(), active_requests: 0.into(), provider: AsyncRwLock::new(Some(Arc::new(provider))), @@ -280,10 +235,6 @@ impl Web3Connection { Ok((new_connection, handle)) } - pub fn url(&self) -> &str { - &self.url - } - /// TODO: this might be too simple. different nodes can prune differently pub fn block_data_limit(&self) -> U64 { self.block_data_limit.load(atomic::Ordering::Acquire).into() @@ -338,11 +289,6 @@ impl Web3Connection { self.active_requests.load(atomic::Ordering::Acquire) } - #[inline] - pub fn soft_limit(&self) -> u32 { - self.soft_limit - } - #[inline] pub async fn has_provider(&self) -> bool { self.provider.read().await.is_some() @@ -668,10 +614,6 @@ impl Web3Connection { Ok(HandleResult::ActiveRequest(handle)) } - - pub fn weight(&self) -> u32 { - self.weight - } } impl Hash for Web3Connection { @@ -768,3 +710,52 @@ impl PartialEq for Web3Connection { self.url == other.url } } + +impl Serialize for Web3Connection { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 3 is the number of fields in the struct. + let mut state = serializer.serialize_struct("Web3Connection", 4)?; + + // the url is excluded because it likely includes private information. just show the name + state.serialize_field("name", &self.name)?; + + let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); + state.serialize_field("block_data_limit", &block_data_limit)?; + + state.serialize_field("soft_limit", &self.soft_limit)?; + + state.serialize_field( + "active_requests", + &self.active_requests.load(atomic::Ordering::Relaxed), + )?; + + state.end() + } +} + +impl fmt::Debug for Web3Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut f = f.debug_struct("Web3Connection"); + + f.field("url", &self.url); + + let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); + if block_data_limit == u64::MAX { + f.field("data", &"archive"); + } else { + f.field("data", &block_data_limit); + } + + f.finish_non_exhaustive() + } +} + +impl fmt::Display for Web3Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: filter basic auth and api keys + write!(f, "{}", &self.url) + } +} diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index f4904995..9a3df63b 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -31,10 +31,11 @@ use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; -// Serialize so we can print it on our debug endpoint +/// A collection of Web3Connections that are on the same block +/// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] struct SyncedConnections { - head_block_num: u64, + head_block_num: U64, head_block_hash: H256, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] @@ -53,16 +54,6 @@ impl fmt::Debug for SyncedConnections { } } -impl SyncedConnections { - pub fn head_block_hash(&self) -> &H256 { - &self.head_block_hash - } - - pub fn head_block_num(&self) -> U64 { - self.head_block_num.into() - } -} - #[derive(Default)] pub struct BlockChain { /// only includes blocks on the main chain. @@ -144,7 +135,7 @@ impl Web3Connections { // #[instrument(name = "spawn_Web3Connections", skip_all)] pub async fn spawn( chain_id: u64, - server_configs: Vec, + server_configs: HashMap, http_client: Option, redis_client_pool: Option, head_block_sender: Option>>>, @@ -193,7 +184,7 @@ impl Web3Connections { // turn configs into connections (in parallel) let spawn_handles: Vec<_> = server_configs .into_iter() - .map(|server_config| { + .map(|(server_name, server_config)| { let http_client = http_client.clone(); let redis_client_pool = redis_client_pool.clone(); let http_interval_sender = http_interval_sender.clone(); @@ -203,6 +194,7 @@ impl Web3Connections { tokio::spawn(async move { server_config .spawn( + server_name, redis_client_pool, chain_id, http_client, @@ -223,7 +215,7 @@ impl Web3Connections { // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit match x { Ok(Ok((connection, handle))) => { - connections.insert(connection.url().to_string(), connection); + connections.insert(connection.url.clone(), connection); handles.push(handle); } Ok(Err(err)) => { @@ -506,18 +498,18 @@ impl Web3Connections { pub fn head_block(&self) -> (U64, H256) { let synced_connections = self.synced_connections.load(); - let num = synced_connections.head_block_num(); - let hash = *synced_connections.head_block_hash(); - - (num, hash) + ( + synced_connections.head_block_num, + synced_connections.head_block_hash, + ) } pub fn head_block_hash(&self) -> H256 { - *self.synced_connections.load().head_block_hash() + self.synced_connections.load().head_block_hash } pub fn head_block_num(&self) -> U64 { - self.synced_connections.load().head_block_num() + self.synced_connections.load().head_block_num } pub fn synced(&self) -> bool { @@ -606,7 +598,7 @@ impl Web3Connections { let mut connection_heads = IndexMap::>>::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - if let Some(current_block) = connection_heads.get(rpc.url()) { + if let Some(current_block) = connection_heads.get(&rpc.url) { if current_block.hash == new_block.hash { // duplicate block continue; @@ -618,7 +610,7 @@ impl Web3Connections { } else { warn!(%rpc, ?new_block, "Block without hash!"); - connection_heads.remove(rpc.url()); + connection_heads.remove(&rpc.url); continue; }; @@ -631,7 +623,7 @@ impl Web3Connections { // maybe when a node is syncing or reconnecting? warn!(%rpc, ?new_block, "Block without number!"); - connection_heads.remove(rpc.url()); + connection_heads.remove(&rpc.url); continue; }; @@ -646,10 +638,10 @@ impl Web3Connections { if new_block_num == U64::zero() { warn!(%rpc, %new_block_num, "still syncing"); - connection_heads.remove(rpc.url()); + connection_heads.remove(&rpc.url); } else { // TODO: no clone? we end up with different blocks for every rpc - connection_heads.insert(rpc.url().to_string(), new_block.clone()); + connection_heads.insert(rpc.url.clone(), new_block.clone()); self.chain.add_block(new_block.clone(), false); } @@ -675,7 +667,7 @@ impl Web3Connections { for (rpc_url, head_block) in connection_heads.iter() { if let Some(rpc) = self.conns.get(rpc_url) { // we need the total soft limit in order to know when its safe to update the backends - total_soft_limit += rpc.soft_limit(); + total_soft_limit += rpc.soft_limit; let head_hash = head_block.hash.unwrap(); @@ -763,7 +755,7 @@ impl Web3Connections { .iter() .map(|rpc_url| { if let Some(rpc) = self.conns.get(*rpc_url) { - rpc.soft_limit() + rpc.soft_limit } else { 0 } @@ -824,7 +816,7 @@ impl Web3Connections { .collect(); let pending_synced_connections = SyncedConnections { - head_block_num: best_head_num.as_u64(), + head_block_num: best_head_num, head_block_hash: best_head_hash, conns, }; @@ -948,9 +940,9 @@ impl Web3Connections { .iter() .map(|rpc| { // TODO: get active requests and the soft limit out of redis? - let weight = rpc.weight(); + let weight = rpc.weight; let active_requests = rpc.active_requests(); - let soft_limit = rpc.soft_limit(); + let soft_limit = rpc.soft_limit; let utilization = active_requests as f32 / soft_limit as f32; diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index de5fc935..815941a3 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -6,7 +6,7 @@ use crate::app::Web3ProxyApp; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { - if app.balanced_rpcs().synced() { + if app.balanced_rpcs.synced() { (StatusCode::OK, "OK") } else { (StatusCode::SERVICE_UNAVAILABLE, ":(") @@ -17,17 +17,12 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// TODO: replace this with proper stats and monitoring pub async fn status(Extension(app): Extension>) -> impl IntoResponse { // TODO: what else should we include? uptime? prometheus? - let balanced_rpcs = app.balanced_rpcs(); - let private_rpcs = app.private_rpcs(); - let num_active_requests = app.active_requests().len(); - let num_pending_transactions = app.pending_transactions().len(); - let body = json!({ - "balanced_rpcs": balanced_rpcs, - "private_rpcs": private_rpcs, - "num_active_requests": num_active_requests, - "num_pending_transactions": num_pending_transactions, + "balanced_rpcs": app.balanced_rpcs, + "private_rpcs": app.private_rpcs, + "num_active_requests": app.active_requests.len(), + "num_pending_transactions": app.pending_transactions.len(), }); - (StatusCode::INTERNAL_SERVER_ERROR, Json(body)) + (StatusCode::OK, Json(body)) } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 246b1827..c55d096b 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -25,7 +25,7 @@ impl Web3ProxyApp { let rate_limiter_key = format!("ip-{}", ip); // TODO: dry this up with rate_limit_by_key - if let Some(rate_limiter) = self.rate_limiter() { + if let Some(rate_limiter) = &self.rate_limiter { match rate_limiter .throttle_key(&rate_limiter_key, None, None, None) .await @@ -52,10 +52,8 @@ impl Web3ProxyApp { } pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result { - let user_cache = self.user_cache(); - // check the local cache - let user_data = if let Some(cached_user) = user_cache.read().get(&user_key) { + let user_data = if let Some(cached_user) = self.user_cache.read().get(&user_key) { // TODO: also include the time this value was last checked! otherwise we cache forever! if cached_user.expires_at < Instant::now() { // old record found @@ -71,7 +69,7 @@ impl Web3ProxyApp { // if cache was empty, check the database let user_data = if user_data.is_none() { - if let Some(db) = self.db_conn() { + if let Some(db) = &self.db_conn { /// helper enum for query just a few columns instead of the entire table #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { @@ -105,7 +103,7 @@ impl Web3ProxyApp { }; // save for the next run - user_cache.write().insert(user_key, user_data); + self.user_cache.write().insert(user_key, user_data); user_data } else { @@ -118,7 +116,7 @@ impl Web3ProxyApp { }; // user key is valid. now check rate limits - if let Some(rate_limiter) = self.rate_limiter() { + if let Some(rate_limiter) = &self.rate_limiter { // TODO: how does max burst actually work? what should it be? let user_max_burst = user_data.user_count_per_period / 3; let user_period = 60; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 4ab2027a..9db3c1e9 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -53,7 +53,7 @@ pub async fn create_user( ..Default::default() }; - let db = app.db_conn().unwrap(); + let db = app.db_conn.as_ref().unwrap(); // TODO: proper error message let user = user.insert(db).await.unwrap();