From 8703532ed7f17ac467a3d309c3d295b024b06f76 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 27 Aug 2022 02:13:36 +0000 Subject: [PATCH] better logs and minor cleanup --- config/example.toml | 19 ++++--- web3_proxy/src/frontend/http_proxy.rs | 3 +- web3_proxy/src/frontend/users.rs | 8 +-- web3_proxy/src/frontend/ws_proxy.rs | 4 +- web3_proxy/src/rpcs/blockchain.rs | 65 +++++++++++++++-------- web3_proxy/src/rpcs/connection.rs | 76 +++++++++++++++------------ 6 files changed, 105 insertions(+), 70 deletions(-) diff --git a/config/example.toml b/config/example.toml index 3c949259..eacd59ee 100644 --- a/config/example.toml +++ b/config/example.toml @@ -1,6 +1,8 @@ [shared] chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" +min_synced_soft_limit = 2000 +min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" @@ -20,11 +22,6 @@ response_cache_max_bytes = 10000000000 soft_limit = 1_000 weight = 0 - #[balanced_rpcs.linkpool-light] - #url = "https://main-light.eth.linkpool.io" - #soft_limit = 1_000 - #weight = 1 - [balanced_rpcs.blastapi] url = "https://eth-mainnet.public.blastapi.io" soft_limit = 1_000 @@ -42,19 +39,25 @@ response_cache_max_bytes = 10000000000 [balanced_rpcs.pokt-v1] url = "https://eth-mainnet.gateway.pokt.network/v1/5f3453978e354ab992c4da79" - soft_limit = 1_000 + soft_limit = 500 weight = 1 [balanced_rpcs.pokt] url = "https://eth-rpc.gateway.pokt.network" - soft_limit = 1_000 + soft_limit = 500 weight = 1 [balanced_rpcs.linkpool] url = "https://main-rpc.linkpool.io" - soft_limit = 1_000 + soft_limit = 500 weight = 2 + # load balanced light nodes are not very reliable + #[balanced_rpcs.linkpool-light] + #url = "https://main-light.eth.linkpool.io" + #soft_limit = 100 + #weight = 3 + [private_rpcs] [private_rpcs.eden] diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index 86930351..8e5af78d 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -5,7 +5,6 @@ use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; -use std::net::IpAddr; use std::sync::Arc; use tracing::{error_span, Instrument}; use uuid::Uuid; @@ -15,7 +14,7 @@ pub async fn public_proxy_web3_rpc( Extension(app): Extension>, ClientIp(ip): ClientIp, ) -> FrontendResult { - let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?; + let _ip = rate_limit_by_ip(&app, ip).await?; let protocol = Protocol::HTTP; let user_id = 0; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index d0b6e263..bc3c4205 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -26,8 +26,8 @@ use redis_rate_limit::redis::AsyncCommands; use sea_orm::ActiveModelTrait; use serde::Deserialize; use siwe::Message; +use std::ops::Add; use std::sync::Arc; -use std::{net::IpAddr, ops::Add}; use time::{Duration, OffsetDateTime}; use ulid::Ulid; @@ -40,7 +40,7 @@ pub async fn get_login( // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { - let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?; + let _ip = rate_limit_by_ip(&app, ip).await?; // at first i thought about checking that user_address is in our db // but theres no need to separate the registration and login flows @@ -130,7 +130,7 @@ pub async fn post_login( Json(payload): Json, Query(query): Query, ) -> FrontendResult { - let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?; + let _ip = rate_limit_by_ip(&app, ip).await?; let mut new_user = true; // TODO: check the database @@ -222,7 +222,7 @@ pub async fn post_user( Extension(app): Extension>, Json(payload): Json, ) -> FrontendResult { - let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?; + let _ip = rate_limit_by_ip(&app, ip).await?; ProtectedAction::PostUser .verify(app.as_ref(), auth_token, &payload.primary_address) diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 2dedbbd4..7ec29d05 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -16,7 +16,7 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use serde_json::{json, value::RawValue}; -use std::{net::IpAddr, sync::Arc}; +use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tracing::{error, error_span, info, trace, Instrument}; use uuid::Uuid; @@ -33,7 +33,7 @@ pub async fn public_websocket_handler( ClientIp(ip): ClientIp, ws_upgrade: Option, ) -> FrontendResult { - let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?; + let _ip = rate_limit_by_ip(&app, ip).await?; let user_id = 0; let protocol = Protocol::Websocket; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 7688375f..ae73d537 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -56,8 +56,8 @@ impl<'a> BlockMetadata<'a> { } impl Web3Connections { - /// adds a block to our map of the blockchain - pub fn add_block_to_chain(&self, block: &Arc>) -> anyhow::Result<()> { + /// add a block to our map and it's hash to our graphmap of the blockchain + pub fn save_block(&self, block: &Arc>) -> anyhow::Result<()> { let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; if self.block_map.contains_key(&hash) { @@ -95,10 +95,12 @@ impl Web3Connections { Ok(()) } + /// Get a block from caches with fallback. + /// Will query a specific node or the best available. pub async fn block( &self, hash: &H256, - rpc: Option<&Web3Connection>, + rpc: Option<&Arc>, ) -> anyhow::Result>> { // first, try to get the hash from our cache if let Some(block) = self.block_map.get(hash) { @@ -110,24 +112,33 @@ impl Web3Connections { // TODO: helper for method+params => JsonRpcRequest // TODO: get block with the transactions? // TODO: does this id matter? - let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) }); - let request: JsonRpcRequest = serde_json::from_value(request)?; + + let request_params = (hash, false); // TODO: if error, retry? - let response = match rpc { + let block: Block = match rpc { Some(rpc) => { - todo!("send request to this rpc") + rpc.wait_for_request_handle() + .await? + .request("eth_getBlockByHash", request_params) + .await? + } + None => { + let request = + json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params }); + let request: JsonRpcRequest = serde_json::from_value(request)?; + + let response = self.try_send_best_upstream_server(request, None).await?; + + let block = response.result.unwrap(); + + serde_json::from_str(block.get())? } - None => self.try_send_best_upstream_server(request, None).await?, }; - let block = response.result.unwrap(); - - let block: Block = serde_json::from_str(block.get())?; - let block = Arc::new(block); - self.add_block_to_chain(&block)?; + self.save_block(&block)?; Ok(block) } @@ -235,7 +246,7 @@ impl Web3Connections { } else { connection_heads.insert(rpc.name.clone(), hash); - self.add_block_to_chain(&new_block)?; + self.save_block(&new_block)?; } } _ => { @@ -366,12 +377,12 @@ impl Web3Connections { drop(blockchain_guard); let soft_limit_met = consensus_soft_limit >= min_sum_soft_limit; + let num_synced_rpcs = consensus_rpcs.len(); let new_synced_connections = if soft_limit_met { // we have a consensus large enough to serve traffic let head_block_hash = highest_work_block.hash.unwrap(); let head_block_num = highest_work_block.number.unwrap(); - let num_synced_rpcs = consensus_rpcs.len(); if num_synced_rpcs < self.min_synced_rpcs { trace!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); @@ -382,9 +393,6 @@ impl Web3Connections { // otherwise, if there is a syncing node that is fast, our first head block might not be good // TODO: have a configurable "minimum rpcs" number that we can set - // TODO: this logs too much. only log when the hash is first updated? - debug!(hash=%head_block_hash, num=%head_block_num, rpcs=%num_synced_rpcs, limit=%consensus_soft_limit, "consensus head"); - // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? let conns = consensus_rpcs.into_iter().cloned().collect(); @@ -403,12 +411,27 @@ impl Web3Connections { SyncedConnections::default() }; - let old_synced_connections = Arc::new(new_synced_connections); + let new_head_hash = new_synced_connections.head_block_hash; + let new_head_num = new_synced_connections.head_block_num; + let new_synced_connections = Arc::new(new_synced_connections); + let num_connection_heads = connection_heads.len(); + + let old_synced_connections = self.synced_connections.swap(new_synced_connections); + + let old_head_hash = old_synced_connections.head_block_hash; + let total_rpcs = self.conns.len(); + + if new_head_hash == old_head_hash { + trace!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + } else if soft_limit_met { + // TODO: if new's parent is not old, warn? + + debug!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); - if soft_limit_met && Some(old_synced_connections.head_block_hash) != highest_work_block.hash - { // the head hash changed. forward to any subscribers head_block_sender.send(highest_work_block)?; + } else { + warn!(?soft_limit_met, %new_head_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) } Ok(()) diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 28d0adf6..8dfac2c6 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -154,51 +154,61 @@ impl Web3Connection { // TODO: i think instead of atomics, we could maybe use a watch channel sleep(Duration::from_millis(250)).await; - for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { - let mut head_block_num = new_connection.head_block.read().num; + new_connection.check_block_data_limit().await?; + } - // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though - while head_block_num == U64::zero() { - warn!(?new_connection, "no head block"); + Ok((new_connection, handle)) + } - // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? - sleep(Duration::from_secs(1)).await; + #[instrument] + async fn check_block_data_limit(self: &Arc) -> anyhow::Result> { + let mut limit = None; - head_block_num = new_connection.head_block.read().num; - } + for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { + let mut head_block_num = self.head_block.read().num; - // TODO: subtract 1 from block_data_limit for safety? - let maybe_archive_block = head_block_num - .saturating_sub((block_data_limit).into()) - .max(U64::one()); + // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though + while head_block_num == U64::zero() { + warn!("no head block"); - let archive_result: Result = new_connection - .wait_for_request_handle() - .await? - .request( - "eth_getCode", - ( - "0xdead00000000000000000000000000000000beef", - maybe_archive_block, - ), - ) - .await; + // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? + sleep(Duration::from_secs(1)).await; - trace!(?archive_result, "{}", new_connection); + head_block_num = self.head_block.read().num; + } - if archive_result.is_ok() { - new_connection - .block_data_limit - .store(block_data_limit, atomic::Ordering::Release); + // TODO: subtract 1 from block_data_limit for safety? + let maybe_archive_block = head_block_num + .saturating_sub((block_data_limit).into()) + .max(U64::one()); - break; - } + let archive_result: Result = self + .wait_for_request_handle() + .await? + .request( + "eth_getCode", + ( + "0xdead00000000000000000000000000000000beef", + maybe_archive_block, + ), + ) + .await; + + trace!(?archive_result, %self); + + if archive_result.is_ok() { + limit = Some(block_data_limit); + + break; } } - info!(?new_connection, "success"); + if let Some(limit) = limit { + self.block_data_limit + .store(limit, atomic::Ordering::Release); + } - Ok((new_connection, handle)) + Ok(limit) } /// TODO: this might be too simple. different nodes can prune differently