From 88cb2cd1d1437be74848e23b3cdabf195eb440c0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 19 Jul 2022 06:41:04 +0000 Subject: [PATCH] document more and cache in block_map --- Cargo.lock | 5 +-- README.md | 31 ++++++++++++++---- TODO.md | 3 +- web3-proxy/Cargo.toml | 4 +-- web3-proxy/src/connection.rs | 11 ++++--- web3-proxy/src/connections.rs | 61 ++++++++++++++++++++++++++--------- 6 files changed, 85 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60ff16b2..f75b64fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2730,9 +2730,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "petgraph" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b305cc4569dd4e8765bab46261f67ef5d4d11a4b6e745100ee5dad8948b46c" +checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" dependencies = [ "fixedbitset", "indexmap", @@ -4338,6 +4338,7 @@ dependencies = [ "notify", "num", "parking_lot 0.12.1", + "petgraph", "proctitle", "redis-cell-client", "regex", diff --git a/README.md b/README.md index 4858d322..f860c64c 100644 --- a/README.md +++ b/README.md @@ -28,19 +28,35 @@ Options: --help display usage information ``` -Start the server with the defaults (listen on `http://localhost:8544` and use `./config/example.toml` which proxies to a local websocket on 8546 and ankr's public ETH node): +Start the server with the defaults (listen on `http://localhost:8544` and use `./config/example.toml` which proxies to a bunch of public nodes: ``` -cargo run --release -p web3-proxy +cargo run --release -p web3-proxy -- --config ./config/example.toml ``` +## Common commands + Check that the proxy is working: ``` curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544 ``` -You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start a proxies for many chains. +Check that the websocket is working: + +``` +$ websocat ws://127.0.0.1:8544 + +{"id": 1, "method": "eth_subscribe", "params": ["newHeads"]} + +{"id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]} + +{"id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]} + +{"id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]} +``` + +You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains. ## Flame Graphs @@ -67,15 +83,18 @@ Test the proxy: wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544 wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544 -Test geth: +Test geth (assuming it is on 8545): wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 -Test erigon: +Test erigon (assuming it is on 8945): wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 - Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another. + +Run ethspam for a more realistic load test: + + docker run --rm --name spam shazow/ethspam --rpc http://$LOCAL_IP:8544 | versus --concurrency=100 --stop-after=10000 http://$LOCAL_IP:8544; docker stop spam diff --git a/TODO.md b/TODO.md index 630b8b53..dd32defb 100644 --- a/TODO.md +++ b/TODO.md @@ -139,4 +139,5 @@ in another repo: event subscriber - [ ] i saw "WebSocket connection closed unexpectedly" but no auto reconnect. need better logs on these - [ ] if archive servers are added to the rotation while they are still syncing, they might get requests too soon. keep archive servers out of the configs until they are done syncing. full nodes should be fine to add to the configs even while syncing, though its a wasted connection - [x] when under load, i'm seeing "http interval lagging!". sometimes it happens when not loaded. - - we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times \ No newline at end of file + - we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times +- [ ] load tests: docker run --rm --name spam shazow/ethspam --rpc http://$LOCAL_IP:8544 | versus --concurrency=100 --stop-after=10000 http://$LOCAL_IP:8544; docker stop spam \ No newline at end of file diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 8d87d989..5ccaef42 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -27,8 +27,8 @@ linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } notify = "4.0.17" num = "0.4.0" redis-cell-client = { path = "../redis-cell-client" } -# TODO: parking_lot has an "arc_lock" feature that we might want to use -parking_lot = "0.12.1" +parking_lot = { version = "0.12.1", features = ["arc_lock"] } +petgraph = "0.6.2" proctitle = "0.1.1" # TODO: regex has several "perf" features that we might want to use regex = "1.6.0" diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index d5a07c26..34719b31 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -86,11 +86,14 @@ impl Serialize for Web3Connection { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Connection", 3)?; + let mut state = serializer.serialize_struct("Web3Connection", 4)?; // TODO: sanitize any credentials in the url state.serialize_field("url", &self.url)?; + let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); + state.serialize_field("block_data_limit", &block_data_limit)?; + state.serialize_field("soft_limit", &self.soft_limit)?; state.serialize_field( @@ -120,6 +123,7 @@ impl fmt::Debug for Web3Connection { impl fmt::Display for Web3Connection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: filter basic auth and api keys write!(f, "{}", &self.url) } } @@ -188,12 +192,11 @@ impl Web3Connection { chain_id, found_chain_id ) - .context(format!("failed spawning {}", new_connection))); + .context(format!("failed @ {}", new_connection))); } } Err(e) => { - let e = - anyhow::Error::from(e).context(format!("failed spawning {}", new_connection)); + let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection)); return Err(e); } } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index c2e3ddaa..0fc28531 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -9,6 +9,8 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; +// use parking_lot::RwLock; +// use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -36,15 +38,14 @@ struct SyncedConnections { // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] inner: BTreeSet>, - // TODO: use petgraph for keeping track of the chain so we can do better fork handling } impl fmt::Debug for SyncedConnections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though f.debug_struct("SyncedConnections") - .field("head_block_num", &self.head_block_num) - .field("head_block_hash", &self.head_block_hash) + .field("head_num", &self.head_block_num) + .field("head_hash", &self.head_block_hash) .finish_non_exhaustive() } } @@ -65,6 +66,10 @@ pub struct Web3Connections { inner: Vec>, synced_connections: ArcSwap, pending_transactions: Arc>, + // TODO: i think chain is what we want, but i'm not sure how we'll use it yet + // TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? + // chain: Arc>>>, + block_map: DashMap>, } impl Serialize for Web3Connections { @@ -197,6 +202,7 @@ impl Web3Connections { inner: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, + block_map: Default::default(), }); let handle = { @@ -377,7 +383,13 @@ impl Web3Connections { pub async fn get_block_hash(&self, num: U64) -> anyhow::Result { // TODO: this definitely needs caching - warn!("this needs to be much more efficient"); + // first, try to get the hash from petgraph. but how do we get all blocks with a given num and then pick the one on the correct chain? + if let Some(hash) = self.block_map.get(&num.as_u64()) { + // for now, just return the first seen block. we actually want the winning block! + return Ok(*hash.iter().next().unwrap()); + + unimplemented!("use petgraph to find the heaviest chain"); + } // TODO: helper for this let request = @@ -392,7 +404,11 @@ impl Web3Connections { let block = response.result.unwrap().to_string(); let block: Block = serde_json::from_str(&block)?; - Ok(block.hash.unwrap()) + let hash = block.hash.unwrap(); + + self.block_map.entry(num.as_u64()).or_default().insert(hash); + + Ok(hash) } pub fn get_head_block_hash(&self) -> H256 { @@ -478,6 +494,7 @@ impl Web3Connections { let mut connection_states: HashMap, _> = HashMap::with_capacity(total_rpcs); + // keep a pending one so that we can delay publishing a new head block until multiple servers are synced let mut pending_synced_connections = SyncedConnections::default(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { @@ -524,12 +541,18 @@ impl Web3Connections { // TODO: if the parent hash isn't our previous best block, ignore it pending_synced_connections.head_block_hash = new_block_hash; + // TODO: wait to send this until we publish head_block_sender .send(new_block.clone()) .context("head_block_sender")?; // TODO: mark all transactions as confirmed // TODO: mark any orphaned transactions as unconfirmed + + self.block_map + .entry(new_block_num) + .or_default() + .insert(new_block_hash); } cmp::Ordering::Equal => { if new_block_hash == pending_synced_connections.head_block_hash { @@ -541,6 +564,11 @@ impl Web3Connections { } else { // same height, but different chain + self.block_map + .entry(new_block_num) + .or_default() + .insert(new_block_hash); + // check connection_states to see which head block is more popular! let mut rpc_ids_by_block: BTreeMap>> = BTreeMap::new(); @@ -596,24 +624,26 @@ impl Web3Connections { // we didn't remove anything. nothing more to do continue; } + + // TODO: insert the hash if it isn't known? + // we removed. don't continue so that we update self.synced_connections } } // the synced connections have changed - let synced_connections = Arc::new(pending_synced_connections.clone()); - if synced_connections.inner.len() == total_rpcs { + if pending_synced_connections.inner.len() == total_rpcs { // TODO: more metrics trace!("all head: {}", new_block_hash); + } else { + trace!( + "rpcs at {}: {:?}", + pending_synced_connections.head_block_hash, + pending_synced_connections.inner + ); } - trace!( - "rpcs at {}: {:?}", - synced_connections.head_block_hash, - synced_connections.inner - ); - // TODO: what if the hashes don't match? if pending_synced_connections.head_block_hash == new_block_hash { // mark all transactions in the block as confirmed @@ -630,9 +660,10 @@ impl Web3Connections { // TODO: mark any orphaned transactions as unconfirmed } - // TODO: only publish if there are x (default 2) nodes synced to this block? + // TODO: only publish if there are x (default 50%) nodes synced to this block? // TODO: do this before or after processing all the transactions in this block? - self.synced_connections.swap(synced_connections); + self.synced_connections + .swap(Arc::new(pending_synced_connections.clone())); } // TODO: if there was an error, we should return it