diff --git a/src/block_watcher.rs b/src/block_watcher.rs index cd1b2300..2da85614 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -17,15 +17,12 @@ pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; pub struct BlockWatcher { sender: BlockWatcherSender, receiver: Mutex, - // TODO: i don't think we want a RwLock. we want an ArcSwap or something - // TODO: should we just store the block number? block_numbers: DashMap, head_block_number: AtomicU64, } impl BlockWatcher { pub fn new() -> Self { - // TODO: this also needs to return a reader for blocks let (sender, receiver) = mpsc::unbounded_channel(); Self { @@ -56,7 +53,6 @@ impl BlockWatcher { } cmp::Ordering::Less => { // allow being some behind - // TODO: why do we need a clone here? let lag = head_block_number - *rpc_block_number; Ok(lag <= allowed_lag) } @@ -88,7 +84,7 @@ impl BlockWatcher { .as_secs() as i64; // save the block for this rpc - // TODO:store the actual chain as a graph and then have self.blocks point to that? + // TODO: store the actual chain as a graph and then have self.blocks point to that? self.block_numbers.insert(rpc.clone(), new_block_number); let head_number = self.head_block_number.load(atomic::Ordering::SeqCst); @@ -112,7 +108,6 @@ impl BlockWatcher { "+".to_string() } cmp::Ordering::Less => { - // TODO: include how many blocks behind? let lag = new_block_number as i64 - head_number as i64; lag.to_string() } diff --git a/src/main.rs b/src/main.rs index f47dd52e..8e9eb81c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,7 +46,7 @@ impl Web3ProxyApp { // make a http shared client // TODO: how should we configure the connection pool? - // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something + // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server let http_client = reqwest::ClientBuilder::new() .timeout(Duration::from_secs(300)) .user_agent(APP_USER_AGENT) @@ -74,6 +74,7 @@ impl Web3ProxyApp { ); let private_rpcs = if private_rpcs.is_empty() { + warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); None } else { Some(Arc::new( @@ -87,7 +88,6 @@ impl Web3ProxyApp { )) }; - // TODO: warn if no private relays Ok(Web3ProxyApp { block_watcher, clock, @@ -160,7 +160,6 @@ impl Web3ProxyApp { // this is not a private transaction (or no private relays are configured) // try to send to each tier, stopping at the first success loop { - // TODO: i'm not positive that this locking is correct let read_lock = self.balanced_rpc_ratelimiter_lock.read().await; // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again @@ -204,7 +203,6 @@ impl Web3ProxyApp { if earliest_not_until.is_none() { earliest_not_until = Some(not_until); } else { - // TODO: do we need to unwrap this far? can we just compare the not_untils let earliest_possible = earliest_not_until.as_ref().unwrap().earliest_possible(); let new_earliest_possible = not_until.earliest_possible(); @@ -277,6 +275,8 @@ impl Web3ProxyApp { let mut response = response?; + // TODO: if "no block with that header" or some other jsonrpc errors, skip this response + // replace the id with what we originally received if let Some(response_id) = response.get_mut("id") { *response_id = incoming_id; @@ -284,7 +284,6 @@ impl Web3ProxyApp { // send the first good response to a one shot channel. that way we respond quickly // drop the result because errors are expected after the first send - // TODO: if "no block with that header" or some other jsonrpc errors, skip this response let _ = tx.send(Ok(response)); Ok::<(), anyhow::Error>(()) @@ -329,11 +328,10 @@ async fn main() { tracing_subscriber::fmt::init(); // TODO: load the config from yaml instead of hard coding - // TODO: support multiple chains in one process. then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else - // TODO: i kind of want to make use of caddy's load balancing and health checking and such though + // TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else + // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable let listen_port = 8445; - // TODO: be smart about about using archive nodes? let state = Web3ProxyApp::try_new( vec![ // local nodes @@ -378,7 +376,6 @@ async fn main() { pub fn handle_anyhow_errors(res: anyhow::Result) -> Box { match res { Ok(r) => Box::new(r.into_response()), - // TODO: json error? Err(e) => Box::new(warp::reply::with_status( format!("{}", e), reqwest::StatusCode::INTERNAL_SERVER_ERROR, diff --git a/src/provider.rs b/src/provider.rs index 999fa418..16ab0afa 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -38,12 +38,11 @@ impl Web3Provider { ) -> anyhow::Result<()> { info!("Watching new_heads from {}", url); - // TODO: automatically reconnect match &self { Web3Provider::Http(provider) => { // TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: how often? - // TODO: maybe it would be better to have one interval for all of these, but this works for now + // TODO: what should this interval be? + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now let mut interval = interval(Duration::from_secs(2)); loop { @@ -58,6 +57,7 @@ impl Web3Provider { } } Web3Provider::Ws(provider) => { + // TODO: automatically reconnect? let mut stream = provider.subscribe_blocks().await?; while let Some(block) = stream.next().await { block_watcher_sender.send((url.clone(), block)).unwrap(); @@ -89,7 +89,6 @@ impl Web3Connection { http_client: Option, block_watcher_sender: BlockWatcherSender, ) -> anyhow::Result { - // TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; @@ -97,16 +96,16 @@ impl Web3Connection { let provider = ethers::providers::Http::new_with_client(url, http_client); - // TODO: dry this up + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) ethers::providers::Provider::new(provider) .interval(Duration::from_secs(1)) .into() } else if url_str.starts_with("ws") { let provider = ethers::providers::Ws::connect(url_str.clone()).await?; - // TODO: make sure this survives disconnects + // TODO: make sure this automatically reconnects - // TODO: dry this up + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) ethers::providers::Provider::new(provider) .interval(Duration::from_secs(1)) .into() @@ -117,7 +116,6 @@ impl Web3Connection { let provider = Arc::new(provider); // subscribe to new heads in a spawned future - // TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check? let provider_clone: Arc = Arc::clone(&provider); tokio::spawn(async move { while let Err(e) = provider_clone diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index 7a1b9cfe..3b45ac2c 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -20,7 +20,6 @@ type Web3RateLimiterMap = DashMap; pub type Web3ConnectionMap = DashMap; /// Load balance to the rpc -/// TODO: i'm not sure about having 3 locks here. can we share them? pub struct Web3ProviderTier { /// RPC urls sorted by active requests /// TODO: what type for the rpc? @@ -90,13 +89,14 @@ impl Web3ProviderTier { let mut earliest_not_until = None; for selected_rpc in balanced_rpcs.iter() { - // TODO: check current block number. if too far behind, make our own NotUntil here + // check current block number if !block_watcher .is_synced(selected_rpc.clone(), 3) .await .expect("checking is_synced failed") { // skip this rpc because it is not synced + // TODO: make a NotUntil here? continue; } @@ -204,7 +204,6 @@ impl Web3ProviderTier { if let Some(not_until) = earliest_not_until { Err(not_until) } else { - // TODO: is this right? Ok(vec![]) } }