This commit is contained in:
Bryan Stitt 2022-05-13 20:50:11 +00:00
parent 9e167b8289
commit 48b1b08e3d
6 changed files with 178 additions and 244 deletions

View File

@ -4,6 +4,7 @@ members = [
"web3-proxy", "web3-proxy",
] ]
[profile.release] # TODO: enable these once rapid development is done
lto = true #[profile.release]
panic = "abort" #lto = true
#panic = "abort"

View File

@ -1,20 +1,18 @@
[shared] [shared]
chain_id = 1 chain_id = 1
[balanced_rpc_tiers] [balanced_rpcs]
[balanced_rpc_tiers.0] [balanced_rpcs.erigon_archive]
[balanced_rpc_tiers.0.erigon_archive]
url = "ws://127.0.0.1:8549" url = "ws://127.0.0.1:8549"
# TODO: double check soft_limit on erigon # TODO: double check soft_limit on erigon
soft_limit = 100_000 soft_limit = 100_000
[balanced_rpc_tiers.0.geth] [balanced_rpcs.geth]
url = "ws://127.0.0.1:8546" url = "ws://127.0.0.1:8546"
soft_limit = 200_000 soft_limit = 200_000
[balanced_rpc_tiers.0.ankr] [balanced_rpcs.ankr]
url = "https://rpc.ankr.com/eth" url = "https://rpc.ankr.com/eth"
soft_limit = 3_000 soft_limit = 3_000

View File

