put rpcs in a rwlock again

This commit is contained in:
Bryan Stitt 2023-07-10 18:09:58 -07:00
parent 670b64de31
commit 31f772591b
4 changed files with 42 additions and 57 deletions

View File

@ -72,6 +72,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \
--mount=type=cache,target=/app/target \
set -eux; \
\
[ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \
cargo --locked --verbose fetch
# build tests (done its in own FROM so that it can run in parallel)
@ -86,6 +87,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \
--mount=type=cache,target=/app/target \
set -eux; \
\
[ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \
RUST_LOG=web3_proxy=trace,info \
cargo \
--frozen \
@ -106,6 +108,7 @@ RUN --mount=type=cache,target=/root/.cargo/git \
--mount=type=cache,target=/app/target \
set -eux; \
\
[ -e "$(pwd)/payment-contracts/src/contracts/mod.rs" ] || touch "$(pwd)/payment-contracts/build.rs"; \
cargo install \
--features "$WEB3_PROXY_FEATURES" \
--frozen \

View File

@ -654,6 +654,7 @@ impl Web3ProxyApp {
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> Web3ProxyResult<()> {
// TODO: also update self.config from new_top_config.app
info!("applying new config");
// connect to the backends
self.balanced_rpcs
@ -685,6 +686,8 @@ impl Web3ProxyApp {
}
}
info!("config applied successfully");
Ok(())
}

View File

@ -1,5 +1,4 @@
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::users::referral;
use entities::{
admin_increase_balance_receipt, increase_on_chain_balance_receipt, referee, referrer,
rpc_accounting_v2, rpc_key, stripe_increase_balance_receipt,
@ -147,7 +146,7 @@ impl Balance {
.into_tuple()
.one(db_conn)
.await
.web3_context("fetching rpc_accounting_v2")?
.web3_context("fetching total_spent_paid_credits and total_spent")?
.unwrap_or_default();
let one_time_referee_bonus = referee::Entity::find()
@ -160,7 +159,7 @@ impl Balance {
.into_tuple()
.one(db_conn)
.await
.web3_context("fetching referee")?
.web3_context("fetching one time referee bonus")?
.unwrap_or_default();
let referal_bonus = referee::Entity::find()
@ -177,7 +176,7 @@ impl Balance {
.into_tuple()
.one(db_conn)
.await
.web3_context("fetching referee and referral_codes")?
.web3_context("fetching referal bonus")?
.unwrap_or_default();
let balance = Self {

View File

@ -20,7 +20,8 @@ use futures::StreamExt;
use hashbrown::HashMap;
use itertools::Itertools;
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt};
use moka::future::{Cache, CacheBuilder};
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -44,7 +45,7 @@ pub struct Web3Rpcs {
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: Cache<String, Arc<Web3Rpc>>,
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -113,7 +114,7 @@ impl Web3Rpcs {
watch::channel(Default::default());
// by_name starts empty. self.apply_server_configs will add to it
let by_name = Cache::builder().build();
let by_name = RwLock::new(HashMap::new());
let max_head_block_lag = max_head_block_lag.unwrap_or(5.into());
@ -225,18 +226,20 @@ impl Web3Rpcs {
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((rpc, _handle))) => {
Ok(Ok((new_rpc, _handle))) => {
// web3 connection worked
let old_rpc = self.by_name.read().get(&new_rpc.name).map(Arc::clone);
// clean up the old rpc if it exists
if let Some(old_rpc) = self.by_name.get(&rpc.name) {
if let Some(old_rpc) = old_rpc {
trace!("old_rpc: {}", old_rpc);
// if the old rpc was synced, wait for the new one to sync
if old_rpc.head_block.as_ref().unwrap().borrow().is_some() {
let mut new_head_receiver =
rpc.head_block.as_ref().unwrap().subscribe();
trace!("waiting for new {} connection to sync", rpc);
new_rpc.head_block.as_ref().unwrap().subscribe();
trace!("waiting for new {} connection to sync", new_rpc);
// TODO: maximum wait time
while new_head_receiver.borrow_and_update().is_none() {
@ -248,8 +251,7 @@ impl Web3Rpcs {
// new rpc is synced (or old one was not synced). update the local map
// make sure that any new requests use the new connection
self.by_name.insert(rpc.name.clone(), rpc).await;
self.by_name.sync();
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
// tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
@ -257,8 +259,7 @@ impl Web3Rpcs {
disconnect_sender.send_replace(true);
}
} else {
self.by_name.insert(rpc.name.clone(), rpc).await;
self.by_name.sync();
self.by_name.write().insert(new_rpc.name.clone(), new_rpc);
}
}
Ok(Err(err)) => {
@ -287,17 +288,15 @@ impl Web3Rpcs {
}
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.get(conn_name)
self.by_name.read().get(conn_name).cloned()
}
pub fn len(&self) -> usize {
// // TODO: this seems not great. investigate better ways to do this
// self.by_name.sync();
self.by_name.entry_count() as usize
self.by_name.read().len()
}
pub fn is_empty(&self) -> bool {
self.len() > 0
self.by_name.read().is_empty()
}
/// TODO: rename to be consistent between "head" and "synced"
@ -652,7 +651,7 @@ impl Web3Rpcs {
let mut earliest_retry_at = None;
// TODO: filter the rpcs with Ranked.will_work_now
let mut all_rpcs: Vec<_> = self.by_name.iter().map(|x| x.1).collect();
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
let mut max_count = if let Some(max_count) = max_count {
max_count
@ -1291,8 +1290,6 @@ impl Display for Web3Rpcs {
impl fmt::Debug for Web3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
self.by_name.sync();
let consensus_rpcs = self.watch_ranked_rpcs.borrow().is_some();
f.debug_struct("Web3Rpcs")
@ -1310,10 +1307,8 @@ impl Serialize for Web3Rpcs {
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
{
// TODO: i didn't think we'd need this, but it shouldn't hurt
self.by_name.sync();
let rpcs: Vec<Arc<Web3Rpc>> = self.by_name.iter().map(|x| x.1).collect();
let by_name = self.by_name.read();
let rpcs: Vec<&Arc<Web3Rpc>> = by_name.values().collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?;
}
@ -1528,19 +1523,14 @@ mod tests {
let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(head_rpc.name.clone(), head_rpc.clone())
.await;
by_name
.insert(lagged_rpc.name.clone(), lagged_rpc.clone())
.await;
by_name.sync();
let mut by_name = HashMap::new();
by_name.insert(head_rpc.name.clone(), head_rpc.clone());
by_name.insert(lagged_rpc.name.clone(), lagged_rpc.clone());
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender: block_sender.clone(),
by_name,
by_name: RwLock::new(by_name),
chain_id,
name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender),
@ -1803,18 +1793,13 @@ mod tests {
let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(pruned_rpc.name.clone(), pruned_rpc.clone())
.await;
by_name
.insert(archive_rpc.name.clone(), archive_rpc.clone())
.await;
by_name.sync();
let mut by_name = HashMap::new();
by_name.insert(pruned_rpc.name.clone(), pruned_rpc.clone());
by_name.insert(archive_rpc.name.clone(), archive_rpc.clone());
let rpcs = Web3Rpcs {
block_sender,
by_name,
by_name: RwLock::new(by_name),
chain_id,
name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender),
@ -1992,22 +1977,17 @@ mod tests {
let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(mock_geth.name.clone(), mock_geth.clone())
.await;
by_name
.insert(
mock_erigon_archive.name.clone(),
mock_erigon_archive.clone(),
)
.await;
by_name.sync();
let mut by_name = HashMap::new();
by_name.insert(mock_geth.name.clone(), mock_geth.clone());
by_name.insert(
mock_erigon_archive.name.clone(),
mock_erigon_archive.clone(),
);
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name,
by_name: RwLock::new(by_name),
chain_id,
name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender),