From 52bea50745cd0528c6c08bf1e82f43caca5f0228 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 8 Jul 2022 18:27:06 +0000 Subject: [PATCH] better error handling --- TODO.md | 2 +- web3-proxy/src/connection.rs | 26 ++++++++------------------ web3-proxy/src/main.rs | 27 ++++++++++++++++++--------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/TODO.md b/TODO.md index 85004152..44f96366 100644 --- a/TODO.md +++ b/TODO.md @@ -39,7 +39,7 @@ - [ ] automatically route to archive server when necessary - [ ] handle log subscriptions - [ ] basic request method stats -- [ ] http servers should check block at the very start +- [x] http servers should check block at the very start - [ ] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription? ## V1 diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 86b020de..3799b854 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -440,7 +440,6 @@ impl Web3Connection { // TODO: what should this interval be? probably automatically set to some fraction of block time // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - // TODO: let mut interval = interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -472,26 +471,17 @@ impl Web3Connection { // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? let mut stream = provider.subscribe_pending_txs().await?; + // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle drop(active_request_handle); - // TODO: query existing pending txs? - - // TODO: should the stream have a timeout on it here? - // TODO: i don't think loop match is what we want. i think while let would be better - loop { - match stream.next().await { - Some(pending_tx_id) => { - tx_id_sender - .send_async((pending_tx_id, self.clone())) - .await - .context("tx_id_sender")?; - } - None => { - warn!("subscription ended"); - break; - } - } + while let Some(pending_tx_id) = stream.next().await { + tx_id_sender + .send_async((pending_tx_id, self.clone())) + .await + .context("tx_id_sender")?; } + + warn!("subscription ended"); } } } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 56cc64b3..2c704ec0 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -32,10 +32,13 @@ fn main() -> anyhow::Result<()> { .compact() .init(); + // this probably won't matter for us in docker, but better safe than sorry fdlimit::raise_fd_limit(); + // initial configuration from flags let cli_config: CliConfig = argh::from_env(); + // advanced configuration info!("Loading rpc config @ {}", cli_config.config); let rpc_config: String = fs::read_to_string(cli_config.config)?; let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; @@ -45,10 +48,9 @@ fn main() -> anyhow::Result<()> { // TODO: this doesn't seem to do anything proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id)); - let chain_id = rpc_config.shared.chain_id; - let mut rt_builder = runtime::Builder::new_multi_thread(); + let chain_id = rpc_config.shared.chain_id; rt_builder.enable_all().thread_name_fn(move || { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need @@ -61,8 +63,6 @@ fn main() -> anyhow::Result<()> { rt_builder.worker_threads(cli_config.workers); } - let rt = rt_builder.build()?; - // spawn a thread for deadlock detection thread::spawn(move || loop { thread::sleep(Duration::from_secs(10)); @@ -81,7 +81,8 @@ fn main() -> anyhow::Result<()> { } }); - // spawn the root task + // start tokio's async runtime + let rt = rt_builder.build()?; rt.block_on(async { let (app, app_handle) = rpc_config.spawn().await?; @@ -90,12 +91,20 @@ fn main() -> anyhow::Result<()> { // if everything is working, these should both run forever tokio::select! { x = app_handle => { - // TODO: error log if error - info!(?x, "app_handle exited"); + match x { + Ok(_) => info!("app_handle exited"), + Err(e) => { + return Err(e); + } + } } x = flatten_handle(frontend_handle) => { - // TODO: error log if error - info!(?x, "frontend exited"); + match x { + Ok(_) => info!("frontend exited"), + Err(e) => { + return Err(e); + } + } } };