include to_block more places

This commit is contained in:
Bryan Stitt 2023-02-10 20:45:57 -08:00
parent c959110986
commit f1bc00082a
5 changed files with 87 additions and 26 deletions

@ -330,6 +330,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] block all admin_ rpc commands
- [x] remove the "metered" crate now that we save aggregate queries?
- [x] add archive depth to app config
- [x] use from_block and to_block so that eth_getLogs is routed correctly
- [x] improve eth_sendRawTransaction server selection
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
@ -343,6 +345,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
- erigon only streams the JSON over HTTP. that code isn't enabled for websockets. so this should save memory on the erigon servers
- i think this also means we don't need to worry about changing the id that the user gives us.
- have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code)
- [ ] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly
- change the send_best function to only include servers that are at least close to fully synced
- [ ] have private transactions be enabled by a url setting rather than a setting on the key
- [ ] cli for adding rpc keys to an existing user
- [ ] rate limiting/throttling on query_user_stats

@ -1158,6 +1158,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
None,
None,
)
.await?;
@ -1231,6 +1232,7 @@ impl Web3ProxyApp {
&request,
Some(request_metadata.clone()),
head_block_num.as_ref(),
None,
Level::Trace,
num,
true,
@ -1527,6 +1529,7 @@ impl Web3ProxyApp {
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number());
self.response_cache
.try_get_with(cache_key, async move {
@ -1539,6 +1542,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
.await?;
@ -1547,7 +1551,7 @@ impl Web3ProxyApp {
// TODO: only cache the inner response
// TODO: how are we going to stream this?
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us?
Ok::<_, anyhow::Error>(response)
})
.await
@ -1567,6 +1571,7 @@ impl Web3ProxyApp {
request,
Some(&request_metadata),
None,
None,
)
.await?
}

@ -215,8 +215,8 @@ pub async fn block_needed(
};
return Ok(BlockNeeded::CacheRange {
from_block_num: from_block_num,
to_block_num: to_block_num,
from_block_num,
to_block_num,
cache_errors: true,
});
}

