less function args. more owned data

This commit is contained in:
Bryan Stitt 2023-11-01 22:31:00 -07:00
parent 723199076c
commit be68a9babb
2 changed files with 67 additions and 99 deletions

@ -40,8 +40,8 @@ pub struct Web3Rpcs {
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_head_block`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_head_block in an Option, but this one isn't?
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// Geth's subscriptions have the same potential for skipping blocks.
pub(crate) watch_ranked_rpcs: watch::Sender<Option<Arc<RankedRpcs>>>,
/// this head receiver makes it easy to wait until there is a new block

@ -46,7 +46,13 @@ pub struct Web3Rpc {
pub block_interval: Duration,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
pub subscribe_txs: bool,
/// Track in-flight requests
pub(super) active_requests: AtomicUsize,
/// mapping of block numbers and hashes
pub(super) block_map: Option<BlocksByHashCache>,
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) created_at: Option<Instant>,
/// most all requests prefer use the http_provider
pub(super) http_client: Option<reqwest::Client>,
pub(super) http_url: Option<Url>,
@ -66,6 +72,8 @@ pub struct Web3Rpc {
pub(super) automatic_block_limit: bool,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
pub backup: bool,
/// if subscribed to new heads, blocks are sent through this channel to update a parent Web3Rpcs
pub(super) block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
@ -73,10 +81,12 @@ pub struct Web3Rpc {
/// Track head block latency.
/// TODO: This is in a sync lock, but writes are infrequent and quick. Is this actually okay? Set from a spawned task and read an atomic instead?
pub(super) head_delay: RwLock<EwmaLatency>,
/// false if a health check has failed
pub(super) healthy: AtomicBool,
/// Track peak request latency
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Automatically set priority
/// Automatically set priority based on request latency and active requests
pub(super) tier: AtomicU32,
/// Track total internal requests served
pub(super) internal_requests: AtomicUsize,
@ -87,15 +97,11 @@ pub struct Web3Rpc {
/// Track time used by external requests served
/// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) median_latency: Option<RollingQuantileLatency>,
/// Track in-flight requests
pub(super) active_requests: AtomicUsize,
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
/// todo!(qthis gets cloned a TON. probably too much. something seems wrong)
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) created_at: Option<Instant>,
/// false if a health check has failed
pub(super) healthy: AtomicBool,
/// if subscribed to pending transactions, transactions are sent through this channel to update a parent Web3App
pub(super) pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
}
impl Web3Rpc {
@ -200,11 +206,20 @@ impl Web3Rpc {
// TODO: start optimistically?
let healthy = false.into();
let pending_txid_firehose = if config.subscribe_txs {
// TODO: error if subscribe_txs but not pending_txid_firehose
pending_txid_firehose
} else {
None
};
let new_rpc = Self {
automatic_block_limit,
backup,
block_data_limit,
block_interval,
block_map: Some(block_map),
chain_id,
created_at: Some(created_at),
display_name: config.display_name,
hard_limit,
@ -217,7 +232,8 @@ impl Web3Rpc {
peak_latency: Some(peak_latency),
median_latency: Some(median_request_latency),
soft_limit: config.soft_limit,
subscribe_txs: config.subscribe_txs,
pending_txid_firehose,
block_and_rpc_sender,
ws_url,
disconnect_watch: Some(disconnect_watch),
healthy,
@ -230,16 +246,7 @@ impl Web3Rpc {
// subscribing starts the connection (with retries)
let handle = {
let new_connection = new_connection.clone();
tokio::spawn(async move {
new_connection
.subscribe_with_reconnect(
block_map,
block_and_rpc_sender,
pending_txid_firehose,
chain_id,
)
.await
})
tokio::spawn(async move { new_connection.subscribe_with_reconnect().await })
};
Ok((new_connection, handle))
@ -492,10 +499,11 @@ impl Web3Rpc {
/// query the web3 provider to confirm it is on the expected chain with the expected data available
/// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match
async fn check_provider(self: &Arc<Self>, chain_id: u64) -> Web3ProxyResult<()> {
async fn check_provider(self: &Arc<Self>) -> Web3ProxyResult<()> {
// TODO: different handlers for backup vs primary
let error_handler = Some(Level::TRACE.into());
// TODO: make this configurable. voltaire bundler uses web3_bundlerVersion
match self
.internal_request::<_, String>(
"web3_clientVersion".into(),
@ -536,10 +544,10 @@ impl Web3Rpc {
trace!("found_chain_id: {:#?}", found_chain_id);
if chain_id != found_chain_id.as_u64() {
if self.chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
self.chain_id,
found_chain_id
)
.context(format!("failed @ {}", self))
@ -559,8 +567,6 @@ impl Web3Rpc {
pub(crate) async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>,
block_and_rpc_sender: &mpsc::UnboundedSender<BlockAndRpc>,
block_map: &BlocksByHashCache,
) -> Web3ProxyResult<()> {
let head_block_sender = self.head_block_sender.as_ref().unwrap();
@ -590,13 +596,17 @@ impl Web3Rpc {
let new_hash = *new_head_block.hash();
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map
.get_with_by_ref(&new_hash, async move { new_head_block })
let new_head_block = self
.block_map
.as_ref()
.unwrap()
.get_with(new_hash, async move { new_head_block })
.await;
// we are synced! yey!
head_block_sender.send_replace(Some(new_head_block.clone()));
// TODO: checking this every time seems excessive
if self.block_data_limit() == U64::zero() {
if let Err(err) = self.check_block_data_limit().await {
warn!(
@ -623,9 +633,11 @@ impl Web3Rpc {
};
// tell web3rpcs about this rpc having this block
block_and_rpc_sender
.send((new_head_block, self.clone()))
.context("block_and_rpc_sender failed sending")?;
if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender {
block_and_rpc_sender
.send((new_head_block, self.clone()))
.context("block_and_rpc_sender failed sending")?;
}
Ok(())
}
@ -691,24 +703,9 @@ impl Web3Rpc {
}
/// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
async fn subscribe_with_reconnect(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
async fn subscribe_with_reconnect(self: Arc<Self>) -> Web3ProxyResult<()> {
loop {
if let Err(err) = self
.clone()
.subscribe(
block_map.clone(),
block_and_rpc_sender.clone(),
pending_txid_firehose.clone(),
chain_id,
)
.await
{
if let Err(err) = self.clone().subscribe().await {
if self.should_disconnect() {
break;
}
@ -733,13 +730,7 @@ impl Web3Rpc {
/// subscribe to blocks and transactions
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self? chain_id for sure
async fn subscribe(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_and_rpc_sender: Option<mpsc::UnboundedSender<BlockAndRpc>>,
pending_txid_firehose: Option<Arc<DedupedBroadcaster<TxHash>>>,
chain_id: u64,
) -> Web3ProxyResult<()> {
async fn subscribe(self: Arc<Self>) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
Some(RequestErrorHandler::DebugLevel)
} else {
@ -768,7 +759,7 @@ impl Web3Rpc {
trace!("starting subscriptions on {}", self);
if let Err(err) = self
.check_provider(chain_id)
.check_provider()
.await
.web3_context("failed check_provider")
{
@ -780,7 +771,7 @@ impl Web3Rpc {
let mut abort_handles = vec![];
// health check that runs if there haven't been any recent requests
let health_handle = if block_and_rpc_sender.is_some() {
let health_handle = if self.block_and_rpc_sender.is_some() {
// TODO: move this into a proper function
let rpc = self.clone();
@ -857,7 +848,7 @@ impl Web3Rpc {
}
// TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_provider(chain_id).await {
if let Err(err) = rpc.check_provider().await {
rpc.healthy.store(false, atomic::Ordering::Relaxed);
// TODO: if rate limit error, set "retry_at"
@ -883,15 +874,10 @@ impl Web3Rpc {
futures.push(health_handle);
// subscribe to new heads
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
if self.block_and_rpc_sender.is_some() {
let clone = self.clone();
let block_map = block_map.clone();
let f = async move {
clone
.subscribe_new_heads(block_and_rpc_sender, block_map)
.await
};
let f = async move { clone.subscribe_new_heads().await };
let h = tokio::spawn(f);
let a = h.abort_handle();
@ -901,23 +887,17 @@ impl Web3Rpc {
}
// subscribe to new transactions
if self.subscribe_txs && self.ws_provider.load().is_some() {
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
let clone = self.clone();
if self.pending_txid_firehose.is_some() && self.ws_provider.load().is_some() {
let clone = self.clone();
let f = async move {
clone
.subscribe_new_transactions(pending_txid_firehose)
.await
};
let f = async move { clone.subscribe_new_transactions().await };
// TODO: this is waking itself alot
let h = tokio::spawn(f);
let a = h.abort_handle();
// TODO: this is waking itself alot
let h = tokio::spawn(f);
let a = h.abort_handle();
futures.push(h);
abort_handles.push(a);
}
futures.push(h);
abort_handles.push(a);
}
// exit if any of the futures exit
@ -929,10 +909,7 @@ impl Web3Rpc {
debug!(?first_exit, "subscriptions on {} exited", self);
// clear the head block
if let Some(block_and_rpc_sender) = block_and_rpc_sender {
self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?
};
self.send_head_block_result(Ok(None)).await?;
// stop the other futures
for a in abort_handles {
@ -945,12 +922,11 @@ impl Web3Rpc {
Ok(())
}
async fn subscribe_new_transactions(
self: &Arc<Self>,
pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
) -> Web3ProxyResult<()> {
async fn subscribe_new_transactions(self: &Arc<Self>) -> Web3ProxyResult<()> {
trace!("subscribing to new transactions on {}", self);
let pending_txid_firehose = self.pending_txid_firehose.as_ref().unwrap();
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
// todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle
self.wait_for_throttle(Instant::now() + Duration::from_secs(5))
@ -973,11 +949,7 @@ impl Web3Rpc {
}
/// Subscribe to new block headers.
async fn subscribe_new_heads(
self: &Arc<Self>,
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> Web3ProxyResult<()> {
async fn subscribe_new_heads(self: &Arc<Self>) -> Web3ProxyResult<()> {
trace!("subscribing to new heads on {}", self);
let error_handler = if self.backup {
@ -1004,14 +976,12 @@ impl Web3Rpc {
)
.await;
self.send_head_block_result(latest_block, &block_sender, &block_map)
.await?;
self.send_head_block_result(latest_block).await?;
while let Some(block) = blocks.next().await {
let block = Ok(Some(Arc::new(block)));
self.send_head_block_result(block, &block_sender, &block_map)
.await?;
self.send_head_block_result(block).await?;
}
} else if self.http_client.is_some() {
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
@ -1029,8 +999,7 @@ impl Web3Rpc {
)
.await;
self.send_head_block_result(block_result, &block_sender, &block_map)
.await?;
self.send_head_block_result(block_result).await?;
// TODO: should this select be at the start or end of the loop?
i.tick().await;
@ -1040,8 +1009,7 @@ impl Web3Rpc {
}
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, &block_map)
.await?;
self.send_head_block_result(Ok(None)).await?;
if self.should_disconnect() {
trace!(%self, "new heads subscription exited");