diff --git a/src/block_watcher.rs b/src/block_watcher.rs new file mode 100644 index 00000000..f50fc2c8 --- /dev/null +++ b/src/block_watcher.rs @@ -0,0 +1,85 @@ +use ethers::prelude::{Block, TxHash}; +use governor::clock::{Clock, QuantaClock, QuantaInstant}; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tracing::{info, warn}; + +// TODO: what type for the Item? String url works, but i don't love it +// TODO: think about boxing this +#[derive(Debug)] +pub enum BlockWatcherItem { + NewHead((String, Block)), + SubscribeHttp(String), + Interval, +} + +pub type BlockWatcherSender = mpsc::UnboundedSender; +pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; + +pub struct BlockWatcher { + clock: QuantaClock, + receiver: BlockWatcherReceiver, + last_update: QuantaInstant, + /// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map + blocks: HashMap>, +} + +impl BlockWatcher { + pub fn new(clock: QuantaClock) -> (BlockWatcher, BlockWatcherSender) { + // TODO: this also needs to return a reader for blocks + let (sender, receiver) = mpsc::unbounded_channel(); + + let last_update = clock.now(); + + let watcher = Self { + clock, + last_update, + receiver, + blocks: Default::default(), + }; + + (watcher, sender) + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + // TODO: + + while let Some(x) = self.receiver.recv().await { + match x { + BlockWatcherItem::Interval => { + // TODO: we got an interval. if we haven't updated the blocks recently, + } + BlockWatcherItem::NewHead((rpc, block)) => { + info!( + "{:?} = {} Ts: {:?}, block number: {}", + block.hash.unwrap(), + rpc, + block.timestamp, + block.number.unwrap(), + ); + self.blocks.insert(rpc, block); + + self.last_update = self.clock.now(); + } + BlockWatcherItem::SubscribeHttp(rpc) => { + warn!("subscribing to {} is not yet supported", rpc); + } + } + } + + Ok(()) + } +} + +/* +pub async fn save_block( + &self, + url: String, + block_watcher_sender: BlockWatcherSender, + block: Block, +) -> anyhow::Result<()> { + // TODO: include the block age (compared to local time) in this, too + // TODO: use tracing properly + +} +*/ diff --git a/src/main.rs b/src/main.rs index f2f0fd2f..6c16947c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,26 @@ // TODO: don't use RwLock. i think we need a concurrent hashmap or we will hit all sorts of deadlocks +mod block_watcher; +mod provider; -use derive_more::From; -use ethers::prelude::{Block, TxHash}; -use ethers::providers::Middleware; use futures::future; -use futures::StreamExt; use governor::clock::{Clock, QuantaClock, QuantaInstant}; use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; -use governor::{NotUntil, RateLimiter}; -use std::cmp::Ordering; +use governor::NotUntil; +use governor::RateLimiter; use std::collections::HashMap; use std::num::NonZeroU32; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, RwLock}; use tokio::time::sleep; -use tracing::info; +use tracing::log::warn; use warp::Filter; +// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; +use crate::block_watcher::{BlockWatcher, BlockWatcherSender}; +use crate::provider::Web3Connection; + static APP_USER_AGENT: &str = concat!( "satoshiandkin/", env!("CARGO_PKG_NAME"), @@ -29,173 +31,24 @@ static APP_USER_AGENT: &str = concat!( type RpcRateLimiter = RateLimiter>; -type BlockMap = RwLock>>; -type RateLimiterMap = RwLock>; -// TODO: include the ethers client on this map -type ConnectionsMap = RwLock>; - -// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 -#[derive(From)] -enum EthersProvider { - Http(ethers::providers::Provider), - Ws(ethers::providers::Provider), -} - -/// Forward functions to the inner ethers::providers::Provider -impl EthersProvider { - /// Send a web3 request - pub async fn request( - &self, - method: &str, - params: serde_json::Value, - ) -> Result { - match self { - Self::Http(provider) => provider.request(method, params).await, - Self::Ws(provider) => provider.request(method, params).await, - } - } - - /// Subscribe to new block heads - pub async fn new_heads(&self, url: String, blocks: Arc) -> anyhow::Result<()> { - // TODO: automatically reconnect - match &self { - EthersProvider::Http(provider) => { - let mut stream = provider.watch_blocks().await?; - while let Some(block_number) = stream.next().await { - let block = provider.get_block(block_number).await?.unwrap(); - - println!( - "{:?} = {} Ts: {:?}, block number: {}", - block.hash.unwrap(), - url, - block.timestamp, - block.number.unwrap(), - ); - - let mut blocks = blocks.write().await; - - blocks.insert(url.clone(), block); - } - } - EthersProvider::Ws(provider) => { - let mut stream = provider.subscribe_blocks().await?; - while let Some(block) = stream.next().await { - // TODO: save the block into a dashmap on - println!( - "{:?} = {} Ts: {:?}, block number: {}", - block.hash.unwrap(), - url, - block.timestamp, - block.number.unwrap(), - ); - - let mut blocks = blocks.write().await; - - blocks.insert(url.clone(), block); - } - } - } - - Ok(()) - } -} - -/// An active connection to a Web3Rpc -struct EthersConnection { - /// keep track of currently open requests. We sort on this - active_requests: u32, - provider: Arc, -} - -impl EthersConnection { - /// Connect to a web3 rpc and subscribe to new heads - async fn try_new( - url_str: String, - http_client: Option, - blocks: Arc, - ) -> anyhow::Result { - // TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task - let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.parse()?; - - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; - - let provider = ethers::providers::Http::new_with_client(url, http_client); - - // TODO: dry this up - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else if url_str.starts_with("ws") { - let provider = ethers::providers::Ws::connect(url_str.clone()).await?; - - // TODO: make sure this survives disconnects - - // TODO: dry this up - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - let provider = Arc::new(provider); - - // subscribe to new heads in a spawned future - // TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check? - let provider_clone: Arc = Arc::clone(&provider); - tokio::spawn(async move { provider_clone.new_heads(url_str, blocks).await }); - - Ok(EthersConnection { - active_requests: 0, - provider, - }) - } - - fn inc_active_requests(&mut self) { - self.active_requests += 1; - } - - fn dec_active_requests(&mut self) { - self.active_requests -= 1; - } -} - -impl Eq for EthersConnection {} - -impl Ord for EthersConnection { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.active_requests.cmp(&other.active_requests) - } -} - -impl PartialOrd for EthersConnection { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// note that this is just comparing the active requests. two providers with different rpc urls are equal! -impl PartialEq for EthersConnection { - fn eq(&self, other: &Self) -> bool { - self.active_requests == other.active_requests - } -} +type RpcRateLimiterMap = RwLock>; +type ConnectionsMap = RwLock>; /// Load balance to the rpc +/// TODO: i'm not sure about having 3 locks here. can we share them? struct RpcTier { - /// RPC urls sorted by - /// TODO: what type? + /// RPC urls sorted by active requests + /// TODO: what type for the rpc? rpcs: RwLock>, connections: Arc, - ratelimits: RateLimiterMap, + ratelimits: RpcRateLimiterMap, } impl RpcTier { async fn try_new( servers: Vec<(&str, u32)>, http_client: Option, - blocks: Arc, + block_watcher_sender: BlockWatcherSender, clock: &QuantaClock, ) -> anyhow::Result { let mut rpcs: Vec = vec![]; @@ -205,9 +58,12 @@ impl RpcTier { for (s, limit) in servers.into_iter() { rpcs.push(s.to_string()); - let connection = - EthersConnection::try_new(s.to_string(), http_client.clone(), blocks.clone()) - .await?; + let connection = Web3Connection::try_new( + s.to_string(), + http_client.clone(), + block_watcher_sender.clone(), + ) + .await?; connections.insert(s.to_string(), connection); @@ -220,65 +76,6 @@ impl RpcTier { } } - /* - let new_heads_handles = rpcs - .clone() - .into_iter() - .map(|rpc| { - // start the subscription inside an abort handler. this way, dropping this BalancedRpcs will close these connections - let (abort_handle, abort_registration) = AbortHandle::new_pair(); - - tokio::spawn(Abortable::new( - async move { - // replace "http" at the start with "ws" - // TODO: this is fragile. some nodes use different ports, too. use proper config - // TODO: maybe we should use this websocket for more than just the new heads subscription. we could send all our requests over it (but would need to modify ids) - let re = Regex::new("^http").expect("bad regex"); - let ws_rpc = re.replace(&rpc, "ws"); - - // TODO: if websocket not supported, use polling? - let ws_rpc = url::Url::parse(&ws_rpc).expect("invalid websocket url"); - - // loop so that if it disconnects, we reconnect - loop { - match connect_async(&ws_rpc).await { - Ok((ws_stream, _)) => { - let (mut write, mut read) = ws_stream.split(); - - // TODO: send eth_subscribe New Heads - if (write.send(tungstenite::Message::Text("{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}".to_string())).await).is_ok() { - if let Some(Ok(_first)) = read.next().await { - // TODO: what should we do with the first message? - - while let Some(Ok(message)) = read.next().await { - if let Err(e) = handle_new_head_message(message).await { - eprintln!("error handling new head message @ {}: {}", ws_rpc, e); - break; - } - } - } - // no more messages or we got an error - } - } - Err(e) => { - // TODO: proper logging - eprintln!("error connecting to websocket @ {}: {}", ws_rpc, e); - } - } - - // TODO: log that we are going to reconnectto ws_rpc in 1 second - // TODO: how long should we wait? exponential backoff? - sleep(Duration::from_secs(1)).await; - } - }, - abort_registration, - )); - - abort_handle - }) - .collect(); - */ - Ok(RpcTier { rpcs: RwLock::new(rpcs), connections: Arc::new(RwLock::new(connections)), @@ -408,8 +205,8 @@ impl RpcTier { } } -/// Application state -struct Web3ProxyState { +/// The application +struct Web3ProxyApp { /// clock used for rate limiting /// TODO: use tokio's clock (will require a different ratelimiting crate) clock: QuantaClock, @@ -422,28 +219,32 @@ struct Web3ProxyState { private_rpcs_ratelimiter_lock: RwLock<()>, } -impl Web3ProxyState { +impl Web3ProxyApp { async fn try_new( balanced_rpc_tiers: Vec>, private_rpcs: Vec<(&str, u32)>, - ) -> anyhow::Result { + ) -> anyhow::Result { let clock = QuantaClock::default(); - let blocks = Arc::new(BlockMap::default()); + let (mut block_watcher, block_watcher_sender) = BlockWatcher::new(clock.clone()); + // make a http shared client + // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something let http_client = reqwest::ClientBuilder::new() .timeout(Duration::from_secs(300)) .user_agent(APP_USER_AGENT) .build()?; - // TODO: i'm sure we s + // start the block_watcher + tokio::spawn(async move { block_watcher.run().await }); + let balanced_rpc_tiers = Arc::new( future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { RpcTier::try_new( balanced_rpc_tier, Some(http_client.clone()), - blocks.clone(), + block_watcher_sender.clone(), &clock, ) })) @@ -456,12 +257,18 @@ impl Web3ProxyState { None } else { Some(Arc::new( - RpcTier::try_new(private_rpcs, Some(http_client), blocks.clone(), &clock).await?, + RpcTier::try_new( + private_rpcs, + Some(http_client), + block_watcher_sender, + &clock, + ) + .await?, )) }; // TODO: warn if no private relays - Ok(Web3ProxyState { + Ok(Web3ProxyApp { clock, balanced_rpc_tiers, private_rpcs, @@ -473,7 +280,7 @@ impl Web3ProxyState { /// send the request to the approriate RPCs /// TODO: dry this up async fn proxy_web3_rpc( - self: Arc, + self: Arc, json_body: serde_json::Value, ) -> anyhow::Result { let eth_send_raw_transaction = @@ -635,7 +442,7 @@ impl Web3ProxyState { async move { // get the client for this rpc server - let provider = connections.read().await.get(&rpc).unwrap().provider.clone(); + let provider = connections.read().await.get(&rpc).unwrap().clone_provider(); let response = provider.request(&method, params).await; @@ -648,6 +455,7 @@ impl Web3ProxyState { let mut response = response?; + // replace the id with what we originally received if let Some(response_id) = response.get_mut("id") { *response_id = incoming_id; } @@ -669,24 +477,25 @@ impl Web3ProxyState { Ok(_) => {} Err(e) => { // TODO: better errors - eprintln!("Got a tokio::JoinError: {}", e); - errs.push(anyhow::anyhow!("Got a tokio::JoinError")); + warn!("Got an error sending request: {}", e); + errs.push(e); } } } + // get the first error (if any) let e: anyhow::Result = if !errs.is_empty() { Err(errs.pop().unwrap()) } else { Err(anyhow::anyhow!("no successful responses")) }; - // TODO: think about this more. we want to send it + // send the error to the channel if tx.send(e).is_ok() { // if we were able to send an error, then we never sent a success return Err(anyhow::anyhow!("no successful responses")); } else { - // sending the error failed. the other side must be closed (which means we sent a success) + // if sending the error failed. the other side must be closed (which means we sent a success earlier) Ok(()) } } @@ -697,15 +506,13 @@ async fn main() { // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt::init(); - info!("starting"); - // TODO: load the config from yaml instead of hard coding // TODO: support multiple chains in one process. then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else // TODO: i kind of want to make use of caddy's load balancing and health checking and such though let listen_port = 8445; // TODO: be smart about about using archive nodes? - let state = Web3ProxyState::try_new( + let state = Web3ProxyApp::try_new( vec![ // local nodes vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], @@ -726,7 +533,7 @@ async fn main() { .await .unwrap(); - let state: Arc = Arc::new(state); + let state: Arc = Arc::new(state); let proxy_rpc_filter = warp::any() .and(warp::post()) @@ -734,8 +541,6 @@ async fn main() { .then(move |json_body| state.clone().proxy_web3_rpc(json_body)) .map(handle_anyhow_errors); - println!("Listening on 0.0.0.0:{}", listen_port); - warp::serve(proxy_rpc_filter) .run(([0, 0, 0, 0], listen_port)) .await; diff --git a/src/provider.rs b/src/provider.rs new file mode 100644 index 00000000..e00d0904 --- /dev/null +++ b/src/provider.rs @@ -0,0 +1,163 @@ +use derive_more::From; +use ethers::prelude::Middleware; +use futures::StreamExt; +use std::time::Duration; +use std::{cmp::Ordering, sync::Arc}; +use tracing::{info, warn}; + +use crate::block_watcher::{BlockWatcherItem, BlockWatcherSender}; + +// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 +#[derive(From)] +pub enum Web3Provider { + Http(ethers::providers::Provider), + Ws(ethers::providers::Provider), +} + +/// Forward functions to the inner ethers::providers::Provider +impl Web3Provider { + /// Send a web3 request + pub async fn request( + &self, + method: &str, + params: serde_json::Value, + ) -> Result { + match self { + Self::Http(provider) => provider.request(method, params).await, + Self::Ws(provider) => provider.request(method, params).await, + } + } + + /// Subscribe to new blocks + pub async fn new_heads( + &self, + url: String, + block_watcher_sender: BlockWatcherSender, + ) -> anyhow::Result<()> { + info!("Watching new_heads from {}", url); + + // TODO: automatically reconnect + match &self { + Web3Provider::Http(_provider) => { + /* + // TODO: not all providers have this. we need to write our interval checking + let mut stream = provider.watch_blocks().await?; + while let Some(block_number) = stream.next().await { + let block = provider.get_block(block_number).await?.expect("no block"); + block_watcher_sender + .send(Some((url.clone(), block))) + .unwrap(); + } + */ + block_watcher_sender + .send(BlockWatcherItem::SubscribeHttp(url.clone())) + .unwrap(); + } + Web3Provider::Ws(provider) => { + let mut stream = provider.subscribe_blocks().await?; + while let Some(block) = stream.next().await { + block_watcher_sender + .send(BlockWatcherItem::NewHead((url.clone(), block))) + .unwrap(); + } + } + } + + info!("Done watching new_heads from {}", url); + + Ok(()) + } +} + +/// An active connection to a Web3Rpc +pub struct Web3Connection { + /// keep track of currently open requests. We sort on this + active_requests: u32, + provider: Arc, +} + +impl Web3Connection { + pub fn clone_provider(&self) -> Arc { + self.provider.clone() + } + + /// Connect to a web3 rpc and subscribe to new heads + pub async fn try_new( + url_str: String, + http_client: Option, + block_watcher_sender: BlockWatcherSender, + ) -> anyhow::Result { + // TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task + let provider = if url_str.starts_with("http") { + let url: url::Url = url_str.parse()?; + + let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + + let provider = ethers::providers::Http::new_with_client(url, http_client); + + // TODO: dry this up + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else if url_str.starts_with("ws") { + let provider = ethers::providers::Ws::connect(url_str.clone()).await?; + + // TODO: make sure this survives disconnects + + // TODO: dry this up + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else { + return Err(anyhow::anyhow!("only http and ws servers are supported")); + }; + + let provider = Arc::new(provider); + + // subscribe to new heads in a spawned future + // TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check? + let provider_clone: Arc = Arc::clone(&provider); + tokio::spawn(async move { + while let Err(e) = provider_clone + .new_heads(url_str.clone(), block_watcher_sender.clone()) + .await + { + warn!("new_heads error for {}: {:?}", url_str, e); + } + }); + + Ok(Web3Connection { + active_requests: 0, + provider, + }) + } + + pub fn inc_active_requests(&mut self) { + self.active_requests += 1; + } + + pub fn dec_active_requests(&mut self) { + self.active_requests -= 1; + } +} + +impl Eq for Web3Connection {} + +impl Ord for Web3Connection { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.active_requests.cmp(&other.active_requests) + } +} + +impl PartialOrd for Web3Connection { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// note that this is just comparing the active requests. two providers with different rpc urls are equal! +impl PartialEq for Web3Connection { + fn eq(&self, other: &Self) -> bool { + self.active_requests == other.active_requests + } +}