@ -167,7 +167,13 @@ impl Web3Rpcs {
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: think more about this wait_for_sync
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, None)
.try_send_best_consensus_head_connection(
authorization,
request,
None,
None,
None,
)
.await?;
let block = response.result.context("failed fetching block")?;
@ -258,7 +264,7 @@ impl Web3Rpcs {
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self
.try_send_best_consensus_head_connection(authorization, request, None, None)
.try_send_best_consensus_head_connection(authorization, request, None, None, None)
.await?;
if let Some(err) = response.error {

@ -411,7 +411,9 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Rpc>],
// TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
if let Ok(without_backups) = self
._best_consensus_head_connection(
@ -420,6 +422,7 @@ impl Web3Rpcs {
request_metadata,
skip,
min_block_needed,
max_block_needed,
)
.await
{
@ -435,6 +438,7 @@ impl Web3Rpcs {
request_metadata,
skip,
min_block_needed,
max_block_needed,
)
.await
}
@ -447,6 +451,7 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Rpc>],
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option<U64>, u64), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_connections_sender.borrow().clone();
@ -471,6 +476,13 @@ impl Web3Rpcs {
.filter(|x| if allow_backups { true } else { !x.backup })
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed))
.filter(|x| {
if let Some(max_block_needed) = max_block_needed {
x.has_block_data(max_block_needed)
} else {
true
}
})
.cloned()
{
let x_head_block = x.head_block.read().clone();
@ -637,28 +649,42 @@ impl Web3Rpcs {
pub async fn all_connections(
&self,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
always_include_backups: bool,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
if !always_include_backups {
if let Ok(without_backups) = self
._all_connections(false, authorization, block_needed, max_count)
._all_connections(
false,
authorization,
min_block_needed,
max_block_needed,
max_count,
)
.await
{
return Ok(without_backups);
}
}
self._all_connections(true, authorization, block_needed, max_count)
.await
self._all_connections(
true,
authorization,
min_block_needed,
max_block_needed,
max_count,
)
.await
}
async fn _all_connections(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
@ -680,11 +706,12 @@ impl Web3Rpcs {
.clone();
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(sort_rpcs_by_sync_status);
synced_conns.sort_by_cached_key(rpc_sync_status_sort_key);
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_conns isn't enough
let mut all_conns: Vec<_> = self.conns.values().cloned().collect();
all_conns.sort_by_cached_key(sort_rpcs_by_sync_status);
all_conns.sort_by_cached_key(rpc_sync_status_sort_key);
for connection in itertools::chain(synced_conns, all_conns) {
if max_count == 0 {
@ -701,7 +728,13 @@ impl Web3Rpcs {
continue;
}
if let Some(block_needed) = block_needed {
if let Some(block_needed) = min_block_needed {
if !connection.has_block_data(block_needed) {
continue;
}
}
if let Some(block_needed) = max_block_needed {
if !connection.has_block_data(block_needed) {
continue;
}
@ -709,7 +742,7 @@ impl Web3Rpcs {
// check rate limits and increment our connection counter
match connection
.try_request_handle(authorization, block_needed.is_none())
.try_request_handle(authorization, min_block_needed.is_none())
.await
{
Ok(OpenRequestResult::RetryAt(retry_at)) => {
@ -748,6 +781,7 @@ impl Web3Rpcs {
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
@ -768,6 +802,7 @@ impl Web3Rpcs {
request_metadata,
&skip_rpcs,
min_block_needed,
max_block_needed,
)
.await?
{
@ -1007,7 +1042,8 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,
block_needed: Option<&U64>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
error_level: Level,
max_count: Option<usize>,
always_include_backups: bool,
@ -1016,7 +1052,8 @@ impl Web3Rpcs {
match self
.all_connections(
authorization,
block_needed,
min_block_needed,
max_block_needed,
max_count,
always_include_backups,
)
@ -1099,6 +1136,7 @@ impl Web3Rpcs {
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
match proxy_mode {
ProxyMode::Best => {
@ -1107,6 +1145,7 @@ impl Web3Rpcs {
request,
request_metadata,
min_block_needed,
max_block_needed,
)
.await
}
@ -1154,7 +1193,7 @@ impl Serialize for Web3Rpcs {
/// sort by block number (descending) and tier (ascending)
/// TODO: should this be moved into a `impl Web3Rpc`?
/// TODO: take AsRef or something like that? We don't need an Arc here
fn sort_rpcs_by_sync_status(x: &Arc<Web3Rpc>) -> (u64, u64, u32) {
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (u64, u64, u32) {
let reversed_head_block = u64::MAX
- x.head_block
.read()
@ -1251,7 +1290,7 @@ mod tests {
.map(Arc::new)
.collect();
rpcs.sort_by_cached_key(sort_rpcs_by_sync_status);
rpcs.sort_by_cached_key(rpc_sync_status_sort_key);
let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect();
@ -1395,7 +1434,7 @@ mod tests {
// all_backend_connections gives all non-backup servers regardless of sync status
assert_eq!(
conns
.all_connections(&authorization, None, None, false)
.all_connections(&authorization, None, None, None, false)
.await
.unwrap()
.len(),
@ -1404,7 +1443,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
let x = conns
.best_consensus_head_connection(&authorization, None, &[], None)
.best_consensus_head_connection(&authorization, None, &[], None, None)
.await
.unwrap();
@ -1459,21 +1498,21 @@ mod tests {
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], None)
.best_consensus_head_connection(&authorization, None, &[], None, None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
.await,
Ok(OpenRequestResult::Handle(_))
));
@ -1481,7 +1520,7 @@ mod tests {
// future block should not get a handle
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None)
.await,
Ok(OpenRequestResult::NotReady(true))
));
@ -1605,8 +1644,15 @@ mod tests {
assert_eq!(conns.num_synced_rpcs(), 2);
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let best_head_server = conns
.best_consensus_head_connection(&authorization, None, &[], Some(&head_block.number()))
.best_consensus_head_connection(
&authorization,
None,
&[],
Some(&head_block.number()),
None,
)
.await;
assert!(matches!(
@ -1615,7 +1661,7 @@ mod tests {
));
let best_archive_server = conns
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()))
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
.await;
match best_archive_server {