dont subscribe to blocks on the private tier

This commit is contained in:
Bryan Stitt 2022-08-10 21:52:28 +00:00
parent 6d92f33dc4
commit 63abd1251b
3 changed files with 51 additions and 40 deletions

View File

@ -66,11 +66,13 @@
- [x] Add a "weight" key to the servers. Sort on that after block. keep most requests local
- [x] cache db query results for user data. db is a big bottleneck right now
- [x] allow blocking public requests
- [x] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription?
- [x] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early
- [x] we were sorting best block the wrong direction. i flipped a.cmp(b) to b.cmp(a) so that the largest would be first, but then i used 'max_by' which looks at the end of the list
- [ ] cache more things locally or in redis
- [ ] use siwe messages and signatures for sign up and login
- [ ] basic request method stats
- [ ] active requests on /status is always 0 even when i'm running requests through
- [ ] im seeing ethspam occasionally try to query a future block. something must be setting the head block too early
## V1
@ -103,7 +105,6 @@
- [ ] emit stats for successes, retries, failures, with the types of requests, account, chain, rpc
- [ ] right now we send too many getTransaction queries to the private rpc tier and i are being rate limited by some of them. change to be serial and weight by hard/soft limit.
- [ ] improved logging with useful instrumentation
- [ ] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription?
- [ ] cli for creating and editing api keys
- [ ] Api keys need option to lock to IP, cors header, etc
- [ ] Only subscribe to transactions when someone is listening and if the server has opted in to it

View File

@ -167,6 +167,8 @@ impl Web3Connection {
}
}
let will_subscribe_to_blocks = block_sender.is_some();
// subscribe to new blocks and new transactions
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
let handle = {
@ -178,55 +180,57 @@ impl Web3Connection {
})
};
// TODO: make sure the server isn't still syncing
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(200)).await;
// we could take "archive" as a parameter, but we would want a safety check on it regardless
// check common archive thresholds
// TODO: would be great if rpcs exposed this
// TODO: move this to a helper function so we can recheck on errors or as the chain grows
// TODO: move this to a helper function that checks
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
let mut head_block_num = new_connection.head_block.read().1;
if will_subscribe_to_blocks {
// TODO: make sure the server isn't still syncing
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
while head_block_num == U64::zero() {
warn!(?new_connection, "no head block");
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep(Duration::from_millis(250)).await;
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
sleep(Duration::from_secs(1)).await;
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
let mut head_block_num = new_connection.head_block.read().1;
head_block_num = new_connection.head_block.read().1;
}
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
while head_block_num == U64::zero() {
warn!(?new_connection, "no head block");
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num
.saturating_sub((block_data_limit).into())
.max(U64::one());
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
sleep(Duration::from_secs(1)).await;
let archive_result: Result<Bytes, _> = new_connection
.wait_for_request_handle()
.await?
.request(
"eth_getCode",
(
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
),
)
.await;
head_block_num = new_connection.head_block.read().1;
}
trace!(?archive_result, "{}", new_connection);
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num
.saturating_sub((block_data_limit).into())
.max(U64::one());
if archive_result.is_ok() {
new_connection
.block_data_limit
.store(block_data_limit, atomic::Ordering::Release);
let archive_result: Result<Bytes, _> = new_connection
.wait_for_request_handle()
.await?
.request(
"eth_getCode",
(
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
),
)
.await;
break;
trace!(?archive_result, "{}", new_connection);
if archive_result.is_ok() {
new_connection
.block_data_limit
.store(block_data_limit, atomic::Ordering::Release);
break;
}
}
}

View File

@ -188,7 +188,13 @@ impl Web3Connections {
let http_client = http_client.clone();
let redis_client_pool = redis_client_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = Some(block_sender.clone());
let block_sender = if head_block_sender.is_some() {
Some(block_sender.clone())
} else {
None
};
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
tokio::spawn(async move {
@ -877,7 +883,7 @@ impl Web3Connections {
// TODO: do this before or after processing all the transactions in this block?
// TODO: only swap if there is a change?
debug!(?pending_synced_connections, "swapping");
trace!(?pending_synced_connections, "swapping");
self.synced_connections
.swap(Arc::new(pending_synced_connections));