This commit is contained in:
Bryan Stitt 2022-05-18 20:28:00 +00:00
parent 7a3a3271bb
commit bc706d1c98
4 changed files with 26 additions and 30 deletions

View File

@ -13,15 +13,15 @@ use crate::Web3ProxyApp;
pub struct CliConfig { pub struct CliConfig {
/// what port the proxy should listen on /// what port the proxy should listen on
#[argh(option, default = "8544")] #[argh(option, default = "8544")]
pub listen_port: u16, pub port: u16,
/// number of worker threads /// number of worker threads
#[argh(option, default = "0")] #[argh(option, default = "0")]
pub worker_threads: usize, pub workers: usize,
/// path to a toml of rpc servers /// path to a toml of rpc servers
#[argh(option, default = "\"./config/example.toml\".to_string()")] #[argh(option, default = "\"./config/example.toml\".to_string()")]
pub rpc_config_path: String, pub config: String,
} }
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@ -195,11 +195,6 @@ impl Web3Connection {
self.soft_limit self.soft_limit
} }
#[inline]
pub fn url(&self) -> &str {
&self.url
}
#[instrument(skip_all)] #[instrument(skip_all)]
async fn send_block( async fn send_block(
self: &Arc<Self>, self: &Arc<Self>,

View File

@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant}; use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil; use governor::NotUntil;
use hashbrown::HashMap;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::cmp; use std::cmp;
use std::fmt; use std::fmt;
@ -219,11 +220,10 @@ impl Web3Connections {
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let max_connections = self.inner.len(); let max_connections = self.inner.len();
let mut connection_states: Vec<(u64, H256)> = Vec::with_capacity(max_connections); let mut connection_states: HashMap<usize, (u64, H256)> =
let mut head_block_hash = H256::zero(); HashMap::with_capacity(max_connections);
let mut head_block_num = 0u64;
let mut synced_connections = SyncedConnections::new(max_connections); let mut pending_synced_connections = SyncedConnections::new(max_connections);
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await {
if new_block_num == 0 { if new_block_num == 0 {
@ -238,37 +238,38 @@ impl Web3Connections {
connection_states.insert(rpc_id, (new_block_num, new_block_hash)); connection_states.insert(rpc_id, (new_block_num, new_block_hash));
// TODO: do something to update the synced blocks // TODO: do something to update the synced blocks
match new_block_num.cmp(&head_block_num) { match new_block_num.cmp(&pending_synced_connections.head_block_num) {
cmp::Ordering::Greater => { cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block // the rpc's newest block is the new overall best block
info!("new head from #{}", rpc_id); info!("new head from #{}", rpc_id);
synced_connections.inner.clear(); pending_synced_connections.inner.clear();
synced_connections.inner.push(rpc_id); pending_synced_connections.inner.push(rpc_id);
head_block_num = new_block_num; pending_synced_connections.head_block_num = new_block_num;
head_block_hash = new_block_hash; pending_synced_connections.head_block_hash = new_block_hash;
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
if new_block_hash != head_block_hash { if new_block_hash != pending_synced_connections.head_block_hash {
// same height, but different chain // same height, but different chain
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
// TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one
warn!( warn!(
"chain is forked at #{}! {} has {}. {} rpcs have {}", "chain is forked at #{}! #{} has {}. {} rpcs have {}",
new_block_num, new_block_num,
rpc_id, rpc_id,
new_block_hash, new_block_hash,
synced_connections.inner.len(), pending_synced_connections.inner.len(),
head_block_hash pending_synced_connections.head_block_hash
); );
// TODO: don't continue. check to see which head block is more popular! // TODO: don't continue. check connection_states to see which head block is more popular!
continue; continue;
} }
// do not clear synced_connections. // do not clear synced_connections.
// we just want to add this rpc to the end // we just want to add this rpc to the end
synced_connections.inner.push(rpc_id); // TODO: HashSet here? i think we get dupes if we don't
pending_synced_connections.inner.push(rpc_id);
} }
cmp::Ordering::Less => { cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything // this isn't the best block in the tier. don't do anything
@ -277,11 +278,11 @@ impl Web3Connections {
} }
// the synced connections have changed // the synced connections have changed
let new_data = Arc::new(synced_connections.clone()); let synced_connections = Arc::new(pending_synced_connections.clone());
// TODO: only do this if there are 2 nodes synced to this block? // TODO: only do this if there are 2 nodes synced to this block?
// do the arcswap // do the arcswap
self.synced_connections.swap(new_data); self.synced_connections.swap(synced_connections);
} }
// TODO: if there was an error, we should return it // TODO: if there was an error, we should return it

View File

@ -34,8 +34,8 @@ fn main() -> anyhow::Result<()> {
let cli_config: CliConfig = argh::from_env(); let cli_config: CliConfig = argh::from_env();
info!("Loading rpc config @ {}", cli_config.rpc_config_path); info!("Loading rpc config @ {}", cli_config.config);
let rpc_config: String = fs::read_to_string(cli_config.rpc_config_path)?; let rpc_config: String = fs::read_to_string(cli_config.config)?;
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
// TODO: this doesn't seem to do anything // TODO: this doesn't seem to do anything
@ -54,8 +54,8 @@ fn main() -> anyhow::Result<()> {
format!("web3-{}-{}", chain_id, worker_id) format!("web3-{}-{}", chain_id, worker_id)
}); });
if cli_config.worker_threads > 0 { if cli_config.workers > 0 {
rt_builder.worker_threads(cli_config.worker_threads); rt_builder.worker_threads(cli_config.workers);
} }
let rt = rt_builder.build()?; let rt = rt_builder.build()?;
@ -80,7 +80,7 @@ fn main() -> anyhow::Result<()> {
// spawn the root task // spawn the root task
rt.block_on(async { rt.block_on(async {
let listen_port = cli_config.listen_port; let listen_port = cli_config.port;
let app = rpc_config.try_build().await?; let app = rpc_config.try_build().await?;