@ -7,15 +7,14 @@ use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum; use crate::jsonrpc::JsonRpcRequestEnum;
use ethers::prelude::ProviderError; use ethers::prelude::ProviderError;
use ethers::prelude::{HttpClientError, WsClientError}; use ethers::prelude::{HttpClientError, WsClientError};
use futures::future;
use futures::future::join_all; use futures::future::join_all;
use governor::clock::{Clock, QuantaClock}; use governor::clock::{Clock, QuantaClock};
use linkedhashmap::LinkedHashMap; use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
use std::fmt; use std::fmt;
use std::sync::atomic::{self, AtomicU64}; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep; use tokio::time::sleep;
use tracing::{trace, warn}; use tracing::{trace, warn};
@ -37,33 +36,27 @@ type ResponseLruCache =
// TODO: this debug impl is way too verbose. make something smaller // TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs // TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
pub struct Web3ProxyApp { pub struct Web3ProxyApp {
best_head_block_number: Arc<AtomicU64>,
/// clock used for rate limiting /// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate) /// TODO: use tokio's clock? (will require a different ratelimiting crate)
clock: QuantaClock, clock: QuantaClock,
/// Send requests to the best server available /// Send requests to the best server available
balanced_rpc_tiers: Vec<Arc<Web3Connections>>, balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Option<Arc<Web3Connections>>, private_rpcs: Arc<Web3Connections>,
response_cache: ResponseLruCache, response_cache: ResponseLruCache,
} }
impl fmt::Debug for Web3ProxyApp { impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though // TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp") f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
.field(
"best_head_block_number",
&self.best_head_block_number.load(atomic::Ordering::Relaxed),
)
.finish_non_exhaustive()
} }
} }
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn try_new( pub async fn try_new(
chain_id: usize, chain_id: usize,
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>, balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>, private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> { ) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default(); let clock = QuantaClock::default();
@ -80,44 +73,35 @@ impl Web3ProxyApp {
.build()?; .build()?;
// TODO: attach context to this error // TODO: attach context to this error
let balanced_rpc_tiers = let balanced_rpcs = Web3Connections::try_new(
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { chain_id,
Web3Connections::try_new( best_head_block_number.clone(),
chain_id, balanced_rpcs,
best_head_block_number.clone(), Some(http_client.clone()),
balanced_rpc_tier, &clock,
Some(http_client.clone()), true,
&clock, )
true, .await?;
)
}))
.await
.into_iter()
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
// TODO: attach context to this error // TODO: attach context to this error
let private_rpcs = if private_rpcs.is_empty() { let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly balanced_rpcs.clone()
None
} else { } else {
Some( Web3Connections::try_new(
Web3Connections::try_new( chain_id,
chain_id, best_head_block_number.clone(),
best_head_block_number.clone(), private_rpcs,
private_rpcs, Some(http_client),
Some(http_client), &clock,
&clock, false,
false,
)
.await?,
) )
.await?
}; };
Ok(Web3ProxyApp { Ok(Web3ProxyApp {
best_head_block_number,
clock, clock,
balanced_rpc_tiers, balanced_rpcs,
private_rpcs, private_rpcs,
response_cache: Default::default(), response_cache: Default::default(),
}) })
@ -180,17 +164,15 @@ impl Web3ProxyApp {
// TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too // TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too
if self.private_rpcs.is_some() && request.method == "eth_sendRawTransaction" { if request.method == "eth_sendRawTransaction" {
let private_rpcs = self.private_rpcs.as_ref().unwrap();
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
loop { loop {
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match private_rpcs.get_upstream_servers() { match self.private_rpcs.get_upstream_servers() {
Ok(active_request_handles) => { Ok(active_request_handles) => {
let (tx, rx) = flume::unbounded(); let (tx, rx) = flume::unbounded();
let connections = private_rpcs.clone(); let connections = self.private_rpcs.clone();
let method = request.method.clone(); let method = request.method.clone();
let params = request.params.clone(); let params = request.params.clone();
@ -221,18 +203,17 @@ impl Web3ProxyApp {
return Ok(response); return Ok(response);
} }
} }
Err(not_until) => { Err(None) => {
// TODO: return a 502?
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(not_until)) => {
// TODO: move this to a helper function // TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available // sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting // TODO: if a server catches up sync while we are waiting, we could stop waiting
if let Some(not_until) = not_until { let deadline = not_until.wait_time_from(self.clock.now());
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await; sleep(deadline).await;
} else {
// TODO: what should we do here?
return Err(anyhow::anyhow!("no private rpcs!"));
}
} }
}; };
} }
@ -241,181 +222,138 @@ impl Web3ProxyApp {
// try to send to each tier, stopping at the first success // try to send to each tier, stopping at the first success
// if no tiers are synced, fallback to privates // if no tiers are synced, fallback to privates
loop { loop {
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again let best_block_number = self.balanced_rpcs.head_block_number();
let mut earliest_not_until = None;
// TODO: how can we better build this iterator? // TODO: building this cache key is slow and its large, but i don't see a better way right now
let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() { // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
self.balanced_rpc_tiers.iter().chain(vec![private_rpcs]) let cache_key = (
} else { best_block_number,
self.balanced_rpc_tiers.iter().chain(vec![]) request.method.clone(),
}; request.params.clone().map(|x| x.to_string()),
);
for balanced_rpcs in rpc_iter { if let Some(cached) = self.response_cache.read().get(&cache_key) {
let best_head_block_number = // TODO: this still serializes every time
self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: return a reference in the other places so that this works without a clone?
return Ok(cached.to_owned());
let best_rpc_block_number = balanced_rpcs.head_block_number();
if best_rpc_block_number < best_head_block_number {
continue;
}
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
best_head_block_number,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
if let Some(cached) = self.response_cache.read().await.get(&cache_key) {
// TODO: this still serializes every time
// TODO: return a reference in the other places so that this works without a clone?
return Ok(cached.to_owned());
}
// TODO: what allowed lag?
match balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
let response = active_request_handle
.request(&request.method, &request.params)
.await;
let response = match response {
Ok(partial_response) => {
// TODO: trace here was really slow with millions of requests.
// info!("forwarding request from {}", upstream_server);
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(partial_response),
error: None,
};
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write().await;
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key, response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
response
}
Err(e) => {
// TODO: move this to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(e) => {
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
if let Some(e) = e.downcast_ref::<HttpClientError>() {
match &*e {
HttpClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else if let Some(e) =
e.downcast_ref::<WsClientError>()
{
match &*e {
WsClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else {
unimplemented!();
}
}
_ => {
code = -32603;
message = format!("{}", e);
data = None;
}
}
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
data,
}),
}
}
};
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
} else {
trace!("Sending reply: {:?}", response);
}
return Ok(response);
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
// warn!("No servers in sync!");
}
Err(Some(not_until)) => {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
// TODO: helper function for this
if earliest_not_until.is_none() {
earliest_not_until.replace(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
}
}
} }
// we haven't returned an Ok match self.balanced_rpcs.next_upstream_server().await {
// if we did return a rate limit error, sleep and try again Ok(active_request_handle) => {
if let Some(earliest_not_until) = earliest_not_until { let response = active_request_handle
let deadline = earliest_not_until.wait_time_from(self.clock.now()); .request(&request.method, &request.params)
.await;
// TODO: max wait let response = match response {
Ok(partial_response) => {
// TODO: trace here was really slow with millions of requests.
// info!("forwarding request from {}", upstream_server);
sleep(deadline).await; let response = JsonRpcForwardedResponse {
} else { jsonrpc: "2.0".to_string(),
// TODO: how long should we wait? id: request.id,
// TODO: max wait time? // TODO: since we only use the result here, should that be all we return from try_send_request?
warn!("No servers in sync!"); result: Some(partial_response),
// TODO: return a 502? error: None,
return Err(anyhow::anyhow!("no servers in sync")); };
};
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key, response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
drop(response_cache);
response
}
Err(e) => {
// TODO: move this to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(e) => {
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
if let Some(e) = e.downcast_ref::<HttpClientError>() {
match &*e {
HttpClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else if let Some(e) = e.downcast_ref::<WsClientError>() {
match &*e {
WsClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else {
unimplemented!();
}
}
_ => {
code = -32603;
message = format!("{}", e);
data = None;
}
}
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
data,
}),
}
}
};
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
} else {
trace!("Sending reply: {:?}", response);
}
return Ok(response);
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
warn!("No servers in sync!");
return Err(anyhow::anyhow!("no servers in sync"));
}
Err(Some(not_until)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
}
}
} }
} }
} }

