From 63abd1251bf2a8f9bbee57ca1d936650d37af5e6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 10 Aug 2022 21:52:28 +0000 Subject: [PATCH] dont subscribe to blocks on the private tier --- TODO.md | 5 ++- web3_proxy/src/connection.rs | 76 ++++++++++++++++++----------------- web3_proxy/src/connections.rs | 10 ++++- 3 files changed, 51 insertions(+), 40 deletions(-) diff --git a/TODO.md b/TODO.md index 5af72881..b182b279 100644 --- a/TODO.md +++ b/TODO.md @@ -66,11 +66,13 @@ - [x] Add a "weight" key to the servers. Sort on that after block. keep most requests local - [x] cache db query results for user data. db is a big bottleneck right now - [x] allow blocking public requests +- [x] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription? +- [x] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early + - [x] we were sorting best block the wrong direction. i flipped a.cmp(b) to b.cmp(a) so that the largest would be first, but then i used 'max_by' which looks at the end of the list - [ ] cache more things locally or in redis - [ ] use siwe messages and signatures for sign up and login - [ ] basic request method stats - [ ] active requests on /status is always 0 even when i'm running requests through -- [ ] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early ## V1 @@ -103,7 +105,6 @@ - [ ] emit stats for successes, retries, failures, with the types of requests, account, chain, rpc - [ ] right now we send too many getTransaction queries to the private rpc tier and i are being rate limited by some of them. change to be serial and weight by hard/soft limit. - [ ] improved logging with useful instrumentation -- [ ] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription? - [ ] cli for creating and editing api keys - [ ] Api keys need option to lock to IP, cors header, etc - [ ] Only subscribe to transactions when someone is listening and if the server has opted in to it diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 2a25b0fb..6f59d1ac 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -167,6 +167,8 @@ impl Web3Connection { } } + let will_subscribe_to_blocks = block_sender.is_some(); + // subscribe to new blocks and new transactions // TODO: make transaction subscription optional (just pass None for tx_id_sender) let handle = { @@ -178,55 +180,57 @@ impl Web3Connection { }) }; - // TODO: make sure the server isn't still syncing - - // TODO: don't sleep. wait for new heads subscription instead - // TODO: i think instead of atomics, we could maybe use a watch channel - sleep(Duration::from_millis(200)).await; - // we could take "archive" as a parameter, but we would want a safety check on it regardless // check common archive thresholds // TODO: would be great if rpcs exposed this // TODO: move this to a helper function so we can recheck on errors or as the chain grows // TODO: move this to a helper function that checks - for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { - let mut head_block_num = new_connection.head_block.read().1; + if will_subscribe_to_blocks { + // TODO: make sure the server isn't still syncing - // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though - while head_block_num == U64::zero() { - warn!(?new_connection, "no head block"); + // TODO: don't sleep. wait for new heads subscription instead + // TODO: i think instead of atomics, we could maybe use a watch channel + sleep(Duration::from_millis(250)).await; - // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? - sleep(Duration::from_secs(1)).await; + for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { + let mut head_block_num = new_connection.head_block.read().1; - head_block_num = new_connection.head_block.read().1; - } + // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though + while head_block_num == U64::zero() { + warn!(?new_connection, "no head block"); - // TODO: subtract 1 from block_data_limit for safety? - let maybe_archive_block = head_block_num - .saturating_sub((block_data_limit).into()) - .max(U64::one()); + // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? + sleep(Duration::from_secs(1)).await; - let archive_result: Result = new_connection - .wait_for_request_handle() - .await? - .request( - "eth_getCode", - ( - "0xdead00000000000000000000000000000000beef", - maybe_archive_block, - ), - ) - .await; + head_block_num = new_connection.head_block.read().1; + } - trace!(?archive_result, "{}", new_connection); + // TODO: subtract 1 from block_data_limit for safety? + let maybe_archive_block = head_block_num + .saturating_sub((block_data_limit).into()) + .max(U64::one()); - if archive_result.is_ok() { - new_connection - .block_data_limit - .store(block_data_limit, atomic::Ordering::Release); + let archive_result: Result = new_connection + .wait_for_request_handle() + .await? + .request( + "eth_getCode", + ( + "0xdead00000000000000000000000000000000beef", + maybe_archive_block, + ), + ) + .await; - break; + trace!(?archive_result, "{}", new_connection); + + if archive_result.is_ok() { + new_connection + .block_data_limit + .store(block_data_limit, atomic::Ordering::Release); + + break; + } } } diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 78c36903..b5bb3fb4 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -188,7 +188,13 @@ impl Web3Connections { let http_client = http_client.clone(); let redis_client_pool = redis_client_pool.clone(); let http_interval_sender = http_interval_sender.clone(); - let block_sender = Some(block_sender.clone()); + + let block_sender = if head_block_sender.is_some() { + Some(block_sender.clone()) + } else { + None + }; + let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); tokio::spawn(async move { @@ -877,7 +883,7 @@ impl Web3Connections { // TODO: do this before or after processing all the transactions in this block? // TODO: only swap if there is a change? - debug!(?pending_synced_connections, "swapping"); + trace!(?pending_synced_connections, "swapping"); self.synced_connections .swap(Arc::new(pending_synced_connections));