use ether's reconnects instead of our own which need a lock (wip)

This commit is contained in:
Bryan Stitt 2023-05-22 15:32:15 -07:00
parent c443f76623
commit 91eeee23e2
8 changed files with 191 additions and 453 deletions

@ -287,7 +287,6 @@ impl Web3RpcConfig {
blocks_by_hash_cache: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys());
@ -304,7 +303,6 @@ impl Web3RpcConfig {
blocks_by_hash_cache,
block_sender,
tx_id_sender,
reconnect,
)
.await
}

@ -166,7 +166,6 @@ pub async fn user_balance_post(
"eth_getTransactionReceipt",
&vec![format!("0x{}", hex::encode(tx_hash))],
Level::Trace.into(),
None,
)
.await
// TODO: What kind of error would be here
@ -217,7 +216,7 @@ pub async fn user_balance_post(
]);
debug!("Params are: {:?}", &params);
let accepted_token: String = handle
.request("eth_call", &params, Level::Trace.into(), None)
.request("eth_call", &params, Level::Trace.into())
.await
// TODO: What kind of error would be here
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
@ -267,7 +266,7 @@ pub async fn user_balance_post(
]);
debug!("ERC20 Decimal request params are: {:?}", &params);
let decimals: String = handle
.request("eth_call", &params, Level::Trace.into(), None)
.request("eth_call", &params, Level::Trace.into())
.await
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
debug!("Decimals response is: {:?}", decimals);

