add locks around sleeping

This commit is contained in:
Bryan Stitt 2022-04-25 01:26:23 +00:00
parent 25b61c0467
commit 978deb3582

View File

@ -187,6 +187,9 @@ struct Web3ProxyState {
// TODO: LoudRpcs and BalancedRpcs should probably share a trait or something // TODO: LoudRpcs and BalancedRpcs should probably share a trait or something
balanced_rpc_tiers: Vec<BalancedRpcs>, balanced_rpc_tiers: Vec<BalancedRpcs>,
private_rpcs: LoudRpcs, private_rpcs: LoudRpcs,
/// lock this when all rate limiters are hit
balanced_rpc_ratelimiter_lock: RwLock<()>,
private_rpcs_ratelimiter_lock: RwLock<()>,
} }
impl Web3ProxyState { impl Web3ProxyState {
@ -205,6 +208,8 @@ impl Web3ProxyState {
.map(|servers| BalancedRpcs::new(servers, &clock)) .map(|servers| BalancedRpcs::new(servers, &clock))
.collect(), .collect(),
private_rpcs: LoudRpcs::new(private_rpcs, &clock), private_rpcs: LoudRpcs::new(private_rpcs, &clock),
balanced_rpc_ratelimiter_lock: Default::default(),
private_rpcs_ratelimiter_lock: Default::default(),
} }
} }
@ -220,6 +225,8 @@ impl Web3ProxyState {
{ {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
loop { loop {
let read_lock = self.private_rpcs_ratelimiter_lock.read().await;
match self.private_rpcs.get_upstream_servers().await { match self.private_rpcs.get_upstream_servers().await {
Ok(upstream_servers) => { Ok(upstream_servers) => {
if let Ok(result) = self if let Ok(result) = self
@ -230,16 +237,23 @@ impl Web3ProxyState {
} }
} }
Err(not_until) => { Err(not_until) => {
// TODO: some sort of lock here? drop(read_lock);
// TODO: there should probably be a lock on this so that other queries wait
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
let deadline = not_until.wait_time_from(self.clock.now()); let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await; sleep(deadline).await;
drop(write_lock);
} }
}; };
} }
} else { } else {
// this is not a private transaction (or no private relays are configured) // this is not a private transaction (or no private relays are configured)
loop { loop {
let read_lock = self.balanced_rpc_ratelimiter_lock.read().await;
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
let mut earliest_not_until = None; let mut earliest_not_until = None;
for balanced_rpcs in self.balanced_rpc_tiers.iter() { for balanced_rpcs in self.balanced_rpc_tiers.iter() {
@ -275,11 +289,17 @@ impl Web3ProxyState {
} }
} }
drop(read_lock);
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
// TODO: some sort of lock here? // TODO: some sort of lock here?
// we haven't returned an Ok, sleep and try again // we haven't returned an Ok, sleep and try again
// unwrap should be safe since we would have returned if it wasn't set // unwrap should be safe since we would have returned if it wasn't set
let deadline = earliest_not_until.unwrap().wait_time_from(self.clock.now()); let deadline = earliest_not_until.unwrap().wait_time_from(self.clock.now());
sleep(deadline).await; sleep(deadline).await;
drop(write_lock);
} }
} }
} }