diff --git a/Cargo.lock b/Cargo.lock index 60afcecd..f5a9b32f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -409,9 +409,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e3356844c4d6a6d6467b8da2cffb4a2820be256f50a3a386c9d152bab31043" +checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" dependencies = [ "async-trait", "axum-core", @@ -454,9 +454,9 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" +checksum = "37e5939e02c56fecd5c017c37df4238c0a839fa76b7f97acdd7efb804fd181cc" dependencies = [ "async-trait", "bytes", diff --git a/TODO.md b/TODO.md index 2ab8eba2..b1457eeb 100644 --- a/TODO.md +++ b/TODO.md @@ -185,6 +185,11 @@ These are roughly in order of completition - [x] if unknown config items, error - unknown configs are almost always a mistake. usually from me changing config parsing on my side and old fields not being updated to the new way - [x] also need to change how we disable rpcs since i was using an unknown field +- [x] [paginate responses](https://www.sea-ql.org/SeaORM/docs/basic-crud/select/#paginate-result) +- [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled + - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs + - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop + - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [-] ability to domain lock or ip lock said key - the code to check the database and use these entries already exists, but users don't have a way to set them - [-] new endpoints for users (not totally sure about the exact paths, but these features are all needed): @@ -201,21 +206,20 @@ These are roughly in order of completition - [ ] generate a new key from a web endpoint - [ ] modifying key settings such as private relay, revert logging, ip/origin/etc checks - [ ] GET logged reverts on an endpoint that **requires authentication**. -- [x] [paginate responses](https://www.sea-ql.org/SeaORM/docs/basic-crud/select/#paginate-result) - [ ] per-user stats should probably be locked behind authentication. the code is written but disabled for easy development - if we do this, we should also have an admin-only endpoint for seeing these for support requests - [ ] endpoint for creating/modifying api keys and their advanced security features -- [ ] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled - - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs - - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop - - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests +- [-] add configurable size limits to all the Caches + - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache +- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly + - this must be opt-in or spawned since it will slow things down and will make their calls less private + - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it +- [ ] don't use unix timestamps for response_millis since leap seconds will confuse it - [ ] WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - why is it failing to get the block from params when its set to None? That should be the simple case -- [-] add configurable size limits to all the Caches - - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - [ ] if user-specific caches have evictions that aren't from timeouts, log a warning - [ ] make the "not synced" error more verbose - I think there is a bug in our synced_rpcs filtering. likely in has_block_data @@ -225,9 +229,6 @@ These are roughly in order of completition - [ ] emit global stat on no servers synced - [ ] emit global stat on error (maybe just use sentry, but graphs are handy) - if we wait until the error handler to emit the stat, i don't think we have access to the authorized_request -- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - - this must be opt-in or spawned since it will slow things down and will make their calls less private - - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it - [ ] endpoint (and cli script) to rotate api key - [ ] if no bearer token found in redis (likely because it expired), send 401 unauthorized - [ ] user create script should allow multiple keys per user @@ -235,9 +236,8 @@ These are roughly in order of completition - [ ] WARN http_request: web3_proxy::frontend::errors: anyhow err=UserKey was not a ULID or UUID id=01GER4VBTS0FDHEBR96D1JRDZF method=POST - if invalid user id given, we give a 500. should be a different error code instead - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. -- [ ] BUG? WARN web3_proxy::rpcs::blockchain: Missing connection_head_block in block_hashes. Fetching now connection_head_hash=0x4b7a…14b5 conn_name=local_erigon_alpha_archive rpc=local_erigon_alpha_archive +- [ ] BUG? WARN web3_proxy::rpcs::blockchain: Missing connection_head_block in block_hashes. Fetching now connection_head_hash=0x4b7a…14b5 conn_name=local_erigon_alpha_archive rpc=local_erigon_alpha_archive - i see this a lot more than expected. why is it happening so much? better logs needed -- [ ] don't use unix timestamps for response_millis since leap seconds will confuse itt - [ ] from what i thought, /status should show hashes > numbers! - but block numbers count is maxed out (10k) - and block hashes count is tiny (83) diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index f42692c8..bdeb6c03 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -22,7 +22,7 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = { version = "1.0.65", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.9" -axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } +axum = { version = "0.5.17", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } axum-client-ip = "0.2.0" axum-macros = "0.2.3" # TODO: import chrono from sea-orm so we always have the same version diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index b542ca9c..a28872a9 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -23,7 +23,6 @@ use futures::future::Abortable; use futures::future::{join_all, AbortHandle}; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; -use futures::Future; use hashbrown::HashMap; use ipnet::IpNet; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; @@ -36,7 +35,6 @@ use serde::Serialize; use serde_json::json; use std::fmt; use std::net::IpAddr; -use std::pin::Pin; use std::str::FromStr; use std::sync::atomic::{self, AtomicUsize}; use std::sync::Arc; @@ -173,9 +171,13 @@ impl Web3ProxyApp { pub async fn spawn( top_config: TopConfig, num_workers: usize, + shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<( Arc, - Pin>>>, + // this handle is the main loops that we can cancel. select on this + FuturesUnordered>, + // this handle is the state saving background loops that we must let finish. join_all on this + FuturesUnordered>, )> { // safety checks on the config if let Some(redirect) = &top_config.app.redirect_user_url { @@ -213,8 +215,10 @@ impl Web3ProxyApp { let balanced_rpcs = top_config.balanced_rpcs; let private_rpcs = top_config.private_rpcs.unwrap_or_default(); - // TODO: try_join_all instead? - let handles = FuturesUnordered::new(); + // these are safe to cancel + let cancellable_handles = FuturesUnordered::new(); + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) + let important_background_handles = FuturesUnordered::new(); // make a http shared client // TODO: can we configure the connection pool? should we? @@ -267,15 +271,15 @@ impl Web3ProxyApp { // we do this in a channel so we don't slow down our response to the users let stat_sender = if let Some(db_conn) = db_conn.clone() { // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats - let (stat_sender, stat_handle, save_handle) = { + let (stat_sender, save_handle, stat_handle) = { // TODO: period from config instead of always being 60 seconds let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60); - emitter.spawn().await? + emitter.spawn(shutdown_receiver).await? }; - handles.push(stat_handle); - handles.push(save_handle); + cancellable_handles.push(stat_handle); + important_background_handles.push(save_handle); Some(stat_sender) } else { @@ -332,7 +336,7 @@ impl Web3ProxyApp { .context("spawning balanced rpcs")?; // save the handle to catch any errors - handles.push(balanced_handle); + cancellable_handles.push(balanced_handle); // connect to the private rpcs // only some chains have this, so this is optional @@ -363,7 +367,7 @@ impl Web3ProxyApp { None } else { // save the handle to catch any errors - handles.push(private_handle); + cancellable_handles.push(private_handle); Some(private_rpcs) } @@ -477,9 +481,7 @@ impl Web3ProxyApp { let app = Arc::new(app); - let handle = Box::pin(flatten_handles(handles)); - - Ok((app, handle)) + Ok((app, cancellable_handles, important_background_handles)) } pub fn prometheus_metrics(&self) -> String { diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index d7d6bdcb..e4570a2d 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -10,7 +10,7 @@ use sea_orm::{ActiveModelTrait, DatabaseConnection}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::{sync::atomic::AtomicU32, time::Duration}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::{broadcast, Mutex as AsyncMutex}; use tokio::task::JoinHandle; use tracing::{error, info, trace}; @@ -173,6 +173,7 @@ impl StatEmitter { pub async fn spawn( self: Arc, + shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<( flume::Sender, JoinHandle>, @@ -181,8 +182,11 @@ impl StatEmitter { let (aggregate_tx, aggregate_rx) = flume::unbounded::(); // TODO: join and flatten these handles - let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop(aggregate_rx)); - let save_handle = { tokio::spawn(self.save_stats_loop()) }; + let aggregate_handle = tokio::spawn( + self.clone() + .aggregate_stats_loop(aggregate_rx, shutdown_receiver), + ); + let save_handle = tokio::spawn(self.save_stats_loop()); Ok((aggregate_tx, aggregate_handle, save_handle)) } @@ -191,28 +195,42 @@ impl StatEmitter { async fn aggregate_stats_loop( self: Arc, aggregate_rx: flume::Receiver, + mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { // TODO: select on shutdown handle so we can be sure to save every aggregate! - while let Ok(x) = aggregate_rx.recv_async().await { - trace!(?x, "aggregating stat"); + tokio::select! { + x = aggregate_rx.recv_async() => { + match x { + Ok(x) => { + trace!(?x, "aggregating stat"); - // TODO: increment global stats (in redis? in local cache for prometheus?) + // TODO: increment global stats (in redis? in local cache for prometheus?) - // TODO: batch stats? spawn this? - // TODO: where can we wait on this handle? - let clone = self.clone(); - tokio::spawn(async move { clone.aggregate_stat(x).await }); - - // no need to save manually. they save on expire + // TODO: batch stats? + // TODO: where can we wait on this handle? + let clone = self.clone(); + tokio::spawn(async move { clone.aggregate_stat(x).await }); + }, + Err(err) => { + error!(?err, "aggregate_rx"); + } + } + } + x = shutdown_receiver.recv() => { + match x { + Ok(_) => info!("aggregate stats loop shutting down"), + Err(err) => error!(?err, "shutdown receiver"), + } + } } - // shutting down. force a save + // shutting down. force a save of any pending stats // we do not use invalidate_all because that is done on a background thread for (key, _) in self.aggregated_proxy_responses.into_iter() { self.aggregated_proxy_responses.invalidate(&key).await; } - info!("stat aggregator exited"); + info!("aggregate stats loop finished"); Ok(()) } diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 2c2dca45..5951cdbb 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -8,20 +8,22 @@ //#![warn(missing_docs)] #![forbid(unsafe_code)] +use futures::StreamExt; use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::thread; use tokio::runtime; +use tokio::sync::broadcast; use tokio::time::Duration; use tracing::{debug, info}; use tracing_subscriber::EnvFilter; -use web3_proxy::app::{flatten_handle, Web3ProxyApp}; +use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; use web3_proxy::config::{CliConfig, TopConfig}; use web3_proxy::{frontend, metrics_frontend}; fn run( - shutdown_receiver: flume::Receiver<()>, + shutdown_sender: broadcast::Sender<()>, cli_config: CliConfig, top_config: TopConfig, ) -> anyhow::Result<()> { @@ -71,7 +73,8 @@ fn run( let app_frontend_port = cli_config.port; let app_prometheus_port = cli_config.prometheus_port; - let (app, app_handle) = Web3ProxyApp::spawn(top_config, num_workers).await?; + let (app, app_handles, mut important_background_handles) = + Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone())); @@ -80,7 +83,7 @@ fn run( // if everything is working, these should both run forever // TODO: join these instead and use shutdown handler properly. probably use tokio's ctrl+c helper tokio::select! { - x = app_handle => { + x = flatten_handles(app_handles) => { match x { Ok(_) => info!("app_handle exited"), Err(e) => { @@ -104,17 +107,27 @@ fn run( } } } - _ = shutdown_receiver.recv_async() => { - // TODO: think more about this. we need some way for tests to tell the app to stop - info!("received shutdown signal"); - - // TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier - - return Ok(()) + x = tokio::signal::ctrl_c() => { + match x { + Ok(_) => info!("quiting from ctrl-c"), + Err(e) => { + return Err(e.into()); + } + } } }; - // TODO: wait on all the handles to stop + // one of the handles stopped. send a value so the others know to shut down + shutdown_sender.send(())?; + + // wait on all the important background tasks (like saving stats to the database) to complete + while let Some(x) = important_background_handles.next().await { + match x { + Err(e) => return Err(e.into()), + Ok(Err(e)) => return Err(e), + Ok(Ok(_)) => continue, + } + } info!("finished"); @@ -154,9 +167,9 @@ fn main() -> anyhow::Result<()> { // tokio has code for catching ctrl+c so we use that // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something - let (_shutdown_sender, shutdown_receiver) = flume::bounded(1); + let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); - run(shutdown_receiver, cli_config, top_config) + run(shutdown_sender, cli_config, top_config) } #[cfg(test)] @@ -237,11 +250,15 @@ mod tests { private_rpcs: None, }; - let (shutdown_sender, shutdown_receiver) = flume::bounded(1); + let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); // spawn another thread for running the app // TODO: allow launching into the local tokio runtime instead of creating a new one? - let handle = thread::spawn(move || run(shutdown_receiver, cli_config, app_config)); + let handle = { + let shutdown_sender = shutdown_sender.clone(); + + thread::spawn(move || run(shutdown_sender, cli_config, app_config)) + }; // TODO: do something to the node. query latest block, mine another block, query again let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 06da07c9..961cf9c9 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -94,7 +94,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // create a unique id for each request .layer(RequestIdLayer) // application state - .layer(Extension(proxy_app)) + .layer(Extension(proxy_app.clone())) // 404 for any unknown routes .fallback(errors::handler_404.into_service()); @@ -118,18 +118,6 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(service) - .with_graceful_shutdown(signal_shutdown()) .await .map_err(Into::into) } - -/// Tokio signal handler that will wait for a user to press CTRL+C. -/// Used in our hyper `Server` method `with_graceful_shutdown`. -async fn signal_shutdown() { - // TODO: take a shutdown_receiver and select on ctrl_c and it - info!("ctrl-c to quit"); - tokio::signal::ctrl_c() - .await - .expect("expect tokio signal ctrl-c"); - info!("signal shutdown"); -} diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index de84d826..cba42dd8 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -423,8 +423,8 @@ pub async fn user_balance_get( /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_balance_post( - TypedHeader(Authorization(bearer_token)): TypedHeader>, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { todo!("user_balance_post"); } @@ -434,8 +434,8 @@ pub async fn user_balance_post( /// TODO: one key per request? maybe /user/keys/:api_key? #[debug_handler] pub async fn user_keys_get( - TypedHeader(Authorization(bearer_token)): TypedHeader>, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { todo!("user_keys_get"); } @@ -446,8 +446,8 @@ pub async fn user_keys_get( /// TODO: one key per request? maybe /user/keys/:api_key? #[debug_handler] pub async fn user_keys_post( - TypedHeader(Authorization(bearer_token)): TypedHeader>, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { todo!("user_keys_post"); } @@ -459,8 +459,8 @@ pub async fn user_keys_post( /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_profile_get( - TypedHeader(Authorization(bearer_token)): TypedHeader>, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { todo!("user_profile_get"); } @@ -468,8 +468,8 @@ pub async fn user_profile_get( /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. #[debug_handler] pub async fn user_revert_logs_get( - TypedHeader(Authorization(bearer_token)): TypedHeader>, Extension(app): Extension>, + TypedHeader(Authorization(bearer_token)): TypedHeader>, ) -> FrontendResult { todo!("user_revert_logs_get"); } @@ -485,10 +485,8 @@ pub async fn user_revert_logs_get( /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_stats_detailed_get( - // TODO: turn this back on when done debugging. maybe add a path field for this - // TypedHeader(Authorization(bearer)): TypedHeader>, - bearer: Option>>, Extension(app): Extension>, + bearer: Option>>, Query(params): Query>, ) -> FrontendResult { let x = get_detailed_stats(&app, bearer, params).await?;