shutdown signal

This commit is contained in:
Bryan Stitt 2022-07-22 23:26:04 +00:00
parent 225ee55fd5
commit 0b184ae9c9
7 changed files with 140 additions and 38 deletions

1
Cargo.lock generated
View File

@ -1825,6 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash",
"serde",
]
[[package]]

View File

@ -51,13 +51,11 @@
- i think now that we retry header not found and similar, caching errors should be fine
- [x] RESPONSE_CACHE_CAP from config
- [x] web3_sha3 rpc command
- [ ] test that launches anvil and connects the proxy to it
- [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced.
- thundering herd problem if we only allow a lag of 0 blocks
- we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server
- [ ] basic request method stats
- [ ] fully test retrying when "header not found"
- i saw "header not found" on a simple eth_getCode query to a public load balanced bsc archive node on block 1
- [ ] test that launches anvil and connects the proxy to it
- [ ] nice output when cargo doc is run
## V1
@ -178,3 +176,5 @@ in another repo: event subscriber
- [ ] Archive check on BSC gave “archive” when it isnt. and FTM gave 90k for all servers even though they should be archive
- [ ] cache eth_getLogs in a database?
- [ ] stats for "read amplification". how many backend requests do we send compared to frontend requests we received?
- [ ] fully test retrying when "header not found"
- i saw "header not found" on a simple eth_getCode query to a public load balanced bsc archive node on block 1

View File

@ -22,7 +22,7 @@ ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "
fdlimit = "0.2.1"
flume = "0.10.14"
futures = { version = "0.3.21", features = ["thread-pool"] }
hashbrown = "0.12.3"
hashbrown = { version = "0.12.3", features = ["serde"] }
indexmap = "1.9.1"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
notify = "4.0.17"

View File

@ -1,7 +1,8 @@
use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::{Block, TxHash};
use hashbrown::HashMap;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
@ -55,7 +56,7 @@ fn default_response_cache_max_bytes() -> usize {
10_usize.pow(8)
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Constructor)]
pub struct Web3ConnectionConfig {
url: String,
soft_limit: u32,

View File

@ -19,7 +19,7 @@ use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),

View File

@ -10,7 +10,7 @@ pub async fn handler_404() -> impl IntoResponse {
}
/// handle errors by converting them into something that implements `IntoResponse`
/// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work
/// TODO: use this. i can't get <https://docs.rs/axum/latest/axum/error_handling/index.html> to work
/// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult`
pub async fn handle_anyhow_error(
code: Option<StatusCode>,

View File

@ -21,34 +21,12 @@ use tracing_subscriber::EnvFilter;
use crate::app::{flatten_handle, Web3ProxyApp};
use crate::config::{AppConfig, CliConfig};
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
}
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.init();
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();
// initial configuration from flags
let cli_config: CliConfig = argh::from_env();
// advanced configuration
info!("Loading rpc config @ {}", cli_config.config);
let rpc_config: String = fs::read_to_string(cli_config.config)?;
let rpc_config: AppConfig = toml::from_str(&rpc_config)?;
debug!(?rpc_config);
// TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
fn run(
shutdown_receiver: flume::Receiver<()>,
cli_config: CliConfig,
app_config: AppConfig,
) -> anyhow::Result<()> {
debug!(?cli_config, ?app_config);
// spawn a thread for deadlock detection
thread::spawn(move || loop {
@ -71,7 +49,7 @@ fn main() -> anyhow::Result<()> {
// set up tokio's async runtime
let mut rt_builder = runtime::Builder::new_multi_thread();
let chain_id = rpc_config.shared.chain_id;
let chain_id = app_config.shared.chain_id;
rt_builder.enable_all().thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
@ -93,11 +71,12 @@ fn main() -> anyhow::Result<()> {
debug!(?num_workers);
rt.block_on(async {
let (app, app_handle) = Web3ProxyApp::spawn(rpc_config, num_workers).await?;
let (app, app_handle) = Web3ProxyApp::spawn(app_config, num_workers).await?;
let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app));
// if everything is working, these should both run forever
// TODO: select on the shutdown marker, here?
tokio::select! {
x = app_handle => {
match x {
@ -115,8 +94,129 @@ fn main() -> anyhow::Result<()> {
}
}
}
_ = shutdown_receiver.recv_async() => {
info!("shutdown signal");
// TODO: wait for outstanding requests to complete. graceful shutdown will make our users happier
return Ok(())
}
};
Ok(())
})
}
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
}
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.init();
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();
// initial configuration from flags
let cli_config: CliConfig = argh::from_env();
// advanced configuration is on disk
info!("Loading rpc config @ {}", cli_config.config);
let app_config: String = fs::read_to_string(cli_config.config.clone())?;
let app_config: AppConfig = toml::from_str(&app_config)?;
// TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3-proxy-{}", app_config.shared.chain_id));
let (_shutdown_sender, shutdown_receiver) = flume::bounded(1);
run(shutdown_receiver, cli_config, app_config)
}
#[cfg(test)]
mod tests {
use ethers::{
prelude::{Http, Provider, U256},
utils::Anvil,
};
use hashbrown::HashMap;
use std::env;
use crate::config::{RpcSharedConfig, Web3ConnectionConfig};
use super::*;
#[tokio::test]
async fn it_works() {
// TODO: move basic setup into a test fixture
let path = env::var("PATH").unwrap();
println!("path: {}", path);
// TODO: opton for super verbose option
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.init();
let anvil = Anvil::new().spawn();
println!("Anvil running at `{}`", anvil.endpoint());
let provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
// mine a block because my code doesn't like being on block 0
// TODO: make block 0 okay?
let head_block_num: U256 = provider.request("evm_mine", None::<()>).await.unwrap();
// make a test CliConfig
let cli_config = CliConfig {
port: 0,
workers: 2,
config: "./does/not/exist/test.toml".to_string(),
};
// make a test AppConfig
let app_config = AppConfig {
shared: RpcSharedConfig {
chain_id: 31337,
redis_url: None,
public_rate_limit_per_minute: 0,
response_cache_max_bytes: 10_usize.pow(7),
},
balanced_rpcs: HashMap::from([
(
"anvil".to_string(),
Web3ConnectionConfig::new(anvil.endpoint(), 100, None),
),
(
"anvil_ws".to_string(),
Web3ConnectionConfig::new(anvil.ws_endpoint(), 100, None),
),
]),
private_rpcs: None,
};
let (shutdown_sender, shutdown_receiver) = flume::bounded(1);
// spawn another thread for running the app
// TODO:
let handle = thread::spawn(move || run(shutdown_receiver, cli_config, app_config));
// TODO: do something to the node. query latest block, mine another block, query again
shutdown_sender.send(()).unwrap();
println!("waiting for shutdown...");
handle.join().unwrap().unwrap();
}
}