better error handling
This commit is contained in:
parent
96fe04b6c8
commit
52bea50745
2
TODO.md
2
TODO.md
@ -39,7 +39,7 @@
|
|||||||
- [ ] automatically route to archive server when necessary
|
- [ ] automatically route to archive server when necessary
|
||||||
- [ ] handle log subscriptions
|
- [ ] handle log subscriptions
|
||||||
- [ ] basic request method stats
|
- [ ] basic request method stats
|
||||||
- [ ] http servers should check block at the very start
|
- [x] http servers should check block at the very start
|
||||||
- [ ] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription?
|
- [ ] Got warning: "WARN subscribe_new_heads:send_block: web3_proxy::connection: unable to get block from https://rpc.ethermine.org: Deserialization Error: expected value at line 1 column 1. Response: error code: 1015". this is cloudflare rate limiting on fetching a block, but this is a private rpc. why is there a block subscription?
|
||||||
|
|
||||||
## V1
|
## V1
|
||||||
|
@ -440,7 +440,6 @@ impl Web3Connection {
|
|||||||
// TODO: what should this interval be? probably automatically set to some fraction of block time
|
// TODO: what should this interval be? probably automatically set to some fraction of block time
|
||||||
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
||||||
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
|
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
|
||||||
// TODO:
|
|
||||||
let mut interval = interval(Duration::from_secs(60));
|
let mut interval = interval(Duration::from_secs(60));
|
||||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||||
|
|
||||||
@ -472,26 +471,17 @@ impl Web3Connection {
|
|||||||
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
|
||||||
let mut stream = provider.subscribe_pending_txs().await?;
|
let mut stream = provider.subscribe_pending_txs().await?;
|
||||||
|
|
||||||
|
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
|
|
||||||
// TODO: query existing pending txs?
|
while let Some(pending_tx_id) = stream.next().await {
|
||||||
|
|
||||||
// TODO: should the stream have a timeout on it here?
|
|
||||||
// TODO: i don't think loop match is what we want. i think while let would be better
|
|
||||||
loop {
|
|
||||||
match stream.next().await {
|
|
||||||
Some(pending_tx_id) => {
|
|
||||||
tx_id_sender
|
tx_id_sender
|
||||||
.send_async((pending_tx_id, self.clone()))
|
.send_async((pending_tx_id, self.clone()))
|
||||||
.await
|
.await
|
||||||
.context("tx_id_sender")?;
|
.context("tx_id_sender")?;
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
warn!("subscription ended");
|
warn!("subscription ended");
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -32,10 +32,13 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.compact()
|
.compact()
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
|
// this probably won't matter for us in docker, but better safe than sorry
|
||||||
fdlimit::raise_fd_limit();
|
fdlimit::raise_fd_limit();
|
||||||
|
|
||||||
|
// initial configuration from flags
|
||||||
let cli_config: CliConfig = argh::from_env();
|
let cli_config: CliConfig = argh::from_env();
|
||||||
|
|
||||||
|
// advanced configuration
|
||||||
info!("Loading rpc config @ {}", cli_config.config);
|
info!("Loading rpc config @ {}", cli_config.config);
|
||||||
let rpc_config: String = fs::read_to_string(cli_config.config)?;
|
let rpc_config: String = fs::read_to_string(cli_config.config)?;
|
||||||
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
|
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
|
||||||
@ -45,10 +48,9 @@ fn main() -> anyhow::Result<()> {
|
|||||||
// TODO: this doesn't seem to do anything
|
// TODO: this doesn't seem to do anything
|
||||||
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
|
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
|
||||||
|
|
||||||
let chain_id = rpc_config.shared.chain_id;
|
|
||||||
|
|
||||||
let mut rt_builder = runtime::Builder::new_multi_thread();
|
let mut rt_builder = runtime::Builder::new_multi_thread();
|
||||||
|
|
||||||
|
let chain_id = rpc_config.shared.chain_id;
|
||||||
rt_builder.enable_all().thread_name_fn(move || {
|
rt_builder.enable_all().thread_name_fn(move || {
|
||||||
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
|
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
|
||||||
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
|
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
|
||||||
@ -61,8 +63,6 @@ fn main() -> anyhow::Result<()> {
|
|||||||
rt_builder.worker_threads(cli_config.workers);
|
rt_builder.worker_threads(cli_config.workers);
|
||||||
}
|
}
|
||||||
|
|
||||||
let rt = rt_builder.build()?;
|
|
||||||
|
|
||||||
// spawn a thread for deadlock detection
|
// spawn a thread for deadlock detection
|
||||||
thread::spawn(move || loop {
|
thread::spawn(move || loop {
|
||||||
thread::sleep(Duration::from_secs(10));
|
thread::sleep(Duration::from_secs(10));
|
||||||
@ -81,7 +81,8 @@ fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// spawn the root task
|
// start tokio's async runtime
|
||||||
|
let rt = rt_builder.build()?;
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
let (app, app_handle) = rpc_config.spawn().await?;
|
let (app, app_handle) = rpc_config.spawn().await?;
|
||||||
|
|
||||||
@ -90,12 +91,20 @@ fn main() -> anyhow::Result<()> {
|
|||||||
// if everything is working, these should both run forever
|
// if everything is working, these should both run forever
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
x = app_handle => {
|
x = app_handle => {
|
||||||
// TODO: error log if error
|
match x {
|
||||||
info!(?x, "app_handle exited");
|
Ok(_) => info!("app_handle exited"),
|
||||||
|
Err(e) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
x = flatten_handle(frontend_handle) => {
|
x = flatten_handle(frontend_handle) => {
|
||||||
// TODO: error log if error
|
match x {
|
||||||
info!(?x, "frontend exited");
|
Ok(_) => info!("frontend exited"),
|
||||||
|
Err(e) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user