From bc1224a0e12d5db7f205245c9d09f1b4b9365178 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 22 Jul 2022 05:11:26 +0000 Subject: [PATCH] store blocks in Arcs --- Cargo.lock | 69 +++++++++--------- linkedhashmap/Cargo.toml | 2 +- web3-proxy/Cargo.toml | 9 +-- web3-proxy/src/app.rs | 14 ++-- web3-proxy/src/config.rs | 4 +- web3-proxy/src/connection.rs | 16 ++--- web3-proxy/src/connections.rs | 128 ++++++++++++++++++++++++---------- 7 files changed, 157 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f75b64fa..74c7adfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -551,11 +551,8 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ - "libc", "num-integer", "num-traits", - "time", - "winapi 0.3.9", ] [[package]] @@ -942,7 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" dependencies = [ "cfg-if 1.0.0", - "hashbrown 0.12.3", + "hashbrown", "lock_api", "parking_lot_core 0.9.3", ] @@ -1526,9 +1523,9 @@ checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" [[package]] name = "flume" -version = "0.10.13" +version = "0.10.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" dependencies = [ "futures-core", "futures-sink", @@ -1821,12 +1818,6 @@ version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" - [[package]] name = "hashbrown" version = "0.12.3" @@ -1851,7 +1842,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ - "hashbrown 0.12.3", + "hashbrown", ] [[package]] @@ -2046,12 +2037,12 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" [[package]] name = "indexmap" -version = "1.7.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown 0.11.2", + "hashbrown", ] [[package]] @@ -2255,7 +2246,7 @@ name = "linkedhashmap" version = "0.2.0" dependencies = [ "criterion", - "hashbrown 0.12.3", + "hashbrown", "hashlink", "linked-hash-map", "slab", @@ -2547,6 +2538,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -3345,9 +3345,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" [[package]] name = "serde" -version = "1.0.139" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" +checksum = "fc855a42c7967b7c369eb5860f7164ef1f6f81c20c7cc1141f2a604e18723b03" dependencies = [ "serde_derive", ] @@ -3374,9 +3374,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.139" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" +checksum = "6f2122636b9fe3b81f1cb25099fcf2d3f542cdb1d45940d56c713158884a05da" dependencies = [ "proc-macro2", "quote", @@ -3511,11 +3511,10 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" [[package]] name = "siwe" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f2d8ae2d4ae58df46e173aa496562ea857ac6a4f0d435ed30fcd19da0aaa79" +checksum = "8bf4de418b0989028f138b74db880525e1dbab7fdbeeb4ba8ad96f62bc9ed9a3" dependencies = [ - "chrono", "hex", "http", "iri-string", @@ -3523,13 +3522,17 @@ dependencies = [ "rand", "sha3 0.9.1", "thiserror", + "time", ] [[package]] name = "slab" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] [[package]] name = "smallvec" @@ -3791,12 +3794,13 @@ dependencies = [ [[package]] name = "time" -version = "0.1.43" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217" dependencies = [ + "itoa 1.0.2", "libc", - "winapi 0.3.9", + "num_threads", ] [[package]] @@ -4046,9 +4050,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.14" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a713421342a5a666b7577783721d3117f1b69a393df803ee17bb73b1e122a59" +checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b" dependencies = [ "ansi_term", "matchers", @@ -4333,7 +4337,8 @@ dependencies = [ "fdlimit", "flume", "futures", - "hashbrown 0.12.3", + "hashbrown", + "indexmap", "linkedhashmap", "notify", "num", diff --git a/linkedhashmap/Cargo.toml b/linkedhashmap/Cargo.toml index b520c978..304c8fd5 100644 --- a/linkedhashmap/Cargo.toml +++ b/linkedhashmap/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" inline-more = [ "hashbrown" ] [dependencies] -slab = "0.4.6" +slab = "0.4.7" hashbrown = { version = "0.12.3", optional = true } [dev-dependencies] diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 5ccaef42..c03193e9 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -20,9 +20,10 @@ dashmap = "5.3.4" derive_more = "0.99.17" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } fdlimit = "0.2.1" -flume = "0.10.13" +flume = "0.10.14" futures = { version = "0.3.21", features = ["thread-pool"] } hashbrown = "0.12.3" +indexmap = "1.9.1" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } notify = "4.0.17" num = "0.4.0" @@ -34,14 +35,14 @@ proctitle = "0.1.1" regex = "1.6.0" reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } rustc-hash = "1.1.0" -siwe = "0.3.0" -serde = { version = "1.0.139", features = [] } +siwe = "0.4.0" +serde = { version = "1.0.140", features = [] } serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] } tokio = { version = "1.20.0", features = ["full", "tracing"] } toml = "0.5.9" tracing = "0.1.35" # TODO: tracing-subscriber has serde and serde_json features that we might want to use -tracing-subscriber = { version = "0.3.14", features = ["env-filter", "parking_lot"] } +tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] } url = "2.2.2" tower = "0.4.13" tokio-stream = { version = "0.1.9", features = ["sync"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index ba224c77..6f2479d8 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -140,13 +140,15 @@ fn get_min_block_needed( "eth_estimateGas" => 1, "eth_getBalance" => 1, "eth_getBlockByHash" => { + // TODO: double check that any node can serve this return None; } "eth_getBlockByNumber" => { + // TODO: double check that any node can serve this return None; } "eth_getBlockTransactionCountByHash" => { - // TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes + // TODO: double check that any node can serve this return None; } "eth_getBlockTransactionCountByNumber" => 0, @@ -254,7 +256,7 @@ pub struct Web3ProxyApp { response_cache: ResponseLrcCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? - head_block_receiver: watch::Receiver>, + head_block_receiver: watch::Receiver>>, pending_tx_sender: broadcast::Sender, pending_transactions: Arc>, public_rate_limiter: Option, @@ -332,10 +334,11 @@ impl Web3ProxyApp { } }; - let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); + let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16); + // TODO: this will grow unbounded!! add some expiration to this. and probably move to redis let pending_transactions = Arc::new(DashMap::new()); // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them @@ -457,7 +460,7 @@ impl Web3ProxyApp { "method":"eth_subscription", "params": { "subscription": subscription_id, - "result": new_head, + "result": new_head.as_ref(), }, }); @@ -846,6 +849,9 @@ impl Web3ProxyApp { // TODO: eth_estimateGas using anvil? // TODO: eth_gasPrice that does awesome magic to predict the future // TODO: eth_getBlockByHash from caches + "eth_getBlockByHash" => { + unimplemented!("wip") + } // TODO: eth_getBlockByNumber from caches // TODO: eth_getBlockTransactionCountByHash from caches // TODO: eth_getBlockTransactionCountByNumber from caches diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index 8d57c061..20dd947d 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -8,6 +8,8 @@ use tokio::sync::broadcast; use crate::app::AnyhowJoinHandle; use crate::connection::Web3Connection; +pub type BlockAndRpc = (Arc>, Arc); + #[derive(Debug, FromArgs)] /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. pub struct CliConfig { @@ -58,7 +60,7 @@ impl Web3ConnectionConfig { chain_id: u64, http_client: Option, http_interval_sender: Option>>, - block_sender: Option, Arc)>>, + block_sender: Option>, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_rate_limit = self.hard_limit.map(|x| (x, redis_client_pool.unwrap())); diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 34719b31..d26b8d5d 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -17,6 +17,7 @@ use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; use tracing::{error, info, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::config::BlockAndRpc; /// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 #[derive(From)] @@ -142,7 +143,7 @@ impl Web3Connection { hard_limit: Option<(u32, redis_cell_client::RedisClientPool)>, // TODO: think more about this type soft_limit: u32, - block_sender: Option, Arc)>>, + block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { @@ -286,7 +287,7 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn reconnect( self: &Arc, - block_sender: Option, Arc)>>, + block_sender: Option>, ) -> anyhow::Result<()> { // websocket doesn't need the http client let http_client = None; @@ -302,7 +303,7 @@ impl Web3Connection { // tell the block subscriber that we are at 0 if let Some(block_sender) = block_sender { block_sender - .send_async((Block::default(), self.clone())) + .send_async((Arc::new(Block::default()), self.clone())) .await .context("block_sender at 0")?; } @@ -334,7 +335,7 @@ impl Web3Connection { async fn send_block_result( self: &Arc, block: Result, ProviderError>, - block_sender: &flume::Sender<(Block, Arc)>, + block_sender: &flume::Sender, ) -> anyhow::Result<()> { match block { Ok(block) => { @@ -348,7 +349,7 @@ impl Web3Connection { } block_sender - .send_async((block, self.clone())) + .send_async((Arc::new(block), self.clone())) .await .context("block_sender")?; } @@ -363,7 +364,7 @@ impl Web3Connection { async fn subscribe( self: Arc, http_interval_sender: Option>>, - block_sender: Option, Arc)>>, + block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<()> { @@ -428,7 +429,7 @@ impl Web3Connection { async fn subscribe_new_heads( self: Arc, http_interval_receiver: Option>, - block_sender: flume::Sender<(Block, Arc)>, + block_sender: flume::Sender, ) -> anyhow::Result<()> { info!("watching {}", self); @@ -446,7 +447,6 @@ impl Web3Connection { loop { match self.try_request_handle().await { Ok(active_request_handle) => { - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 0fc40b34..39b519c2 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -9,6 +9,7 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; +use indexmap::{IndexMap, IndexSet}; // use parking_lot::RwLock; // use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; @@ -16,7 +17,6 @@ use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; use std::cmp; -use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -36,8 +36,9 @@ struct SyncedConnections { head_block_num: u64, head_block_hash: H256, // TODO: this should be able to serialize, but it isn't + // TODO: use linkedhashmap? #[serde(skip_serializing)] - inner: BTreeSet>, + inner: IndexSet>, } impl fmt::Debug for SyncedConnections { @@ -60,6 +61,48 @@ impl SyncedConnections { } } +#[derive(Default)] +pub struct BlockChain { + /// only includes blocks on the main chain. + chain_map: DashMap>>, + /// all blocks, including orphans + block_map: DashMap>>, + // TODO: petgraph? +} + +impl BlockChain { + pub fn add_block(&self, block: Arc>, cannonical: bool) { + let hash = block.hash.unwrap(); + + if cannonical { + let num = block.number.unwrap(); + + let entry = self.chain_map.entry(num); + + let mut is_new = false; + + entry.or_insert_with(|| { + is_new = true; + block.clone() + }); + + if !is_new { + return; + } + } + + self.block_map.entry(hash).or_insert(block); + } + + pub fn get_block(&self, num: &U64) -> Option>> { + self.chain_map.get(num).map(|x| x.clone()) + } + + pub fn get_block_from_hash(&self, hash: &H256) -> Option>> { + self.block_map.get(hash).map(|x| x.clone()) + } +} + /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { @@ -69,7 +112,7 @@ pub struct Web3Connections { // TODO: i think chain is what we want, but i'm not sure how we'll use it yet // TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? // chain: Arc>>>, - block_map: DashMap>, + chain: BlockChain, } impl Serialize for Web3Connections { @@ -103,12 +146,13 @@ impl Web3Connections { server_configs: Vec, http_client: Option, redis_client_pool: Option, - head_block_sender: Option>>, + head_block_sender: Option>>>, pending_tx_sender: Option>, pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (block_sender, block_receiver) = flume::unbounded(); + let (block_sender, block_receiver) = + flume::unbounded::<(Arc>, Arc)>(); let http_interval_sender = if http_client.is_some() { let (sender, receiver) = broadcast::channel(1); @@ -202,7 +246,7 @@ impl Web3Connections { inner: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, - block_map: Default::default(), + chain: Default::default(), }); let handle = { @@ -314,8 +358,8 @@ impl Web3Connections { async fn subscribe( self: Arc, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, - block_receiver: flume::Receiver<(Block, Arc)>, - head_block_sender: Option>>, + block_receiver: flume::Receiver<(Arc>, Arc)>, + head_block_sender: Option>>>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -377,14 +421,11 @@ impl Web3Connections { Ok(()) } - pub async fn get_block_hash(&self, num: U64) -> anyhow::Result { - // first, try to get the hash from our cache - // TODO: move this cache to redis? - if let Some(hash) = self.block_map.get(&num.as_u64()) { + pub async fn get_block(&self, num: U64) -> anyhow::Result>> { + if let Some(block) = self.chain.get_block(&num) { // for now, just return the first seen block. we actually want the winning block! - return Ok(*hash.iter().next().unwrap()); - - unimplemented!("use petgraph to find the heaviest chain"); + // TODO: don't clone + return Ok(block); } let head_block_num = self.get_head_block_num(); @@ -407,14 +448,24 @@ impl Web3Connections { .try_send_best_upstream_server(request, Some(num)) .await?; - let block = response.result.unwrap().to_string(); + let block = response.result.unwrap(); - let block: Block = serde_json::from_str(&block)?; + let block: Block = serde_json::from_str(block.get())?; + + let block = Arc::new(block); + + self.chain.add_block(block.clone(), true); + + Ok(block) + } + + pub async fn get_block_hash(&self, num: U64) -> anyhow::Result { + // first, try to get the hash from our cache + // TODO: move this cache to redis? + let block = self.get_block(num).await?; let hash = block.hash.unwrap(); - self.block_map.entry(num.as_u64()).or_default().insert(hash); - Ok(hash) } @@ -507,8 +558,8 @@ impl Web3Connections { // we don't instrument here because we put a span inside the while loop async fn update_synced_rpcs( &self, - block_receiver: flume::Receiver<(Block, Arc)>, - head_block_sender: watch::Sender>, + block_receiver: flume::Receiver<(Arc>, Arc)>, + head_block_sender: watch::Sender>>, // TODO: use pending_tx_sender pending_tx_sender: Option>, ) -> anyhow::Result<()> { @@ -546,6 +597,8 @@ impl Web3Connections { warn!("still syncing"); } + let mut new_head_block = false; + connection_states.insert(rpc.clone(), (new_block_num, new_block_hash)); // TODO: do something to update the synced blocks @@ -572,10 +625,10 @@ impl Web3Connections { // TODO: mark all transactions as confirmed // TODO: mark any orphaned transactions as unconfirmed - self.block_map - .entry(new_block_num) - .or_default() - .insert(new_block_hash); + // TODO: do not mark cannonical until a threshold of RPCs have this block! + new_head_block = true; + + self.chain.add_block(new_block.clone(), new_head_block); } cmp::Ordering::Equal => { if new_block_hash == pending_synced_connections.head_block_hash { @@ -587,14 +640,10 @@ impl Web3Connections { } else { // same height, but different chain - self.block_map - .entry(new_block_num) - .or_default() - .insert(new_block_hash); - // check connection_states to see which head block is more popular! - let mut rpc_ids_by_block: BTreeMap>> = - BTreeMap::new(); + // TODO: i don't think btreemap is what we want. i think we want indexmap or linkedhashmap + let mut rpc_ids_by_block = + IndexMap::>>::new(); let mut counted_rpcs = 0; @@ -630,11 +679,11 @@ impl Web3Connections { most_common_head_hash ); + self.chain + .add_block(new_block.clone(), new_block_hash == most_common_head_hash); + // TODO: do this more efficiently? if pending_synced_connections.head_block_hash != most_common_head_hash { - head_block_sender - .send(new_block.clone()) - .context("head_block_sender")?; pending_synced_connections.head_block_hash = most_common_head_hash; } @@ -687,6 +736,15 @@ impl Web3Connections { // TODO: do this before or after processing all the transactions in this block? self.synced_connections .swap(Arc::new(pending_synced_connections.clone())); + + if new_head_block { + // TODO: this will need a refactor to only send once a minmum threshold has this block + // TODO: move this onto self.chain + // TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail + head_block_sender + .send(new_block.clone()) + .context("head_block_sender")?; + } } // TODO: if there was an error, we should return it