@ -219,13 +219,12 @@ impl Web3Rpcs {
// TODO: if error, retry?
let block: Web3ProxyBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)))
.await?
.request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
None,
)
.await?
.and_then(|x| {

@ -276,7 +276,6 @@ impl Web3Rpcs {
blocks_by_hash_cache,
block_sender,
pending_tx_id_sender,
true,
));
Some(handle)
@ -303,9 +302,9 @@ impl Web3Rpcs {
while new_head_receiver.borrow_and_update().is_none() {
new_head_receiver.changed().await?;
}
}
old_rpc.disconnect().await.context("disconnect old rpc")?;
// TODO: tell ethers to disconnect? is there a function for that?
}
}
// TODO: what should we do with the new handle? make sure error logs aren't dropped
@ -435,7 +434,7 @@ impl Web3Rpcs {
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &json!(&params), error_level.into(), None)
.request(method, &json!(&params), error_level.into())
.await;
result
})
@ -508,7 +507,7 @@ impl Web3Rpcs {
skip.push(Arc::clone(faster_rpc));
// just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits
match faster_rpc.try_request_handle(authorization, None).await {
match faster_rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", faster_rpc);
return OpenRequestResult::Handle(handle);
@ -831,7 +830,7 @@ impl Web3Rpcs {
}
// check rate limits and increment our connection counter
match rpc.try_request_handle(authorization, None).await {
match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
trace!("{} is rate limited. skipping", rpc);
@ -908,7 +907,6 @@ impl Web3Rpcs {
&request.method,
&json!(request.params),
RequestErrorHandler::Save,
None,
)
.await;
@ -1307,8 +1305,8 @@ mod tests {
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use arc_swap::ArcSwap;
use ethers::types::H256;
use ethers::types::{Block, U256};
@ -1451,7 +1449,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx_synced),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1466,7 +1463,6 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx_lagged),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1707,7 +1703,6 @@ mod tests {
block_data_limit: 64.into(),
tier: 1,
head_block: Some(tx_pruned),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1721,7 +1716,6 @@ mod tests {
block_data_limit: u64::MAX.into(),
tier: 2,
head_block: Some(tx_archive),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1876,7 +1870,6 @@ mod tests {
block_data_limit: 64.into(),
tier: 0,
head_block: Some(tx_mock_geth),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1889,7 +1882,6 @@ mod tests {
block_data_limit: u64::MAX.into(),
tier: 1,
head_block: Some(tx_mock_erigon_archive),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};

@ -1,6 +1,6 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::Web3Provider;
use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
@ -40,17 +40,10 @@ pub struct Web3Rpc {
pub name: String,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
pub(super) ws_url: Option<Url>,
pub(super) http_url: Option<Url>,
/// Some connections use an http_client. we keep a clone for reconnecting
pub(super) http_client: Option<reqwest::Client>,
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
/// this provider is only used for new heads subscriptions
/// TODO: benchmark ArcSwapOption and a watch::Sender
/// TODO: only the websocket provider needs to be behind an asyncrwlock!
/// TODO: the http provider is just an http_client
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// most all requests prefer use the http_provider
pub(super) http_provider: Option<EthersHttpProvider>,
/// the websocket provider is only used for subscriptions
pub(super) ws_provider: Option<EthersWsProvider>,
/// keep track of hard limits
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
@ -79,7 +72,6 @@ pub struct Web3Rpc {
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
pub(super) active_requests: AtomicUsize,
pub(super) reconnect: AtomicBool,
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
pub(super) created_at: Option<Instant>,
@ -102,7 +94,6 @@ impl Web3Rpc {
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
let created_at = Instant::now();
@ -161,7 +152,6 @@ impl Web3Rpc {
}
let (disconnect_sender, disconnect_receiver) = watch::channel(false);
let reconnect = reconnect.into();
let (head_block, _) = watch::channel(None);
@ -169,6 +159,7 @@ impl Web3Rpc {
// TODO Should these defaults be in config
let peak_latency = PeakEwmaLatency::spawn(
// Decay over 15s
// TODO! This is wrong! needs to be as_nanos!
Duration::from_secs(15).as_millis() as f64,
// Peak requests so far around 5k, we will use an order of magnitude
// more to be safe. Should only use about 50mb RAM
@ -177,41 +168,46 @@ impl Web3Rpc {
Duration::from_secs(1),
);
let http_url = if let Some(http_url) = config.http_url {
Some(http_url.parse()?)
let http_provider = if let Some(http_url) = config.http_url {
let http_url = http_url.parse::<Url>()?;
Some(connect_http(Cow::Owned(http_url), http_client)?)
// TODO: check the provider is on the right chain
} else {
None
};
let ws_url = if let Some(ws_url) = config.ws_url {
Some(ws_url.parse()?)
let ws_provider = if let Some(ws_url) = config.ws_url {
let ws_url = ws_url.parse::<Url>()?;
Some(connect_ws(Cow::Owned(ws_url), usize::MAX).await?)
// TODO: check the provider is on the right chain
} else {
None
};
let new_connection = Self {
name,
db_conn: db_conn.clone(),
display_name: config.display_name,
http_client,
ws_url,
http_url,
hard_limit,
hard_limit_until: Some(hard_limit_until),
soft_limit: config.soft_limit,
let new_rpc = Self {
automatic_block_limit,
backup,
block_data_limit,
reconnect,
tier: config.tier,
disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
db_conn: db_conn.clone(),
disconnect_watch: Some(disconnect_sender),
display_name: config.display_name,
hard_limit,
hard_limit_until: Some(hard_limit_until),
head_block: Some(head_block),
http_provider,
name,
peak_latency: Some(peak_latency),
soft_limit: config.soft_limit,
tier: config.tier,
..Default::default()
};
let new_connection = Arc::new(new_connection);
let new_connection = Arc::new(new_rpc);
// subscribe to new blocks and new transactions
// subscribing starts the connection (with retries)
@ -256,7 +252,6 @@ impl Web3Rpc {
async fn check_block_data_limit(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<Option<u64>> {
if !self.automatic_block_limit {
// TODO: is this a good thing to return?
@ -270,16 +265,13 @@ impl Web3Rpc {
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let handle = self
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
.await?;
let handle = self.wait_for_request_handle(authorization, None).await?;
let head_block_num_future = handle.request::<Option<()>, U256>(
"eth_blockNumber",
&None,
// error here are expected, so keep the level low
Level::Debug.into(),
unlocked_provider.clone(),
);
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
@ -297,9 +289,7 @@ impl Web3Rpc {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let handle = self
.wait_for_request_handle(authorization, None, unlocked_provider.clone())
.await?;
let handle = self.wait_for_request_handle(authorization, None).await?;
let archive_result: Result<Bytes, _> = handle
.request(
@ -310,7 +300,6 @@ impl Web3Rpc {
)),
// error here are expected, so keep the level low
Level::Trace.into(),
unlocked_provider.clone(),
)
.await;
@ -388,204 +377,50 @@ impl Web3Rpc {
true
}
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_connect(
/// query the web3 provider to confirm it is on the expected chain with the expected data available
async fn check_provider(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
delay_start: bool,
) -> anyhow::Result<()> {
// there are several crates that have retry helpers, but they all seem more complex than necessary
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
let base_ms = 500;
let cap_ms = 30_000;
let range_multiplier = 3;
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
// sleep once before the initial retry attempt
// TODO: now that we use this method for our initial connection, do we still want this sleep?
let mut sleep_ms = if delay_start {
let first_sleep_ms = min(
cap_ms,
thread_fast_rng().gen_range(base_ms..(base_ms * range_multiplier)),
);
let reconnect_in = Duration::from_millis(first_sleep_ms);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None)
.await
.context(format!("waiting for request handle on {}", self))?
.request("eth_chainId", &json!(Vec::<()>::new()), Level::Trace.into())
.await;
trace!("found_chain_id: {:#?}", found_chain_id);
info!("Reconnect to {} in {}ms", self, reconnect_in.as_millis());
sleep(reconnect_in).await;
first_sleep_ms
} else {
base_ms
};
// retry until we succeed
while let Err(err) = self.connect(block_sender, chain_id, db_conn).await {
// thread_rng is crytographically secure. we don't need that here. use thread_fast_rng instead
// TODO: min of 1 second? sleep longer if rate limited?
sleep_ms = min(
cap_ms,
thread_fast_rng().gen_range(base_ms..(sleep_ms * range_multiplier)),
);
let retry_in = Duration::from_millis(sleep_ms);
let error_level = if self.backup {
log::Level::Debug
} else {
log::Level::Info
};
log::log!(
error_level,
"Failed (re)connect to {}! Retry in {}ms. err={:?}",
self,
retry_in.as_millis(),
err,
);
sleep(retry_in).await;
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
}
}
Err(e) => {
return Err(anyhow::Error::from(e)
.context(format!("unable to parse eth_chainId from {}", self)));
}
}
Ok(())
}
self.check_block_data_limit(&authorization)
.await
.context(format!("unable to check_block_data_limit of {}", self))?;
/// connect to the web3 provider
async fn connect(
self: &Arc<Self>,
block_sender: Option<&flume::Sender<BlockAndRpc>>,
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
if let Ok(mut unlocked_provider) = self.provider.try_write() {
#[cfg(test)]
if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() {
return Ok(());
}
*unlocked_provider = if let Some(ws_url) = self.ws_url.as_ref() {
// set up ws client
match &*unlocked_provider {
None => {
info!("connecting to {}", self);
}
Some(_) => {
debug!("reconnecting to {}", self);
// tell the block subscriber that this rpc doesn't have any blocks
if let Some(block_sender) = block_sender {
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender during connect")?;
}
// reset sync status
self.head_block
.as_ref()
.expect("head_block should always be set")
.send_replace(None);
// disconnect the current provider
// TODO: what until the block_sender's receiver finishes updating this item?
*unlocked_provider = None;
}
}
let p = Web3Provider::new(Cow::Borrowed(ws_url), None)
.await
.context(format!("failed connecting to {}", ws_url))?;
assert!(p.ws().is_some());
Some(Arc::new(p))
} else {
// http client
if let Some(url) = &self.http_url {
let p = Web3Provider::new(Cow::Borrowed(url), self.http_client.clone())
.await
.context(format!("failed connecting to {}", url))?;
assert!(p.http().is_some());
Some(Arc::new(p))
} else {
None
}
};
let authorization = Arc::new(Authorization::internal(db_conn.cloned())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
.await
.context(format!("waiting for request handle on {}", self))?
.request(
"eth_chainId",
&json!(Vec::<()>::new()),
Level::Trace.into(),
unlocked_provider.clone(),
)
.await;
trace!("found_chain_id: {:#?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self)));
}
}
Err(e) => {
return Err(anyhow::Error::from(e)
.context(format!("unable to parse eth_chainId from {}", self)));
}
}
self.check_block_data_limit(&authorization, unlocked_provider.clone())
.await
.context(format!("unable to check_block_data_limit of {}", self))?;
drop(unlocked_provider);
info!("successfully connected to {}", self);
} else if self.provider.read().await.is_none() {
return Err(anyhow!("failed waiting for client {}", self));
};
Ok(())
}
pub async fn disconnect(&self) -> anyhow::Result<()> {
let age = self.created_at.unwrap().elapsed().as_secs();
info!("disconnecting {} ({}s old)", self, age);
self.reconnect.store(false, atomic::Ordering::Release);
if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) {
warn!("failed sending disconnect watch: {:?}", err);
};
trace!("disconnecting (locking) {} ({}s old)", self, age);
let mut provider = self.provider.write().await;
trace!("disconnecting (clearing provider) {} ({}s old)", self, age);
*provider = None;
info!("successfully connected to {}", self);
Ok(())
}
@ -638,7 +473,7 @@ impl Web3Rpc {
if self.block_data_limit() == U64::zero() {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
if let Err(err) = self.check_block_data_limit(&authorization, None).await {
if let Err(err) = self.check_block_data_limit(&authorization).await {
warn!(
"failed checking block limit after {} finished syncing. {:?}",
self, err
@ -670,7 +505,7 @@ impl Web3Rpc {
*self.disconnect_watch.as_ref().unwrap().borrow()
}
/// subscribe to blocks and transactions with automatic reconnects
/// subscribe to blocks and transactions
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
#[allow(clippy::too_many_arguments)]
@ -690,15 +525,12 @@ impl Web3Rpc {
RequestErrorHandler::ErrorLevel
};
let mut delay_start = false;
todo!();
// this does loop. just only when reconnect is enabled
#[allow(clippy::never_loop)]
loop {
trace!("subscription loop started on {}", self);
let mut futures = vec![];
/*
let mut futures = vec![];
while false {
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
{
@ -741,7 +573,7 @@ impl Web3Rpc {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
if let Some(client) = rpc.provider.read().await.clone() {
if let Some(client) = rpc.ws_provider.read().await.clone() {
// health check as a way of keeping this rpc's request_ewma accurate
// TODO: do something different if this is a backup server?
@ -785,6 +617,7 @@ impl Web3Rpc {
let code = match to {
Err(err) => {
// TODO: an "error" here just means that the hash wasn't available. i dont think its truly an "error"
if rpc.backup {
debug!(
"{} failed health check query! {:#?}",
@ -893,6 +726,8 @@ impl Web3Rpc {
}
}
*/
info!("all subscriptions on {} completed", self);
Ok(())
@ -908,8 +743,15 @@ impl Web3Rpc {
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
let provider = self.wait_for_provider().await;
if let Some(ws_provider) = self.ws_provider.as_ref() {
todo!("subscribe")
} else if let Some(http_provider) = self.http_provider.as_ref() {
todo!("poll")
} else {
unimplemented!("no ws or http provider!")
}
/*
match provider.as_ref() {
Web3Provider::Http(_client) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
@ -922,17 +764,13 @@ impl Web3Rpc {
while !self.should_disconnect() {
// TODO: what should the max_wait be?
// we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one
match self
.wait_for_request_handle(&authorization, None, None)
.await
{
match self.wait_for_request_handle(&authorization, None).await {
Ok(active_request_handle) => {
let block: Result<Option<ArcBlock>, _> = active_request_handle
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
None,
)
.await;
@ -1017,9 +855,8 @@ impl Web3Rpc {
}
Web3Provider::Both(_, client) | Web3Provider::Ws(client) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
.await;
let active_request_handle =
self.wait_for_request_handle(&authorization, None).await;
let mut stream = client.subscribe_blocks().await?;
drop(active_request_handle);
@ -1029,13 +866,12 @@ impl Web3Rpc {
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: do this part over http?
let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
.wait_for_request_handle(&authorization, None)
.await?
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
Some(provider.clone()),
)
.await;
@ -1082,6 +918,7 @@ impl Web3Rpc {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
}
*/
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
@ -1100,10 +937,10 @@ impl Web3Rpc {
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
let provider = self.wait_for_provider().await;
// TODO: make this subscription optional
self.wait_for_disconnect().await?;
/*
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_ref() {
@ -1144,6 +981,7 @@ impl Web3Rpc {
self.wait_for_disconnect().await?;
}
}
*/
if self.should_disconnect() {
Ok(())
@ -1155,21 +993,15 @@ impl Web3Rpc {
}
/// be careful with this; it might wait forever!
/// `allow_not_ready` is only for use by health checks while starting the provider
/// TODO: don't use anyhow. use specific error type
pub async fn wait_for_request_handle<'a>(
self: &'a Arc<Self>,
authorization: &'a Arc<Authorization>,
max_wait: Option<Duration>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Web3ProxyResult<OpenRequestHandle> {
let max_wait = max_wait.map(|x| Instant::now() + x);
loop {
match self
.try_request_handle(authorization, unlocked_provider.clone())
.await
{
match self.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
@ -1214,18 +1046,8 @@ impl Web3Rpc {
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
// TODO: borrow on this instead of needing to clone the Arc?
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: think more about this read block
// TODO: this should *not* be new_head_client. this should be a separate object
if unlocked_provider.is_some() || self.provider.read().await.is_some() {
// we already have an unlocked provider. no need to lock
} else {
warn!("no provider on {}", self);
// TODO: wait for provider? that will probably slow us down more than we want
return Ok(OpenRequestResult::NotReady);
}
// TODO: if websocket is reconnecting, return an error?
// check cached rate limits
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
@ -1291,59 +1113,34 @@ impl Web3Rpc {
}
}
async fn wait_for_provider(&self) -> Arc<Web3Provider> {
let mut provider = self.provider.read().await.clone();
let mut logged = false;
while provider.is_none() {
// trace!("waiting on unlocked_provider: locking...");
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("waiting for provider on {}", self);
logged = true;
}
provider = self.provider.read().await.clone();
}
provider.unwrap()
}
pub async fn wait_for_query<P, R>(
self: &Arc<Self>,
method: &str,
params: &P,
revert_handler: RequestErrorHandler,
authorization: Arc<Authorization>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<R>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send,
{
self.wait_for_request_handle(&authorization, None, None)
self.wait_for_request_handle(&authorization, None)
.await?
.request::<P, R>(method, params, revert_handler, unlocked_provider)
.request::<P, R>(method, params, revert_handler)
.await
.context("ProviderError from the backend")
}
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
impl Hash for Web3Rpc {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.display_name.hash(state);
self.http_url.hash(state);
self.ws_url.hash(state);
self.http_provider.as_ref().map(|x| x.url()).hash(state);
// TODO: figure out how to get the url for the provider
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
// self.ws_provider.map(|x| x.url()).hash(state);
self.automatic_block_limit.hash(state);
self.backup.hash(state);
// TODO: don't include soft_limit if we change them to be dynamic
@ -1481,7 +1278,6 @@ mod tests {
let x = Web3Rpc {
name: "name".to_string(),
ws_url: Some("ws://example.com".parse::<Url>().unwrap()),
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,

@ -5,92 +5,81 @@ use std::{borrow::Cow, time::Duration};
use url::Url;
// TODO: our own structs for these that handle streaming large responses
type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
pub type EthersHttpProvider = ethers::providers::Provider<ethers::providers::Http>;
pub type EthersWsProvider = ethers::providers::Provider<ethers::providers::Ws>;
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
// TODO: custom types that let us stream JSON responses
#[derive(From)]
pub enum Web3Provider {
Both(EthersHttpProvider, EthersWsProvider),
Http(EthersHttpProvider),
// TODO: deadpool? custom tokio-tungstenite
Ws(EthersWsProvider),
#[cfg(test)]
Mock,
}
pub fn extract_auth(url: &mut Cow<'_, Url>) -> Option<Authorization> {
if let Some(pass) = url.password().map(|x| x.to_string()) {
// to_string is needed because we are going to remove these items from the url
let user = url.username().to_string();
impl Web3Provider {
pub fn http(&self) -> Option<&EthersHttpProvider> {
match self {
Self::Http(x) => Some(x),
_ => None,
}
}
// clear username and password from the url
let mut_url = url.to_mut();
pub fn ws(&self) -> Option<&EthersWsProvider> {
match self {
Self::Both(_, x) | Self::Ws(x) => Some(x),
_ => None,
}
}
mut_url
.set_username("")
.expect("unable to clear username on websocket");
mut_url
.set_password(None)
.expect("unable to clear password on websocket");
/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used
/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set
pub async fn new(
mut url: Cow<'_, Url>,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Self> {
let auth = if let Some(pass) = url.password().map(|x| x.to_string()) {
// to_string is needed because we are going to remove these items from the url
let user = url.username().to_string();
// clear username and password from the url
let mut_url = url.to_mut();
mut_url
.set_username("")
.map_err(|_| anyhow!("unable to clear username on websocket"))?;
mut_url
.set_password(None)
.map_err(|_| anyhow!("unable to clear password on websocket"))?;
// keep them
Some(Authorization::basic(user, pass))
} else {
None
};
let provider = if url.scheme().starts_with("http") {
let provider = if let Some(auth) = auth {
ethers::providers::Http::new_with_auth(url.into_owned(), auth)?
} else if let Some(http_client) = http_client {
ethers::providers::Http::new_with_client(url.into_owned(), http_client)
} else {
ethers::providers::Http::new(url.into_owned())
};
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(12))
.into()
} else if url.scheme().starts_with("ws") {
let provider = if auth.is_some() {
let connection_details = ConnectionDetails::new(url.as_str(), auth);
ethers::providers::Ws::connect(connection_details).await?
} else {
ethers::providers::Ws::connect(url.as_str()).await?
};
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters
ethers::providers::Provider::new(provider).into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
// keep them
Some(Authorization::basic(user, pass))
} else {
None
}
}
/// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used
/// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set
pub fn connect_http(
mut url: Cow<'_, Url>,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<EthersHttpProvider> {
let auth = extract_auth(&mut url);
let provider = if url.scheme().starts_with("http") {
let provider = if let Some(auth) = auth {
ethers::providers::Http::new_with_auth(url.into_owned(), auth)?
} else if let Some(http_client) = http_client {
ethers::providers::Http::new_with_client(url.into_owned(), http_client)
} else {
ethers::providers::Http::new(url.into_owned())
};
// TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2`
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(12))
.into()
} else {
return Err(anyhow::anyhow!("only http servers are supported"));
};
Ok(provider)
}
pub async fn connect_ws(
mut url: Cow<'_, Url>,
reconnects: usize,
) -> anyhow::Result<EthersWsProvider> {
let auth = extract_auth(&mut url);
let provider = if url.scheme().starts_with("ws") {
let provider = if auth.is_some() {
let connection_details = ConnectionDetails::new(url.as_str(), auth);
// if they error, we do our own reconnection with backoff
ethers::providers::Ws::connect_with_reconnects(connection_details, reconnects).await?
} else {
ethers::providers::Ws::connect_with_reconnects(url.as_str(), reconnects).await?
};
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters
ethers::providers::Provider::new(provider).into()
} else {
return Err(anyhow::anyhow!("ws servers are supported"));
};
Ok(provider)
}

@ -1,5 +1,4 @@
use super::one::Web3Rpc;
use super::provider::Web3Provider;
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use chrono::Utc;
@ -158,7 +157,6 @@ impl OpenRequestHandle {
method: &str,
params: &P,
mut error_handler: RequestErrorHandler,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
@ -170,29 +168,6 @@ impl OpenRequestHandle {
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
let mut provider = if unlocked_provider.is_some() {
unlocked_provider
} else {
self.rpc.provider.read().await.clone()
};
let mut logged = false;
// TODO: instead of a lock, i guess it should be a watch?
while provider.is_none() {
// trace!("waiting on provider: locking...");
// TODO: i dont like this. subscribing to a channel could be better
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("no provider for open handle on {}", self.rpc);
logged = true;
}
provider = self.rpc.provider.read().await.clone();
}
let provider = provider.expect("provider was checked already");
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
@ -202,21 +177,16 @@ impl OpenRequestHandle {
let start = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
#[cfg(test)]
Web3Provider::Mock => {
return Err(ProviderError::CustomError(
"mock provider can't respond".to_string(),
))
}
Web3Provider::Ws(p) => p.request(method, params).await,
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
p.request(method, params).await
}
// TODO: replace ethers-rs providers with our own that handles "id" being null
let response: Result<R, _> = if let Some(ref p) = self.rpc.http_provider {
p.request(method, params).await
} else if let Some(ref p) = self.rpc.ws_provider {
p.request(method, params).await
} else {
unimplemented!("no provider. cannot send request")
};
// note. we intentionally do not record this latency now. we do NOT want to measure errors
// we do NOT want to measure errors, so we intentionally do not record this latency now.
let latency = start.elapsed();
// we used to fetch_sub the active_request count here, but sometimes the handle is dropped without request being called!
@ -277,11 +247,7 @@ impl OpenRequestHandle {
// TODO: move this info a function on ResponseErrorType
let response_type = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match &*provider {
#[cfg(test)]
Web3Provider::Mock => unimplemented!(),
_ => err.as_error_response().map(|x| x.message.clone()),
};
let msg = err.as_error_response().map(|x| x.message.clone());
trace!("error message: {:?}", msg);

@ -29,14 +29,13 @@ impl Web3Rpcs {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc.try_request_handle(authorization, None).await {
let tx: Transaction = match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::Error.into(),
None,
)
.await?
}