From 6067369ee3eaa4ec0da6a6287d6371bbb5b9fc15 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 26 Feb 2023 23:52:37 -0800 Subject: [PATCH] add inotify and rpc disconnect --- Cargo.lock | 76 +------------ Dockerfile | 2 +- TODO.md | 3 + web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 57 +++------- web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 56 ++++++++-- web3_proxy/src/block_number.rs | 2 +- web3_proxy/src/rpcs/many.rs | 3 +- web3_proxy/src/rpcs/one.rs | 113 ++++++++++++++------ 9 files changed, 157 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d133e01..609ac70b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1853,18 +1853,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "filetime" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "windows-sys 0.42.0", -] - [[package]] name = "fixed-hash" version = "0.8.0" @@ -1941,15 +1929,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "funty" version = "2.0.0" @@ -2558,13 +2537,15 @@ dependencies = [ [[package]] name = "inotify" -version = "0.9.6" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9" dependencies = [ "bitflags", + "futures-core", "inotify-sys", "libc", + "tokio", ] [[package]] @@ -2696,26 +2677,6 @@ dependencies = [ "cpufeatures", ] -[[package]] -name = "kqueue" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" -dependencies = [ - "bitflags", - "libc", -] - [[package]] name = "lalrpop" version = "0.19.8" @@ -2954,33 +2915,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" -[[package]] -name = "notify" -version = "5.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9" -dependencies = [ - "bitflags", - "crossbeam-channel", - "filetime", - "fsevent-sys", - "inotify", - "kqueue", - "libc", - "mio", - "walkdir", - "windows-sys 0.42.0", -] - -[[package]] -name = "notify-debouncer-mini" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23e9fa24f094b143c1eb61f90ac6457de87be6987bc70746e0179f7dbc9007b" -dependencies = [ - "notify", -] - [[package]] name = "num" version = "0.4.0" @@ -5830,12 +5764,12 @@ dependencies = [ "hashbrown 0.13.2", "hdrhistogram", "http", + "inotify", "ipnet", "itertools", "log", "migration", "moka", - "notify-debouncer-mini", "num", "num-traits", "once_cell", diff --git a/Dockerfile b/Dockerfile index 1642327d..232bec23 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ # We only pay the installation cost once, # it will be cached from the second build onwards # -FROM rust:1-bullseye AS builder +FROM rust:1.67.1-bullseye AS builder WORKDIR /app ENV CARGO_TERM_COLOR always diff --git a/TODO.md b/TODO.md index f843384c..7351f631 100644 --- a/TODO.md +++ b/TODO.md @@ -680,3 +680,6 @@ in another repo: event subscriber - [ ] i think i use FuturesUnordered when a try_join_all might be better - [ ] since we are read-heavy on our configs, maybe we should use a cache - "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html +- [ ] tests for config reloading +- [ ] use pin instead of arc for a bunch of things? + - https://fasterthanli.me/articles/pin-and-suffering diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 2c6c6801..70d1dc15 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -45,11 +45,11 @@ handlebars = "4.3.6" hashbrown = { version = "0.13.2", features = ["serde"] } hdrhistogram = "7.5.2" http = "0.2.9" +inotify = "0.10" ipnet = "2.7.1" itertools = "0.10.5" log = "0.4.17" moka = { version = "0.10.0", default-features = false, features = ["future"] } -notify-debouncer-mini = { version = "0.2.0", default-features = false } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.17.1" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index d4c6ff10..f69362f7 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -26,7 +26,7 @@ use ethers::core::utils::keccak256; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; -use futures::future::{join_all, pending}; +use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; @@ -37,20 +37,18 @@ use migration::sea_orm::{ use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; -use notify_debouncer_mini::{new_debouncer, notify, DebounceEventResult}; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; use serde_json::value::to_raw_value; +use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::num::NonZeroU64; -use std::path::PathBuf; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; -use std::{fmt, fs}; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; @@ -361,13 +359,14 @@ pub struct Web3ProxyAppSpawn { pub app_handles: FuturesUnordered>, /// these are important and must be allowed to finish pub background_handles: FuturesUnordered>, + /// config changes are sent here + pub new_top_config_sender: watch::Sender, } impl Web3ProxyApp { /// The main entrypoint. pub async fn spawn( top_config: TopConfig, - top_config_path: Option, num_workers: usize, shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result { @@ -703,48 +702,20 @@ impl Web3ProxyApp { // watch for config changes // TODO: initial config reload should be from this channel. not from the call to spawn - if let Some(top_config_path) = top_config_path { - let (top_config_sender, mut top_config_receiver) = watch::channel(top_config); - // TODO: i think the debouncer is exiting - let mut debouncer = new_debouncer( - Duration::from_secs(2), - None, - move |res: DebounceEventResult| match res { - Ok(events) => events.iter().for_each(|e| { - debug!("Event {:?} for {:?}", e.kind, e.path); - - // TODO: use tokio::fs here? - let new_top_config: String = fs::read_to_string(&e.path).unwrap(); - - let new_top_config: TopConfig = toml::from_str(&new_top_config).unwrap(); - - top_config_sender.send_replace(new_top_config); - }), - Err(errors) => errors - .iter() - .for_each(|e| error!("config watcher error {:#?}", e)), - }, - ) - .context("failed starting debouncer config watcher")?; - - // Add a path to be watched. All files and directories at that path and below will be monitored for changes. - info!("watching config @ {}", top_config_path.display()); - debouncer - .watcher() - .watch(top_config_path.as_path(), notify::RecursiveMode::Recursive) - .context("failed starting config watcher")?; + let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config); + { let app = app.clone(); let config_handle = tokio::spawn(async move { loop { - let new_top_config = top_config_receiver.borrow_and_update().to_owned(); + let new_top_config = new_top_config_receiver.borrow_and_update().to_owned(); app.apply_top_config(new_top_config) .await .context("failed applying new top_config")?; - top_config_receiver + new_top_config_receiver .changed() .await .context("failed awaiting top_config change")?; @@ -754,13 +725,15 @@ impl Web3ProxyApp { }); app_handles.push(config_handle); - } else { - // no path to config, so we don't know what to watch - // this isn't an error. the config might just be in memory - app.apply_top_config(top_config).await?; } - Ok((app, app_handles, important_background_handles).into()) + Ok(( + app, + app_handles, + important_background_handles, + new_top_config_sender, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index e8453cfa..03db2694 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -1,9 +1,11 @@ #![forbid(unsafe_code)] use std::path::PathBuf; +use std::{fs, thread}; use argh::FromArgs; use futures::StreamExt; +use inotify::{EventMask, Inotify, WatchMask}; use log::{error, info, warn}; use num::Zero; use tokio::sync::broadcast; @@ -65,13 +67,53 @@ async fn run( let mut shutdown_receiver = shutdown_sender.subscribe(); // start the main app - let mut spawned_app = Web3ProxyApp::spawn( - top_config, - top_config_path, - num_workers, - shutdown_sender.subscribe(), - ) - .await?; + let mut spawned_app = + Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; + + // start thread for watching config + if let Some(top_config_path) = top_config_path { + let mut inotify = Inotify::init().expect("Failed to initialize inotify"); + + inotify + .add_watch(top_config_path.clone(), WatchMask::MODIFY) + .expect("Failed to add inotify watch on config"); + + let mut buffer = [0u8; 4096]; + + let config_sender = spawned_app.new_top_config_sender; + + // TODO: exit the app if this handle exits + // TODO: debounce + thread::spawn(move || loop { + let events = inotify + .read_events_blocking(&mut buffer) + .expect("Failed to read inotify events"); + + for event in events { + if event.mask.contains(EventMask::MODIFY) { + info!("config changed"); + match fs::read_to_string(&top_config_path) { + Ok(top_config) => match toml::from_str(&top_config) { + Ok(top_config) => { + config_sender.send(top_config).unwrap(); + } + Err(err) => { + // TODO: panic? + error!("Unable to parse config! {:#?}", err); + } + }, + Err(err) => { + // TODO: panic? + error!("Unable to read config! {:#?}", err); + } + }; + } else { + // TODO: is "MODIFY" enough, or do we want CLOSE_WRITE? + unimplemented!(); + } + } + }); + } // start the prometheus metrics port let prometheus_handle = tokio::spawn(metrics_frontend::serve( diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 69d289a1..bfb39299 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -4,7 +4,7 @@ use ethers::{ prelude::{BlockNumber, U64}, types::H256, }; -use log::{debug, trace, warn}; +use log::{trace, warn}; use serde_json::json; use std::sync::Arc; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0de9b666..b5e6c8fb 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -9,6 +9,7 @@ use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; +use anyhow::Context; use counter::Counter; use derive_more::From; use ethers::prelude::{ProviderError, TxHash, H256, U64}; @@ -282,7 +283,7 @@ impl Web3Rpcs { .insert(connection.name.clone(), connection); if let Some(old_rpc) = old_rpc { - todo!("do something to make the old one shutdown"); + old_rpc.disconnect().await.context("disconnect old rpc")?; } // TODO: what should we do with the new handle? make sure error logs aren't dropped diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b4cf4af7..17faee6c 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -22,7 +22,7 @@ use serde_json::json; use std::cmp::min; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; @@ -133,6 +133,8 @@ pub struct Web3Rpc { /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, pub(super) active_requests: AtomicUsize, + pub(super) reconnect: AtomicBool, + pub(super) disconnect_watch: Option>, } impl Web3Rpc { @@ -217,6 +219,9 @@ impl Web3Rpc { } } + let (disconnect_sender, disconnect_receiver) = watch::channel(false); + let reconnect = reconnect.into(); + let new_connection = Self { name, db_conn: db_conn.clone(), @@ -230,7 +235,9 @@ impl Web3Rpc { automatic_block_limit, backup, block_data_limit, + reconnect, tier: config.tier, + disconnect_watch: Some(disconnect_sender), ..Default::default() }; @@ -249,8 +256,8 @@ impl Web3Rpc { block_map, block_sender, chain_id, + disconnect_receiver, http_interval_sender, - reconnect, tx_id_sender, ) .await @@ -565,6 +572,22 @@ impl Web3Rpc { Ok(()) } + pub async fn disconnect(&self) -> anyhow::Result<()> { + self.reconnect.store(false, atomic::Ordering::Release); + + let mut provider = self.provider.write().await; + + info!("disconnecting {}", self); + + *provider = None; + + if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) { + warn!("failed sending disconnect watch: {:?}", err); + }; + + Ok(()) + } + async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, @@ -588,7 +611,8 @@ impl Web3Rpc { None } Ok(Some(new_head_block)) => { - let new_head_block = Web3ProxyBlock::try_new(new_head_block).unwrap(); + let new_head_block = Web3ProxyBlock::try_new(new_head_block) + .expect("blocks from newHeads subscriptions should also convert"); let new_hash = *new_head_block.hash(); @@ -649,8 +673,8 @@ impl Web3Rpc { block_map: BlocksByHashCache, block_sender: Option>, chain_id: u64, + disconnect_receiver: watch::Receiver, http_interval_sender: Option>>, - reconnect: bool, tx_id_sender: Option)>>, ) -> anyhow::Result<()> { let revert_handler = if self.backup { @@ -665,15 +689,14 @@ impl Web3Rpc { let mut futures = vec![]; { - // health check // TODO: move this into a proper function let authorization = authorization.clone(); let block_sender = block_sender.clone(); - let conn = self.clone(); + let rpc = self.clone(); let (ready_tx, ready_rx) = oneshot::channel(); let f = async move { // initial sleep to allow for the initial connection - conn.retrying_connect( + rpc.retrying_connect( block_sender.as_ref(), chain_id, authorization.db_conn.as_ref(), @@ -695,19 +718,22 @@ impl Web3Rpc { loop { sleep(Duration::from_secs(health_sleep_seconds)).await; + // health check + // TODO: lower this log level once disconnect works + debug!("health check on {}", rpc); + // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this - if let Some(client) = conn.provider.read().await.clone() { + if let Some(client) = rpc.provider.read().await.clone() { // health check as a way of keeping this rpc's request_ewma accurate // TODO: do something different if this is a backup server? - new_total_requests = - conn.total_requests.load(atomic::Ordering::Relaxed); + new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); if new_total_requests - old_total_requests < 10 { // TODO: if this fails too many times, reset the connection // TODO: move this into a function and the chaining should be easier - let head_block = conn.head_block.read().clone(); + let head_block = rpc.head_block.read().clone(); if let Some((block_number, txid)) = head_block.and_then(|x| { let block = x.block; @@ -717,7 +743,7 @@ impl Web3Rpc { Some((block_number, txid)) }) { - let to = conn + let to = rpc .wait_for_query::<_, Option>( "eth_getTransactionByHash", &(txid,), @@ -741,21 +767,21 @@ impl Web3Rpc { let code = match to { Err(err) => { - if conn.backup { + if rpc.backup { debug!( "{} failed health check query! {:#?}", - conn, err + rpc, err ); } else { warn!( "{} failed health check query! {:#?}", - conn, err + rpc, err ); } continue; } Ok(to) => { - conn.wait_for_query::<_, Option>( + rpc.wait_for_query::<_, Option>( "eth_getCode", &(to, block_number), revert_handler, @@ -767,13 +793,10 @@ impl Web3Rpc { }; if let Err(err) = code { - if conn.backup { - debug!( - "{} failed health check query! {:#?}", - conn, err - ); + if rpc.backup { + debug!("{} failed health check query! {:#?}", rpc, err); } else { - warn!("{} failed health check query! {:#?}", conn, err); + warn!("{} failed health check query! {:#?}", rpc, err); } continue; } @@ -816,7 +839,7 @@ impl Web3Rpc { break; } Err(err) => { - if reconnect { + if self.reconnect.load(atomic::Ordering::Acquire) { warn!("{} connection ended. err={:?}", self, err); self.clone() @@ -827,6 +850,9 @@ impl Web3Rpc { true, ) .await?; + } else if *disconnect_receiver.borrow() { + info!("{} is disconnecting", self); + break; } else { error!("{} subscription exited. err={:?}", self, err); return Err(err); @@ -840,6 +866,10 @@ impl Web3Rpc { Ok(()) } + fn should_disconnect(&self) -> bool { + *self.disconnect_watch.as_ref().unwrap().borrow() + } + /// Subscribe to new blocks. async fn subscribe_new_heads( self: Arc, @@ -861,7 +891,7 @@ impl Web3Rpc { let mut last_hash = H256::zero(); - loop { + while !self.should_disconnect() { // TODO: what should the max_wait be? match self .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) @@ -982,6 +1012,11 @@ impl Web3Rpc { .await?; while let Some(new_block) = stream.next().await { + // TODO: select on disconnect_watch instead of waiting for a block to arrive + if self.should_disconnect() { + break; + } + // TODO: check the new block's hash to be sure we don't send dupes let new_hash = new_block .hash @@ -1002,19 +1037,20 @@ impl Web3Rpc { .await?; } - // clear the head block. this might not be needed, but it won't hurt - self.send_head_block_result(Ok(None), &block_sender, block_map) - .await?; - // TODO: is this always an error? // TODO: we probably don't want a warn and to return error - warn!("new_heads subscription to {} ended", self); - Err(anyhow::anyhow!("new_heads subscription ended")) + debug!("new_heads subscription to {} ended", self); } None => todo!("what should happen now? wait for a connection?"), #[cfg(test)] Some(Web3Provider::Mock) => unimplemented!(), } + + // clear the head block. this might not be needed, but it won't hurt + self.send_head_block_result(Ok(None), &block_sender, block_map) + .await?; + + Ok(()) } /// Turn on the firehose of pending transactions @@ -1057,15 +1093,26 @@ impl Web3Rpc { .context("tx_id_sender")?; // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription + + // TODO: select on this instead of checking every loop + if self.should_disconnect() { + break; + } } // TODO: is this always an error? // TODO: we probably don't want a warn and to return error - warn!("pending_transactions subscription ended on {}", self); - return Err(anyhow::anyhow!("pending_transactions subscription ended")); + debug!("pending_transactions subscription ended on {}", self); } #[cfg(test)] - Some(Web3Provider::Mock) => futures::future::pending::<()>().await, + Some(Web3Provider::Mock) => { + let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe(); + + if !*disconnect_watch.borrow_and_update() { + // wait for disconnect_watch to change + disconnect_watch.changed().await?; + } + } } Ok(())