another pass at server selection

This commit is contained in:
Bryan Stitt 2022-12-07 22:54:38 -08:00
parent 78a2119c07
commit 86f6b16761
5 changed files with 151 additions and 136 deletions

@ -254,6 +254,9 @@ These are roughly in order of completition
- need to do all the connections in parallel with spawns - need to do all the connections in parallel with spawns
- [x] add block timestamp to the /status page - [x] add block timestamp to the /status page
- [x] be sure to save the timestamp in a way that our request routing logic can make use of it - [x] be sure to save the timestamp in a way that our request routing logic can make use of it
- [x] node selection still needs improvements. we still send to syncing nodes if they are close
- try consensus heads first! only if that is empty should we try others. and we should try them sorted by block height and then randomly chosen from there
- [ ] having the whole block in status is very verbose. trim it down
- [ ] `cost estimate` script - [ ] `cost estimate` script
- sum bytes and number of requests. prompt hosting costs. divide - sum bytes and number of requests. prompt hosting costs. divide
- [ ] `stat delay` script - [ ] `stat delay` script

@ -189,6 +189,7 @@ pub async fn get_migrated_db(
min_connections: u32, min_connections: u32,
max_connections: u32, max_connections: u32,
) -> anyhow::Result<DatabaseConnection> { ) -> anyhow::Result<DatabaseConnection> {
// TODO: this seems to fail silently
let db_conn = get_db(db_url, min_connections, max_connections).await?; let db_conn = get_db(db_url, min_connections, max_connections).await?;
let db_backend = db_conn.get_database_backend(); let db_backend = db_conn.get_database_backend();

@ -310,7 +310,7 @@ impl Web3Connection {
let oldest_block_num = head_block_num.saturating_sub(block_data_limit); let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
needed_block_num >= &oldest_block_num *needed_block_num >= oldest_block_num
} }
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter. /// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
@ -1017,6 +1017,19 @@ impl Web3Connection {
// TODO? ready_provider: Option<&Arc<Web3Provider>>, // TODO? ready_provider: Option<&Arc<Web3Provider>>,
allow_not_ready: bool, allow_not_ready: bool,
) -> anyhow::Result<OpenRequestResult> { ) -> anyhow::Result<OpenRequestResult> {
// TODO: think more about this read block
if !allow_not_ready
&& self
.provider_state
.read()
.await
.provider(allow_not_ready)
.await
.is_none()
{
return Ok(OpenRequestResult::NotReady);
}
// check rate limits // check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() { if let Some(ratelimiter) = self.hard_limit.as_ref() {
// TODO: how should we know if we should set expire or not? // TODO: how should we know if we should set expire or not?

@ -10,6 +10,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
@ -25,6 +26,7 @@ use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::collections::BTreeMap;
use std::fmt; use std::fmt;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
@ -371,61 +373,57 @@ impl Web3Connections {
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> { ) -> anyhow::Result<OpenRequestResult> {
let mut earliest_retry_at = None; let usable_rpcs_by_head_num: BTreeMap<U64, Vec<Arc<Web3Connection>>> =
if let Some(min_block_needed) = min_block_needed {
let usable_rpcs: Vec<Arc<Web3Connection>> = if let Some(min_block_needed) = min_block_needed
{
// need a potentially old block. check all the rpcs // need a potentially old block. check all the rpcs
// TODO: we are going to be checking "has_block_data" a lot now // TODO: we are going to be checking "has_block_data" a lot now
self.conns let mut m = BTreeMap::new();
for x in self
.conns
.values() .values()
.filter(|x| !skip.contains(x)) .filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed)) .filter(|x| x.has_block_data(min_block_needed))
.cloned() .cloned()
.collect() {
let x_head_block = x.head_block.read().clone();
match x_head_block {
None => continue,
Some(x_head) => {
m.entry(x_head.number()).or_insert_with(Vec::new).push(x);
}
}
}
m
} else { } else {
// need latest. filter the synced rpcs // need latest. filter the synced rpcs
// TODO: double check has_block_data? // TODO: double check has_block_data?
self.synced_connections let synced_connections = self.synced_connections.load();
.load()
let head_num = match synced_connections.head_block.as_ref() {
None => return Ok(OpenRequestResult::NotReady),
Some(x) => x.number(),
};
let c: Vec<_> = synced_connections
.conns .conns
.iter() .iter()
.filter(|x| !skip.contains(x)) .filter(|x| !skip.contains(x))
.cloned() .cloned()
.collect() .collect();
BTreeMap::from([(head_num, c)])
}; };
match usable_rpcs.len() { let mut earliest_retry_at = None;
0 => {
warn!(
"no rpcs @ {:?}: {:?} (skipped {:?})",
min_block_needed,
self.synced_connections.load(),
skip.iter().map(|x| &x.name).collect::<Vec<_>>()
);
// TODO: what should happen here? automatic retry?
// TODO: more detailed error
return Ok(OpenRequestResult::NotReady);
}
1 => {
let rpc = usable_rpcs.get(0).expect("len is 1");
// TODO: try or wait for a request handle?
let handle = rpc
.wait_for_request_handle(authorization, Duration::from_secs(60), false)
.await?;
return Ok(OpenRequestResult::Handle(handle));
}
_ => {
// anything else and we need to pick with a weighted random chooser
}
}
for usable_rpcs in usable_rpcs_by_head_num.into_values().rev() {
let mut minimum = f64::MAX; let mut minimum = f64::MAX;
// we sort on a bunch of values. cache them here so that we don't do this math multiple times. // we sort on a combination of values. cache them here so that we don't do this math multiple times.
let available_request_map: HashMap<_, f64> = usable_rpcs let mut available_request_map: HashMap<_, f64> = usable_rpcs
.iter() .iter()
.map(|rpc| { .map(|rpc| {
// TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that?
@ -442,15 +440,16 @@ impl Web3Connections {
// under heavy load, it is possible for even our best server to be negative // under heavy load, it is possible for even our best server to be negative
minimum = available_requests.min(minimum); minimum = available_requests.min(minimum);
(rpc.clone(), available_requests) // TODO: clone needed?
(rpc, available_requests)
}) })
.collect(); .collect();
trace!("minimum available requests: {}", minimum); trace!("minimum available requests: {}", minimum);
// weights can't have negative numbers. shift up if any are negative // weights can't have negative numbers. shift up if any are negative
let available_request_map: HashMap<_, f64> = if minimum < 0.0 { if minimum < 0.0 {
available_request_map available_request_map = available_request_map
.into_iter() .into_iter()
.map(|(rpc, weight)| { .map(|(rpc, weight)| {
// TODO: is simple addition the right way to shift everyone? // TODO: is simple addition the right way to shift everyone?
@ -461,12 +460,11 @@ impl Web3Connections {
(rpc, x) (rpc, x)
}) })
.collect() .collect()
} else { }
available_request_map
};
let sorted_rpcs = { let sorted_rpcs = {
if usable_rpcs.len() == 1 { if usable_rpcs.len() == 1 {
// TODO: return now instead?
vec![usable_rpcs.get(0).expect("there should be 1")] vec![usable_rpcs.get(0).expect("there should be 1")]
} else { } else {
let mut rng = thread_fast_rng::thread_fast_rng(); let mut rng = thread_fast_rng::thread_fast_rng();
@ -484,11 +482,11 @@ impl Web3Connections {
}; };
// now that the rpcs are sorted, try to get an active request handle for one of them // now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in sorted_rpcs.iter() { for best_rpc in sorted_rpcs.into_iter() {
// increment our connection counter // increment our connection counter
match rpc.try_request_handle(authorization, false).await { match best_rpc.try_request_handle(authorization, false).await {
Ok(OpenRequestResult::Handle(handle)) => { Ok(OpenRequestResult::Handle(handle)) => {
// // trace!("next server on {:?}: {:?}", self, rpc); // // trace!("next server on {:?}: {:?}", self, best_rpc);
return Ok(OpenRequestResult::Handle(handle)); return Ok(OpenRequestResult::Handle(handle));
} }
Ok(OpenRequestResult::RetryAt(retry_at)) => { Ok(OpenRequestResult::RetryAt(retry_at)) => {
@ -499,35 +497,35 @@ impl Web3Connections {
} }
Err(err) => { Err(err) => {
// TODO: log a warning? // TODO: log a warning?
warn!("No request handle for {}. err={:?}", rpc, err) warn!("No request handle for {}. err={:?}", best_rpc, err)
} }
} }
} }
}
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
match earliest_retry_at { match earliest_retry_at {
None => { None => {
// none of the servers gave us a time to retry at // none of the servers gave us a time to retry at
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: bring this back?
// we could return an error here, but maybe waiting a second will fix the problem // we could return an error here, but maybe waiting a second will fix the problem
// TODO: configurable max wait? the whole max request time, or just some portion? // TODO: configurable max wait? the whole max request time, or just some portion?
let handle = sorted_rpcs // let handle = sorted_rpcs
.get(0) // .get(0)
.expect("at least 1 is available") // .expect("at least 1 is available")
.wait_for_request_handle(authorization, Duration::from_secs(3), false) // .wait_for_request_handle(authorization, Duration::from_secs(3), false)
.await?; // .await?;
// Ok(OpenRequestResult::Handle(handle))
Ok(OpenRequestResult::Handle(handle)) Ok(OpenRequestResult::NotReady)
} }
Some(earliest_retry_at) => { Some(earliest_retry_at) => {
warn!("no servers on {:?}! {:?}", self, earliest_retry_at); warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
Ok(OpenRequestResult::RetryAt(earliest_retry_at)) Ok(OpenRequestResult::RetryAt(earliest_retry_at))
} }
} }

@ -8,7 +8,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method; use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes}; use ethers::types::{Address, Bytes};
use log::{debug, error, info, trace, warn, Level}; use log::{debug, error, trace, warn, Level};
use metered::metered; use metered::metered;
use metered::HitCount; use metered::HitCount;
use metered::ResponseTime; use metered::ResponseTime;