instrument
This commit is contained in:
parent
6ab2b3a533
commit
9213e1a796
2
TODO.md
2
TODO.md
@ -9,9 +9,9 @@
|
||||
- [ ] endpoint for health checks. if no synced servers, give a 502 error
|
||||
- [ ] move from warp to auxm?
|
||||
- [ ] some production configs are occassionally stuck waiting at 100% cpu
|
||||
- looks like its getting stuck on `futex(0x7fc15067b478, FUTEX_WAIT_PRIVATE, 1, NULL`
|
||||
- they stop processing new blocks. i'm guessing 2 blocks arrive at the same time, but i thought our locks would handle that
|
||||
- even after removing a bunch of the locks, the deadlock still happens. i can't reliably reproduce. i just let it run for awhile and it happens.
|
||||
- running gdb shows the thread at tokio tungstenite thread is spinning near 100% cpu and none of the rest of the program is proceeding
|
||||
- [ ] proper logging with useful instrumentation
|
||||
- [ ] handle websocket disconnect and reconnect
|
||||
- [ ] warning if no blocks for too long. maybe reconnect automatically?
|
||||
|
@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{trace, warn};
|
||||
use tracing::{debug, instrument, trace, warn};
|
||||
|
||||
static APP_USER_AGENT: &str = concat!(
|
||||
"satoshiandkin/",
|
||||
@ -56,6 +56,7 @@ impl fmt::Debug for Web3ProxyApp {
|
||||
}
|
||||
|
||||
impl Web3ProxyApp {
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
balanced_rpcs: Vec<Web3ConnectionConfig>,
|
||||
@ -102,11 +103,12 @@ impl Web3ProxyApp {
|
||||
|
||||
/// send the request to the approriate RPCs
|
||||
/// TODO: dry this up
|
||||
#[instrument(skip_all)]
|
||||
pub async fn proxy_web3_rpc(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
request: JsonRpcRequestEnum,
|
||||
) -> anyhow::Result<impl warp::Reply> {
|
||||
trace!("Received request: {:?}", request);
|
||||
debug!("Received request: {:?}", request);
|
||||
|
||||
let response = match request {
|
||||
JsonRpcRequestEnum::Single(request) => {
|
||||
@ -117,9 +119,12 @@ impl Web3ProxyApp {
|
||||
}
|
||||
};
|
||||
|
||||
debug!("Forwarding response: {:?}", response);
|
||||
|
||||
Ok(warp::reply::json(&response))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn proxy_web3_rpc_requests(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
requests: Vec<JsonRpcRequest>,
|
||||
@ -149,6 +154,7 @@ impl Web3ProxyApp {
|
||||
Ok(collected)
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn proxy_web3_rpc_request(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
request: JsonRpcRequest,
|
||||
@ -234,6 +240,9 @@ impl Web3ProxyApp {
|
||||
if let Some(cached) = self.response_cache.read().get(&cache_key) {
|
||||
let _ = self.active_requests.remove(&cache_key);
|
||||
|
||||
// TODO: emit a stat
|
||||
trace!("cache hit!");
|
||||
|
||||
return Ok(cached.to_owned());
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ use governor::clock::QuantaClock;
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::connection::Web3Connection;
|
||||
use crate::Web3ProxyApp;
|
||||
@ -42,6 +43,7 @@ pub struct Web3ConnectionConfig {
|
||||
|
||||
impl RpcConfig {
|
||||
/// Create a Web3ProxyApp from config
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
|
||||
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
|
||||
|
||||
@ -57,6 +59,7 @@ impl RpcConfig {
|
||||
|
||||
impl Web3ConnectionConfig {
|
||||
/// Create a Web3Connection from config
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_build(
|
||||
self,
|
||||
clock: &QuantaClock,
|
||||
|
@ -7,13 +7,14 @@ use governor::middleware::NoOpMiddleware;
|
||||
use governor::state::{InMemoryState, NotKeyed};
|
||||
use governor::NotUntil;
|
||||
use governor::RateLimiter;
|
||||
use parking_lot::RwLock;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
use std::sync::atomic::{self, AtomicU32};
|
||||
use std::time::Duration;
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use tokio::time::{interval, sleep, MissedTickBehavior};
|
||||
use tracing::{info, trace, warn};
|
||||
use tokio::time::{interval, sleep, timeout_at, Instant, MissedTickBehavior};
|
||||
use tracing::{info, instrument, trace, warn};
|
||||
|
||||
type Web3RateLimiter =
|
||||
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
||||
@ -38,6 +39,7 @@ pub struct Web3Connection {
|
||||
url: String,
|
||||
/// keep track of currently open requests. We sort on this
|
||||
active_requests: AtomicU32,
|
||||
// TODO: put this in a RwLock so that we can replace it if re-connecting
|
||||
provider: Web3Provider,
|
||||
ratelimiter: Option<Web3RateLimiter>,
|
||||
/// used for load balancing to the least loaded server
|
||||
@ -61,7 +63,11 @@ impl fmt::Display for Web3Connection {
|
||||
}
|
||||
|
||||
impl Web3Connection {
|
||||
#[instrument(skip_all)]
|
||||
async fn reconnect(&self) {}
|
||||
|
||||
/// Connect to a web3 rpc and subscribe to new heads
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
url_str: String,
|
||||
@ -164,6 +170,7 @@ impl Web3Connection {
|
||||
&self.url
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn send_block(
|
||||
self: &Arc<Self>,
|
||||
block: Result<Block<TxHash>, ProviderError>,
|
||||
@ -187,7 +194,7 @@ impl Web3Connection {
|
||||
}
|
||||
|
||||
/// Subscribe to new blocks
|
||||
// #[instrument]
|
||||
#[instrument(skip_all)]
|
||||
pub async fn subscribe_new_heads(
|
||||
self: Arc<Self>,
|
||||
block_sender: flume::Sender<(u64, H256, Arc<Self>)>,
|
||||
@ -260,9 +267,15 @@ impl Web3Connection {
|
||||
|
||||
self.send_block(block, &block_sender).await;
|
||||
|
||||
while let Some(new_block) = stream.next().await {
|
||||
// TODO: what should this timeout be? needs to be larger than worst case block time
|
||||
// TODO: although reconnects will make this less of an issue
|
||||
while let Ok(Some(new_block)) =
|
||||
timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await
|
||||
{
|
||||
self.send_block(Ok(new_block), &block_sender).await;
|
||||
}
|
||||
|
||||
// TODO: re-connect!
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,6 +284,7 @@ impl Web3Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
|
||||
// TODO: maximum wait time
|
||||
|
||||
@ -331,6 +345,7 @@ impl ActiveRequestHandle {
|
||||
/// Send a web3 request
|
||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||
/// By taking self here, we ensure that this is dropped after the request is complete
|
||||
#[instrument(skip_all)]
|
||||
pub async fn request<T, R>(
|
||||
&self,
|
||||
method: &str,
|
||||
|
@ -11,7 +11,7 @@ use serde_json::value::RawValue;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use tracing::{info, trace, warn};
|
||||
use tracing::{info, instrument, trace, warn};
|
||||
|
||||
use crate::config::Web3ConnectionConfig;
|
||||
use crate::connection::{ActiveRequestHandle, Web3Connection};
|
||||
@ -140,6 +140,7 @@ impl fmt::Debug for Web3Connections {
|
||||
}
|
||||
|
||||
impl Web3Connections {
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_new(
|
||||
chain_id: usize,
|
||||
servers: Vec<Web3ConnectionConfig>,
|
||||
@ -217,6 +218,7 @@ impl Web3Connections {
|
||||
}
|
||||
|
||||
/// Send the same request to all the handles. Returning the fastest successful result.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn try_send_parallel_requests(
|
||||
self: Arc<Self>,
|
||||
active_request_handles: Vec<ActiveRequestHandle>,
|
||||
@ -280,6 +282,7 @@ impl Web3Connections {
|
||||
}
|
||||
|
||||
/// TODO: possible dead lock here. investigate more. probably refactor
|
||||
#[instrument(skip_all)]
|
||||
async fn update_synced_rpcs(
|
||||
&self,
|
||||
block_receiver: flume::Receiver<(u64, H256, Arc<Web3Connection>)>,
|
||||
@ -305,6 +308,7 @@ impl Web3Connections {
|
||||
}
|
||||
|
||||
/// get the best available rpc server
|
||||
#[instrument(skip_all)]
|
||||
pub async fn next_upstream_server(
|
||||
&self,
|
||||
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
|
||||
|
@ -161,7 +161,7 @@ impl fmt::Debug for JsonRpcForwardedResponse {
|
||||
}
|
||||
|
||||
/// JSONRPC Responses can include one or many response objects.
|
||||
#[derive(Clone, Serialize)]
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum JsonRpcForwardedResponseEnum {
|
||||
Single(JsonRpcForwardedResponse),
|
||||
|
@ -13,7 +13,7 @@ use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tokio::runtime;
|
||||
use tracing::info;
|
||||
use tracing::{info, warn};
|
||||
use warp::Filter;
|
||||
use warp::Reply;
|
||||
|
||||
@ -96,6 +96,8 @@ fn handle_anyhow_errors<T: warp::Reply>(
|
||||
match res {
|
||||
Ok(r) => r.into_response(),
|
||||
Err(e) => {
|
||||
warn!("Responding with an error: {:?}", e);
|
||||
|
||||
let e = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
// TODO: what id can we use? how do we make sure the incoming id gets attached to this?
|
||||
|
Loading…
Reference in New Issue
Block a user