retrying reconnect
This commit is contained in:
parent
73a2fcaf72
commit
7eaf6f3540
18
TODO.md
18
TODO.md
@ -138,6 +138,11 @@ These are roughly in order of completition
|
||||
- [x] right now the block_map is unbounded. move this to redis and do some calculations to be sure about RAM usage
|
||||
- [x] synced connections swap threshold should come from config
|
||||
- [x] right now we send too many getTransaction queries to the private rpc tier and i are being rate limited by some of them. change to be serial and weight by hard/soft limit.
|
||||
- [x] ip blocking gives a 500 and not the proper error code
|
||||
- [x] need a reconnect that doesn't unwrap
|
||||
- [x] need a retrying_reconnect that is used everywhere reconnect is. have exponential backoff here
|
||||
- [x] it looks like our reconnect logic is not always firing. we need to make reconnect more robust!
|
||||
- i am pretty sure that this is actually servers that fail to connect on initial setup (maybe the rpcs that are on the wrong chain are just timing out and they aren't set to reconnect?)
|
||||
- [ ] rewrite rate limiting to have a tiered cache. do not put redis in the hot path
|
||||
- instead, we should check a local cache for the current rate limit (+1) and spawn an update to the local cache from redis in the background.
|
||||
- when there are a LOT of concurrent requests, we see errors. i thought that was a problem with redis cell, but it happens with my simpler rate limit. now i think the problem is actually with bb8
|
||||
@ -153,9 +158,12 @@ These are roughly in order of completition
|
||||
- [ ] add configurable size limits to all the Caches
|
||||
- [ ] Ulid instead of Uuid for user keys
|
||||
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
|
||||
- since users are actively using our service, we will need to support both
|
||||
- [ ] Ulid instead of Uuid for database ids
|
||||
- might have to use Uuid in sea-orm and then convert to Ulid on display
|
||||
- [ ] Api keys need option to lock to IP, cors header, referer, etc
|
||||
- [ ] requests per second per api key
|
||||
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
|
||||
|
||||
## V1
|
||||
|
||||
@ -344,14 +352,8 @@ in another repo: event subscriber
|
||||
- [ ] i think checking the parents of the heaviest chain works most of the time, but not always
|
||||
- maybe iterate connection heads by total weight? i still think we need to include parent hashes
|
||||
- [ ] i see "No block found" sometimes for a single server's block. Not sure why since reads should happen after writes
|
||||
- [ ] figure out why total requests is climbing so fast
|
||||
- is someone using my node that i don't expect?
|
||||
- is staking that inefficient?
|
||||
- maybe arbitrum syncing?
|
||||
- internal requests gone haywire?
|
||||
- need graphs!
|
||||
- [ ] it looks like our reconnect logic is not always firing. we need to make reconnect more robust!
|
||||
- i am pretty sure that this is actually servers that fail to connect on initial setup (maybe the rpcs that are on the wrong chain are just timing out and they aren't set to reconnect?)
|
||||
- [ ] whats going on here? why is it rolling back? maybe total_difficulty was a LOT higher?
|
||||
- 2022-09-05T19:21:39.763630Z WARN web3_proxy::rpcs::blockchain: chain rolled back 1/6/7 head=15479604 (0xf809…6a2c) rpc=infura_free
|
||||
- i wish i had more logs. its possible that 15479605 came immediatly after
|
||||
- [ ] ip blocking logs a warn. we don't need that. a stat at most
|
||||
- [ ] keep it working without redis and a database
|
||||
|
@ -124,6 +124,9 @@ pub async fn get_migrated_db(
|
||||
min_connections: u32,
|
||||
max_connections: u32,
|
||||
) -> anyhow::Result<DatabaseConnection> {
|
||||
// TODO: scrub credentials and then include the db_url in logs
|
||||
info!("Connecting to db");
|
||||
|
||||
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
|
||||
|
||||
// TODO: load all these options from the config file. i think mysql default max is 100
|
||||
@ -206,7 +209,8 @@ impl Web3ProxyApp {
|
||||
|
||||
let redis_pool = match top_config.app.redis_url.as_ref() {
|
||||
Some(redis_url) => {
|
||||
info!("Connecting to redis on {}", redis_url);
|
||||
// TODO: scrub credentials and then include the redis_url in logs
|
||||
info!("Connecting to redis");
|
||||
|
||||
let manager = RedisConnectionManager::new(redis_url.as_ref())?;
|
||||
|
||||
|
@ -9,9 +9,11 @@ use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult};
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||
@ -82,14 +84,12 @@ impl Web3Connection {
|
||||
)
|
||||
});
|
||||
|
||||
let provider = Web3Provider::from_str(&url_str, http_client).await?;
|
||||
|
||||
let new_connection = Self {
|
||||
name,
|
||||
url: url_str.clone(),
|
||||
active_requests: 0.into(),
|
||||
total_requests: 0.into(),
|
||||
provider: AsyncRwLock::new(Some(Arc::new(provider))),
|
||||
provider: AsyncRwLock::new(None),
|
||||
hard_limit,
|
||||
soft_limit,
|
||||
block_data_limit: Default::default(),
|
||||
@ -100,6 +100,11 @@ impl Web3Connection {
|
||||
|
||||
let new_connection = Arc::new(new_connection);
|
||||
|
||||
// connect to the server (with retries)
|
||||
new_connection
|
||||
.retrying_reconnect(block_sender.as_ref())
|
||||
.await?;
|
||||
|
||||
// check the server's chain_id here
|
||||
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
|
||||
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
|
||||
@ -242,11 +247,46 @@ impl Web3Connection {
|
||||
needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num
|
||||
}
|
||||
|
||||
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
|
||||
/// We use the "Decorrelated" jitter from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
|
||||
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
|
||||
pub async fn retrying_reconnect(
|
||||
self: &Arc<Self>,
|
||||
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
||||
) -> anyhow::Result<()> {
|
||||
// there are several crates that have retry helpers, but they all seem more complex than necessary
|
||||
let base_ms = 500;
|
||||
let cap_ms = 30_000;
|
||||
let range_multiplier = 3;
|
||||
|
||||
// sleep once before the initial retry attempt
|
||||
let mut sleep_ms = min(
|
||||
cap_ms,
|
||||
rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)),
|
||||
);
|
||||
sleep(Duration::from_millis(sleep_ms)).await;
|
||||
|
||||
// retry until we succeed
|
||||
while let Err(err) = self.reconnect(block_sender).await {
|
||||
sleep_ms = min(
|
||||
cap_ms,
|
||||
rand::thread_rng().gen_range(base_ms..(sleep_ms * range_multiplier)),
|
||||
);
|
||||
|
||||
let retry_in = Duration::from_millis(sleep_ms);
|
||||
warn!(rpc=%self, ?retry_in, ?err, "Failed to reconnect!");
|
||||
|
||||
sleep(retry_in).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// reconnect a websocket provider
|
||||
#[instrument(skip_all)]
|
||||
pub async fn reconnect(
|
||||
self: &Arc<Self>,
|
||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: no-op if this called on a http provider
|
||||
// websocket doesn't need the http client
|
||||
@ -256,42 +296,32 @@ impl Web3Connection {
|
||||
|
||||
// since this lock is held open over an await, we use tokio's locking
|
||||
// TODO: timeout on this lock. if its slow, something is wrong
|
||||
let mut provider = self.provider.write().await;
|
||||
|
||||
// our provider doesn't work anymore
|
||||
*provider = None;
|
||||
|
||||
// reset sync status
|
||||
{
|
||||
let mut provider = self.provider.write().await;
|
||||
|
||||
// our provider doesn't work anymore
|
||||
*provider = None;
|
||||
|
||||
// reset sync status
|
||||
{
|
||||
let mut head_block_id = self.head_block_id.write();
|
||||
*head_block_id = None;
|
||||
}
|
||||
|
||||
// tell the block subscriber that we don't have any blocks
|
||||
if let Some(block_sender) = &block_sender {
|
||||
block_sender
|
||||
.send_async((None, self.clone()))
|
||||
.await
|
||||
.context("block_sender during reconnect clear")?;
|
||||
}
|
||||
|
||||
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
||||
let new_provider = Web3Provider::from_str(&self.url, http_client)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
*provider = Some(Arc::new(new_provider));
|
||||
let mut head_block_id = self.head_block_id.write();
|
||||
*head_block_id = None;
|
||||
}
|
||||
|
||||
// tell the block subscriber that we don't have any blocks
|
||||
if let Some(block_sender) = block_sender {
|
||||
if let Some(block_sender) = &block_sender {
|
||||
block_sender
|
||||
.send_async((None, self.clone()))
|
||||
.await
|
||||
.context("block_sender during reconnect")?;
|
||||
.context("block_sender during reconnect clear")?;
|
||||
}
|
||||
|
||||
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
||||
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
|
||||
|
||||
*provider = Some(Arc::new(new_provider));
|
||||
|
||||
info!(rpc=%self, "successfully reconnected");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -399,6 +429,7 @@ impl Web3Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// subscribe to blocks and transactions with automatic reconnects
|
||||
#[instrument(skip_all)]
|
||||
async fn subscribe(
|
||||
self: Arc<Self>,
|
||||
@ -459,21 +490,19 @@ impl Web3Connection {
|
||||
}
|
||||
|
||||
match try_join_all(futures).await {
|
||||
Ok(_) => break,
|
||||
Ok(_) => {
|
||||
// futures all exited without error. break instead of restarting subscriptions
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
if reconnect {
|
||||
// TODO: exponential backoff
|
||||
let retry_in = Duration::from_millis(50);
|
||||
warn!(
|
||||
rpc=%self,
|
||||
?err,
|
||||
?retry_in,
|
||||
"subscription exited",
|
||||
);
|
||||
sleep(retry_in).await;
|
||||
|
||||
// TODO: loop on reconnecting! do not return with a "?" here
|
||||
self.reconnect(block_sender.clone()).await?;
|
||||
self.retrying_reconnect(block_sender.as_ref()).await?;
|
||||
} else {
|
||||
error!(rpc=%self, ?err, "subscription exited");
|
||||
return Err(err);
|
||||
|
Loading…
Reference in New Issue
Block a user