View File

@ -1,7 +1,7 @@
use argh::FromArgs; use argh::FromArgs;
use governor::clock::QuantaClock; use governor::clock::QuantaClock;
use serde::Deserialize; use serde::Deserialize;
use std::collections::{BTreeMap, HashMap}; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::connection::Web3Connection; use crate::connection::Web3Connection;
@ -22,8 +22,7 @@ pub struct CliConfig {
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct RpcConfig { pub struct RpcConfig {
pub shared: RpcSharedConfig, pub shared: RpcSharedConfig,
// BTreeMap so that iterating keeps the same order. we want tier 0 before tier 1! pub balanced_rpcs: HashMap<String, Web3ConnectionConfig>,
pub balanced_rpc_tiers: BTreeMap<String, HashMap<String, Web3ConnectionConfig>>,
pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>, pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>,
} }
@ -44,11 +43,7 @@ pub struct Web3ConnectionConfig {
impl RpcConfig { impl RpcConfig {
/// Create a Web3ProxyApp from config /// Create a Web3ProxyApp from config
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> { pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
let balanced_rpc_tiers = self let balanced_rpcs = self.balanced_rpcs.into_values().collect();
.balanced_rpc_tiers
.into_values()
.map(|x| x.into_values().collect())
.collect();
let private_rpcs = if let Some(private_rpcs) = self.private_rpcs { let private_rpcs = if let Some(private_rpcs) = self.private_rpcs {
private_rpcs.into_values().collect() private_rpcs.into_values().collect()
@ -56,7 +51,7 @@ impl RpcConfig {
vec![] vec![]
}; };
Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpc_tiers, private_rpcs).await Web3ProxyApp::try_new(self.shared.chain_id, balanced_rpcs, private_rpcs).await
} }
} }

View File

@ -45,6 +45,7 @@ pub struct Web3Connection {
/// used for load balancing to the least loaded server /// used for load balancing to the least loaded server
soft_limit: u32, soft_limit: u32,
head_block_number: AtomicU64, head_block_number: AtomicU64,
/// the same clock that is used by the rate limiter
clock: QuantaClock, clock: QuantaClock,
} }
@ -268,6 +269,7 @@ impl Web3Connection {
} }
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle { pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// TODO: maximum wait time
loop { loop {
match self.try_request_handle() { match self.try_request_handle() {
Ok(pending_request_handle) => return pending_request_handle, Ok(pending_request_handle) => return pending_request_handle,
@ -288,7 +290,7 @@ impl Web3Connection {
match ratelimiter.check() { match ratelimiter.check() {
Ok(_) => { Ok(_) => {
// rate limit succeeded // rate limit succeeded
return Ok(ActiveRequestHandle(self.clone())); return Ok(ActiveRequestHandle::new(self.clone()));
} }
Err(not_until) => { Err(not_until) => {
// rate limit failed // rate limit failed

View File

@ -345,7 +345,7 @@ impl Web3Connections {
} }
/// get all rpc servers that are not rate limited /// get all rpc servers that are not rate limited
/// even fetches if they aren't in sync. This is useful for broadcasting signed transactions /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
pub fn get_upstream_servers( pub fn get_upstream_servers(
&self, &self,
) -> Result<Vec<ActiveRequestHandle>, Option<NotUntil<QuantaInstant>>> { ) -> Result<Vec<ActiveRequestHandle>, Option<NotUntil<QuantaInstant>>> {