This commit is contained in:
Bryan Stitt 2023-02-25 09:48:40 -08:00
parent 18f644e8fe
commit c9e5661c5b
3 changed files with 264 additions and 242 deletions

@ -496,6 +496,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
These are not ordered. I think some rows also accidently got deleted here. Check git history.
- [ ] less Arc (and more pin?). we use arcs on a lot of things where i think a &self should work fine.
- [ ] automatically tune database and redis connection pool size
- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then
- [ ] handle user payments

@ -354,8 +354,6 @@ pub async fn get_migrated_db(
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
// cancellable handles
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
}
@ -367,27 +365,8 @@ impl Web3ProxyApp {
num_workers: usize,
shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
// safety checks on the config
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
);
}
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
@ -447,46 +426,7 @@ impl Web3ProxyApp {
warn!("no database. some features will be disabled");
};
let balanced_rpcs = top_config.balanced_rpcs;
// safety check on balanced_rpcs
if balanced_rpcs.len() < top_config.app.min_synced_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.",
balanced_rpcs.len(),
top_config.app.min_synced_rpcs
));
}
// safety check on sum soft limit
let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit);
if sum_soft_limit < top_config.app.min_sum_soft_limit {
return Err(anyhow::anyhow!(
"Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
top_config.app.min_sum_soft_limit
));
}
let private_rpcs = top_config.private_rpcs.unwrap_or_default();
// these are safe to cancel
let cancellable_handles = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
// make a http shared client
// TODO: can we configure the connection pool? should we?
// TODO: timeouts from config. defaults are hopefully good
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(5 * 60))
.user_agent(APP_USER_AGENT)
.build()?,
);
// TODO: do this during apply_config so that we can change redis url while running
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
let vredis_pool = match top_config.app.volatile_redis_url.as_ref() {
@ -540,6 +480,17 @@ impl Web3ProxyApp {
None
};
// make a http shared client
// TODO: can we configure the connection pool? should we?
// TODO: timeouts from config. defaults are hopefully good
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(5 * 60))
.user_agent(APP_USER_AGENT)
.build()?,
);
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
@ -575,110 +526,6 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// connect to the load balanced rpcs
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
block_map.clone(),
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
vredis_pool.clone(),
balanced_rpcs,
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
// save the handle to catch any errors
cancellable_handles.push(balanced_handle);
// connect to the private rpcs
// only some chains have this, so this is optional
let private_rpcs = if private_rpcs.is_empty() {
// TODO: do None instead of clone?
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
} else {
let (private_rpcs, private_handle) = Web3Rpcs::spawn(
block_map,
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
vredis_pool.clone(),
private_rpcs,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
)
.await
.context("spawning private_rpcs")?;
if private_rpcs.by_name.is_empty() {
None
} else {
// save the handle to catch any errors
cancellable_handles.push(private_handle);
Some(private_rpcs)
}
};
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
&format!("web3_proxy:{}", top_config.app.chain_id),
"frontend",
public_requests_per_period,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rpc_rrl.clone(),
None,
));
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
10_000, "key", rpc_rrl, None,
));
}
// login rate limiter
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_period,
60.0,
redis_pool.clone(),
));
}
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
@ -720,6 +567,68 @@ impl Web3ProxyApp {
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// set up a Web3Rpcs object to hold all our connections
// TODO: for now, only the server_configs can change
let (balanced_rpcs, balanced_handle) = Web3Rpcs::spawn(
block_map.clone(),
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
vredis_pool.clone(),
Some(watch_consensus_head_sender),
)
.await
.context("spawning balanced rpcs")?;
// connect to the load balanced rpcs
balanced_rpcs.apply_server_configs(top_config.balanced_rpcs);
// connect to the private rpcs
// only some chains have this, so this is optional
let private_rpc_configs = top_config.private_rpcs.unwrap_or_default();
let private_rpcs = if private_rpc_configs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
} else {
// TODO: do something with the spawn handle
let (private_rpcs, _) = Web3Rpcs::spawn(
block_map,
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
vredis_pool.clone(),
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
None,
)
.await
.context("spawning private_rpcs")?;
private_rpcs.apply_server_configs(private_rpc_configs);
if private_rpcs.by_name.is_empty() {
None
} else {
Some(private_rpcs)
}
};
let app = Self {
config: top_config.app,
balanced_rpcs,
@ -743,7 +652,106 @@ impl Web3ProxyApp {
let app = Arc::new(app);
Ok((app, cancellable_handles, important_background_handles).into())
app.apply_config(top_config).await?;
// TODO: use channel for receiving new top_configs
// TODO: return a channel for sending new top_configs
Ok((app, important_background_handles).into())
}
/// update the app's balanced_rpcs and private_rpcs
/// TODO: make more of the app mutable. for now, db and
pub async fn apply_server_configs(
self: Arc<Self>,
top_config: TopConfig,
) -> anyhow::Result<()> {
// safety checks on the config
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
);
}
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
let balanced_rpcs = top_config.balanced_rpcs;
// safety check on balanced_rpcs
if balanced_rpcs.len() < top_config.app.min_synced_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.",
balanced_rpcs.len(),
top_config.app.min_synced_rpcs
));
}
// safety check on sum soft limit
let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit);
if sum_soft_limit < top_config.app.min_sum_soft_limit {
return Err(anyhow::anyhow!(
"Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
top_config.app.min_sum_soft_limit
));
}
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
&format!("web3_proxy:{}", top_config.app.chain_id),
"frontend",
public_requests_per_period,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rpc_rrl.clone(),
None,
));
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
10_000, "key", rpc_rrl, None,
));
}
// login rate limiter
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_period,
60.0,
redis_pool.clone(),
));
}
Ok(())
}
pub fn head_block_receiver(&self) -> watch::Receiver<Option<Web3ProxyBlock>> {

@ -31,7 +31,7 @@ use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::sync::{broadcast, watch};
use tokio::sync::{broadcast, watch, RwLock as AsyncRwLock};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -62,6 +62,91 @@ pub struct Web3Rpcs {
}
impl Web3Rpcs {
pub async fn min_head_rpcs(&self) -> usize {
self.min_head_rpcs
}
pub async fn apply_server_configs(
&self,
server_configs: HashMap<String, Web3RpcConfig>,
) -> anyhow::Result<()> {
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = server_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
if server_config.disabled {
info!("{} is disabled", server_name);
return None;
}
let db_conn = db_conn.clone();
let http_client = http_client.clone();
let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = if watch_consensus_head_sender.is_some() {
Some(block_sender.clone())
} else {
None
};
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
debug!("spawning {}", server_name);
let handle = tokio::spawn(async move {
server_config
.spawn(
server_name,
db_conn,
redis_pool,
chain_id,
http_client,
http_interval_sender,
block_map,
block_sender,
pending_tx_id_sender,
true,
)
.await
});
Some(handle)
})
.collect();
// map of connection names to their connection
let mut connections = AsyncRwLock::new(HashMap::new());
let mut handles = vec![];
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, _handle))) => {
// web3 connection worked
connections
.write()
.await
.insert(connection.name.clone(), connection);
// TODO: what should we do with the handle? at least log any errors
}
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into());
}
}
}
Ok(())
}
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
@ -76,13 +161,12 @@ impl Web3Rpcs {
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
server_configs: HashMap<String, Web3RpcConfig>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
// TODO: query the rpc to get the actual expected block time, or get from config?
// TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check?
let expected_block_time_ms = match chain_id {
// ethereum
1 => 12_000,
@ -133,77 +217,6 @@ impl Web3Rpcs {
None
};
// turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
let mut spawn_handles: FuturesUnordered<_> = server_configs
.into_iter()
.filter_map(|(server_name, server_config)| {
if server_config.disabled {
info!("{} is disabled", server_name);
return None;
}
let db_conn = db_conn.clone();
let http_client = http_client.clone();
let redis_pool = redis_pool.clone();
let http_interval_sender = http_interval_sender.clone();
let block_sender = if watch_consensus_head_sender.is_some() {
Some(block_sender.clone())
} else {
None
};
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
debug!("spawning {}", server_name);
let handle = tokio::spawn(async move {
server_config
.spawn(
server_name,
db_conn,
redis_pool,
chain_id,
http_client,
http_interval_sender,
block_map,
block_sender,
pending_tx_id_sender,
true,
)
.await
});
Some(handle)
})
.collect();
// map of connection names to their connection
let mut connections = HashMap::new();
let mut handles = vec![];
// TODO: futures unordered?
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, handle))) => {
// web3 connection worked
connections.insert(connection.name.clone(), connection);
handles.push(handle);
}
Ok(Err(err)) => {
// if we got an error here, the app can continue on
// TODO: include context about which connection failed
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into());
}
}
}
// TODO: max_capacity and time_to_idle from config
// all block hashes are the same size, so no need for weigher
let block_hashes = Cache::builder()