diff --git a/Cargo.lock b/Cargo.lock index 912ff19d..ea4a41c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2162,6 +2162,20 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +[[package]] +name = "handlebars" +version = "4.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360d9740069b2f6cbb63ce2dbaa71a20d3185350cbb990d7bebeb9318415eb17" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3151,6 +3165,50 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pest" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69486e2b8c2d2aeb9762db7b4e00b0331156393555cff467f4163ff06821eef8" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13570633aff33c6d22ce47dd566b10a3b9122c2fe9d8e7501895905be532b91" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3c567e5702efdc79fb18859ea74c3eb36e14c43da7b8c1f098a4ed6514ec7a0" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb32be5ee3bbdafa8c7a18b0a8a8d962b66cfa2ceee4037f49267a50ee821fe" +dependencies = [ + "once_cell", + "pest", + "sha-1", +] + [[package]] name = "petgraph" version = "0.6.2" @@ -4947,6 +5005,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "ucd-trie" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89570599c4fe5585de2b388aab47e99f7fa4e9238a1399f707a02e356058141c" + [[package]] name = "uint" version = "0.9.3" @@ -5220,6 +5284,7 @@ dependencies = [ "flume", "fstrings", "futures", + "handlebars", "hashbrown", "indexmap", "migration", diff --git a/README.md b/README.md index 6becefc5..9d1b4622 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,6 @@ Test erigon (assuming it is on 8945): Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another. -Run ethspam for a more realistic load test: +Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test: ethspam --rpc http://127.0.0.1:8544/u/someuserkey | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/someuserkey diff --git a/TODO.md b/TODO.md index 18325eca..3de6819f 100644 --- a/TODO.md +++ b/TODO.md @@ -70,9 +70,9 @@ - [x] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early - [x] we were sorting best block the wrong direction. i flipped a.cmp(b) to b.cmp(a) so that the largest would be first, but then i used 'max_by' which looks at the end of the list - [x] HTTP GET to the websocket endpoints should redirect instead of giving an ugly error -- [ ] load the redirected page from config -- [ ] use config to decide if the app should use `into_make_service_with_connect_info` or `into_make_service`. with_connect_info is needed for getting the ip from docker +- [x] load the redirected page from config - [ ] basic request method stats +- [ ] prettier output for create_user command. need the key in hex - [ ] use siwe messages and signatures for sign up and login - [ ] fantom_1 | 2022-08-10T22:19:43.522465Z WARN web3_proxy::jsonrpc: forwarding error err=missing field `jsonrpc` at line 1 column 60 - [ ] i think the server isn't following the spec. we need a context attached to this error so we know which one @@ -247,3 +247,4 @@ in another repo: event subscriber eth_1 | 2022-08-10T23:26:10.195014Z WARN web3_proxy::connections: chain is forked! 262 possible heads. 1/2/5/5 rpcs have 0x0538…bfff eth_1 | 2022-08-10T23:26:10.195658Z WARN web3_proxy::connections: chain is forked! 262 possible heads. 2/3/5/5 rpcs have 0x0538…bfff - [ ] disable redis persistence in dev +- [ ] fix ip detection when running in dev \ No newline at end of file diff --git a/config/example.bac b/config/example.bac index bf94daaa..7dd72107 100644 --- a/config/example.bac +++ b/config/example.bac @@ -1,4 +1,4 @@ -[shared] +[app] chain_id = 1 public_rate_limit_per_minute = 60_000 diff --git a/config/example.toml b/config/example.toml index a1fa4e20..3c949259 100644 --- a/config/example.toml +++ b/config/example.toml @@ -2,6 +2,8 @@ chain_id = 1 db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy" redis_url = "redis://dev-redis:6379/" +redirect_public_url = "https://llamanodes.com/free-rpc-stats" +redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" public_rate_limit_per_minute = 0 # 1GB of cache response_cache_max_bytes = 10000000000 diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 66d7c1c5..d2330e68 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -42,6 +42,7 @@ rand = "0.8.5" # TODO: regex has several "perf" features that we might want to use regex = "1.6.0" reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } +handlebars = "4.3.3" rustc-hash = "1.1.0" siwe = "0.4.1" sea-orm = { version = "0.9.1", features = ["macros"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 4f05db8d..785171cf 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -34,7 +34,7 @@ use uuid::Uuid; use crate::bb8_helpers; use crate::block_helpers::block_needed; -use crate::config::AppConfig; +use crate::config::{AppConfig, TopConfig}; use crate::connections::Web3Connections; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; @@ -139,6 +139,7 @@ pub struct Web3ProxyApp { // TODO: broadcast channel instead? head_block_receiver: watch::Receiver>>, pending_tx_sender: broadcast::Sender, + pub config: AppConfig, pub pending_transactions: Arc>, pub user_cache: RwLock>, pub redis_pool: Option, @@ -156,17 +157,23 @@ impl fmt::Debug for Web3ProxyApp { impl Web3ProxyApp { // TODO: should we just take the rpc config as the only arg instead? pub async fn spawn( - app_config: AppConfig, + top_config: TopConfig, num_workers: usize, ) -> anyhow::Result<( Arc, Pin>>>, )> { + // safety checks on the config + assert!( + top_config.app.redirect_user_url.contains("{{user_id}}"), + "redirect user url must contain \"{{user_id}}\"" + ); + // first, we connect to mysql and make sure the latest migrations have run - let db_conn = if let Some(db_url) = app_config.shared.db_url { + let db_conn = if let Some(db_url) = &top_config.app.db_url { let max_connections = num_workers.try_into()?; - let db = get_migrated_db(db_url, max_connections).await?; + let db = get_migrated_db(db_url.clone(), max_connections).await?; Some(db) } else { @@ -174,9 +181,9 @@ impl Web3ProxyApp { None }; - let balanced_rpcs = app_config.balanced_rpcs; + let balanced_rpcs = top_config.balanced_rpcs; - let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs { + let private_rpcs = if let Some(private_rpcs) = top_config.private_rpcs { private_rpcs } else { Default::default() @@ -196,11 +203,11 @@ impl Web3ProxyApp { .build()?, ); - let redis_pool = match app_config.shared.redis_url { + let redis_pool = match top_config.app.redis_url.as_ref() { Some(redis_url) => { info!("Connecting to redis on {}", redis_url); - let manager = RedisConnectionManager::new(redis_url)?; + let manager = RedisConnectionManager::new(redis_url.as_ref())?; let min_size = num_workers as u32; let max_size = min_size * 4; @@ -236,9 +243,8 @@ impl Web3ProxyApp { // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. // TODO: we should still have some sort of expiration or maximum size limit for the map - // TODO: attach context to this error let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( - app_config.shared.chain_id, + top_config.app.chain_id, balanced_rpcs, http_client.clone(), redis_pool.clone(), @@ -257,7 +263,7 @@ impl Web3ProxyApp { } else { // TODO: attach context to this error let (private_rpcs, private_handle) = Web3Connections::spawn( - app_config.shared.chain_id, + top_config.app.chain_id, private_rpcs, http_client.clone(), redis_pool.clone(), @@ -276,7 +282,8 @@ impl Web3ProxyApp { }; // TODO: how much should we allow? - let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3; + // TODO: im seeing errors in redis. just use the redis FAQ on rate limiting. its really simple + let public_max_burst = top_config.app.public_rate_limit_per_minute / 3; let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| { RedisCell::new( @@ -284,20 +291,21 @@ impl Web3ProxyApp { "web3_proxy", "frontend", public_max_burst, - app_config.shared.public_rate_limit_per_minute, + top_config.app.public_rate_limit_per_minute, 60, ) }); + // keep the borrow checker happy + let response_cache_max_bytes = top_config.app.response_cache_max_bytes; + let app = Self { + config: top_config.app, balanced_rpcs, private_rpcs, active_requests: Default::default(), // TODO: make the share configurable - response_cache: RwLock::new(FifoSizedMap::new( - app_config.shared.response_cache_max_bytes, - 100, - )), + response_cache: RwLock::new(FifoSizedMap::new(response_cache_max_bytes, 100)), head_block_receiver, pending_tx_sender, pending_transactions, diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 1b4304b7..eac61bff 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -17,15 +17,15 @@ use tokio::time::Duration; use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, Web3ProxyApp}; -use web3_proxy::config::{AppConfig, CliConfig}; +use web3_proxy::config::{CliConfig, TopConfig}; use web3_proxy::frontend; fn run( shutdown_receiver: flume::Receiver<()>, cli_config: CliConfig, - app_config: AppConfig, + top_config: TopConfig, ) -> anyhow::Result<()> { - debug!(?cli_config, ?app_config); + debug!(?cli_config, ?top_config); // spawn a thread for deadlock detection thread::spawn(move || loop { @@ -48,7 +48,7 @@ fn run( // set up tokio's async runtime let mut rt_builder = runtime::Builder::new_multi_thread(); - let chain_id = app_config.shared.chain_id; + let chain_id = top_config.app.chain_id; rt_builder.enable_all().thread_name_fn(move || { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need @@ -70,7 +70,7 @@ fn run( debug!(?num_workers); rt.block_on(async { - let (app, app_handle) = Web3ProxyApp::spawn(app_config, num_workers).await?; + let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?; let frontend_handle = tokio::spawn(frontend::serve(cli_config.port, app)); @@ -128,15 +128,17 @@ fn main() -> anyhow::Result<()> { // advanced configuration is on disk info!("Loading rpc config @ {}", cli_config.config); - let app_config: String = fs::read_to_string(cli_config.config.clone())?; - let app_config: AppConfig = toml::from_str(&app_config)?; + let top_config: String = fs::read_to_string(cli_config.config.clone())?; + let top_config: TopConfig = toml::from_str(&top_config)?; // TODO: this doesn't seem to do anything - proctitle::set_title(format!("web3_proxy-{}", app_config.shared.chain_id)); + proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id)); + // tokio has code for catching ctrl+c so we use that + // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something let (_shutdown_sender, shutdown_receiver) = flume::bounded(1); - run(shutdown_receiver, cli_config, app_config) + run(shutdown_receiver, cli_config, top_config) } #[cfg(test)] @@ -148,7 +150,7 @@ mod tests { use hashbrown::HashMap; use std::env; - use web3_proxy::config::{RpcSharedConfig, Web3ConnectionConfig}; + use web3_proxy::config::{AppConfig, Web3ConnectionConfig}; use super::*; @@ -190,13 +192,15 @@ mod tests { }; // make a test AppConfig - let app_config = AppConfig { - shared: RpcSharedConfig { + let app_config = TopConfig { + app: AppConfig { chain_id: 31337, db_url: None, redis_url: None, public_rate_limit_per_minute: 0, response_cache_max_bytes: 10_usize.pow(7), + redirect_public_url: "example.com/".to_string(), + redirect_user_url: "example.com/users/{user_address}".to_string(), }, balanced_rpcs: HashMap::from([ ( diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 70aa0706..b48ce308 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -28,15 +28,15 @@ pub struct CliConfig { } #[derive(Debug, Deserialize)] -pub struct AppConfig { - pub shared: RpcSharedConfig, +pub struct TopConfig { + pub app: AppConfig, pub balanced_rpcs: HashMap, pub private_rpcs: Option>, } /// shared configuration between Web3Connections #[derive(Debug, Deserialize)] -pub struct RpcSharedConfig { +pub struct AppConfig { // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 pub chain_id: u64, pub db_url: Option, @@ -45,6 +45,10 @@ pub struct RpcSharedConfig { pub public_rate_limit_per_minute: u32, #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, + /// the stats page url for an anonymous user. + pub redirect_public_url: String, + /// the stats page url for a logged in user. it must contain "{user_id}" + pub redirect_user_url: String, } fn default_public_rate_limit_per_minute() -> u32 { diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index b5bb3fb4..6111427d 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -182,6 +182,7 @@ impl Web3Connections { }; // turn configs into connections (in parallel) + // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) let spawn_handles: Vec<_> = server_configs .into_iter() .map(|(server_name, server_config)| { diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 559deb25..8ffab2e1 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -106,6 +106,7 @@ impl Web3ProxyApp { UserId, RequestsPerMinute, } + // TODO: join the user table to this to return the User? we don't always need it let user_data = match user_keys::Entity::find() .select_only() .column_as(user_keys::Column::UserId, QueryAs::UserId) diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 95294cf9..fb175fad 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -5,12 +5,12 @@ use axum::{ Extension, }; use axum_client_ip::ClientIp; -use fstrings::{format_args_f, format_f}; use futures::SinkExt; use futures::{ future::AbortHandle, stream::{SplitSink, SplitStream, StreamExt}, }; +use handlebars::Handlebars; use hashbrown::HashMap; use serde_json::{json, value::RawValue}; use std::sync::Arc; @@ -30,7 +30,7 @@ pub async fn public_websocket_handler( ClientIp(ip): ClientIp, ws_upgrade: Option, ) -> Response { - let ip = match app.rate_limit_by_ip(ip).await { + let _ip = match app.rate_limit_by_ip(ip).await { Ok(x) => match x.try_into_response().await { Ok(RateLimitResult::AllowedIp(x)) => x, Err(err_response) => return err_response, @@ -44,12 +44,8 @@ pub async fn public_websocket_handler( .on_upgrade(|socket| proxy_web3_socket(app, socket)) .into_response(), None => { - // this is not a websocket. give a friendly page. maybe redirect to the llama nodes home - // TODO: redirect to a configurable url - Redirect::to(&format_f!( - "https://llamanodes.com/free-rpc-stats#userip={ip}" - )) - .into_response() + // this is not a websocket. redirect to a friendly page + Redirect::to(&app.config.redirect_public_url).into_response() } } } @@ -72,13 +68,20 @@ pub async fn user_websocket_handler( match ws_upgrade { Some(ws_upgrade) => ws_upgrade.on_upgrade(|socket| proxy_web3_socket(app, socket)), None => { - // this is not a websocket. give a friendly page with stats for this user - // TODO: redirect to a configurable url + // TODO: store this on the app and use register_template? + let reg = Handlebars::new(); + + // TODO: show the user's address, not their id (remember to update the checks for {{user_id}} in app.rs) // TODO: query to get the user's address. expose that instead of user_id - Redirect::to(&format_f!( - "https://llamanodes.com/user-rpc-stats#user_id={user_id}" - )) - .into_response() + let user_url = reg + .render_template( + &app.config.redirect_user_url, + &json!({ "user_id": user_id }), + ) + .unwrap(); + + // this is not a websocket. redirect to a page for this user + Redirect::to(&user_url).into_response() } } }