web3-proxy/src/main.rs

526 lines
20 KiB
Rust
Raw Normal View History

2022-04-26 23:33:37 +03:00
mod block_watcher;
mod provider;
2022-04-27 02:53:58 +03:00
mod provider_tiers;
2022-04-26 09:54:24 +03:00
2022-04-25 00:54:29 +03:00
use futures::future;
2022-04-29 04:57:16 +03:00
use futures::stream::FuturesUnordered;
use futures::StreamExt;
2022-04-27 02:53:58 +03:00
use governor::clock::{Clock, QuantaClock};
2022-04-29 19:03:11 +03:00
use serde::{Deserialize, Serialize};
use serde_json::json;
2022-04-29 19:03:11 +03:00
use serde_json::value::RawValue;
2022-04-28 03:08:30 +03:00
use std::collections::HashMap;
use std::fmt;
2022-03-05 06:46:57 +03:00
use std::sync::Arc;
2022-04-25 22:14:10 +03:00
use std::time::Duration;
2022-04-28 03:08:30 +03:00
use tokio::sync::{mpsc, watch, RwLock};
2022-04-25 04:12:07 +03:00
use tokio::time::sleep;
use tracing::warn;
2022-03-05 06:46:57 +03:00
use warp::Filter;
2022-04-26 23:33:37 +03:00
// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap};
2022-04-27 02:53:58 +03:00
use crate::block_watcher::BlockWatcher;
use crate::provider_tiers::{Web3ConnectionMap, Web3ProviderTier};
2022-04-26 23:33:37 +03:00
2022-04-26 09:54:24 +03:00
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
2022-04-29 19:03:11 +03:00
#[derive(Clone, Deserialize)]
struct JsonRpcRequest {
jsonrpc: Box<RawValue>,
id: Box<RawValue>,
method: String,
params: Box<RawValue>,
}
#[derive(Clone, Serialize)]
struct JsonRpcForwardedResponse {
jsonrpc: Box<RawValue>,
id: Box<RawValue>,
result: Box<RawValue>,
}
2022-04-26 23:33:37 +03:00
/// The application
2022-04-28 22:30:22 +03:00
// TODO: this debug impl is way too verbose. make something smaller
2022-04-26 23:33:37 +03:00
struct Web3ProxyApp {
2022-04-26 20:03:38 +03:00
/// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate)
2022-04-25 04:12:07 +03:00
clock: QuantaClock,
2022-04-26 20:03:38 +03:00
/// Send requests to the best server available
2022-04-27 02:53:58 +03:00
balanced_rpc_tiers: Arc<Vec<Web3ProviderTier>>,
2022-04-26 20:03:38 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-04-27 02:53:58 +03:00
private_rpcs: Option<Arc<Web3ProviderTier>>,
2022-04-26 09:54:24 +03:00
/// write lock on these when all rate limits are hit
2022-04-28 03:08:30 +03:00
/// this lock will be held open over an await, so use async locking
2022-04-25 04:26:23 +03:00
balanced_rpc_ratelimiter_lock: RwLock<()>,
2022-04-28 03:08:30 +03:00
/// this lock will be held open over an await, so use async locking
2022-04-25 04:26:23 +03:00
private_rpcs_ratelimiter_lock: RwLock<()>,
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
write!(f, "Web3ProxyApp(...)")
}
}
2022-04-26 23:33:37 +03:00
impl Web3ProxyApp {
2022-04-26 09:54:24 +03:00
async fn try_new(
2022-04-28 03:08:30 +03:00
allowed_lag: u64,
2022-04-25 01:36:51 +03:00
balanced_rpc_tiers: Vec<Vec<(&str, u32)>>,
private_rpcs: Vec<(&str, u32)>,
2022-04-26 23:33:37 +03:00
) -> anyhow::Result<Web3ProxyApp> {
2022-04-25 04:12:07 +03:00
let clock = QuantaClock::default();
2022-04-29 01:40:29 +03:00
let mut rpcs = vec![];
for balanced_rpc_tier in balanced_rpc_tiers.iter() {
for rpc_data in balanced_rpc_tier {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
}
for rpc_data in private_rpcs.iter() {
let rpc = rpc_data.0.to_string();
rpcs.push(rpc);
}
let block_watcher = Arc::new(BlockWatcher::new(rpcs));
2022-04-26 23:33:37 +03:00
// make a http shared client
// TODO: how should we configure the connection pool?
2022-04-27 09:14:35 +03:00
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
2022-04-26 09:54:24 +03:00
let http_client = reqwest::ClientBuilder::new()
2022-04-29 04:57:16 +03:00
.connect_timeout(Duration::from_secs(5))
2022-04-26 09:54:24 +03:00
.timeout(Duration::from_secs(300))
.user_agent(APP_USER_AGENT)
.build()?;
let balanced_rpc_tiers = Arc::new(
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
2022-04-27 02:53:58 +03:00
Web3ProviderTier::try_new(
2022-04-26 09:54:24 +03:00
balanced_rpc_tier,
Some(http_client.clone()),
2022-04-27 04:25:01 +03:00
block_watcher.clone(),
2022-04-26 09:54:24 +03:00
&clock,
)
}))
.await
2022-04-25 22:14:10 +03:00
.into_iter()
2022-04-27 02:53:58 +03:00
.collect::<anyhow::Result<Vec<Web3ProviderTier>>>()?,
2022-04-26 09:54:24 +03:00
);
2022-04-25 22:14:10 +03:00
2022-04-26 09:54:24 +03:00
let private_rpcs = if private_rpcs.is_empty() {
2022-04-27 09:14:35 +03:00
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
2022-04-29 04:57:16 +03:00
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
2022-04-26 09:54:24 +03:00
None
} else {
Some(Arc::new(
2022-04-27 02:53:58 +03:00
Web3ProviderTier::try_new(
2022-04-26 23:33:37 +03:00
private_rpcs,
Some(http_client),
2022-04-27 04:25:01 +03:00
block_watcher.clone(),
2022-04-26 23:33:37 +03:00
&clock,
)
.await?,
2022-04-26 09:54:24 +03:00
))
};
2022-04-25 22:14:10 +03:00
2022-04-29 01:40:29 +03:00
let (new_block_sender, mut new_block_receiver) = watch::channel::<String>("".to_string());
{
// TODO: spawn this later?
// spawn a future for the block_watcher
let block_watcher = block_watcher.clone();
tokio::spawn(async move { block_watcher.run(new_block_sender).await });
}
2022-04-28 03:08:30 +03:00
{
// spawn a future for sorting our synced rpcs
// TODO: spawn this later?
let balanced_rpc_tiers = balanced_rpc_tiers.clone();
let private_rpcs = private_rpcs.clone();
let block_watcher = block_watcher.clone();
tokio::spawn(async move {
let mut tier_map = HashMap::new();
let mut private_map = HashMap::new();
for balanced_rpc_tier in balanced_rpc_tiers.iter() {
for rpc in balanced_rpc_tier.clone_rpcs() {
tier_map.insert(rpc, balanced_rpc_tier);
}
}
if let Some(private_rpcs) = private_rpcs {
for rpc in private_rpcs.clone_rpcs() {
private_map.insert(rpc, private_rpcs.clone());
}
}
while new_block_receiver.changed().await.is_ok() {
let updated_rpc = new_block_receiver.borrow().clone();
if let Some(tier) = tier_map.get(&updated_rpc) {
tier.update_synced_rpcs(block_watcher.clone(), allowed_lag)
.unwrap();
} else if let Some(tier) = private_map.get(&updated_rpc) {
tier.update_synced_rpcs(block_watcher.clone(), allowed_lag)
.unwrap();
} else {
panic!("howd this happen");
}
}
});
}
2022-04-26 23:33:37 +03:00
Ok(Web3ProxyApp {
2022-04-25 22:14:10 +03:00
clock,
balanced_rpc_tiers,
private_rpcs,
2022-04-25 04:26:23 +03:00
balanced_rpc_ratelimiter_lock: Default::default(),
private_rpcs_ratelimiter_lock: Default::default(),
2022-04-26 09:54:24 +03:00
})
2022-04-24 10:26:00 +03:00
}
2022-03-05 06:46:57 +03:00
2022-04-24 10:26:00 +03:00
/// send the request to the approriate RPCs
2022-04-26 09:54:24 +03:00
/// TODO: dry this up
2022-04-24 10:26:00 +03:00
async fn proxy_web3_rpc(
2022-04-26 23:33:37 +03:00
self: Arc<Web3ProxyApp>,
2022-04-29 19:03:11 +03:00
json_body: JsonRpcRequest,
2022-04-24 10:26:00 +03:00
) -> anyhow::Result<impl warp::Reply> {
2022-04-29 19:03:11 +03:00
if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" {
2022-04-26 09:54:24 +03:00
let private_rpcs = self.private_rpcs.clone().unwrap();
2022-04-25 04:14:34 +03:00
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
2022-04-25 04:12:07 +03:00
loop {
2022-04-25 04:26:23 +03:00
let read_lock = self.private_rpcs_ratelimiter_lock.read().await;
let json_body_clone = json_body.clone();
2022-04-28 03:08:30 +03:00
match private_rpcs.get_upstream_servers().await {
2022-04-25 04:12:07 +03:00
Ok(upstream_servers) => {
2022-04-26 10:10:13 +03:00
let (tx, mut rx) =
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
2022-04-26 09:54:24 +03:00
let clone = self.clone();
2022-04-27 02:53:58 +03:00
let connections = private_rpcs.clone_connections();
// check incoming_id before sending any requests
2022-04-29 19:03:11 +03:00
let incoming_id = &*json_body.id;
2022-04-26 09:54:24 +03:00
tokio::spawn(async move {
clone
.try_send_requests(
upstream_servers,
connections,
json_body_clone,
tx,
)
2022-04-26 09:54:24 +03:00
.await
});
let response = rx
.recv()
2022-04-25 04:12:07 +03:00
.await
2022-04-26 10:10:13 +03:00
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
2022-04-26 09:54:24 +03:00
if let Ok(partial_response) = response {
let response = json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
});
2022-04-26 10:10:13 +03:00
return Ok(warp::reply::json(&response));
}
2022-04-25 04:12:07 +03:00
}
Err(not_until) => {
2022-04-25 22:14:10 +03:00
// TODO: move this to a helper function
2022-04-25 04:30:55 +03:00
// sleep (with a lock) until our rate limits should be available
2022-04-25 04:26:23 +03:00
drop(read_lock);
2022-04-28 22:30:22 +03:00
if let Some(not_until) = not_until {
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
2022-04-25 04:26:23 +03:00
2022-04-28 22:30:22 +03:00
let deadline = not_until.wait_time_from(self.clock.now());
2022-04-25 04:26:23 +03:00
2022-04-28 22:30:22 +03:00
sleep(deadline).await;
drop(write_lock);
}
2022-04-25 04:12:07 +03:00
}
};
2022-04-24 10:26:00 +03:00
}
} else {
// this is not a private transaction (or no private relays are configured)
2022-04-26 10:10:13 +03:00
// try to send to each tier, stopping at the first success
2022-04-25 04:12:07 +03:00
loop {
2022-04-25 04:26:23 +03:00
let read_lock = self.balanced_rpc_ratelimiter_lock.read().await;
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
2022-04-25 04:12:07 +03:00
let mut earliest_not_until = None;
// check incoming_id before sending any requests
2022-04-29 19:03:11 +03:00
let incoming_id = &*json_body.id;
2022-04-25 04:12:07 +03:00
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
2022-04-28 03:08:30 +03:00
// TODO: what allowed lag?
match balanced_rpcs.next_upstream_server().await {
2022-04-25 04:12:07 +03:00
Ok(upstream_server) => {
// TODO: better type for this. right now its request (the full jsonrpc object), response (just the inner result)
2022-04-26 10:10:13 +03:00
let (tx, mut rx) =
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
2022-04-26 09:54:24 +03:00
{
// clone things so we can move them into the future and still use them here
let clone = self.clone();
let connections = balanced_rpcs.clone_connections();
let json_body = json_body.clone();
let upstream_server = upstream_server.clone();
tokio::spawn(async move {
clone
.try_send_requests(
vec![upstream_server],
connections,
json_body,
tx,
)
.await
});
}
2022-04-26 09:54:24 +03:00
let response = rx
.recv()
2022-04-25 04:12:07 +03:00
.await
2022-04-26 10:10:13 +03:00
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
2022-04-26 09:54:24 +03:00
2022-04-28 22:30:22 +03:00
let response = match response {
Ok(partial_response) => {
// TODO: trace
// info!("forwarding request from {}", upstream_server);
2022-04-28 22:30:22 +03:00
json!({
"jsonrpc": "2.0",
"id": incoming_id,
"result": partial_response
})
}
Err(e) => {
// TODO: what is the proper format for an error?
// TODO: use e
json!({
"jsonrpc": "2.0",
"id": incoming_id,
"error": format!("{}", e)
})
}
};
return Ok(warp::reply::json(&response));
2022-04-25 04:12:07 +03:00
}
2022-04-28 22:30:22 +03:00
Err(None) => {
warn!("No servers in sync!");
}
Err(Some(not_until)) => {
2022-04-25 04:12:07 +03:00
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
if earliest_not_until.is_none() {
2022-04-28 22:30:22 +03:00
earliest_not_until.replace(not_until);
2022-04-25 04:12:07 +03:00
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
2022-04-28 22:30:22 +03:00
2022-04-25 04:12:07 +03:00
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
}
2022-04-24 10:26:00 +03:00
}
}
2022-04-25 04:12:07 +03:00
2022-04-25 04:30:55 +03:00
// we haven't returned an Ok, sleep and try again
2022-04-25 22:14:10 +03:00
// TODO: move this to a helper function
2022-04-25 04:26:23 +03:00
drop(read_lock);
2022-04-25 04:12:07 +03:00
// unwrap should be safe since we would have returned if it wasn't set
2022-04-28 22:30:22 +03:00
if let Some(earliest_not_until) = earliest_not_until {
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
let deadline = earliest_not_until.wait_time_from(self.clock.now());
2022-04-25 04:30:55 +03:00
2022-04-28 22:30:22 +03:00
sleep(deadline).await;
2022-04-25 04:26:23 +03:00
2022-04-28 22:30:22 +03:00
drop(write_lock);
} else {
// TODO: how long should we wait?
// TODO: max wait time?
sleep(Duration::from_millis(500)).await;
};
2022-04-24 10:26:00 +03:00
}
}
}
2022-03-05 08:01:45 +03:00
2022-04-24 10:26:00 +03:00
async fn try_send_requests(
&self,
2022-04-26 09:54:24 +03:00
rpc_servers: Vec<String>,
2022-04-27 02:53:58 +03:00
connections: Arc<Web3ConnectionMap>,
2022-04-29 19:03:11 +03:00
json_request_body: JsonRpcRequest,
// TODO: better type for this
2022-04-26 10:10:13 +03:00
tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>,
2022-04-26 09:54:24 +03:00
) -> anyhow::Result<()> {
// {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1}
2022-04-29 19:03:11 +03:00
let method = json_request_body.method.clone();
let params = json_request_body.params;
2022-04-26 09:54:24 +03:00
2022-04-29 05:22:54 +03:00
if rpc_servers.len() == 1 {
let rpc = rpc_servers.first().unwrap();
2022-04-29 04:57:16 +03:00
2022-04-29 05:22:54 +03:00
let provider = connections.get(rpc).unwrap().clone_provider();
2022-04-29 04:57:16 +03:00
2022-04-29 05:22:54 +03:00
let response = provider.request(&method, params).await;
2022-04-24 21:56:46 +03:00
2022-04-29 05:22:54 +03:00
connections.get(rpc).unwrap().dec_active_requests();
2022-04-24 21:56:46 +03:00
2022-04-29 05:22:54 +03:00
tx.send(response.map_err(Into::into))?;
2022-04-26 20:03:38 +03:00
2022-04-29 05:22:54 +03:00
Ok(())
} else {
// TODO: lets just use a usize index or something
let method = Arc::new(method);
2022-04-24 22:55:13 +03:00
2022-04-29 05:22:54 +03:00
let mut unordered_futures = FuturesUnordered::new();
2022-04-24 22:55:13 +03:00
2022-04-29 05:22:54 +03:00
for rpc in rpc_servers {
let connections = connections.clone();
let method = method.clone();
let params = params.clone();
let tx = tx.clone();
2022-04-27 09:14:35 +03:00
2022-04-29 05:22:54 +03:00
let handle = tokio::spawn(async move {
// get the client for this rpc server
let provider = connections.get(&rpc).unwrap().clone_provider();
2022-04-26 09:54:24 +03:00
2022-04-29 05:22:54 +03:00
let response = provider.request(&method, params).await;
2022-04-29 04:57:16 +03:00
2022-04-29 05:22:54 +03:00
connections.get(&rpc).unwrap().dec_active_requests();
2022-04-26 09:54:24 +03:00
2022-04-29 05:22:54 +03:00
let response = response?;
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
let _ = tx.send(Ok(response));
Ok::<(), anyhow::Error>(())
});
unordered_futures.push(handle);
2022-04-24 10:26:00 +03:00
}
2022-04-29 05:22:54 +03:00
// TODO: use iterators instead of pushing into a vec
let mut errs = vec![];
if let Some(x) = unordered_futures.next().await {
match x.unwrap() {
Ok(_) => {}
Err(e) => {
// TODO: better errors
warn!("Got an error sending request: {}", e);
errs.push(e);
}
}
}
2022-04-26 10:10:13 +03:00
2022-04-29 05:22:54 +03:00
// get the first error (if any)
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// send the error to the channel
if tx.send(e).is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
Ok(())
}
2022-04-24 10:26:00 +03:00
}
}
2022-03-05 06:46:57 +03:00
}
2022-04-24 10:26:00 +03:00
#[tokio::main]
async fn main() {
2022-04-26 10:16:16 +03:00
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
2022-04-24 10:26:00 +03:00
// TODO: load the config from yaml instead of hard coding
2022-04-27 09:14:35 +03:00
// TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
2022-04-24 10:26:00 +03:00
let listen_port = 8445;
2022-04-29 04:57:16 +03:00
// TODO: what should this be? 0 will cause a thundering herd
let allowed_lag = 0;
2022-04-25 04:12:07 +03:00
2022-04-26 23:33:37 +03:00
let state = Web3ProxyApp::try_new(
2022-04-29 04:57:16 +03:00
allowed_lag,
2022-04-24 10:26:00 +03:00
vec![
// local nodes
2022-04-25 23:26:54 +03:00
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)],
2022-04-24 10:26:00 +03:00
// paid nodes
2022-04-24 21:56:46 +03:00
// TODO: add paid nodes (with rate limits)
2022-04-28 22:30:22 +03:00
// vec![
// // chainstack.com archive
// (
// "wss://ws-nd-373-761-850.p2pify.com/106d73af4cebc487df5ba92f1ad8dee7",
// 0,
// ),
// ],
2022-04-24 10:26:00 +03:00
// free nodes
2022-04-28 22:30:22 +03:00
// vec![
// // ("https://main-rpc.linkpool.io", 0), // linkpool is slow and often offline
// ("https://rpc.ankr.com/eth", 0),
// ],
2022-04-24 21:56:46 +03:00
],
vec![
2022-04-28 22:30:22 +03:00
// ("https://api.edennetwork.io/v1/beta", 0),
// ("https://api.edennetwork.io/v1/", 0),
2022-04-24 10:26:00 +03:00
],
2022-04-26 09:54:24 +03:00
)
.await
.unwrap();
2022-04-24 10:26:00 +03:00
2022-04-26 23:33:37 +03:00
let state: Arc<Web3ProxyApp> = Arc::new(state);
2022-04-24 10:26:00 +03:00
let proxy_rpc_filter = warp::any()
.and(warp::post())
.and(warp::body::json())
2022-04-29 04:57:16 +03:00
.then(move |json_body| state.clone().proxy_web3_rpc(json_body));
2022-04-24 10:26:00 +03:00
2022-04-29 04:57:16 +03:00
// TODO: filter for displaying connections and their block heights
2022-04-28 22:30:22 +03:00
// TODO: warp trace is super verbose. how do we make this more readable?
// let routes = proxy_rpc_filter.with(warp::trace::request());
2022-04-29 04:57:16 +03:00
let routes = proxy_rpc_filter.map(handle_anyhow_errors);
2022-04-28 22:30:22 +03:00
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
2022-03-05 06:46:57 +03:00
}
2022-03-25 00:08:40 +03:00
/// convert result into an http response. use this at the end of your warp filter
2022-03-05 06:46:57 +03:00
pub fn handle_anyhow_errors<T: warp::Reply>(res: anyhow::Result<T>) -> Box<dyn warp::Reply> {
match res {
Ok(r) => Box::new(r.into_response()),
Err(e) => Box::new(warp::reply::with_status(
format!("{}", e),
reqwest::StatusCode::INTERNAL_SERVER_ERROR,
)),
}
}