watch new heads

This commit is contained in:
Bryan Stitt 2022-04-25 19:14:10 +00:00
parent ce4d92e40e
commit 7705991b4c
4 changed files with 252 additions and 30 deletions

125
Cargo.lock generated

@ -2,6 +2,24 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.57"
@ -931,6 +949,23 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
@ -1086,6 +1121,15 @@ dependencies = [
"digest 0.10.3",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -1162,6 +1206,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]]
name = "tinyvec"
version = "1.5.1"
@ -1239,7 +1292,19 @@ dependencies = [
"log",
"pin-project",
"tokio",
"tungstenite",
"tungstenite 0.14.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.17.2",
]
[[package]]
@ -1307,6 +1372,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
dependencies = [
"lazy_static",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596"
dependencies = [
"ansi_term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
]
[[package]]
@ -1334,6 +1425,25 @@ dependencies = [
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1 0.10.0",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "twoway"
version = "0.1.8"
@ -1403,6 +1513,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -1449,7 +1565,7 @@ dependencies = [
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-tungstenite 0.15.0",
"tokio-util 0.6.9",
"tower-service",
"tracing",
@ -1553,9 +1669,14 @@ dependencies = [
"dashmap",
"futures",
"governor",
"regex",
"reqwest",
"serde_json",
"tokio",
"tokio-tungstenite 0.17.1",
"tracing",
"tracing-subscriber",
"url",
"warp",
]

@ -6,13 +6,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
argh = "0.1"
anyhow = "1.0"
argh = "0.1.7"
anyhow = "1.0.57"
atomic-counter = "1.0.1"
dashmap = "5.2"
futures = { version = "0.3.0", features = ["thread-pool"]}
governor = { version = "0.4.2", features = ["dashmap", "std"]}
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json"] }
serde_json = { version = "1.0", default-features = false, features = ["alloc"] }
warp = "0.3"
dashmap = "5.2.0"
futures = { version = "0.3.21", features = ["thread-pool"] }
governor = { version = "0.4.2", features = ["dashmap", "std"] }
tokio = { version = "1.17.0", features = ["full"] }
regex = "1.5.5"
reqwest = { version = "0.11.10", features = ["json"] }
serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] }
tokio-tungstenite = "0.17.1"
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.2.2"
warp = "0.3.2"

3
TODO.md Normal file

@ -0,0 +1,3 @@
# Todo
- [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high

@ -1,13 +1,19 @@
use dashmap::DashMap;
use futures::future;
use futures::future::{AbortHandle, Abortable};
use futures::SinkExt;
use futures::StreamExt;
use governor::clock::{Clock, QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::{NotUntil, RateLimiter};
use regex::Regex;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite};
use warp::Filter;
type RateLimiterMap = DashMap<String, RpcRateLimiter>;
@ -21,6 +27,25 @@ struct BalancedRpcs {
rpcs: RwLock<Vec<String>>,
connections: ConnectionsMap,
ratelimits: RateLimiterMap,
new_heads_handles: Vec<AbortHandle>,
}
impl Drop for BalancedRpcs {
fn drop(&mut self) {
for handle in self.new_heads_handles.iter() {
handle.abort();
}
}
}
async fn handle_new_head_message(message: tungstenite::Message) -> anyhow::Result<()> {
// TODO: move this to a "handle_new_head_message" function so that we can use the ? helper
let data: serde_json::Value = serde_json::from_str(message.to_text().unwrap()).unwrap();
// TODO: parse the message as json and get out the block data. then update a map for this rpc
println!("now what? {:?}", data);
unimplemented!();
}
impl BalancedRpcs {
@ -42,13 +67,73 @@ impl BalancedRpcs {
}
}
// TODO: subscribe to new_heads
let new_heads_handles = rpcs
.clone()
.into_iter()
.map(|rpc| {
// start the subscription inside an abort handler. this way, dropping this BalancedRpcs will close these connections
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(Abortable::new(
async move {
// replace "http" at the start with "ws"
// TODO: this is fragile. some nodes use different ports, too. use proper config
// TODO: maybe we should use this websocket for more than just the new heads subscription. we could send all our requests over it (but would need to modify ids)
let re = Regex::new("^http").expect("bad regex");
let ws_rpc = re.replace(&rpc, "ws");
// TODO: if websocket not supported, use polling?
let ws_rpc = url::Url::parse(&ws_rpc).expect("invalid websocket url");
// loop so that if it disconnects, we reconnect
loop {
match connect_async(&ws_rpc).await {
Ok((ws_stream, _)) => {
let (mut write, mut read) = ws_stream.split();
// TODO: send eth_subscribe New Heads
if (write.send(tungstenite::Message::Text("{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}".to_string())).await).is_ok() {
if let Some(Ok(_first)) = read.next().await {
// TODO: what should we do with the first message?
while let Some(Ok(message)) = read.next().await {
if let Err(e) = handle_new_head_message(message).await {
eprintln!("error handling new head message @ {}: {}", ws_rpc, e);
break;
}
}
}
// no more messages or we got an error
}
}
Err(e) => {
// TODO: proper logging
eprintln!("error connecting to websocket @ {}: {}", ws_rpc, e);
}
}
// TODO: log that we are going to reconnectto ws_rpc in 1 second
// TODO: how long should we wait? exponential backoff?
sleep(Duration::from_secs(1)).await;
}
},
abort_registration,
));
abort_handle
})
.collect();
BalancedRpcs {
rpcs: RwLock::new(rpcs),
connections,
ratelimits,
new_heads_handles,
}
}
/// get the best available rpc server
async fn get_upstream_server(&self) -> Result<String, NotUntil<QuantaInstant>> {
let mut balanced_rpcs = self.rpcs.write().await;
@ -95,7 +180,7 @@ impl BalancedRpcs {
// return the smallest not_until
if let Some(not_until) = earliest_not_until {
return Err(not_until);
Err(not_until)
} else {
unimplemented!();
}
@ -131,6 +216,7 @@ impl LoudRpcs {
LoudRpcs { rpcs, ratelimits }
}
/// get all available rpc servers
async fn get_upstream_servers(&self) -> Result<Vec<String>, NotUntil<QuantaInstant>> {
let mut earliest_not_until = None;
@ -160,24 +246,24 @@ impl LoudRpcs {
}
};
// return the selected RPC
// this is rpc should work
selected_rpcs.push(selected_rpc.clone());
}
if selected_rpcs.len() > 0 {
if !selected_rpcs.is_empty() {
return Ok(selected_rpcs);
}
// return the earliest not_until
if let Some(not_until) = earliest_not_until {
return Err(not_until);
Err(not_until)
} else {
panic!("i don't think this should happen")
}
}
fn as_bool(&self) -> bool {
self.rpcs.len() > 0
!self.rpcs.is_empty()
}
}
@ -199,15 +285,19 @@ impl Web3ProxyState {
) -> Web3ProxyState {
let clock = QuantaClock::default();
let balanced_rpc_tiers = balanced_rpc_tiers
.into_iter()
.map(|servers| BalancedRpcs::new(servers, &clock))
.collect();
let private_rpcs = LoudRpcs::new(private_rpcs, &clock);
// TODO: warn if no private relays
Web3ProxyState {
clock: clock.clone(),
clock,
client: reqwest::Client::new(),
balanced_rpc_tiers: balanced_rpc_tiers
.into_iter()
.map(|servers| BalancedRpcs::new(servers, &clock))
.collect(),
private_rpcs: LoudRpcs::new(private_rpcs, &clock),
balanced_rpc_tiers,
private_rpcs,
balanced_rpc_ratelimiter_lock: Default::default(),
private_rpcs_ratelimiter_lock: Default::default(),
}
@ -237,6 +327,7 @@ impl Web3ProxyState {
}
}
Err(not_until) => {
// TODO: move this to a helper function
// sleep (with a lock) until our rate limits should be available
drop(read_lock);
@ -291,6 +382,7 @@ impl Web3ProxyState {
}
// we haven't returned an Ok, sleep and try again
// TODO: move this to a helper function
drop(read_lock);
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
@ -315,18 +407,19 @@ impl Web3ProxyState {
let client = self.client.clone();
let json_body = json_body.clone();
tokio::spawn(async move {
// TODO: there has to be a better way to do this map and map_err
// TODO: there has to be a better way to attach the url to the result
client
.post(&url)
.json(&json_body)
.send()
.await
// add the url to the error so that we can decrement
// add the url to the error so that we can reduce connection counters
.map_err(|e| (url.clone(), e))?
.text()
.await
// add the url to the result and the error so that we can decrement
// add the url to the result so that we can reduce connection counters
.map(|t| (url.clone(), t))
// add the url to the error so that we can reduce connection counters
.map_err(|e| (url, e))
})
}))
@ -345,7 +438,7 @@ impl Web3ProxyState {
*connections.get_mut(&url).unwrap() -= 1;
}
// TODO: if "no block with that header", skip this response (maybe retry)
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
oks.push(b);
}
Ok(Err((url, e))) => {
@ -367,10 +460,10 @@ impl Web3ProxyState {
}
// TODO: which response should we use?
if oks.len() > 0 {
return Ok(oks.pop().unwrap());
} else if errs.len() > 0 {
return Err(errs.pop().unwrap());
if !oks.is_empty() {
Ok(oks.pop().unwrap())
} else if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
return Err(anyhow::anyhow!("no successful responses"));
}