diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 1b2d9bd5..9ad28941 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -13,15 +13,15 @@ use crate::Web3ProxyApp; pub struct CliConfig { /// what port the proxy should listen on #[argh(option, default = "8544")] - pub listen_port: u16, + pub port: u16, /// number of worker threads #[argh(option, default = "0")] - pub worker_threads: usize, + pub workers: usize, /// path to a toml of rpc servers #[argh(option, default = "\"./config/example.toml\".to_string()")] - pub rpc_config_path: String, + pub config: String, } #[derive(Deserialize)] diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 7a947e3d..79fc6e60 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -195,11 +195,6 @@ impl Web3Connection { self.soft_limit } - #[inline] - pub fn url(&self) -> &str { - &self.url - } - #[instrument(skip_all)] async fn send_block( self: &Arc, diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 8725552b..b2ce99a5 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -7,6 +7,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; +use hashbrown::HashMap; use serde_json::value::RawValue; use std::cmp; use std::fmt; @@ -219,11 +220,10 @@ impl Web3Connections { ) -> anyhow::Result<()> { let max_connections = self.inner.len(); - let mut connection_states: Vec<(u64, H256)> = Vec::with_capacity(max_connections); - let mut head_block_hash = H256::zero(); - let mut head_block_num = 0u64; + let mut connection_states: HashMap = + HashMap::with_capacity(max_connections); - let mut synced_connections = SyncedConnections::new(max_connections); + let mut pending_synced_connections = SyncedConnections::new(max_connections); while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { if new_block_num == 0 { @@ -238,37 +238,38 @@ impl Web3Connections { connection_states.insert(rpc_id, (new_block_num, new_block_hash)); // TODO: do something to update the synced blocks - match new_block_num.cmp(&head_block_num) { + match new_block_num.cmp(&pending_synced_connections.head_block_num) { cmp::Ordering::Greater => { // the rpc's newest block is the new overall best block info!("new head from #{}", rpc_id); - synced_connections.inner.clear(); - synced_connections.inner.push(rpc_id); + pending_synced_connections.inner.clear(); + pending_synced_connections.inner.push(rpc_id); - head_block_num = new_block_num; - head_block_hash = new_block_hash; + pending_synced_connections.head_block_num = new_block_num; + pending_synced_connections.head_block_hash = new_block_hash; } cmp::Ordering::Equal => { - if new_block_hash != head_block_hash { + if new_block_hash != pending_synced_connections.head_block_hash { // same height, but different chain // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one warn!( - "chain is forked at #{}! {} has {}. {} rpcs have {}", + "chain is forked at #{}! #{} has {}. {} rpcs have {}", new_block_num, rpc_id, new_block_hash, - synced_connections.inner.len(), - head_block_hash + pending_synced_connections.inner.len(), + pending_synced_connections.head_block_hash ); - // TODO: don't continue. check to see which head block is more popular! + // TODO: don't continue. check connection_states to see which head block is more popular! continue; } // do not clear synced_connections. // we just want to add this rpc to the end - synced_connections.inner.push(rpc_id); + // TODO: HashSet here? i think we get dupes if we don't + pending_synced_connections.inner.push(rpc_id); } cmp::Ordering::Less => { // this isn't the best block in the tier. don't do anything @@ -277,11 +278,11 @@ impl Web3Connections { } // the synced connections have changed - let new_data = Arc::new(synced_connections.clone()); + let synced_connections = Arc::new(pending_synced_connections.clone()); // TODO: only do this if there are 2 nodes synced to this block? // do the arcswap - self.synced_connections.swap(new_data); + self.synced_connections.swap(synced_connections); } // TODO: if there was an error, we should return it diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 929a566c..536df925 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -34,8 +34,8 @@ fn main() -> anyhow::Result<()> { let cli_config: CliConfig = argh::from_env(); - info!("Loading rpc config @ {}", cli_config.rpc_config_path); - let rpc_config: String = fs::read_to_string(cli_config.rpc_config_path)?; + info!("Loading rpc config @ {}", cli_config.config); + let rpc_config: String = fs::read_to_string(cli_config.config)?; let rpc_config: RpcConfig = toml::from_str(&rpc_config)?; // TODO: this doesn't seem to do anything @@ -54,8 +54,8 @@ fn main() -> anyhow::Result<()> { format!("web3-{}-{}", chain_id, worker_id) }); - if cli_config.worker_threads > 0 { - rt_builder.worker_threads(cli_config.worker_threads); + if cli_config.workers > 0 { + rt_builder.worker_threads(cli_config.workers); } let rt = rt_builder.build()?; @@ -80,7 +80,7 @@ fn main() -> anyhow::Result<()> { // spawn the root task rt.block_on(async { - let listen_port = cli_config.listen_port; + let listen_port = cli_config.port; let app = rpc_config.try_build().await?;