web3-proxy/web3-proxy/src/app.rs

451 lines
20 KiB
Rust
Raw Normal View History

2022-05-12 02:50:52 +03:00
use crate::config::Web3ConnectionConfig;
use crate::connections::Web3Connections;
use crate::jsonrpc::JsonRpcErrorData;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
2022-05-16 22:15:40 +03:00
use dashmap::DashMap;
2022-05-16 01:02:14 +03:00
use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256};
2022-05-12 02:50:52 +03:00
use futures::future::join_all;
use governor::clock::{Clock, QuantaClock};
2022-05-16 01:02:14 +03:00
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
2022-05-12 02:50:52 +03:00
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
2022-05-16 22:15:40 +03:00
use tokio::sync::watch;
2022-05-17 19:23:27 +03:00
use tokio::task;
2022-05-12 02:50:52 +03:00
use tokio::time::sleep;
2022-05-17 03:56:56 +03:00
use tracing::{debug, instrument, trace, warn};
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
2022-05-16 01:02:14 +03:00
// TODO: put this in config? what size should we do?
const RESPONSE_CACHE_CAP: usize = 1024;
2022-05-12 02:50:52 +03:00
2022-05-16 01:02:14 +03:00
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
2022-05-16 22:15:40 +03:00
type CacheKey = (H256, String, Option<String>);
type ResponseLruCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
2022-05-12 02:50:52 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
pub struct Web3ProxyApp {
/// clock used for rate limiting
2022-05-13 23:50:11 +03:00
/// TODO: use tokio's clock? (will require a different ratelimiting crate)
2022-05-12 02:50:52 +03:00
clock: QuantaClock,
/// Send requests to the best server available
2022-05-13 23:50:11 +03:00
balanced_rpcs: Arc<Web3Connections>,
2022-05-12 02:50:52 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-05-13 23:50:11 +03:00
private_rpcs: Arc<Web3Connections>,
2022-05-16 22:15:40 +03:00
active_requests: DashMap<CacheKey, watch::Receiver<bool>>,
2022-05-16 01:02:14 +03:00
response_cache: ResponseLruCache,
2022-05-12 02:50:52 +03:00
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-05-13 23:50:11 +03:00
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
2022-05-12 02:50:52 +03:00
}
}
impl Web3ProxyApp {
2022-05-17 20:15:18 +03:00
// #[instrument(name = "try_new_Web3ProxyApp", skip_all)]
2022-05-12 02:50:52 +03:00
pub async fn try_new(
2022-05-12 21:49:57 +03:00
chain_id: usize,
2022-05-13 23:50:11 +03:00
balanced_rpcs: Vec<Web3ConnectionConfig>,
2022-05-12 02:50:52 +03:00
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default();
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
let http_client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
2022-05-18 19:35:06 +03:00
.timeout(Duration::from_secs(60))
2022-05-12 02:50:52 +03:00
.user_agent(APP_USER_AGENT)
.build()?;
// TODO: attach context to this error
2022-05-18 19:35:06 +03:00
let balanced_rpcs =
Web3Connections::try_new(chain_id, balanced_rpcs, Some(http_client.clone()), &clock)
.await?;
2022-05-20 05:01:02 +03:00
// TODO: do this separately instead of during try_new
2022-05-18 19:35:06 +03:00
{
let balanced_rpcs = balanced_rpcs.clone();
task::spawn(async move {
balanced_rpcs.subscribe_heads().await;
});
}
2022-05-12 02:50:52 +03:00
// TODO: attach context to this error
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
2022-05-13 23:50:11 +03:00
balanced_rpcs.clone()
2022-05-12 02:50:52 +03:00
} else {
2022-05-18 19:35:06 +03:00
Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock).await?
2022-05-12 02:50:52 +03:00
};
Ok(Web3ProxyApp {
clock,
2022-05-13 23:50:11 +03:00
balanced_rpcs,
2022-05-12 02:50:52 +03:00
private_rpcs,
2022-05-16 22:15:40 +03:00
active_requests: Default::default(),
2022-05-16 01:02:14 +03:00
response_cache: Default::default(),
2022-05-12 02:50:52 +03:00
})
}
/// send the request to the approriate RPCs
/// TODO: dry this up
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc(
self: Arc<Web3ProxyApp>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<impl warp::Reply> {
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-05-17 03:56:56 +03:00
debug!("Received request: {:?}", request);
2022-05-12 02:50:52 +03:00
let response = match request {
JsonRpcRequestEnum::Single(request) => {
JsonRpcForwardedResponseEnum::Single(self.proxy_web3_rpc_request(request).await?)
}
JsonRpcRequestEnum::Batch(requests) => {
JsonRpcForwardedResponseEnum::Batch(self.proxy_web3_rpc_requests(requests).await?)
}
};
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-05-17 03:56:56 +03:00
debug!("Forwarding response: {:?}", response);
2022-05-12 02:50:52 +03:00
Ok(warp::reply::json(&response))
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests(
self: Arc<Web3ProxyApp>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem.
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
// TODO: is spawning here actually slower?
let num_requests = requests.len();
let responses = join_all(
requests
.into_iter()
2022-05-18 19:35:06 +03:00
.map(|request| self.clone().proxy_web3_rpc_request(request))
2022-05-12 02:50:52 +03:00
.collect::<Vec<_>>(),
)
.await;
// TODO: i'm sure this could be done better with iterators
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
for response in responses {
2022-05-18 19:35:06 +03:00
collected.push(response?);
2022-05-12 02:50:52 +03:00
}
Ok(collected)
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request(
self: Arc<Web3ProxyApp>,
request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
2022-05-13 23:50:11 +03:00
if request.method == "eth_sendRawTransaction" {
2022-05-12 02:50:52 +03:00
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
2022-05-16 22:15:40 +03:00
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match self.private_rpcs.get_upstream_servers() {
Ok(active_request_handles) => {
let (tx, rx) = flume::unbounded();
let connections = self.private_rpcs.clone();
let method = request.method.clone();
let params = request.params.clone();
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
2022-05-17 19:23:27 +03:00
task::Builder::default()
.name("try_send_parallel_requests")
.spawn(async move {
connections
.try_send_parallel_requests(
active_request_handles,
method,
params,
tx,
)
.await
});
2022-05-16 22:15:40 +03:00
// wait for the first response
// TODO: we don't want the first response. we want the quorum response
let backend_response = rx.recv_async().await?;
if let Ok(backend_response) = backend_response {
// TODO: i think we
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(backend_response),
error: None,
};
return Ok(response);
2022-05-13 23:50:11 +03:00
}
2022-05-16 22:15:40 +03:00
}
Err(None) => {
// TODO: return a 502?
return Err(anyhow::anyhow!("no private rpcs!"));
}
Err(Some(not_until)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
let deadline = not_until.wait_time_from(self.clock.now());
2022-05-12 02:50:52 +03:00
2022-05-16 22:15:40 +03:00
let deadline = deadline.min(Duration::from_millis(200));
sleep(deadline).await;
warn!("All rate limits exceeded. Sleeping");
}
};
2022-05-12 02:50:52 +03:00
} else {
// this is not a private transaction (or no private relays are configured)
2022-05-16 22:15:40 +03:00
// TODO: how much should we retry?
for _i in 0..10usize {
2022-05-16 22:15:40 +03:00
// TODO: think more about this loop.
2022-05-19 06:00:54 +03:00
// // TODO: add more to this span. and do it properly
// let span = info_span!("i", ?i);
// let _enter = span.enter();
2022-05-16 22:15:40 +03:00
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
2022-05-17 20:15:18 +03:00
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
2022-05-18 23:18:01 +03:00
trace!("{:?} waiting for head_block_hash", request);
2022-05-17 20:15:18 +03:00
2022-05-18 23:18:01 +03:00
let head_block_hash = self.balanced_rpcs.get_head_block_hash();
2022-05-16 01:02:14 +03:00
2022-05-18 23:18:01 +03:00
trace!("{:?} head_block_hash {}", request, head_block_hash);
2022-05-17 20:15:18 +03:00
2022-05-16 01:02:14 +03:00
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
2022-05-18 23:18:01 +03:00
head_block_hash,
2022-05-16 01:02:14 +03:00
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
2022-05-16 22:15:40 +03:00
// first check to see if this is cached
2022-05-16 01:02:14 +03:00
if let Some(cached) = self.response_cache.read().get(&cache_key) {
2022-05-16 22:15:40 +03:00
let _ = self.active_requests.remove(&cache_key);
2022-05-17 03:56:56 +03:00
// TODO: emit a stat
2022-05-17 20:15:18 +03:00
trace!("{:?} cache hit!", request);
2022-05-17 03:56:56 +03:00
2022-05-16 01:02:14 +03:00
return Ok(cached.to_owned());
2022-05-17 20:15:18 +03:00
} else {
trace!("{:?} cache miss!", request);
2022-05-16 01:02:14 +03:00
}
2022-05-12 02:50:52 +03:00
2022-05-16 22:15:40 +03:00
// check if this request is already in flight
let (in_flight_tx, in_flight_rx) = watch::channel(true);
let mut other_in_flight_rx = None;
match self.active_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
other_in_flight_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(in_flight_rx);
}
}
if let Some(mut other_in_flight_rx) = other_in_flight_rx {
// wait for the other request to finish. it can finish successfully or with an error
2022-05-17 20:15:18 +03:00
trace!("{:?} waiting on in-flight request", request);
2022-05-16 22:15:40 +03:00
let _ = other_in_flight_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = self.response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
2022-05-17 20:15:18 +03:00
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
2022-05-16 22:15:40 +03:00
return Ok(cached.to_owned());
2022-05-17 20:15:18 +03:00
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
2022-05-16 22:15:40 +03:00
}
}
2022-05-13 23:50:11 +03:00
match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => {
let response = active_request_handle
.request(&request.method, &request.params)
.await;
let response = match response {
Ok(partial_response) => {
// TODO: trace here was really slow with millions of requests.
2022-05-17 20:15:18 +03:00
// trace!("forwarding request from {}", upstream_server);
2022-05-13 23:50:11 +03:00
let response = JsonRpcForwardedResponse {
2022-05-13 23:50:11 +03:00
jsonrpc: "2.0".to_string(),
id: request.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
result: Some(partial_response),
error: None,
};
2022-05-13 23:50:11 +03:00
2022-05-16 01:02:14 +03:00
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write();
2022-05-13 23:50:11 +03:00
2022-05-16 01:02:14 +03:00
// TODO: cache the warp::reply to save us serializing every time
2022-05-16 22:15:40 +03:00
response_cache.insert(cache_key.clone(), response.clone());
2022-05-16 01:02:14 +03:00
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
2022-05-12 02:50:52 +03:00
2022-05-16 01:02:14 +03:00
drop(response_cache);
2022-05-12 02:50:52 +03:00
2022-05-16 22:15:40 +03:00
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
response
2022-05-13 23:50:11 +03:00
}
Err(e) => {
// send now since we aren't going to cache an error response
let _ = in_flight_tx.send(false);
2022-05-16 22:15:40 +03:00
2022-05-13 23:50:11 +03:00
// TODO: move this to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(e) => {
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
if let Some(e) = e.downcast_ref::<HttpClientError>() {
match &*e {
HttpClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
2022-05-12 02:50:52 +03:00
}
2022-05-13 23:50:11 +03:00
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
2022-05-12 02:50:52 +03:00
}
}
2022-05-13 23:50:11 +03:00
} else if let Some(e) = e.downcast_ref::<WsClientError>() {
match &*e {
WsClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else {
unimplemented!();
2022-05-12 02:50:52 +03:00
}
}
2022-05-13 23:50:11 +03:00
_ => {
code = -32603;
message = format!("{}", e);
data = None;
2022-05-12 02:50:52 +03:00
}
}
2022-05-13 23:50:11 +03:00
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
data,
}),
2022-05-12 02:50:52 +03:00
}
}
2022-05-13 23:50:11 +03:00
};
2022-05-12 02:50:52 +03:00
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
2022-05-17 20:15:18 +03:00
2022-05-13 23:50:11 +03:00
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
2022-05-17 20:15:18 +03:00
// errors already sent false to the in_flight_tx
2022-05-13 23:50:11 +03:00
} else {
trace!("Sending reply: {:?}", response);
2022-05-12 02:50:52 +03:00
let _ = in_flight_tx.send(false);
2022-05-17 20:15:18 +03:00
}
2022-05-16 22:15:40 +03:00
2022-05-13 23:50:11 +03:00
return Ok(response);
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
warn!("No servers in sync!");
2022-05-16 22:15:40 +03:00
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
2022-05-16 22:15:40 +03:00
2022-05-13 23:50:11 +03:00
return Err(anyhow::anyhow!("no servers in sync"));
}
Err(Some(not_until)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
let deadline = not_until.wait_time_from(self.clock.now());
2022-05-12 02:50:52 +03:00
2022-05-16 22:15:40 +03:00
let deadline = deadline.min(Duration::from_millis(200));
2022-05-13 23:50:11 +03:00
sleep(deadline).await;
2022-05-16 22:15:40 +03:00
warn!("All rate limits exceeded. Sleeping");
2022-05-17 20:15:18 +03:00
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
2022-05-17 20:15:18 +03:00
continue;
2022-05-13 23:50:11 +03:00
}
}
2022-05-12 02:50:52 +03:00
}
}
2022-05-16 22:15:40 +03:00
Err(anyhow::anyhow!("internal error"))
2022-05-12 02:50:52 +03:00
}
}