add inotify and rpc disconnect

This commit is contained in:
Bryan Stitt 2023-02-26 23:52:37 -08:00
parent 475e521918
commit 6067369ee3
9 changed files with 157 additions and 157 deletions

76
Cargo.lock generated
View File

@ -1853,18 +1853,6 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "filetime"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e884668cd0c7480504233e951174ddc3b382f7c2666e3b7310b5c4e7b0c37f9"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"windows-sys 0.42.0",
]
[[package]] [[package]]
name = "fixed-hash" name = "fixed-hash"
version = "0.8.0" version = "0.8.0"
@ -1941,15 +1929,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "funty" name = "funty"
version = "2.0.0" version = "2.0.0"
@ -2558,13 +2537,15 @@ dependencies = [
[[package]] [[package]]
name = "inotify" name = "inotify"
version = "0.9.6" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"futures-core",
"inotify-sys", "inotify-sys",
"libc", "libc",
"tokio",
] ]
[[package]] [[package]]
@ -2696,26 +2677,6 @@ dependencies = [
"cpufeatures", "cpufeatures",
] ]
[[package]]
name = "kqueue"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]] [[package]]
name = "lalrpop" name = "lalrpop"
version = "0.19.8" version = "0.19.8"
@ -2954,33 +2915,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
[[package]]
name = "notify"
version = "5.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58ea850aa68a06e48fdb069c0ec44d0d64c8dbffa49bf3b6f7f0a901fdea1ba9"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"windows-sys 0.42.0",
]
[[package]]
name = "notify-debouncer-mini"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e23e9fa24f094b143c1eb61f90ac6457de87be6987bc70746e0179f7dbc9007b"
dependencies = [
"notify",
]
[[package]] [[package]]
name = "num" name = "num"
version = "0.4.0" version = "0.4.0"
@ -5830,12 +5764,12 @@ dependencies = [
"hashbrown 0.13.2", "hashbrown 0.13.2",
"hdrhistogram", "hdrhistogram",
"http", "http",
"inotify",
"ipnet", "ipnet",
"itertools", "itertools",
"log", "log",
"migration", "migration",
"moka", "moka",
"notify-debouncer-mini",
"num", "num",
"num-traits", "num-traits",
"once_cell", "once_cell",

View File

@ -3,7 +3,7 @@
# We only pay the installation cost once, # We only pay the installation cost once,
# it will be cached from the second build onwards # it will be cached from the second build onwards
# #
FROM rust:1-bullseye AS builder FROM rust:1.67.1-bullseye AS builder
WORKDIR /app WORKDIR /app
ENV CARGO_TERM_COLOR always ENV CARGO_TERM_COLOR always

View File

@ -680,3 +680,6 @@ in another repo: event subscriber
- [ ] i think i use FuturesUnordered when a try_join_all might be better - [ ] i think i use FuturesUnordered when a try_join_all might be better
- [ ] since we are read-heavy on our configs, maybe we should use a cache - [ ] since we are read-heavy on our configs, maybe we should use a cache
- "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html - "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html
- [ ] tests for config reloading
- [ ] use pin instead of arc for a bunch of things?
- https://fasterthanli.me/articles/pin-and-suffering

View File

@ -45,11 +45,11 @@ handlebars = "4.3.6"
hashbrown = { version = "0.13.2", features = ["serde"] } hashbrown = { version = "0.13.2", features = ["serde"] }
hdrhistogram = "7.5.2" hdrhistogram = "7.5.2"
http = "0.2.9" http = "0.2.9"
inotify = "0.10"
ipnet = "2.7.1" ipnet = "2.7.1"
itertools = "0.10.5" itertools = "0.10.5"
log = "0.4.17" log = "0.4.17"
moka = { version = "0.10.0", default-features = false, features = ["future"] } moka = { version = "0.10.0", default-features = false, features = ["future"] }
notify-debouncer-mini = { version = "0.2.0", default-features = false }
num = "0.4.0" num = "0.4.0"
num-traits = "0.2.15" num-traits = "0.2.15"
once_cell = { version = "1.17.1" } once_cell = { version = "1.17.1" }

View File

@ -26,7 +26,7 @@ use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
use ethers::types::U256; use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp}; use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::{join_all, pending}; use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use ipnet::IpNet; use ipnet::IpNet;
@ -37,20 +37,18 @@ use migration::sea_orm::{
use migration::sea_query::table::ColumnDef; use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache; use moka::future::Cache;
use notify_debouncer_mini::{new_debouncer, notify, DebounceEventResult};
use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::value::to_raw_value; use serde_json::value::to_raw_value;
use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::net::IpAddr; use std::net::IpAddr;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use std::{fmt, fs};
use tokio::sync::{broadcast, watch, Semaphore}; use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout}; use tokio::time::{sleep, timeout};
@ -361,13 +359,14 @@ pub struct Web3ProxyAppSpawn {
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>, pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
/// these are important and must be allowed to finish /// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>, pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
/// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>,
} }
impl Web3ProxyApp { impl Web3ProxyApp {
/// The main entrypoint. /// The main entrypoint.
pub async fn spawn( pub async fn spawn(
top_config: TopConfig, top_config: TopConfig,
top_config_path: Option<PathBuf>,
num_workers: usize, num_workers: usize,
shutdown_receiver: broadcast::Receiver<()>, shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<Web3ProxyAppSpawn> { ) -> anyhow::Result<Web3ProxyAppSpawn> {
@ -703,48 +702,20 @@ impl Web3ProxyApp {
// watch for config changes // watch for config changes
// TODO: initial config reload should be from this channel. not from the call to spawn // TODO: initial config reload should be from this channel. not from the call to spawn
if let Some(top_config_path) = top_config_path {
let (top_config_sender, mut top_config_receiver) = watch::channel(top_config);
// TODO: i think the debouncer is exiting let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config);
let mut debouncer = new_debouncer(
Duration::from_secs(2),
None,
move |res: DebounceEventResult| match res {
Ok(events) => events.iter().for_each(|e| {
debug!("Event {:?} for {:?}", e.kind, e.path);
// TODO: use tokio::fs here?
let new_top_config: String = fs::read_to_string(&e.path).unwrap();
let new_top_config: TopConfig = toml::from_str(&new_top_config).unwrap();
top_config_sender.send_replace(new_top_config);
}),
Err(errors) => errors
.iter()
.for_each(|e| error!("config watcher error {:#?}", e)),
},
)
.context("failed starting debouncer config watcher")?;
// Add a path to be watched. All files and directories at that path and below will be monitored for changes.
info!("watching config @ {}", top_config_path.display());
debouncer
.watcher()
.watch(top_config_path.as_path(), notify::RecursiveMode::Recursive)
.context("failed starting config watcher")?;
{
let app = app.clone(); let app = app.clone();
let config_handle = tokio::spawn(async move { let config_handle = tokio::spawn(async move {
loop { loop {
let new_top_config = top_config_receiver.borrow_and_update().to_owned(); let new_top_config = new_top_config_receiver.borrow_and_update().to_owned();
app.apply_top_config(new_top_config) app.apply_top_config(new_top_config)
.await .await
.context("failed applying new top_config")?; .context("failed applying new top_config")?;
top_config_receiver new_top_config_receiver
.changed() .changed()
.await .await
.context("failed awaiting top_config change")?; .context("failed awaiting top_config change")?;
@ -754,13 +725,15 @@ impl Web3ProxyApp {
}); });
app_handles.push(config_handle); app_handles.push(config_handle);
} else {
// no path to config, so we don't know what to watch
// this isn't an error. the config might just be in memory
app.apply_top_config(top_config).await?;
} }
Ok((app, app_handles, important_background_handles).into()) Ok((
app,
app_handles,
important_background_handles,
new_top_config_sender,
)
.into())
} }
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {

View File

@ -1,9 +1,11 @@
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use std::path::PathBuf; use std::path::PathBuf;
use std::{fs, thread};
use argh::FromArgs; use argh::FromArgs;
use futures::StreamExt; use futures::StreamExt;
use inotify::{EventMask, Inotify, WatchMask};
use log::{error, info, warn}; use log::{error, info, warn};
use num::Zero; use num::Zero;
use tokio::sync::broadcast; use tokio::sync::broadcast;
@ -65,13 +67,53 @@ async fn run(
let mut shutdown_receiver = shutdown_sender.subscribe(); let mut shutdown_receiver = shutdown_sender.subscribe();
// start the main app // start the main app
let mut spawned_app = Web3ProxyApp::spawn( let mut spawned_app =
top_config, Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?;
top_config_path,
num_workers, // start thread for watching config
shutdown_sender.subscribe(), if let Some(top_config_path) = top_config_path {
) let mut inotify = Inotify::init().expect("Failed to initialize inotify");
.await?;
inotify
.add_watch(top_config_path.clone(), WatchMask::MODIFY)
.expect("Failed to add inotify watch on config");
let mut buffer = [0u8; 4096];
let config_sender = spawned_app.new_top_config_sender;
// TODO: exit the app if this handle exits
// TODO: debounce
thread::spawn(move || loop {
let events = inotify
.read_events_blocking(&mut buffer)
.expect("Failed to read inotify events");
for event in events {
if event.mask.contains(EventMask::MODIFY) {
info!("config changed");
match fs::read_to_string(&top_config_path) {
Ok(top_config) => match toml::from_str(&top_config) {
Ok(top_config) => {
config_sender.send(top_config).unwrap();
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
};
} else {
// TODO: is "MODIFY" enough, or do we want CLOSE_WRITE?
unimplemented!();
}
}
});
}
// start the prometheus metrics port // start the prometheus metrics port
let prometheus_handle = tokio::spawn(metrics_frontend::serve( let prometheus_handle = tokio::spawn(metrics_frontend::serve(

View File

@ -4,7 +4,7 @@ use ethers::{
prelude::{BlockNumber, U64}, prelude::{BlockNumber, U64},
types::H256, types::H256,
}; };
use log::{debug, trace, warn}; use log::{trace, warn};
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;

View File

@ -9,6 +9,7 @@ use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use anyhow::Context;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, H256, U64}; use ethers::prelude::{ProviderError, TxHash, H256, U64};
@ -282,7 +283,7 @@ impl Web3Rpcs {
.insert(connection.name.clone(), connection); .insert(connection.name.clone(), connection);
if let Some(old_rpc) = old_rpc { if let Some(old_rpc) = old_rpc {
todo!("do something to make the old one shutdown"); old_rpc.disconnect().await.context("disconnect old rpc")?;
} }
// TODO: what should we do with the new handle? make sure error logs aren't dropped // TODO: what should we do with the new handle? make sure error logs aren't dropped

View File

@ -22,7 +22,7 @@ use serde_json::json;
use std::cmp::min; use std::cmp::min;
use std::fmt; use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng; use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng; use thread_fast_rng::thread_fast_rng;
@ -133,6 +133,8 @@ pub struct Web3Rpc {
/// TODO: maybe move this to graphana /// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize, pub(super) total_requests: AtomicUsize,
pub(super) active_requests: AtomicUsize, pub(super) active_requests: AtomicUsize,
pub(super) reconnect: AtomicBool,
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
} }
impl Web3Rpc { impl Web3Rpc {
@ -217,6 +219,9 @@ impl Web3Rpc {
} }
} }
let (disconnect_sender, disconnect_receiver) = watch::channel(false);
let reconnect = reconnect.into();
let new_connection = Self { let new_connection = Self {
name, name,
db_conn: db_conn.clone(), db_conn: db_conn.clone(),
@ -230,7 +235,9 @@ impl Web3Rpc {
automatic_block_limit, automatic_block_limit,
backup, backup,
block_data_limit, block_data_limit,
reconnect,
tier: config.tier, tier: config.tier,
disconnect_watch: Some(disconnect_sender),
..Default::default() ..Default::default()
}; };
@ -249,8 +256,8 @@ impl Web3Rpc {
block_map, block_map,
block_sender, block_sender,
chain_id, chain_id,
disconnect_receiver,
http_interval_sender, http_interval_sender,
reconnect,
tx_id_sender, tx_id_sender,
) )
.await .await
@ -565,6 +572,22 @@ impl Web3Rpc {
Ok(()) Ok(())
} }
pub async fn disconnect(&self) -> anyhow::Result<()> {
self.reconnect.store(false, atomic::Ordering::Release);
let mut provider = self.provider.write().await;
info!("disconnecting {}", self);
*provider = None;
if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) {
warn!("failed sending disconnect watch: {:?}", err);
};
Ok(())
}
async fn send_head_block_result( async fn send_head_block_result(
self: &Arc<Self>, self: &Arc<Self>,
new_head_block: Result<Option<ArcBlock>, ProviderError>, new_head_block: Result<Option<ArcBlock>, ProviderError>,
@ -588,7 +611,8 @@ impl Web3Rpc {
None None
} }
Ok(Some(new_head_block)) => { Ok(Some(new_head_block)) => {
let new_head_block = Web3ProxyBlock::try_new(new_head_block).unwrap(); let new_head_block = Web3ProxyBlock::try_new(new_head_block)
.expect("blocks from newHeads subscriptions should also convert");
let new_hash = *new_head_block.hash(); let new_hash = *new_head_block.hash();
@ -649,8 +673,8 @@ impl Web3Rpc {
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64, chain_id: u64,
disconnect_receiver: watch::Receiver<bool>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
reconnect: bool,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let revert_handler = if self.backup { let revert_handler = if self.backup {
@ -665,15 +689,14 @@ impl Web3Rpc {
let mut futures = vec![]; let mut futures = vec![];
{ {
// health check
// TODO: move this into a proper function // TODO: move this into a proper function
let authorization = authorization.clone(); let authorization = authorization.clone();
let block_sender = block_sender.clone(); let block_sender = block_sender.clone();
let conn = self.clone(); let rpc = self.clone();
let (ready_tx, ready_rx) = oneshot::channel(); let (ready_tx, ready_rx) = oneshot::channel();
let f = async move { let f = async move {
// initial sleep to allow for the initial connection // initial sleep to allow for the initial connection
conn.retrying_connect( rpc.retrying_connect(
block_sender.as_ref(), block_sender.as_ref(),
chain_id, chain_id,
authorization.db_conn.as_ref(), authorization.db_conn.as_ref(),
@ -695,19 +718,22 @@ impl Web3Rpc {
loop { loop {
sleep(Duration::from_secs(health_sleep_seconds)).await; sleep(Duration::from_secs(health_sleep_seconds)).await;
// health check
// TODO: lower this log level once disconnect works
debug!("health check on {}", rpc);
// TODO: what if we just happened to have this check line up with another restart? // TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this // TODO: think more about this
if let Some(client) = conn.provider.read().await.clone() { if let Some(client) = rpc.provider.read().await.clone() {
// health check as a way of keeping this rpc's request_ewma accurate // health check as a way of keeping this rpc's request_ewma accurate
// TODO: do something different if this is a backup server? // TODO: do something different if this is a backup server?
new_total_requests = new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
conn.total_requests.load(atomic::Ordering::Relaxed);
if new_total_requests - old_total_requests < 10 { if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier // TODO: move this into a function and the chaining should be easier
let head_block = conn.head_block.read().clone(); let head_block = rpc.head_block.read().clone();
if let Some((block_number, txid)) = head_block.and_then(|x| { if let Some((block_number, txid)) = head_block.and_then(|x| {
let block = x.block; let block = x.block;
@ -717,7 +743,7 @@ impl Web3Rpc {
Some((block_number, txid)) Some((block_number, txid))
}) { }) {
let to = conn let to = rpc
.wait_for_query::<_, Option<Transaction>>( .wait_for_query::<_, Option<Transaction>>(
"eth_getTransactionByHash", "eth_getTransactionByHash",
&(txid,), &(txid,),
@ -741,21 +767,21 @@ impl Web3Rpc {
let code = match to { let code = match to {
Err(err) => { Err(err) => {
if conn.backup { if rpc.backup {
debug!( debug!(
"{} failed health check query! {:#?}", "{} failed health check query! {:#?}",
conn, err rpc, err
); );
} else { } else {
warn!( warn!(
"{} failed health check query! {:#?}", "{} failed health check query! {:#?}",
conn, err rpc, err
); );
} }
continue; continue;
} }
Ok(to) => { Ok(to) => {
conn.wait_for_query::<_, Option<Bytes>>( rpc.wait_for_query::<_, Option<Bytes>>(
"eth_getCode", "eth_getCode",
&(to, block_number), &(to, block_number),
revert_handler, revert_handler,
@ -767,13 +793,10 @@ impl Web3Rpc {
}; };
if let Err(err) = code { if let Err(err) = code {
if conn.backup { if rpc.backup {
debug!( debug!("{} failed health check query! {:#?}", rpc, err);
"{} failed health check query! {:#?}",
conn, err
);
} else { } else {
warn!("{} failed health check query! {:#?}", conn, err); warn!("{} failed health check query! {:#?}", rpc, err);
} }
continue; continue;
} }
@ -816,7 +839,7 @@ impl Web3Rpc {
break; break;
} }
Err(err) => { Err(err) => {
if reconnect { if self.reconnect.load(atomic::Ordering::Acquire) {
warn!("{} connection ended. err={:?}", self, err); warn!("{} connection ended. err={:?}", self, err);
self.clone() self.clone()
@ -827,6 +850,9 @@ impl Web3Rpc {
true, true,
) )
.await?; .await?;
} else if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
break;
} else { } else {
error!("{} subscription exited. err={:?}", self, err); error!("{} subscription exited. err={:?}", self, err);
return Err(err); return Err(err);
@ -840,6 +866,10 @@ impl Web3Rpc {
Ok(()) Ok(())
} }
fn should_disconnect(&self) -> bool {
*self.disconnect_watch.as_ref().unwrap().borrow()
}
/// Subscribe to new blocks. /// Subscribe to new blocks.
async fn subscribe_new_heads( async fn subscribe_new_heads(
self: Arc<Self>, self: Arc<Self>,
@ -861,7 +891,7 @@ impl Web3Rpc {
let mut last_hash = H256::zero(); let mut last_hash = H256::zero();
loop { while !self.should_disconnect() {
// TODO: what should the max_wait be? // TODO: what should the max_wait be?
match self match self
.wait_for_request_handle(&authorization, None, unlocked_provider.clone()) .wait_for_request_handle(&authorization, None, unlocked_provider.clone())
@ -982,6 +1012,11 @@ impl Web3Rpc {
.await?; .await?;
while let Some(new_block) = stream.next().await { while let Some(new_block) = stream.next().await {
// TODO: select on disconnect_watch instead of waiting for a block to arrive
if self.should_disconnect() {
break;
}
// TODO: check the new block's hash to be sure we don't send dupes // TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block let new_hash = new_block
.hash .hash
@ -1002,19 +1037,20 @@ impl Web3Rpc {
.await?; .await?;
} }
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
// TODO: is this always an error? // TODO: is this always an error?
// TODO: we probably don't want a warn and to return error // TODO: we probably don't want a warn and to return error
warn!("new_heads subscription to {} ended", self); debug!("new_heads subscription to {} ended", self);
Err(anyhow::anyhow!("new_heads subscription ended"))
} }
None => todo!("what should happen now? wait for a connection?"), None => todo!("what should happen now? wait for a connection?"),
#[cfg(test)] #[cfg(test)]
Some(Web3Provider::Mock) => unimplemented!(), Some(Web3Provider::Mock) => unimplemented!(),
} }
// clear the head block. this might not be needed, but it won't hurt
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
Ok(())
} }
/// Turn on the firehose of pending transactions /// Turn on the firehose of pending transactions
@ -1057,15 +1093,26 @@ impl Web3Rpc {
.context("tx_id_sender")?; .context("tx_id_sender")?;
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription // TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
// TODO: select on this instead of checking every loop
if self.should_disconnect() {
break;
}
} }
// TODO: is this always an error? // TODO: is this always an error?
// TODO: we probably don't want a warn and to return error // TODO: we probably don't want a warn and to return error
warn!("pending_transactions subscription ended on {}", self); debug!("pending_transactions subscription ended on {}", self);
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
} }
#[cfg(test)] #[cfg(test)]
Some(Web3Provider::Mock) => futures::future::pending::<()>().await, Some(Web3Provider::Mock) => {
let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe();
if !*disconnect_watch.borrow_and_update() {
// wait for disconnect_watch to change
disconnect_watch.changed().await?;
}
}
} }
Ok(()) Ok(())