toml config

This commit is contained in:
Bryan Stitt 2022-05-03 04:02:52 +00:00
parent afd25649fb
commit 651494a278
5 changed files with 31 additions and 84 deletions

1
Cargo.lock generated

@ -3770,6 +3770,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"url",

@ -21,6 +21,7 @@ reqwest = { version = "0.11.10", features = ["json", "rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] }
toml = "*"
tracing = "0.1.34"
tracing-subscriber = "0.3.11"
url = "2.2.2"

@ -1,13 +1,8 @@
use ethers::prelude::{Block, TxHash};
use governor::clock::QuantaClock;
use serde::Deserialize;
use std::collections::{BTreeMap, HashMap};
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::block_watcher::BlockWatcherSender;
// use crate::block_watcher::BlockWatcher;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
@ -47,30 +42,16 @@ impl Web3ConnectionConfig {
pub async fn try_build(
self,
clock: &QuantaClock,
block_watcher_sender: BlockWatcherSender,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_rate_limiter = if let Some(hard_limit) = self.hard_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(hard_limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock);
Some(rate_limiter)
} else {
None
};
Web3Connection::try_new(
self.url,
http_client,
block_watcher_sender,
hard_rate_limiter,
self.hard_limit,
Some(clock),
self.soft_limit,
)
.await
}
pub fn url(&self) -> &str {
&self.url
.map(Arc::new)
}
}

@ -13,6 +13,7 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::warn;
use crate::config::Web3ConnectionConfig;
use crate::connection::{JsonRpcForwardedResponse, Web3Connection};
#[derive(Clone, Default)]
@ -60,7 +61,7 @@ impl fmt::Debug for Web3Connections {
impl Web3Connections {
pub async fn try_new(
// TODO: servers should be a Web3ConnectionBuilder struct
servers: Vec<(&str, u32, Option<u32>)>,
servers: Vec<Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
clock: &QuantaClock,
) -> anyhow::Result<Arc<Self>> {
@ -68,17 +69,8 @@ impl Web3Connections {
let num_connections = servers.len();
for (s, soft_rate_limit, hard_rate_limit) in servers.into_iter() {
let connection = Web3Connection::try_new(
s.to_string(),
http_client.clone(),
hard_rate_limit,
Some(clock),
soft_rate_limit,
)
.await?;
let connection = Arc::new(connection);
for server_config in servers.into_iter() {
let connection = server_config.try_build(clock, http_client.clone()).await?;
connections.push(connection);
}
@ -90,6 +82,7 @@ impl Web3Connections {
for connection in connections.inner.iter() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right
let connection = Arc::clone(connection);
let connections = connections.clone();
tokio::spawn(async move {

@ -1,10 +1,13 @@
mod config;
mod connection;
mod connections;
use config::Web3ConnectionConfig;
use futures::future;
use governor::clock::{Clock, QuantaClock};
use serde_json::json;
use std::fmt;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
@ -13,6 +16,7 @@ use tracing::warn;
use warp::Filter;
use warp::Reply;
use crate::config::RootConfig;
use crate::connection::JsonRpcRequest;
use crate::connections::Web3Connections;
@ -25,7 +29,7 @@ static APP_USER_AGENT: &str = concat!(
/// The application
// TODO: this debug impl is way too verbose. make something smaller
struct Web3ProxyApp {
pub struct Web3ProxyApp {
/// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate)
clock: QuantaClock,
@ -44,25 +48,11 @@ impl fmt::Debug for Web3ProxyApp {
impl Web3ProxyApp {
async fn try_new(
balanced_rpc_tiers: Vec<Vec<(&str, u32, Option<u32>)>>,
private_rpcs: Vec<(&str, u32, Option<u32>)>,
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default();
let mut rpcs = vec![];
for balanced_rpc_tier in balanced_rpc_tiers.iter() {
for rpc_data in balanced_rpc_tier {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
}
for rpc_data in private_rpcs.iter() {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
@ -229,51 +219,30 @@ impl Web3ProxyApp {
}
#[tokio::main]
async fn main() {
async fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
// TODO: use flags for the config path
let config = "./data/config/example.toml";
let config: String = fs::read_to_string(config)?;
let config: RootConfig = toml::from_str(&config)?;
// TODO: load the config from yaml instead of hard coding
// TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
let listen_port = 8445;
// TODO: what should this be? 0 will cause a thundering herd
let listen_port = config.config.listen_port;
let state = Web3ProxyApp::try_new(
vec![
// local nodes
vec![
("ws://127.0.0.1:8545", 68_800, None),
("ws://127.0.0.1:8946", 152_138, None),
],
// paid nodes
// TODO: add paid nodes (with rate limits)
// vec![
// // chainstack.com archive
// // moralis free (25/sec rate limit)
// ],
// free nodes
vec![
("https://main-rpc.linkpool.io", 4_779, None), // linkpool is slow and often offline
("https://rpc.ankr.com/eth", 23_967, None),
],
],
vec![
("https://api.edennetwork.io/v1/", 1_805, None),
("https://api.edennetwork.io/v1/beta", 300, None),
("https://rpc.ethermine.org/", 5_861, None),
("https://rpc.flashbots.net", 7074, None),
],
)
.await
.unwrap();
let app = config.try_build().await?;
let state: Arc<Web3ProxyApp> = Arc::new(state);
let app: Arc<Web3ProxyApp> = Arc::new(app);
let proxy_rpc_filter = warp::any()
.and(warp::post())
.and(warp::body::json())
.then(move |json_body| state.clone().proxy_web3_rpc(json_body));
.then(move |json_body| app.clone().proxy_web3_rpc(json_body));
// TODO: filter for displaying connections and their block heights
@ -282,6 +251,8 @@ async fn main() {
let routes = proxy_rpc_filter.map(handle_anyhow_errors);
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
Ok(())
}
/// convert result into an http response. use this at the end of your warp filter