add chain_id to proc title

This commit is contained in:
Bryan Stitt 2022-05-12 18:49:57 +00:00
parent 030e458658
commit 397d4d03e5
10 changed files with 125 additions and 41 deletions

12
Cargo.lock generated

@ -2496,6 +2496,17 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "proctitle"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "924cd8a0de90723d63fed19c5035ea129913a0bc998b37686a67f1eaf6a2aab5"
dependencies = [
"lazy_static",
"libc",
"winapi",
]
[[package]]
name = "quanta"
version = "0.9.3"
@ -3854,6 +3865,7 @@ dependencies = [
"hashbrown 0.12.1",
"linkedhashmap",
"parking_lot 0.12.0",
"proctitle",
"regex",
"reqwest",
"rustc-hash",

@ -72,6 +72,7 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest
## Todo
- [ ] some production configs are using 100% cpu
- [ ] after connecting to a server, check that it gives the expected chainId
- [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced.
- [ ] proper logging with useful instrumentation
@ -79,6 +80,9 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest
- [ ] if a request gets a socket timeout, try on another server
- maybe always try at least two servers in parallel? and then return the first? or only if the first one doesn't respond very quickly?
- [ ] incoming rate limiting (by ip or by api key or what?)
- [ ] improve caching
- [ ] if the params include a block, we can cache for longer
- [ ] if the call is something simple like "symbol" or "decimals", cache that too
- [ ] measure latency to nodes?
- [ ] one proxy for mulitple chains?
- [ ] zero downtime deploys

@ -1,3 +1,6 @@
[shared]
chain_id = 1
[balanced_rpc_tiers]
[balanced_rpc_tiers.0]

@ -1 +1 @@
https://github.com/quininer/linkedhashmap
https://github.com/quininer/linkedhashmap

@ -17,6 +17,7 @@ governor = { version = "0.4.2", features = ["dashmap", "std"] }
hashbrown = "0.12.1"
linkedhashmap = { path = "../linkedhashmap" }
parking_lot = "0.12.0"
proctitle = "0.1.1"
regex = "1.5.5"
reqwest = { version = "0.11.10", default-features = false, features = ["json", "rustls"] }
rustc-hash = "1.1.0"

@ -30,7 +30,8 @@ static APP_USER_AGENT: &str = concat!(
const RESPONSE_CACHE_CAP: usize = 1024;
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
type ResponseLruCache = RwLock<LinkedHashMap<(u64, String, String), JsonRpcForwardedResponse>>;
type ResponseLruCache =
RwLock<LinkedHashMap<(u64, String, Option<String>), JsonRpcForwardedResponse>>;
/// The application
// TODO: this debug impl is way too verbose. make something smaller
@ -61,6 +62,7 @@ impl fmt::Debug for Web3ProxyApp {
impl Web3ProxyApp {
pub async fn try_new(
chain_id: usize,
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> {
@ -81,6 +83,7 @@ impl Web3ProxyApp {
let balanced_rpc_tiers =
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
Web3Connections::try_new(
chain_id,
best_head_block_number.clone(),
balanced_rpc_tier,
Some(http_client.clone()),
@ -100,6 +103,7 @@ impl Web3ProxyApp {
} else {
Some(
Web3Connections::try_new(
chain_id,
best_head_block_number.clone(),
private_rpcs,
Some(http_client),
@ -225,6 +229,9 @@ impl Web3ProxyApp {
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
} else {
// TODO: what should we do here?
return Err(anyhow::anyhow!("no private rpcs!"));
}
}
};
@ -246,7 +253,7 @@ impl Web3ProxyApp {
for balanced_rpcs in rpc_iter {
let best_head_block_number =
self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections
self.best_head_block_number.load(atomic::Ordering::Acquire);
let best_rpc_block_number = balanced_rpcs.head_block_number();
@ -259,11 +266,7 @@ impl Web3ProxyApp {
let cache_key = (
best_head_block_number,
request.method.clone(),
request
.params
.clone()
.map(|x| x.to_string())
.unwrap_or_else(|| "[]".to_string()),
request.params.clone().map(|x| x.to_string()),
);
if let Some(cached) = self.response_cache.read().await.get(&cache_key) {
@ -302,6 +305,7 @@ impl Web3ProxyApp {
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key, response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
@ -412,7 +416,7 @@ impl Web3ProxyApp {
// TODO: how long should we wait?
// TODO: max wait time?
warn!("No servers in sync!");
// TODO: return json error? return a 502?
// TODO: return a 502?
return Err(anyhow::anyhow!("no servers in sync"));
};
}

@ -21,11 +21,19 @@ pub struct CliConfig {
#[derive(Deserialize)]
pub struct RpcConfig {
// BTreeMap so that iterating keeps the same order
pub shared: RpcSharedConfig,
// BTreeMap so that iterating keeps the same order. we want tier 0 before tier 1!
pub balanced_rpc_tiers: BTreeMap<String, HashMap<String, Web3ConnectionConfig>>,
pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>,
}
/// shared configuration between Web3Connections
#[derive(Deserialize)]
pub struct RpcSharedConfig {
/// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config"
pub chain_id: usize,
}
#[derive(Deserialize)]
pub struct Web3ConnectionConfig {
url: String,
@ -48,7 +56,7 @@ impl RpcConfig {
vec![]
};
Web3ProxyApp::try_new(balanced_rpc_tiers, private_rpcs).await
Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpc_tiers, private_rpcs).await
}
}
@ -57,9 +65,11 @@ impl Web3ConnectionConfig {
pub async fn try_build(
self,
clock: &QuantaClock,
chain_id: usize,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
Web3Connection::try_new(
chain_id,
self.url,
http_client,
self.hard_limit,
@ -67,6 +77,5 @@ impl Web3ConnectionConfig {
self.soft_limit,
)
.await
.map(Arc::new)
}
}

@ -7,7 +7,6 @@ use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use serde_json::value::RawValue;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
@ -66,13 +65,14 @@ impl fmt::Display for Web3Connection {
impl Web3Connection {
/// Connect to a web3 rpc and subscribe to new heads
pub async fn try_new(
chain_id: usize,
url_str: String,
http_client: Option<reqwest::Client>,
hard_rate_limit: Option<u32>,
clock: &QuantaClock,
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Web3Connection> {
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
@ -108,7 +108,7 @@ impl Web3Connection {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(Web3Connection {
let connection = Web3Connection {
clock: clock.clone(),
url: url_str.clone(),
active_requests: Default::default(),
@ -116,7 +116,35 @@ impl Web3Connection {
ratelimiter: hard_rate_limiter,
soft_limit,
head_block_number: 0.into(),
})
};
let connection = Arc::new(connection);
// TODO: check the chain_id here
let active_request_handle = connection.wait_for_request_handle().await;
// TODO: passing empty_params like this feels awkward.
let empty_params: Option<()> = None;
let found_chain_id: String = active_request_handle
.request("eth_chainId", empty_params)
.await
.unwrap();
let found_chain_id =
usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap();
if chain_id != found_chain_id {
return Err(anyhow::anyhow!(
"incorrect chain id! Expected {}. Found {}",
chain_id,
found_chain_id
));
}
// TODO: use anyhow
assert_eq!(chain_id, found_chain_id);
Ok(connection)
}
#[inline]
@ -233,7 +261,6 @@ impl Web3Connection {
}
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// rate limits
loop {
match self.try_request_handle() {
Ok(pending_request_handle) => return pending_request_handle,
@ -244,8 +271,6 @@ impl Web3Connection {
}
}
}
// TODO: return a thing that when we drop it decrements?
}
pub fn try_request_handle(
@ -290,11 +315,15 @@ impl ActiveRequestHandle {
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// By taking self here, we ensure that this is dropped after the request is complete
pub async fn request(
self,
pub async fn request<T, R>(
&self,
method: &str,
params: &Option<Box<serde_json::value::RawValue>>,
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
params: T,
) -> Result<R, ethers::prelude::ProviderError>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
// TODO: this should probably be trace level and use a span
// TODO: it would be nice to have the request id on this
trace!("Sending {}({:?}) to {}", method, params, self.0);

@ -61,6 +61,7 @@ impl fmt::Debug for Web3Connections {
impl Web3Connections {
pub async fn try_new(
chain_id: usize,
best_head_block_number: Arc<AtomicU64>,
servers: Vec<Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
@ -72,7 +73,10 @@ impl Web3Connections {
let num_connections = servers.len();
for server_config in servers.into_iter() {
match server_config.try_build(clock, http_client.clone()).await {
match server_config
.try_build(clock, chain_id, http_client.clone())
.await
{
Ok(connection) => connections.push(connection),
Err(e) => warn!("Unable to connect to a server! {}", e),
}

@ -5,7 +5,9 @@ mod connections;
mod jsonrpc;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use tokio::runtime;
use tracing::info;
use warp::Filter;
use warp::Reply;
@ -13,8 +15,7 @@ use warp::Reply;
use crate::app::Web3ProxyApp;
use crate::config::{CliConfig, RpcConfig};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
@ -24,27 +25,44 @@ async fn main() -> anyhow::Result<()> {
let rpc_config: String = fs::read_to_string(cli_config.rpc_config_path)?;
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
// TODO: load the config from yaml instead of hard coding
// TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
let listen_port = cli_config.listen_port;
// TODO: setting title inside of tokio doesnt seem to work. lets do it outside of tokio
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
let app = rpc_config.try_build().await?;
let chain_id = rpc_config.shared.chain_id;
let app: Arc<Web3ProxyApp> = Arc::new(app);
let rt = runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering?
let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst);
// TODO: i think these max at 15 characters
format!("web3-{}-{}", chain_id, worker_id)
})
.build()?;
let proxy_rpc_filter = warp::any()
.and(warp::post())
.and(warp::body::json())
.then(move |json_body| app.clone().proxy_web3_rpc(json_body));
// spawn the root task
rt.block_on(async {
let listen_port = cli_config.listen_port;
// TODO: filter for displaying connections and their block heights
let app = rpc_config.try_build().await.unwrap();
// TODO: warp trace is super verbose. how do we make this more readable?
// let routes = proxy_rpc_filter.with(warp::trace::request());
let routes = proxy_rpc_filter.map(handle_anyhow_errors);
let app: Arc<Web3ProxyApp> = Arc::new(app);
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
let proxy_rpc_filter = warp::any()
.and(warp::post())
.and(warp::body::json())
.then(move |json_body| app.clone().proxy_web3_rpc(json_body));
// TODO: filter for displaying connections and their block heights
// TODO: warp trace is super verbose. how do we make this more readable?
// let routes = proxy_rpc_filter.with(warp::trace::request());
let routes = proxy_rpc_filter.map(handle_anyhow_errors);
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
});
Ok(())
}