retry providers
This commit is contained in:
parent
24b76a33bc
commit
9f7f657926
|
@ -71,7 +71,7 @@ pub struct Web3Connection {
|
||||||
/// keep track of currently open requests. We sort on this
|
/// keep track of currently open requests. We sort on this
|
||||||
active_requests: AtomicU32,
|
active_requests: AtomicU32,
|
||||||
/// provider is in a RwLock so that we can replace it if re-connecting
|
/// provider is in a RwLock so that we can replace it if re-connecting
|
||||||
provider: RwLock<Arc<Web3Provider>>,
|
provider: RwLock<Option<Arc<Web3Provider>>>,
|
||||||
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
||||||
hard_limit: Option<redis_cell_client::RedisCellClient>,
|
hard_limit: Option<redis_cell_client::RedisCellClient>,
|
||||||
/// used for load balancing to the least loaded server
|
/// used for load balancing to the least loaded server
|
||||||
|
@ -127,12 +127,14 @@ impl Web3Connection {
|
||||||
// since this lock is held open over an await, we use tokio's locking
|
// since this lock is held open over an await, we use tokio's locking
|
||||||
let mut provider = self.provider.write().await;
|
let mut provider = self.provider.write().await;
|
||||||
|
|
||||||
|
*provider = None;
|
||||||
|
|
||||||
// tell the block subscriber that we are at 0
|
// tell the block subscriber that we are at 0
|
||||||
block_sender.send_async((Block::default(), rpc_id)).await?;
|
block_sender.send_async((Block::default(), rpc_id)).await?;
|
||||||
|
|
||||||
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
|
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
|
||||||
|
|
||||||
*provider = Arc::new(new_provider);
|
*provider = Some(Arc::new(new_provider));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -165,7 +167,7 @@ impl Web3Connection {
|
||||||
let connection = Web3Connection {
|
let connection = Web3Connection {
|
||||||
url: url_str.clone(),
|
url: url_str.clone(),
|
||||||
active_requests: 0.into(),
|
active_requests: 0.into(),
|
||||||
provider: RwLock::new(Arc::new(provider)),
|
provider: RwLock::new(Some(Arc::new(provider))),
|
||||||
hard_limit,
|
hard_limit,
|
||||||
soft_limit,
|
soft_limit,
|
||||||
};
|
};
|
||||||
|
@ -214,6 +216,11 @@ impl Web3Connection {
|
||||||
self.soft_limit
|
self.soft_limit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub async fn has_provider(&self) -> bool {
|
||||||
|
self.provider.read().await.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
async fn send_block(
|
async fn send_block(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
@ -246,89 +253,89 @@ impl Web3Connection {
|
||||||
loop {
|
loop {
|
||||||
info!("Watching new_heads on {}", self);
|
info!("Watching new_heads on {}", self);
|
||||||
|
|
||||||
// TODO: is a RwLock of Arc the right thing here?
|
// TODO: is a RwLock of an Option<Arc> the right thing here?
|
||||||
let provider = self.provider.read().await.clone();
|
if let Some(provider) = self.provider.read().await.clone() {
|
||||||
|
match &*provider {
|
||||||
|
Web3Provider::Http(provider) => {
|
||||||
|
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
|
// TODO: what should this interval be? probably some fraction of block time. set automatically?
|
||||||
|
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
||||||
|
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
|
||||||
|
let mut interval = interval(Duration::from_secs(2));
|
||||||
|
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||||
|
|
||||||
match &*provider {
|
let mut last_hash = Default::default();
|
||||||
Web3Provider::Http(provider) => {
|
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
|
||||||
// TODO: what should this interval be? probably some fraction of block time. set automatically?
|
|
||||||
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
|
||||||
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
|
|
||||||
let mut interval = interval(Duration::from_secs(2));
|
|
||||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
|
||||||
|
|
||||||
let mut last_hash = Default::default();
|
loop {
|
||||||
|
// wait for the interval
|
||||||
|
// TODO: if error or rate limit, increase interval?
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
loop {
|
match self.try_request_handle().await {
|
||||||
// wait for the interval
|
Ok(active_request_handle) => {
|
||||||
// TODO: if error or rate limit, increase interval?
|
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
|
||||||
interval.tick().await;
|
let block: Result<Block<TxHash>, _> = provider
|
||||||
|
.request("eth_getBlockByNumber", ("latest", false))
|
||||||
|
.await;
|
||||||
|
|
||||||
match self.try_request_handle().await {
|
drop(active_request_handle);
|
||||||
Ok(active_request_handle) => {
|
|
||||||
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
|
|
||||||
let block: Result<Block<TxHash>, _> = provider
|
|
||||||
.request("eth_getBlockByNumber", ("latest", false))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
drop(active_request_handle);
|
// don't send repeat blocks
|
||||||
|
if let Ok(block) = &block {
|
||||||
|
let new_hash = block.hash.unwrap();
|
||||||
|
|
||||||
// don't send repeat blocks
|
if new_hash == last_hash {
|
||||||
if let Ok(block) = &block {
|
continue;
|
||||||
let new_hash = block.hash.unwrap();
|
}
|
||||||
|
|
||||||
if new_hash == last_hash {
|
last_hash = new_hash;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
last_hash = new_hash;
|
self.send_block(block, &block_sender, rpc_id).await?;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed getting latest block from {}: {:?}", self, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.send_block(block, &block_sender, rpc_id).await?;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed getting latest block from {}: {:?}", self, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Web3Provider::Ws(provider) => {
|
||||||
Web3Provider::Ws(provider) => {
|
// rate limits
|
||||||
// rate limits
|
let active_request_handle = self.wait_for_request_handle().await;
|
||||||
let active_request_handle = self.wait_for_request_handle().await;
|
|
||||||
|
|
||||||
// TODO: automatically reconnect?
|
// TODO: automatically reconnect?
|
||||||
// TODO: it would be faster to get the block number, but subscriptions don't provide that
|
// TODO: it would be faster to get the block number, but subscriptions don't provide that
|
||||||
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
||||||
let mut stream = provider.subscribe_blocks().await?;
|
let mut stream = provider.subscribe_blocks().await?;
|
||||||
|
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
let active_request_handle = self.wait_for_request_handle().await;
|
let active_request_handle = self.wait_for_request_handle().await;
|
||||||
|
|
||||||
// query the block once since the subscription doesn't send the current block
|
// query the block once since the subscription doesn't send the current block
|
||||||
// there is a very small race condition here where the stream could send us a new block right now
|
// there is a very small race condition here where the stream could send us a new block right now
|
||||||
// all it does is print "new block" for the same block as current block
|
// all it does is print "new block" for the same block as current block
|
||||||
// TODO: rate limit!
|
// TODO: rate limit!
|
||||||
let block: Result<Block<TxHash>, _> = active_request_handle
|
let block: Result<Block<TxHash>, _> = active_request_handle
|
||||||
.request("eth_getBlockByNumber", ("latest", false))
|
.request("eth_getBlockByNumber", ("latest", false))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.send_block(block, &block_sender, rpc_id).await?;
|
self.send_block(block, &block_sender, rpc_id).await?;
|
||||||
|
|
||||||
// TODO: should the stream have a timeout on it here?
|
// TODO: should the stream have a timeout on it here?
|
||||||
// TODO: although reconnects will make this less of an issue
|
// TODO: although reconnects will make this less of an issue
|
||||||
loop {
|
loop {
|
||||||
match stream.next().await {
|
match stream.next().await {
|
||||||
Some(new_block) => {
|
Some(new_block) => {
|
||||||
self.send_block(Ok(new_block), &block_sender, rpc_id)
|
self.send_block(Ok(new_block), &block_sender, rpc_id)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: really not sure about this
|
// TODO: really not sure about this
|
||||||
task::yield_now().await;
|
task::yield_now().await;
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!("subscription ended");
|
warn!("subscription ended");
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -336,12 +343,11 @@ impl Web3Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
if reconnect {
|
if reconnect {
|
||||||
drop(provider);
|
|
||||||
|
|
||||||
// TODO: exponential backoff
|
// TODO: exponential backoff
|
||||||
warn!("new heads subscription exited. reconnecting in 10 seconds...");
|
warn!("new heads subscription exited. Attempting to reconnect in 1 second...");
|
||||||
sleep(Duration::from_secs(10)).await;
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// TODO: loop on reconnecting! do not return with a "?" here
|
||||||
self.reconnect(&block_sender, rpc_id).await?;
|
self.reconnect(&block_sender, rpc_id).await?;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -370,6 +376,12 @@ impl Web3Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {
|
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {
|
||||||
|
// check that we are connected
|
||||||
|
if !self.has_provider().await {
|
||||||
|
// TODO: how long? use the same amount as the exponential backoff on retry
|
||||||
|
return Err(Duration::from_secs(1));
|
||||||
|
}
|
||||||
|
|
||||||
// check rate limits
|
// check rate limits
|
||||||
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
if let Some(ratelimiter) = self.hard_limit.as_ref() {
|
||||||
match ratelimiter.throttle().await {
|
match ratelimiter.throttle().await {
|
||||||
|
@ -424,9 +436,17 @@ impl ActiveRequestHandle {
|
||||||
// TODO: including params in this is way too verbose
|
// TODO: including params in this is way too verbose
|
||||||
trace!("Sending {} to {}", method, self.0);
|
trace!("Sending {} to {}", method, self.0);
|
||||||
|
|
||||||
let provider = self.0.provider.read().await.clone();
|
let mut provider = None;
|
||||||
|
|
||||||
let response = match &*provider {
|
while provider.is_none() {
|
||||||
|
// TODO: if no provider, don't unwrap. wait until there is one.
|
||||||
|
match self.0.provider.read().await.as_ref() {
|
||||||
|
None => {}
|
||||||
|
Some(found_provider) => provider = Some(found_provider.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = match &*provider.unwrap() {
|
||||||
Web3Provider::Http(provider) => provider.request(method, params).await,
|
Web3Provider::Http(provider) => provider.request(method, params).await,
|
||||||
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue