store blocks in Arcs

This commit is contained in:
Bryan Stitt 2022-07-22 05:11:26 +00:00
parent c10be3c78d
commit bc1224a0e1
7 changed files with 157 additions and 85 deletions

69
Cargo.lock generated
View File

@ -551,11 +551,8 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [ dependencies = [
"libc",
"num-integer", "num-integer",
"num-traits", "num-traits",
"time",
"winapi 0.3.9",
] ]
[[package]] [[package]]
@ -942,7 +939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"hashbrown 0.12.3", "hashbrown",
"lock_api", "lock_api",
"parking_lot_core 0.9.3", "parking_lot_core 0.9.3",
] ]
@ -1526,9 +1523,9 @@ checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]] [[package]]
name = "flume" name = "flume"
version = "0.10.13" version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da" checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -1821,12 +1818,6 @@ version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -1851,7 +1842,7 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086"
dependencies = [ dependencies = [
"hashbrown 0.12.3", "hashbrown",
] ]
[[package]] [[package]]
@ -2046,12 +2037,12 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.7.0" version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown 0.11.2", "hashbrown",
] ]
[[package]] [[package]]
@ -2255,7 +2246,7 @@ name = "linkedhashmap"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"criterion", "criterion",
"hashbrown 0.12.3", "hashbrown",
"hashlink", "hashlink",
"linked-hash-map", "linked-hash-map",
"slab", "slab",
@ -2547,6 +2538,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.4.0" version = "0.4.0"
@ -3345,9 +3345,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.139" version = "1.0.140"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0171ebb889e45aa68b44aee0859b3eede84c6f5f5c228e6f140c0b2a0a46cad6" checksum = "fc855a42c7967b7c369eb5860f7164ef1f6f81c20c7cc1141f2a604e18723b03"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -3374,9 +3374,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.139" version = "1.0.140"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1d3230c1de7932af58ad8ffbe1d784bd55efd5a9d84ac24f69c72d83543dfb" checksum = "6f2122636b9fe3b81f1cb25099fcf2d3f542cdb1d45940d56c713158884a05da"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3511,11 +3511,10 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]] [[package]]
name = "siwe" name = "siwe"
version = "0.3.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86f2d8ae2d4ae58df46e173aa496562ea857ac6a4f0d435ed30fcd19da0aaa79" checksum = "8bf4de418b0989028f138b74db880525e1dbab7fdbeeb4ba8ad96f62bc9ed9a3"
dependencies = [ dependencies = [
"chrono",
"hex", "hex",
"http", "http",
"iri-string", "iri-string",
@ -3523,13 +3522,17 @@ dependencies = [
"rand", "rand",
"sha3 0.9.1", "sha3 0.9.1",
"thiserror", "thiserror",
"time",
] ]
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.6" version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
@ -3791,12 +3794,13 @@ dependencies = [
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.43" version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" checksum = "72c91f41dcb2f096c05f0873d667dceec1087ce5bcf984ec8ffb19acddbb3217"
dependencies = [ dependencies = [
"itoa 1.0.2",
"libc", "libc",
"winapi 0.3.9", "num_threads",
] ]
[[package]] [[package]]
@ -4046,9 +4050,9 @@ dependencies = [
[[package]] [[package]]
name = "tracing-subscriber" name = "tracing-subscriber"
version = "0.3.14" version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a713421342a5a666b7577783721d3117f1b69a393df803ee17bb73b1e122a59" checksum = "60db860322da191b40952ad9affe65ea23e7dd6a5c442c2c42865810c6ab8e6b"
dependencies = [ dependencies = [
"ansi_term", "ansi_term",
"matchers", "matchers",
@ -4333,7 +4337,8 @@ dependencies = [
"fdlimit", "fdlimit",
"flume", "flume",
"futures", "futures",
"hashbrown 0.12.3", "hashbrown",
"indexmap",
"linkedhashmap", "linkedhashmap",
"notify", "notify",
"num", "num",

View File

@ -8,7 +8,7 @@ edition = "2018"
inline-more = [ "hashbrown" ] inline-more = [ "hashbrown" ]
[dependencies] [dependencies]
slab = "0.4.6" slab = "0.4.7"
hashbrown = { version = "0.12.3", optional = true } hashbrown = { version = "0.12.3", optional = true }
[dev-dependencies] [dev-dependencies]

View File

@ -20,9 +20,10 @@ dashmap = "5.3.4"
derive_more = "0.99.17" derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
fdlimit = "0.2.1" fdlimit = "0.2.1"
flume = "0.10.13" flume = "0.10.14"
futures = { version = "0.3.21", features = ["thread-pool"] } futures = { version = "0.3.21", features = ["thread-pool"] }
hashbrown = "0.12.3" hashbrown = "0.12.3"
indexmap = "1.9.1"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
notify = "4.0.17" notify = "4.0.17"
num = "0.4.0" num = "0.4.0"
@ -34,14 +35,14 @@ proctitle = "0.1.1"
regex = "1.6.0" regex = "1.6.0"
reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] }
rustc-hash = "1.1.0" rustc-hash = "1.1.0"
siwe = "0.3.0" siwe = "0.4.0"
serde = { version = "1.0.139", features = [] } serde = { version = "1.0.140", features = [] }
serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] } serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.20.0", features = ["full", "tracing"] } tokio = { version = "1.20.0", features = ["full", "tracing"] }
toml = "0.5.9" toml = "0.5.9"
tracing = "0.1.35" tracing = "0.1.35"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use # TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.14", features = ["env-filter", "parking_lot"] } tracing-subscriber = { version = "0.3.15", features = ["env-filter", "parking_lot"] }
url = "2.2.2" url = "2.2.2"
tower = "0.4.13" tower = "0.4.13"
tokio-stream = { version = "0.1.9", features = ["sync"] } tokio-stream = { version = "0.1.9", features = ["sync"] }

View File

@ -140,13 +140,15 @@ fn get_min_block_needed(
"eth_estimateGas" => 1, "eth_estimateGas" => 1,
"eth_getBalance" => 1, "eth_getBalance" => 1,
"eth_getBlockByHash" => { "eth_getBlockByHash" => {
// TODO: double check that any node can serve this
return None; return None;
} }
"eth_getBlockByNumber" => { "eth_getBlockByNumber" => {
// TODO: double check that any node can serve this
return None; return None;
} }
"eth_getBlockTransactionCountByHash" => { "eth_getBlockTransactionCountByHash" => {
// TODO: turn block hash into number and check. will need a linkedhashmap of recent hashes // TODO: double check that any node can serve this
return None; return None;
} }
"eth_getBlockTransactionCountByNumber" => 0, "eth_getBlockTransactionCountByNumber" => 0,
@ -254,7 +256,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseLrcCache, response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working // don't drop this or the sender will stop working
// TODO: broadcast channel instead? // TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Block<TxHash>>, head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxState>, pending_tx_sender: broadcast::Sender<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>, pending_transactions: Arc<DashMap<TxHash, TxState>>,
public_rate_limiter: Option<RedisCellClient>, public_rate_limiter: Option<RedisCellClient>,
@ -332,10 +334,11 @@ impl Web3ProxyApp {
} }
}; };
let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be? // TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16); let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16);
// TODO: this will grow unbounded!! add some expiration to this. and probably move to redis
let pending_transactions = Arc::new(DashMap::new()); let pending_transactions = Arc::new(DashMap::new());
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
@ -457,7 +460,7 @@ impl Web3ProxyApp {
"method":"eth_subscription", "method":"eth_subscription",
"params": { "params": {
"subscription": subscription_id, "subscription": subscription_id,
"result": new_head, "result": new_head.as_ref(),
}, },
}); });
@ -846,6 +849,9 @@ impl Web3ProxyApp {
// TODO: eth_estimateGas using anvil? // TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future // TODO: eth_gasPrice that does awesome magic to predict the future
// TODO: eth_getBlockByHash from caches // TODO: eth_getBlockByHash from caches
"eth_getBlockByHash" => {
unimplemented!("wip")
}
// TODO: eth_getBlockByNumber from caches // TODO: eth_getBlockByNumber from caches
// TODO: eth_getBlockTransactionCountByHash from caches // TODO: eth_getBlockTransactionCountByHash from caches
// TODO: eth_getBlockTransactionCountByNumber from caches // TODO: eth_getBlockTransactionCountByNumber from caches

View File

@ -8,6 +8,8 @@ use tokio::sync::broadcast;
use crate::app::AnyhowJoinHandle; use crate::app::AnyhowJoinHandle;
use crate::connection::Web3Connection; use crate::connection::Web3Connection;
pub type BlockAndRpc = (Arc<Block<TxHash>>, Arc<Web3Connection>);
#[derive(Debug, FromArgs)] #[derive(Debug, FromArgs)]
/// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. /// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
pub struct CliConfig { pub struct CliConfig {
@ -58,7 +60,7 @@ impl Web3ConnectionConfig {
chain_id: u64, chain_id: u64,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Web3Connection>)>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_client_pool.unwrap())); let hard_rate_limit = self.hard_limit.map(|x| (x, redis_client_pool.unwrap()));

View File

@ -17,6 +17,7 @@ use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{error, info, instrument, trace, warn}; use tracing::{error, info, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592 /// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)] #[derive(From)]
@ -142,7 +143,7 @@ impl Web3Connection {
hard_limit: Option<(u32, redis_cell_client::RedisClientPool)>, hard_limit: Option<(u32, redis_cell_client::RedisClientPool)>,
// TODO: think more about this type // TODO: think more about this type
soft_limit: u32, soft_limit: u32,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool, reconnect: bool,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
@ -286,7 +287,7 @@ impl Web3Connection {
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn reconnect( pub async fn reconnect(
self: &Arc<Self>, self: &Arc<Self>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// websocket doesn't need the http client // websocket doesn't need the http client
let http_client = None; let http_client = None;
@ -302,7 +303,7 @@ impl Web3Connection {
// tell the block subscriber that we are at 0 // tell the block subscriber that we are at 0
if let Some(block_sender) = block_sender { if let Some(block_sender) = block_sender {
block_sender block_sender
.send_async((Block::default(), self.clone())) .send_async((Arc::new(Block::default()), self.clone()))
.await .await
.context("block_sender at 0")?; .context("block_sender at 0")?;
} }
@ -334,7 +335,7 @@ impl Web3Connection {
async fn send_block_result( async fn send_block_result(
self: &Arc<Self>, self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>, block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(Block<TxHash>, Arc<Self>)>, block_sender: &flume::Sender<BlockAndRpc>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match block { match block {
Ok(block) => { Ok(block) => {
@ -348,7 +349,7 @@ impl Web3Connection {
} }
block_sender block_sender
.send_async((block, self.clone())) .send_async((Arc::new(block), self.clone()))
.await .await
.context("block_sender")?; .context("block_sender")?;
} }
@ -363,7 +364,7 @@ impl Web3Connection {
async fn subscribe( async fn subscribe(
self: Arc<Self>, self: Arc<Self>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool, reconnect: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -428,7 +429,7 @@ impl Web3Connection {
async fn subscribe_new_heads( async fn subscribe_new_heads(
self: Arc<Self>, self: Arc<Self>,
http_interval_receiver: Option<broadcast::Receiver<()>>, http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>, block_sender: flume::Sender<BlockAndRpc>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
info!("watching {}", self); info!("watching {}", self);
@ -446,7 +447,6 @@ impl Web3Connection {
loop { loop {
match self.try_request_handle().await { match self.try_request_handle().await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = active_request_handle let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false)) .request("eth_getBlockByNumber", ("latest", false))
.await; .await;

View File

@ -9,6 +9,7 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::{IndexMap, IndexSet};
// use parking_lot::RwLock; // use parking_lot::RwLock;
// use petgraph::graphmap::DiGraphMap; // use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
@ -16,7 +17,6 @@ use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::cmp; use std::cmp;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -36,8 +36,9 @@ struct SyncedConnections {
head_block_num: u64, head_block_num: u64,
head_block_hash: H256, head_block_hash: H256,
// TODO: this should be able to serialize, but it isn't // TODO: this should be able to serialize, but it isn't
// TODO: use linkedhashmap?
#[serde(skip_serializing)] #[serde(skip_serializing)]
inner: BTreeSet<Arc<Web3Connection>>, inner: IndexSet<Arc<Web3Connection>>,
} }
impl fmt::Debug for SyncedConnections { impl fmt::Debug for SyncedConnections {
@ -60,6 +61,48 @@ impl SyncedConnections {
} }
} }
#[derive(Default)]
pub struct BlockChain {
/// only includes blocks on the main chain.
chain_map: DashMap<U64, Arc<Block<TxHash>>>,
/// all blocks, including orphans
block_map: DashMap<H256, Arc<Block<TxHash>>>,
// TODO: petgraph?
}
impl BlockChain {
pub fn add_block(&self, block: Arc<Block<TxHash>>, cannonical: bool) {
let hash = block.hash.unwrap();
if cannonical {
let num = block.number.unwrap();
let entry = self.chain_map.entry(num);
let mut is_new = false;
entry.or_insert_with(|| {
is_new = true;
block.clone()
});
if !is_new {
return;
}
}
self.block_map.entry(hash).or_insert(block);
}
pub fn get_block(&self, num: &U64) -> Option<Arc<Block<TxHash>>> {
self.chain_map.get(num).map(|x| x.clone())
}
pub fn get_block_from_hash(&self, hash: &H256) -> Option<Arc<Block<TxHash>>> {
self.block_map.get(hash).map(|x| x.clone())
}
}
/// A collection of web3 connections. Sends requests either the current best server or all servers. /// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)] #[derive(From)]
pub struct Web3Connections { pub struct Web3Connections {
@ -69,7 +112,7 @@ pub struct Web3Connections {
// TODO: i think chain is what we want, but i'm not sure how we'll use it yet // TODO: i think chain is what we want, but i'm not sure how we'll use it yet
// TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? // TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
// chain: Arc<RwLock<DiGraphMap<H256, Block<TxHash>>>>, // chain: Arc<RwLock<DiGraphMap<H256, Block<TxHash>>>>,
block_map: DashMap<u64, BTreeSet<H256>>, chain: BlockChain,
} }
impl Serialize for Web3Connections { impl Serialize for Web3Connections {
@ -103,12 +146,13 @@ impl Web3Connections {
server_configs: Vec<Web3ConnectionConfig>, server_configs: Vec<Web3ConnectionConfig>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_cell_client::RedisClientPool>, redis_client_pool: Option<redis_cell_client::RedisClientPool>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>, head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>, pending_tx_sender: Option<broadcast::Sender<TxState>>,
pending_transactions: Arc<DashMap<TxHash, TxState>>, pending_transactions: Arc<DashMap<TxHash, TxState>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded(); let (block_sender, block_receiver) =
flume::unbounded::<(Arc<Block<H256>>, Arc<Web3Connection>)>();
let http_interval_sender = if http_client.is_some() { let http_interval_sender = if http_client.is_some() {
let (sender, receiver) = broadcast::channel(1); let (sender, receiver) = broadcast::channel(1);
@ -202,7 +246,7 @@ impl Web3Connections {
inner: connections, inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)), synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions, pending_transactions,
block_map: Default::default(), chain: Default::default(),
}); });
let handle = { let handle = {
@ -314,8 +358,8 @@ impl Web3Connections {
async fn subscribe( async fn subscribe(
self: Arc<Self>, self: Arc<Self>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>, block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>, head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxState>>, pending_tx_sender: Option<broadcast::Sender<TxState>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut futures = vec![]; let mut futures = vec![];
@ -377,14 +421,11 @@ impl Web3Connections {
Ok(()) Ok(())
} }
pub async fn get_block_hash(&self, num: U64) -> anyhow::Result<H256> { pub async fn get_block(&self, num: U64) -> anyhow::Result<Arc<Block<TxHash>>> {
// first, try to get the hash from our cache if let Some(block) = self.chain.get_block(&num) {
// TODO: move this cache to redis?
if let Some(hash) = self.block_map.get(&num.as_u64()) {
// for now, just return the first seen block. we actually want the winning block! // for now, just return the first seen block. we actually want the winning block!
return Ok(*hash.iter().next().unwrap()); // TODO: don't clone
return Ok(block);
unimplemented!("use petgraph to find the heaviest chain");
} }
let head_block_num = self.get_head_block_num(); let head_block_num = self.get_head_block_num();
@ -407,14 +448,24 @@ impl Web3Connections {
.try_send_best_upstream_server(request, Some(num)) .try_send_best_upstream_server(request, Some(num))
.await?; .await?;
let block = response.result.unwrap().to_string(); let block = response.result.unwrap();
let block: Block<TxHash> = serde_json::from_str(&block)?; let block: Block<TxHash> = serde_json::from_str(block.get())?;
let block = Arc::new(block);
self.chain.add_block(block.clone(), true);
Ok(block)
}
pub async fn get_block_hash(&self, num: U64) -> anyhow::Result<H256> {
// first, try to get the hash from our cache
// TODO: move this cache to redis?
let block = self.get_block(num).await?;
let hash = block.hash.unwrap(); let hash = block.hash.unwrap();
self.block_map.entry(num.as_u64()).or_default().insert(hash);
Ok(hash) Ok(hash)
} }
@ -507,8 +558,8 @@ impl Web3Connections {
// we don't instrument here because we put a span inside the while loop // we don't instrument here because we put a span inside the while loop
async fn update_synced_rpcs( async fn update_synced_rpcs(
&self, &self,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>, block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
head_block_sender: watch::Sender<Block<TxHash>>, head_block_sender: watch::Sender<Arc<Block<TxHash>>>,
// TODO: use pending_tx_sender // TODO: use pending_tx_sender
pending_tx_sender: Option<broadcast::Sender<TxState>>, pending_tx_sender: Option<broadcast::Sender<TxState>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -546,6 +597,8 @@ impl Web3Connections {
warn!("still syncing"); warn!("still syncing");
} }
let mut new_head_block = false;
connection_states.insert(rpc.clone(), (new_block_num, new_block_hash)); connection_states.insert(rpc.clone(), (new_block_num, new_block_hash));
// TODO: do something to update the synced blocks // TODO: do something to update the synced blocks
@ -572,10 +625,10 @@ impl Web3Connections {
// TODO: mark all transactions as confirmed // TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed // TODO: mark any orphaned transactions as unconfirmed
self.block_map // TODO: do not mark cannonical until a threshold of RPCs have this block!
.entry(new_block_num) new_head_block = true;
.or_default()
.insert(new_block_hash); self.chain.add_block(new_block.clone(), new_head_block);
} }
cmp::Ordering::Equal => { cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash { if new_block_hash == pending_synced_connections.head_block_hash {
@ -587,14 +640,10 @@ impl Web3Connections {
} else { } else {
// same height, but different chain // same height, but different chain
self.block_map
.entry(new_block_num)
.or_default()
.insert(new_block_hash);
// check connection_states to see which head block is more popular! // check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<Arc<Web3Connection>>> = // TODO: i don't think btreemap is what we want. i think we want indexmap or linkedhashmap
BTreeMap::new(); let mut rpc_ids_by_block =
IndexMap::<H256, Vec<Arc<Web3Connection>>>::new();
let mut counted_rpcs = 0; let mut counted_rpcs = 0;
@ -630,11 +679,11 @@ impl Web3Connections {
most_common_head_hash most_common_head_hash
); );
self.chain
.add_block(new_block.clone(), new_block_hash == most_common_head_hash);
// TODO: do this more efficiently? // TODO: do this more efficiently?
if pending_synced_connections.head_block_hash != most_common_head_hash { if pending_synced_connections.head_block_hash != most_common_head_hash {
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
pending_synced_connections.head_block_hash = most_common_head_hash; pending_synced_connections.head_block_hash = most_common_head_hash;
} }
@ -687,6 +736,15 @@ impl Web3Connections {
// TODO: do this before or after processing all the transactions in this block? // TODO: do this before or after processing all the transactions in this block?
self.synced_connections self.synced_connections
.swap(Arc::new(pending_synced_connections.clone())); .swap(Arc::new(pending_synced_connections.clone()));
if new_head_block {
// TODO: this will need a refactor to only send once a minmum threshold has this block
// TODO: move this onto self.chain
// TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
}
} }
// TODO: if there was an error, we should return it // TODO: if there was an error, we should return it