From 7705991b4c94d665da77142d9aa8962e0425e80a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 25 Apr 2022 19:14:10 +0000 Subject: [PATCH] watch new heads --- Cargo.lock | 125 ++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 23 +++++---- TODO.md | 3 ++ src/main.rs | 131 ++++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 252 insertions(+), 30 deletions(-) create mode 100644 TODO.md diff --git a/Cargo.lock b/Cargo.lock index 0d569783..110661aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,24 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.57" @@ -931,6 +949,23 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -1086,6 +1121,15 @@ dependencies = [ "digest 0.10.3", ] +[[package]] +name = "sharded-slab" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1162,6 +1206,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" +dependencies = [ + "once_cell", +] + [[package]] name = "tinyvec" version = "1.5.1" @@ -1239,7 +1292,19 @@ dependencies = [ "log", "pin-project", "tokio", - "tungstenite", + "tungstenite 0.14.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.17.2", ] [[package]] @@ -1307,6 +1372,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" dependencies = [ "lazy_static", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" +dependencies = [ + "ansi_term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -1334,6 +1425,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha-1 0.10.0", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "twoway" version = "0.1.8" @@ -1403,6 +1513,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1449,7 +1565,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.15.0", "tokio-util 0.6.9", "tower-service", "tracing", @@ -1553,9 +1669,14 @@ dependencies = [ "dashmap", "futures", "governor", + "regex", "reqwest", "serde_json", "tokio", + "tokio-tungstenite 0.17.1", + "tracing", + "tracing-subscriber", + "url", "warp", ] diff --git a/Cargo.toml b/Cargo.toml index e0738822..737fda48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,13 +6,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -argh = "0.1" -anyhow = "1.0" +argh = "0.1.7" +anyhow = "1.0.57" atomic-counter = "1.0.1" -dashmap = "5.2" -futures = { version = "0.3.0", features = ["thread-pool"]} -governor = { version = "0.4.2", features = ["dashmap", "std"]} -tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.11", features = ["json"] } -serde_json = { version = "1.0", default-features = false, features = ["alloc"] } -warp = "0.3" +dashmap = "5.2.0" +futures = { version = "0.3.21", features = ["thread-pool"] } +governor = { version = "0.4.2", features = ["dashmap", "std"] } +tokio = { version = "1.17.0", features = ["full"] } +regex = "1.5.5" +reqwest = { version = "0.11.10", features = ["json"] } +serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] } +tokio-tungstenite = "0.17.1" +tracing = "0.1" +tracing-subscriber = "0.3" +url = "2.2.2" +warp = "0.3.2" diff --git a/TODO.md b/TODO.md new file mode 100644 index 00000000..b33d2942 --- /dev/null +++ b/TODO.md @@ -0,0 +1,3 @@ +# Todo + +- [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high diff --git a/src/main.rs b/src/main.rs index 20ae069a..aabce56b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,19 @@ use dashmap::DashMap; use futures::future; +use futures::future::{AbortHandle, Abortable}; +use futures::SinkExt; +use futures::StreamExt; use governor::clock::{Clock, QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::{NotUntil, RateLimiter}; +use regex::Regex; use std::num::NonZeroU32; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use tokio::time::sleep; +use tokio_tungstenite::{connect_async, tungstenite}; use warp::Filter; type RateLimiterMap = DashMap; @@ -21,6 +27,25 @@ struct BalancedRpcs { rpcs: RwLock>, connections: ConnectionsMap, ratelimits: RateLimiterMap, + new_heads_handles: Vec, +} + +impl Drop for BalancedRpcs { + fn drop(&mut self) { + for handle in self.new_heads_handles.iter() { + handle.abort(); + } + } +} + +async fn handle_new_head_message(message: tungstenite::Message) -> anyhow::Result<()> { + // TODO: move this to a "handle_new_head_message" function so that we can use the ? helper + let data: serde_json::Value = serde_json::from_str(message.to_text().unwrap()).unwrap(); + + // TODO: parse the message as json and get out the block data. then update a map for this rpc + println!("now what? {:?}", data); + + unimplemented!(); } impl BalancedRpcs { @@ -42,13 +67,73 @@ impl BalancedRpcs { } } + // TODO: subscribe to new_heads + let new_heads_handles = rpcs + .clone() + .into_iter() + .map(|rpc| { + // start the subscription inside an abort handler. this way, dropping this BalancedRpcs will close these connections + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + + tokio::spawn(Abortable::new( + async move { + // replace "http" at the start with "ws" + // TODO: this is fragile. some nodes use different ports, too. use proper config + // TODO: maybe we should use this websocket for more than just the new heads subscription. we could send all our requests over it (but would need to modify ids) + let re = Regex::new("^http").expect("bad regex"); + let ws_rpc = re.replace(&rpc, "ws"); + + // TODO: if websocket not supported, use polling? + let ws_rpc = url::Url::parse(&ws_rpc).expect("invalid websocket url"); + + // loop so that if it disconnects, we reconnect + loop { + match connect_async(&ws_rpc).await { + Ok((ws_stream, _)) => { + let (mut write, mut read) = ws_stream.split(); + + // TODO: send eth_subscribe New Heads + if (write.send(tungstenite::Message::Text("{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}".to_string())).await).is_ok() { + if let Some(Ok(_first)) = read.next().await { + // TODO: what should we do with the first message? + + while let Some(Ok(message)) = read.next().await { + if let Err(e) = handle_new_head_message(message).await { + eprintln!("error handling new head message @ {}: {}", ws_rpc, e); + break; + } + } + } + // no more messages or we got an error + } + } + Err(e) => { + // TODO: proper logging + eprintln!("error connecting to websocket @ {}: {}", ws_rpc, e); + } + } + + // TODO: log that we are going to reconnectto ws_rpc in 1 second + // TODO: how long should we wait? exponential backoff? + sleep(Duration::from_secs(1)).await; + } + }, + abort_registration, + )); + + abort_handle + }) + .collect(); + BalancedRpcs { rpcs: RwLock::new(rpcs), connections, ratelimits, + new_heads_handles, } } + /// get the best available rpc server async fn get_upstream_server(&self) -> Result> { let mut balanced_rpcs = self.rpcs.write().await; @@ -95,7 +180,7 @@ impl BalancedRpcs { // return the smallest not_until if let Some(not_until) = earliest_not_until { - return Err(not_until); + Err(not_until) } else { unimplemented!(); } @@ -131,6 +216,7 @@ impl LoudRpcs { LoudRpcs { rpcs, ratelimits } } + /// get all available rpc servers async fn get_upstream_servers(&self) -> Result, NotUntil> { let mut earliest_not_until = None; @@ -160,24 +246,24 @@ impl LoudRpcs { } }; - // return the selected RPC + // this is rpc should work selected_rpcs.push(selected_rpc.clone()); } - if selected_rpcs.len() > 0 { + if !selected_rpcs.is_empty() { return Ok(selected_rpcs); } // return the earliest not_until if let Some(not_until) = earliest_not_until { - return Err(not_until); + Err(not_until) } else { panic!("i don't think this should happen") } } fn as_bool(&self) -> bool { - self.rpcs.len() > 0 + !self.rpcs.is_empty() } } @@ -199,15 +285,19 @@ impl Web3ProxyState { ) -> Web3ProxyState { let clock = QuantaClock::default(); + let balanced_rpc_tiers = balanced_rpc_tiers + .into_iter() + .map(|servers| BalancedRpcs::new(servers, &clock)) + .collect(); + + let private_rpcs = LoudRpcs::new(private_rpcs, &clock); + // TODO: warn if no private relays Web3ProxyState { - clock: clock.clone(), + clock, client: reqwest::Client::new(), - balanced_rpc_tiers: balanced_rpc_tiers - .into_iter() - .map(|servers| BalancedRpcs::new(servers, &clock)) - .collect(), - private_rpcs: LoudRpcs::new(private_rpcs, &clock), + balanced_rpc_tiers, + private_rpcs, balanced_rpc_ratelimiter_lock: Default::default(), private_rpcs_ratelimiter_lock: Default::default(), } @@ -237,6 +327,7 @@ impl Web3ProxyState { } } Err(not_until) => { + // TODO: move this to a helper function // sleep (with a lock) until our rate limits should be available drop(read_lock); @@ -291,6 +382,7 @@ impl Web3ProxyState { } // we haven't returned an Ok, sleep and try again + // TODO: move this to a helper function drop(read_lock); let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; @@ -315,18 +407,19 @@ impl Web3ProxyState { let client = self.client.clone(); let json_body = json_body.clone(); tokio::spawn(async move { - // TODO: there has to be a better way to do this map and map_err + // TODO: there has to be a better way to attach the url to the result client .post(&url) .json(&json_body) .send() .await - // add the url to the error so that we can decrement + // add the url to the error so that we can reduce connection counters .map_err(|e| (url.clone(), e))? .text() .await - // add the url to the result and the error so that we can decrement + // add the url to the result so that we can reduce connection counters .map(|t| (url.clone(), t)) + // add the url to the error so that we can reduce connection counters .map_err(|e| (url, e)) }) })) @@ -345,7 +438,7 @@ impl Web3ProxyState { *connections.get_mut(&url).unwrap() -= 1; } - // TODO: if "no block with that header", skip this response (maybe retry) + // TODO: if "no block with that header" or some other jsonrpc errors, skip this response oks.push(b); } Ok(Err((url, e))) => { @@ -367,10 +460,10 @@ impl Web3ProxyState { } // TODO: which response should we use? - if oks.len() > 0 { - return Ok(oks.pop().unwrap()); - } else if errs.len() > 0 { - return Err(errs.pop().unwrap()); + if !oks.is_empty() { + Ok(oks.pop().unwrap()) + } else if !errs.is_empty() { + Err(errs.pop().unwrap()) } else { return Err(anyhow::anyhow!("no successful responses")); }