fix bug with not using synced_connections correctly

This commit is contained in:
Bryan Stitt 2022-11-22 22:45:22 +00:00
parent 87b0ecc916
commit 5b1621ead4
6 changed files with 270 additions and 44 deletions

@ -243,10 +243,11 @@ These are roughly in order of completition
- [x] cache the status page for a second
- [x] request accounting for websockets
- [x] database merge scripts
- [ ] add block timestamp to the /status page
- [x] test that sets up a Web3Connection and asks "has_block" for old and new blocks
- [x] test that sets up Web3Connections with 2 nodes. one behind by several blocks. and see what the "next" server shows as- [ ] add block timestamp to the /status page
- [ ] be sure to save the timestamp in a way that our request routing logic can make use of it
- [ ] period_datetime should always be :00. right now it depends on start time
- [ ] two servers running will confuse rpc_accounting!
- [ ] two servers running will confuse rpc_accounting!
- one option: we need the insert to be an upsert, but how do we merge historgrams?
- [ ] change invite codes to set the user_tier
- [ ] period_datetime should always be :00. right now it depends on start time

@ -246,7 +246,7 @@ impl Web3Connections {
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete ArcBlock.
/// TODO: return something?
async fn process_block_from_rpc(
pub(crate) async fn process_block_from_rpc(
&self,
authorization: &Arc<Authorization>,
connection_heads: &mut HashMap<String, H256>,
@ -261,24 +261,15 @@ impl Web3Connections {
let rpc_head_num = rpc_head_block.number.unwrap();
let rpc_head_hash = rpc_head_block.hash.unwrap();
if rpc_head_num.is_zero() {
// TODO: i don't think we can get to this anymore now that we use Options
debug!("{} still syncing", rpc);
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false).await?;
connection_heads.remove(&rpc.name);
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
None
} else {
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false).await?;
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
}
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
}
None => {
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
@ -384,7 +375,7 @@ impl Web3Connections {
}
if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit
|| highest_rpcs.len() < self.min_synced_rpcs
|| highest_rpcs.len() < self.min_head_rpcs
{
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
@ -401,7 +392,7 @@ impl Web3Connections {
highest_rpcs_sum_soft_limit,
self.min_sum_soft_limit,
highest_rpcs.len(),
self.min_synced_rpcs,
self.min_head_rpcs,
highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit
);
break;
@ -448,8 +439,6 @@ impl Web3Connections {
.number
.expect("head blocks always have numbers");
debug_assert_ne!(consensus_head_num, U64::zero());
let num_consensus_rpcs = conns.len();
let consensus_head_block_id = BlockId {

@ -32,9 +32,9 @@ pub struct Web3Connection {
pub name: String,
pub display_name: Option<String>,
/// TODO: can we get this from the provider? do we even need it?
url: String,
pub(super) url: String,
/// Some connections use an http_client. we keep a clone for reconnecting
http_client: Option<reqwest::Client>,
pub(super) http_client: Option<reqwest::Client>,
/// keep track of currently open requests. We sort on this
pub(super) active_requests: AtomicU32,
/// keep track of total requests
@ -46,11 +46,11 @@ pub struct Web3Connection {
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
/// We do not use the deferred rate limiter because going over limits would cause errors
hard_limit: Option<RedisRateLimiter>,
pub(super) hard_limit: Option<RedisRateLimiter>,
/// used for load balancing to the least loaded server
pub(super) soft_limit: u32,
/// TODO: have an enum for this so that "no limit" prints pretty?
block_data_limit: AtomicU64,
pub(super) block_data_limit: AtomicU64,
/// Lower weight are higher priority when sending requests. 0 to 99.
pub(super) weight: f64,
/// TODO: should this be an AsyncRwLock?
@ -350,6 +350,7 @@ impl Web3Connection {
return Ok(());
}
Web3Provider::Ws(_) => {}
Web3Provider::Mock => return Ok(()),
}
info!("Reconnecting to {}", self);
@ -571,6 +572,7 @@ impl Web3Connection {
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: try watch_blocks and fall back to this?
@ -745,6 +747,7 @@ impl Web3Connection {
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
match &*provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time

@ -52,7 +52,7 @@ pub struct Web3Connections {
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// TODO: what should we use for edges?
pub(super) blockchain_graphmap: AsyncRwLock<DiGraphMap<H256, u32>>,
pub(super) min_synced_rpcs: usize,
pub(super) min_head_rpcs: usize,
pub(super) min_sum_soft_limit: u32,
}
@ -68,7 +68,7 @@ impl Web3Connections {
block_map: BlockHashesCache,
head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32,
min_synced_rpcs: usize,
min_head_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
@ -180,11 +180,11 @@ impl Web3Connections {
}
// TODO: now this errors for private rpcs when we disable all!
if connections.len() < min_synced_rpcs {
if connections.len() < min_head_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} connections! Add more connections or reduce min_synced_rpcs.",
"Only {}/{} connections! Add more connections or reduce min_head_rpcs.",
connections.len(),
min_synced_rpcs
min_head_rpcs
));
}
@ -212,7 +212,7 @@ impl Web3Connections {
block_numbers,
blockchain_graphmap: Default::default(),
min_sum_soft_limit,
min_synced_rpcs,
min_head_rpcs,
});
let authorization = Arc::new(Authorization::local(db_conn.clone())?);
@ -382,7 +382,7 @@ impl Web3Connections {
}
/// get the best available rpc server
pub async fn next_upstream_server(
pub async fn best_synced_backend_connection(
&self,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
@ -394,20 +394,23 @@ impl Web3Connections {
let min_block_needed = if let Some(min_block_needed) = min_block_needed {
*min_block_needed
} else {
self.head_block_num().context("not servers are synced")?
// TODO: error or OpenRequestResult::NotSynced? and then we turn that into a 502?
self.head_block_num().context("no servers are synced")?
};
// filter the synced rpcs
// TODO: we are going to be checking "has_block_data" a lot now
let synced_rpcs: Vec<Arc<Web3Connection>> = self
let head_rpcs: Vec<Arc<Web3Connection>> = self
.synced_connections
.load()
.conns
.values()
.iter()
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(&min_block_needed))
.cloned()
.collect();
if synced_rpcs.is_empty() {
if head_rpcs.is_empty() {
// TODO: what should happen here? automatic retry?
// TODO: more detailed error
return Err(anyhow::anyhow!("no servers are synced"));
@ -416,7 +419,7 @@ impl Web3Connections {
let mut minimum = 0.0;
// we sort on a bunch of values. cache them here so that we don't do this math multiple times.
let weight_map: HashMap<_, f64> = synced_rpcs
let weight_map: HashMap<_, f64> = head_rpcs
.iter()
.map(|rpc| {
// TODO: put this on the rpc object instead?
@ -457,8 +460,8 @@ impl Web3Connections {
let sorted_rpcs = {
let mut rng = thread_fast_rng::thread_fast_rng();
synced_rpcs
.choose_multiple_weighted(&mut rng, synced_rpcs.len(), |rpc| {
head_rpcs
.choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| {
*weight_map
.get(rpc)
.expect("rpc should always be in the weight map")
@ -512,7 +515,7 @@ impl Web3Connections {
/// get all rpc servers that are not rate limited
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
// TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers(
pub async fn all_backend_connections(
&self,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
@ -572,7 +575,7 @@ impl Web3Connections {
break;
}
match self
.next_upstream_server(
.best_synced_backend_connection(
authorization,
request_metadata,
&skip_rpcs,
@ -705,7 +708,10 @@ impl Web3Connections {
block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self.upstream_servers(authorization, block_needed).await {
match self
.all_backend_connections(authorization, block_needed)
.await
{
Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
@ -800,3 +806,225 @@ impl Serialize for Web3Connections {
state.end()
}
}
mod tests {
use super::*;
use crate::rpcs::{blockchain::BlockId, provider::Web3Provider};
use ethers::types::Block;
use log::LevelFilter;
use parking_lot::RwLock;
#[tokio::test]
async fn test_server_selection() {
// TODO: do this better. can test_env_logger and tokio test be stacked?
let _ = env_logger::builder()
.filter_level(LevelFilter::Error)
.filter_module("web3_proxy", LevelFilter::Trace)
.is_test(true)
.try_init();
let lagged_block = Block {
hash: Some(H256::random()),
number: Some(0.into()),
..Default::default()
};
let head_block = Block {
hash: Some(H256::random()),
number: Some(1.into()),
parent_hash: lagged_block.hash.unwrap(),
..Default::default()
};
// TODO: write a impl From for Block -> BlockId?
let lagged_block_id = BlockId {
hash: lagged_block.hash.unwrap(),
num: lagged_block.number.unwrap(),
};
let head_block_id = BlockId {
hash: head_block.hash.unwrap(),
num: head_block.number.unwrap(),
};
let lagged_block = Arc::new(lagged_block);
let head_block = Arc::new(head_block);
let block_data_limit = u64::MAX;
let head_rpc = Web3Connection {
name: "synced".to_string(),
display_name: None,
url: "ws://example.com/synced".to_string(),
http_client: None,
active_requests: 0.into(),
total_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 1_000,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block_id: RwLock::new(Some(head_block_id)),
open_request_handle_metrics: Arc::new(Default::default()),
};
let lagged_rpc = Web3Connection {
name: "lagged".to_string(),
display_name: None,
url: "ws://example.com/lagged".to_string(),
http_client: None,
active_requests: 0.into(),
total_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
hard_limit: None,
soft_limit: 1_000,
block_data_limit: block_data_limit.into(),
weight: 100.0,
head_block_id: RwLock::new(Some(lagged_block_id)),
open_request_handle_metrics: Arc::new(Default::default()),
};
assert!(head_rpc.has_block_data(&lagged_block.number.unwrap()));
assert!(head_rpc.has_block_data(&head_block.number.unwrap()));
assert!(lagged_rpc.has_block_data(&lagged_block.number.unwrap()));
assert!(!lagged_rpc.has_block_data(&head_block.number.unwrap()));
let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc);
let conns = HashMap::from([
(head_rpc.name.clone(), head_rpc.clone()),
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let conns = Web3Connections {
conns,
synced_connections: Default::default(),
pending_transactions: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_hashes: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
block_numbers: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blockchain_graphmap: Default::default(),
min_head_rpcs: 1,
min_sum_soft_limit: 1,
};
let authorization = Arc::new(Authorization::local(None).unwrap());
let (head_block_sender, _head_block_receiver) =
watch::channel::<ArcBlock>(Default::default());
let mut connection_heads = HashMap::new();
// process None so that
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
None,
lagged_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
None,
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
// no head block because the rpcs haven't communicated through their channels
assert!(conns.head_block_hash().is_none());
// all_backend_connections gives everything regardless of sync status
assert_eq!(
conns
.all_backend_connections(&authorization, None)
.await
.unwrap()
.len(),
2
);
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: should this be an error, or a OpenRequestResult::NotSynced?
assert!(conns
.best_synced_backend_connection(&authorization, None, &[], lagged_block.number.as_ref())
.await
.is_err());
// add lagged blocks to the conns. both servers should be allowed
conns.save_block(&lagged_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(lagged_block.clone()),
lagged_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(lagged_block.clone()),
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
assert_eq!(conns.num_synced_rpcs(), 2);
// add head block to the conns. lagged_rpc should not be available
conns.save_block(&head_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
head_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
assert_eq!(conns.num_synced_rpcs(), 1);
// TODO: is_ok is too simple. make sure its head_rpc
assert!(conns
.best_synced_backend_connection(&authorization, None, &[], None)
.await
.is_ok());
assert!(conns
.best_synced_backend_connection(&authorization, None, &[], Some(&0.into()))
.await
.is_ok());
assert!(conns
.best_synced_backend_connection(&authorization, None, &[], Some(&1.into()))
.await
.is_ok());
assert!(conns
.best_synced_backend_connection(&authorization, None, &[], Some(&2.into()))
.await
.is_err());
}
}

@ -8,12 +8,15 @@ use std::time::Duration;
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
// TODO: only include this for tests.
Mock,
}
impl Web3Provider {
pub fn ready(&self) -> bool {
// TODO: i'm not sure if this is enough
match self {
Self::Mock => true,
Self::Http(_) => true,
Self::Ws(provider) => provider.as_ref().ready(),
}

@ -200,6 +200,7 @@ impl OpenRequestHandle {
// TODO: really sucks that we have to clone here
let response = match provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};
@ -242,6 +243,7 @@ impl OpenRequestHandle {
let is_revert = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match provider {
Web3Provider::Mock => unimplemented!(),
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()