web3-proxy/web3-proxy/src/app.rs

576 lines
22 KiB
Rust
Raw Normal View History

use axum::extract::ws::Message;
2022-06-14 09:42:52 +03:00
use dashmap::mapref::entry::Entry as DashMapEntry;
2022-05-16 22:15:40 +03:00
use dashmap::DashMap;
use ethers::prelude::Transaction;
2022-05-30 04:28:22 +03:00
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
2022-05-30 04:28:22 +03:00
use futures::future::{join_all, AbortHandle};
2022-06-14 08:43:28 +03:00
use futures::stream::FuturesUnordered;
2022-05-30 04:28:22 +03:00
use futures::stream::StreamExt;
2022-05-16 01:02:14 +03:00
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
2022-05-30 21:23:55 +03:00
use serde_json::json;
2022-05-12 02:50:52 +03:00
use std::fmt;
2022-05-30 21:51:19 +03:00
use std::sync::atomic::{self, AtomicUsize};
2022-05-12 02:50:52 +03:00
use std::sync::Arc;
use std::time::Duration;
2022-06-16 05:53:37 +03:00
use tokio::sync::{broadcast, watch};
2022-06-14 08:43:28 +03:00
use tokio::task::JoinHandle;
2022-05-29 04:23:58 +03:00
use tokio::time::timeout;
2022-05-30 07:30:13 +03:00
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
2022-05-12 02:50:52 +03:00
2022-06-16 05:53:37 +03:00
use crate::config::Web3ConnectionConfig;
use crate::connections::Web3Connections;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
2022-05-12 02:50:52 +03:00
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
2022-05-16 01:02:14 +03:00
// TODO: put this in config? what size should we do?
const RESPONSE_CACHE_CAP: usize = 1024;
2022-05-12 02:50:52 +03:00
2022-05-16 01:02:14 +03:00
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
type CacheKey = (Option<H256>, String, Option<String>);
2022-05-16 22:15:40 +03:00
type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
2022-05-12 02:50:52 +03:00
2022-05-21 01:16:15 +03:00
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
}
}
2022-06-16 05:53:37 +03:00
#[derive(Clone)]
2022-06-14 09:42:52 +03:00
pub enum TxState {
2022-06-16 05:53:37 +03:00
Known(TxHash),
Pending(Transaction),
2022-06-16 05:53:37 +03:00
Confirmed(Transaction),
Orphaned(Transaction),
2022-06-14 09:42:52 +03:00
}
2022-05-12 02:50:52 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
pub struct Web3ProxyApp {
/// Send requests to the best server available
2022-05-13 23:50:11 +03:00
balanced_rpcs: Arc<Web3Connections>,
2022-05-12 02:50:52 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
2022-05-13 23:50:11 +03:00
private_rpcs: Arc<Web3Connections>,
2022-05-22 21:11:42 +03:00
incoming_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
2022-05-30 07:30:13 +03:00
// don't drop this or the sender will stop working
2022-06-16 05:53:37 +03:00
// TODO: broadcast channel instead?
2022-05-30 07:30:13 +03:00
head_block_receiver: watch::Receiver<Block<TxHash>>,
2022-06-16 05:53:37 +03:00
pending_tx_sender: broadcast::Sender<TxState>,
2022-05-30 21:51:19 +03:00
next_subscription_id: AtomicUsize,
2022-05-12 02:50:52 +03:00
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-05-13 23:50:11 +03:00
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
2022-05-12 02:50:52 +03:00
}
}
impl Web3ProxyApp {
pub async fn spawn(
2022-05-12 21:49:57 +03:00
chain_id: usize,
2022-05-22 02:34:05 +03:00
redis_address: Option<String>,
2022-05-13 23:50:11 +03:00
balanced_rpcs: Vec<Web3ConnectionConfig>,
2022-05-12 02:50:52 +03:00
private_rpcs: Vec<Web3ConnectionConfig>,
2022-06-14 08:43:28 +03:00
) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
2022-06-14 09:09:56 +03:00
// TODO: try_join_all instead
2022-06-14 08:43:28 +03:00
let mut handles = FuturesUnordered::new();
2022-05-12 02:50:52 +03:00
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
2022-05-22 02:34:05 +03:00
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.user_agent(APP_USER_AGENT)
.build()?,
);
let rate_limiter = match redis_address {
Some(redis_address) => {
2022-05-22 21:39:06 +03:00
info!("Connecting to redis on {}", redis_address);
let redis_client = redis_cell_client::Client::open(redis_address)?;
// TODO: r2d2 connection pool?
2022-05-22 21:39:06 +03:00
let redis_conn = redis_client.get_multiplexed_tokio_connection().await?;
Some(redis_conn)
}
None => {
info!("No redis address");
None
2022-05-22 02:34:05 +03:00
}
};
2022-05-12 02:50:52 +03:00
// TODO: subscribe to pending transactions on the private rpcs, too?
let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
2022-06-16 05:53:37 +03:00
// TODO: will one receiver lagging be okay?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16);
drop(pending_tx_receiver);
2022-05-12 02:50:52 +03:00
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
2022-05-22 02:34:05 +03:00
chain_id,
balanced_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
Some(head_block_sender),
2022-06-16 05:53:37 +03:00
Some(pending_tx_sender.clone()),
2022-05-22 02:34:05 +03:00
)
.await?;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
handles.push(balanced_handle);
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
2022-05-13 23:50:11 +03:00
balanced_rpcs.clone()
2022-05-12 02:50:52 +03:00
} else {
// TODO: attach context to this error
2022-06-14 08:43:28 +03:00
let (private_rpcs, private_handle) = Web3Connections::spawn(
2022-05-22 02:34:05 +03:00
chain_id,
private_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
// subscribing to new heads here won't work well
None,
2022-06-14 08:43:28 +03:00
// TODO: subscribe to pending transactions on the private rpcs?
None,
2022-05-22 02:34:05 +03:00
)
2022-06-14 08:43:28 +03:00
.await?;
handles.push(private_handle);
private_rpcs
2022-05-12 02:50:52 +03:00
};
let app = Web3ProxyApp {
2022-05-13 23:50:11 +03:00
balanced_rpcs,
2022-05-12 02:50:52 +03:00
private_rpcs,
2022-05-22 21:11:42 +03:00
incoming_requests: Default::default(),
2022-05-16 01:02:14 +03:00
response_cache: Default::default(),
2022-05-30 04:28:22 +03:00
head_block_receiver,
2022-06-16 05:53:37 +03:00
pending_tx_sender,
2022-05-30 21:51:19 +03:00
next_subscription_id: 1.into(),
};
let app = Arc::new(app);
2022-06-14 08:43:28 +03:00
// create a handle that returns on the first error
let handle = tokio::spawn(async move {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
}
Ok(())
});
Ok((app, handle))
2022-05-12 02:50:52 +03:00
}
pub async fn eth_subscribe(
2022-06-14 10:13:42 +03:00
self: Arc<Self>,
payload: JsonRpcRequest,
2022-05-30 04:28:22 +03:00
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
subscription_tx: flume::Sender<Message>,
) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> {
2022-06-14 10:13:42 +03:00
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
2022-05-30 04:28:22 +03:00
// TODO: this only needs to be unique per connection. we don't need it globably unique
2022-05-30 21:51:19 +03:00
let subscription_id = self
.next_subscription_id
.fetch_add(1, atomic::Ordering::SeqCst);
let subscription_id = format!("{:#x}", subscription_id);
2022-05-30 21:23:55 +03:00
2022-06-05 23:39:58 +03:00
// save the id so we can use it in the response
let id = payload.id.clone();
2022-06-14 10:13:42 +03:00
let subscription_join_handle = {
2022-05-30 21:23:55 +03:00
let subscription_id = subscription_id.clone();
2022-05-30 04:28:22 +03:00
match payload.params.as_deref().unwrap().get() {
r#"["newHeads"]"# => {
let head_block_receiver = self.head_block_receiver.clone();
trace!(?subscription_id, "new heads subscription");
2022-06-14 10:13:42 +03:00
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_head,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
2022-06-14 10:13:42 +03:00
})
2022-05-30 04:28:22 +03:00
}
2022-06-14 10:13:42 +03:00
r#"["newPendingTransactions"]"# => {
2022-06-16 05:53:37 +03:00
let mut pending_tx_receiver = self.pending_tx_sender.subscribe();
2022-06-14 10:13:42 +03:00
trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
2022-06-16 05:53:37 +03:00
while let Ok(new_tx_state) = pending_tx_receiver.recv().await {
2022-06-14 10:13:42 +03:00
let new_tx = match new_tx_state {
2022-06-16 05:53:37 +03:00
TxState::Known(..) => continue,
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxState::Pending(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
})
}
r#"["newPendingFullTransactions"]"# => {
// TODO: too much copy/pasta with newPendingTransactions
let mut pending_tx_receiver = self.pending_tx_sender.subscribe();
trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
while let Ok(new_tx_state) = pending_tx_receiver.recv().await {
let new_tx = match new_tx_state {
TxState::Known(..) => continue,
2022-06-14 10:13:42 +03:00
TxState::Confirmed(..) => continue,
TxState::Orphaned(tx) => tx,
TxState::Pending(tx) => tx,
2022-06-14 10:13:42 +03:00
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(?subscription_id, "closed new heads subscription");
})
}
_ => return Err(anyhow::anyhow!("unimplemented")),
2022-05-30 04:28:22 +03:00
}
};
2022-06-14 10:13:42 +03:00
// TODO: do something with subscription_join_handle?
2022-05-30 04:28:22 +03:00
let response = JsonRpcForwardedResponse::from_string(subscription_id, id);
2022-06-16 05:53:37 +03:00
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
2022-06-14 10:13:42 +03:00
Ok((subscription_abort_handle, response))
}
2022-05-21 01:16:15 +03:00
pub fn get_balanced_rpcs(&self) -> &Web3Connections {
&self.balanced_rpcs
}
pub fn get_private_rpcs(&self) -> &Web3Connections {
&self.private_rpcs
}
pub fn get_active_requests(&self) -> &ActiveRequestsMap {
2022-05-22 21:11:42 +03:00
&self.incoming_requests
2022-05-21 01:16:15 +03:00
}
2022-05-12 02:50:52 +03:00
/// send the request to the approriate RPCs
/// TODO: dry this up
2022-05-17 03:56:56 +03:00
#[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc(
2022-05-29 20:28:41 +03:00
&self,
2022-05-12 02:50:52 +03:00
request: JsonRpcRequestEnum,
2022-05-20 08:27:18 +03:00
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-05-17 03:56:56 +03:00
debug!("Received request: {:?}", request);
2022-05-12 02:50:52 +03:00
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that delays from
let max_time = Duration::from_secs(60);
2022-05-12 02:50:52 +03:00
let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(max_time, self.proxy_web3_rpc_request(request)).await??,
),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??,
),
2022-05-12 02:50:52 +03:00
};
2022-05-20 06:35:45 +03:00
// TODO: i don't always see this in the logs. why?
2022-05-17 03:56:56 +03:00
debug!("Forwarding response: {:?}", response);
2022-05-20 08:27:18 +03:00
Ok(response)
2022-05-12 02:50:52 +03:00
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests(
2022-05-29 20:28:41 +03:00
&self,
2022-05-12 02:50:52 +03:00
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
// we cut up the request and send to potentually different servers. this could be a problem.
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
// TODO: is spawning here actually slower?
let num_requests = requests.len();
let responses = join_all(
requests
.into_iter()
2022-05-29 20:28:41 +03:00
.map(|request| self.proxy_web3_rpc_request(request))
2022-05-12 02:50:52 +03:00
.collect::<Vec<_>>(),
)
.await;
// TODO: i'm sure this could be done better with iterators
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
for response in responses {
2022-05-18 19:35:06 +03:00
collected.push(response?);
2022-05-12 02:50:52 +03:00
}
Ok(collected)
}
fn get_cached_response(
&self,
request: &JsonRpcRequest,
) -> (
CacheKey,
Result<JsonRpcForwardedResponse, &ResponseLrcCache>,
) {
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
// TODO: Some requests should skip caching on the head_block_hash
let head_block_hash = Some(self.balanced_rpcs.get_head_block_hash());
// TODO: better key? benchmark this
let key = (
head_block_hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
if let Some(response) = self.response_cache.read().get(&key) {
// TODO: emit a stat
trace!("{:?} cache hit!", request);
// TODO: can we make references work? maybe put them in an Arc?
return (key, Ok(response.to_owned()));
} else {
// TODO: emit a stat
trace!("{:?} cache miss!", request);
}
// TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?)
let cache = &self.response_cache;
(key, Err(cache))
}
2022-05-17 20:15:18 +03:00
// #[instrument(skip_all)]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request(
2022-05-29 20:28:41 +03:00
&self,
2022-05-12 02:50:52 +03:00
request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
// TODO: how much should we retry? probably with a timeout and not with a count like this
// TODO: think more about this loop.
// // TODO: add more to this span such as
let span = info_span!("rpc_request");
2022-05-29 04:23:58 +03:00
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
match &request.method[..] {
2022-06-14 09:54:19 +03:00
"admin_addPeer"
| "admin_datadir"
| "admin_startRPC"
| "admin_startWS"
| "admin_stopRPC"
| "admin_stopWS"
| "debug_chaindbCompact"
| "debug_freezeClient"
| "debug_goTrace"
| "debug_mutexProfile"
| "debug_setBlockProfileRate"
| "debug_setGCPercent"
| "debug_setHead"
| "debug_setMutexProfileFraction"
| "debug_standardTraceBlockToFile"
| "debug_standardTraceBadBlockToFile"
| "debug_startCPUProfile"
| "debug_startGoTrace"
| "debug_stopCPUProfile"
| "debug_stopGoTrace"
| "debug_writeBlockProfile"
| "debug_writeMemProfile"
| "debug_writeMutexProfile"
| "les_addBalance"
| "les_setClientParams"
| "les_setDefaultParams"
| "miner_setExtra"
| "miner_setGasPrice"
| "miner_start"
| "miner_stop"
| "miner_setEtherbase"
| "miner_setGasLimit"
| "personal_importRawKey"
| "personal_listAccounts"
| "personal_lockAccount"
| "personal_newAccount"
| "personal_unlockAccount"
| "personal_sendTransaction"
| "personal_sign"
| "personal_ecRecover" => Err(anyhow::anyhow!("unimplemented")),
"eth_sendRawTransaction" => {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
self.private_rpcs
.try_send_all_upstream_servers(request)
.instrument(span)
.await
}
_ => {
// this is not a private transaction (or no private relays are configured)
2022-05-16 22:15:40 +03:00
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
let _ = self.incoming_requests.remove(&cache_key);
2022-05-29 04:23:58 +03:00
return Ok(response);
}
(cache_key, Err(response_cache)) => (cache_key, response_cache),
};
// check if this request is already in flight
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) {
2022-06-14 09:42:52 +03:00
DashMapEntry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone());
}
2022-06-14 09:42:52 +03:00
DashMapEntry::Vacant(entry) => {
entry.insert(incoming_rx);
}
2022-05-29 04:23:58 +03:00
}
2022-05-16 22:15:40 +03:00
if let Some(mut other_incoming_rx) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let _ = other_incoming_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
// TODO: emit a stat
trace!(
"{:?} cache hit after waiting for in-flight request!",
request
);
return Ok(cached.to_owned());
} else {
// TODO: emit a stat
trace!(
"{:?} cache miss after waiting for in-flight request!",
request
);
}
2022-05-13 23:50:11 +03:00
}
2022-05-16 22:15:40 +03:00
let response = self
.balanced_rpcs
.try_send_best_upstream_server(request)
.await?;
2022-05-29 04:23:58 +03:00
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = response_cache.write();
2022-05-29 04:23:58 +03:00
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key.clone(), response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
2022-05-29 04:23:58 +03:00
drop(response_cache);
2022-05-29 04:23:58 +03:00
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
2022-05-29 04:23:58 +03:00
Ok(response)
}
2022-05-29 04:23:58 +03:00
}
2022-05-12 02:50:52 +03:00
}
}