add minimal config for quick testing

This commit is contained in:
Bryan Stitt 2023-02-06 15:20:36 -08:00
parent 0edd11349f
commit 0b1929a820
3 changed files with 66 additions and 34 deletions

32
config/minimal.toml Normal file
View File

@ -0,0 +1,32 @@
[app]
chain_id = 1
# no database
# no influxdb
# no redis
# no sentry
# no public limits means anon gets full access
# no thundering herd protection
min_sum_soft_limit = 1
min_synced_rpcs = 1
# 1GB of cache
response_cache_max_bytes = 1_000_000_000
[balanced_rpcs]
[balanced_rpcs.llama_public_wss]
# TODO: what should we do if all rpcs are disabled? warn and wait for a config change?
disabled = false
display_name = "LlamaNodes WSS"
url = "wss://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0
[balanced_rpcs.llama_public_https]
disabled = false
display_name = "LlamaNodes HTTPS"
url = "https://eth.llamarpc.com/"
soft_limit = 1_000
tier = 0

View File

@ -288,29 +288,34 @@ impl Web3Rpcs {
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = ConsensusFinder::default(); let mut connection_heads = ConsensusFinder::default();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await { loop {
let new_block = new_block.map(Into::into); match block_receiver.recv_async().await {
Ok((new_block, rpc)) => {
let new_block = new_block.map(Into::into);
let rpc_name = rpc.name.clone(); let rpc_name = rpc.name.clone();
if let Err(err) = self if let Err(err) = self
.process_block_from_rpc( .process_block_from_rpc(
authorization, authorization,
&mut connection_heads, &mut connection_heads,
new_block, new_block,
rpc, rpc,
&head_block_sender, &head_block_sender,
&pending_tx_sender, &pending_tx_sender,
) )
.await .await
{ {
warn!("unable to process block from rpc {}: {:?}", rpc_name, err); warn!("unable to process block from rpc {}: {:?}", rpc_name, err);
}
}
Err(err) => {
warn!("block_receiver exited! {:#?}", err);
return Err(err.into());
}
} }
} }
// TODO: if there was an error, should we return it instead of an Ok?
warn!("block_receiver exited!");
Ok(()) Ok(())
} }
@ -674,6 +679,7 @@ impl ConnectionsGroup {
} else { } else {
// i don't think this is an error. i think its just if a reconnect is currently happening // i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name); warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
} }
} }

View File

@ -12,7 +12,7 @@ use crate::rpcs::transactions::TxStatus;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, H256, U64}; use ethers::prelude::{ProviderError, TxHash, H256, U64};
use futures::future::try_join_all; use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
@ -89,9 +89,7 @@ impl Web3Rpcs {
}; };
let http_interval_sender = if http_client.is_some() { let http_interval_sender = if http_client.is_some() {
let (sender, receiver) = broadcast::channel(1); let (sender, _) = broadcast::channel(1);
drop(receiver);
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce // TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2)); let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2));
@ -109,14 +107,10 @@ impl Web3Rpcs {
// trace!("http interval ready"); // trace!("http interval ready");
if let Err(_) = sender.send(()) { if let Err(_) = sender.send(()) {
// errors are okay. they mean that all receivers have been dropped. that is normal if there are only websocket backend rpcs // errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet
// TODO: if error, await a `std::future::pending::<()>`? whats the right thing here? trace!("no http receivers");
info!("all http receivers have been dropped");
break;
}; };
} }
info!("http interval sender exited");
} }
}; };
@ -130,11 +124,11 @@ impl Web3Rpcs {
// turn configs into connections (in parallel) // turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
// TODO: futures unordered? let mut spawn_handles: Vec<_> = server_configs
let mut spawn_handles: FuturesUnordered<_> = server_configs
.into_iter() .into_iter()
.filter_map(|(server_name, server_config)| { .filter_map(|(server_name, server_config)| {
if server_config.disabled { if server_config.disabled {
info!("{} is disabled", server_name);
return None; return None;
} }
@ -152,6 +146,8 @@ impl Web3Rpcs {
let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone(); let block_map = block_map.clone();
debug!("spawning {}", server_name);
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
server_config server_config
.spawn( .spawn(
@ -176,8 +172,8 @@ impl Web3Rpcs {
let mut connections = HashMap::new(); let mut connections = HashMap::new();
let mut handles = vec![]; let mut handles = vec![];
for x in spawn_handles.next().await { // TODO: futures unordered?
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit for x in join_all(spawn_handles).await {
match x { match x {
Ok(Ok((connection, handle))) => { Ok(Ok((connection, handle))) => {
// web3 connection worked // web3 connection worked
@ -230,7 +226,6 @@ impl Web3Rpcs {
let connections = connections.clone(); let connections = connections.clone();
tokio::spawn(async move { tokio::spawn(async move {
// TODO: try_join_all with the other handles here
connections connections
.subscribe( .subscribe(
authorization, authorization,
@ -328,7 +323,6 @@ impl Web3Rpcs {
} }
info!("subscriptions over: {:?}", self); info!("subscriptions over: {:?}", self);
Ok(()) Ok(())
} }