add is_archive_needed and a bunch of rpc commands

This commit is contained in:
Bryan Stitt 2022-07-09 02:23:26 +00:00
parent 04f492b5e2
commit 58fa7af105
5 changed files with 216 additions and 24 deletions

View File

@ -60,7 +60,8 @@
- [ ] improve caching
- [ ] if the eth_call (or similar) params include a block, we can cache for longer
- [ ] if the call is something simple like "symbol" or "decimals", cache that too
- [ ] when we receive a block, we should store it for later eth_getBlockByNumber, eth_blockNumber, and similar calls
- [ ] when we receive a block, we should store it for later eth_getBlockByNumber and similar calls
- [x] eth_blockNumber without a backend request
- [ ] if a rpc fails to connect at start, retry later instead of skipping it forever
- [ ] inspect any jsonrpc errors. if its something like "header not found" or "block with id $x not found" retry on another node (and add a negative score to that server)
- this error seems to happen when we use load balanced backend rpcs like pokt and ankr
@ -96,3 +97,6 @@
- [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master)
- [ ] subscribe to pending transactions and build an intelligent gas estimator
- [ ] include private rpcs with regular queries? i don't want to overwhelm them, but they could be good for excess load
- [ ] flashbots specific methods
- [ ] flashbots protect fast mode or not? probably fast matches most user's needs, but no reverts is nice.
- [ ] https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#authentication maybe have per-user keys. or pass their header on if its set

View File

@ -1,7 +1,7 @@
use axum::extract::ws::Message;
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use ethers::prelude::Transaction;
use ethers::prelude::{Address, Transaction};
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
@ -32,6 +32,7 @@ use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
// TODO: make this customizable?
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
@ -42,8 +43,8 @@ static APP_USER_AGENT: &str = concat!(
// TODO: put this in config? what size should we do? probably should structure this to be a size in MB
const RESPONSE_CACHE_CAP: usize = 1024;
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
type CacheKey = (Option<H256>, String, Option<String>);
// block hash, method, params
type CacheKey = (H256, String, Option<String>);
type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
@ -73,6 +74,38 @@ pub async fn flatten_handles<T>(
Ok(())
}
fn is_archive_needed(method: &str, params: Option<&mut serde_json::Value>) -> bool {
let methods_that_may_need_archive = [
"eth_call",
"eth_getBalance",
"eth_getCode",
"eth_getLogs",
"eth_getStorageAt",
"eth_getTransactionCount",
"eth_getTransactionByBlockHashAndIndex",
"eth_getTransactionByBlockNumberAndIndex",
"eth_getTransactionReceipt",
"eth_getUncleByBlockHashAndIndex",
"eth_getUncleByBlockNumberAndIndex",
];
if !methods_that_may_need_archive.contains(&method) {
// this method is not known to require an archive node
return false;
}
// TODO: find the given block number in params
// TODO: if its "latest" (or not given), modify params to have the latest block. return false
// TODO: if its "pending", do something special? return false
// TODO: we probably need a list of recent hashes/heights. if specified block is recent, return false
// this command needs an archive server
true
}
// TODO: think more about TxState. d
#[derive(Clone)]
pub enum TxState {
@ -530,7 +563,7 @@ impl Web3ProxyApp {
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
// TODO: Some requests should skip caching on the head_block_hash
let head_block_hash = Some(self.balanced_rpcs.get_head_block_hash());
let head_block_hash = self.balanced_rpcs.get_head_block_hash();
// TODO: better key? benchmark this
let key = (
@ -559,7 +592,7 @@ impl Web3ProxyApp {
// #[instrument(skip_all)]
async fn proxy_web3_rpc_request(
&self,
request: JsonRpcRequest,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
@ -571,12 +604,17 @@ impl Web3ProxyApp {
let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
match &request.method[..] {
// lots of commands are blocked
"admin_addPeer"
| "admin_datadir"
| "admin_startRPC"
| "admin_startWS"
| "admin_stopRPC"
| "admin_stopWS"
| "db_getHex"
| "db_getString"
| "db_putHex"
| "db_putString"
| "debug_chaindbCompact"
| "debug_freezeClient"
| "debug_goTrace"
@ -594,6 +632,15 @@ impl Web3ProxyApp {
| "debug_writeBlockProfile"
| "debug_writeMemProfile"
| "debug_writeMutexProfile"
| "eth_compileLLL"
| "eth_compileSerpent"
| "eth_compileSolidity"
| "eth_getCompilers"
| "eth_sendTransaction"
| "eth_sign"
| "eth_signTransaction"
| "eth_submitHashrate"
| "eth_submitWork"
| "les_addBalance"
| "les_setClientParams"
| "les_setDefaultParams"
@ -610,20 +657,131 @@ impl Web3ProxyApp {
| "personal_unlockAccount"
| "personal_sendTransaction"
| "personal_sign"
| "personal_ecRecover" => {
| "personal_ecRecover"
| "shh_addToGroup"
| "shh_getFilterChanges"
| "shh_getMessages"
| "shh_hasIdentity"
| "shh_newFilter"
| "shh_newGroup"
| "shh_newIdentity"
| "shh_post"
| "shh_uninstallFilter"
| "shh_version" => {
// TODO: proper error code
Err(anyhow::anyhow!("unimplemented"))
Err(anyhow::anyhow!("unsupported"))
}
// TODO: implement these commands
"eth_getFilterChanges"
| "eth_getFilterLogs"
| "eth_newBlockFilter"
| "eth_newFilter"
| "eth_newPendingTransactionFilter"
| "eth_uninstallFilter" => Err(anyhow::anyhow!("not yet implemented")),
// some commands can use local data or caches
"eth_accounts" => {
let partial_response = serde_json::Value::Array(vec![]);
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"eth_blockNumber" => {
let head_block_number = self.balanced_rpcs.get_head_block_num();
if head_block_number == 0 {
return Err(anyhow::anyhow!("no servers synced"));
}
// TODO: this seems pretty common. make a helper?
let partial_response = format!("{:x}", head_block_number);
let response = JsonRpcForwardedResponse::from_string(partial_response, request.id);
Ok(response)
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
// no need for serving coinbase. we could return a per-user payment address here, but then we might leak that to dapps
let partial_response = json!(Address::zero());
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
// TODO: eth_estimateGas using anvil?
// TODO: eth_gasPrice that does awesome magic to predict the future
// TODO: eth_getBlockByHash from caches
// TODO: eth_getBlockByNumber from caches
// TODO: eth_getBlockTransactionCountByHash from caches
// TODO: eth_getBlockTransactionCountByNumber from caches
// TODO: eth_getUncleCountByBlockHash from caches
// TODO: eth_getUncleCountByBlockNumber from caches
"eth_hashrate" => {
let partial_response = json!("0x0");
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"eth_mining" => {
let partial_response = json!(false);
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
self.private_rpcs
.try_send_all_upstream_servers(request)
.try_send_all_upstream_servers(request, false)
.instrument(span)
.await
}
"eth_syncing" => {
// TODO: return a real response if all backends are syncing or if no servers in sync
let partial_response = serde_json::Value::Bool(false);
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"net_listening" => {
// TODO: only if there are some backends on balanced_rpcs?
let partial_response = serde_json::Value::Bool(true);
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"net_peerCount" => {
let partial_response = serde_json::Value::String(format!(
"{:x}",
self.balanced_rpcs.num_synced_rpcs()
));
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
"web3_clientVersion" => {
// TODO: return a real response if all backends are syncing or if no servers in sync
let partial_response = serde_json::Value::String(APP_USER_AGENT.to_string());
let response = JsonRpcForwardedResponse::from_value(partial_response, request.id);
Ok(response)
}
// TODO: web3_sha3?
method => {
// this is not a private transaction (or no private relays are configured)
// everything else is relayed to a backend
// this is not a private transaction
// we do this check before checking caches because it might modify the request params
let archive_needed = is_archive_needed(method, request.params.as_mut());
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
@ -678,29 +836,30 @@ impl Web3ProxyApp {
"eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self.private_rpcs
.try_send_all_upstream_servers(request)
.try_send_all_upstream_servers(request, archive_needed)
.await?
}
_ => {
// TODO: retries?
self.balanced_rpcs
.try_send_best_upstream_server(request)
.try_send_best_upstream_server(request, archive_needed)
.await?
}
};
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = response_cache.write();
{
let mut response_cache = response_cache.write();
// TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key.clone(), response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
// TODO: cache the warp::reply to save us serializing every time?
response_cache.insert(cache_key.clone(), response.clone());
// TODO: instead of checking length, check size in bytes
if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front();
}
}
drop(response_cache);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);

View File

@ -49,6 +49,10 @@ impl SyncedConnections {
pub fn get_head_block_hash(&self) -> &H256 {
&self.head_block_hash
}
pub fn get_head_block_num(&self) -> u64 {
self.head_block_num
}
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
@ -344,6 +348,10 @@ impl Web3Connections {
Ok(())
}
pub fn get_head_block_num(&self) -> u64 {
self.synced_connections.load().get_head_block_num()
}
pub fn get_head_block_hash(&self) -> H256 {
*self.synced_connections.load().get_head_block_hash()
}
@ -352,6 +360,10 @@ impl Web3Connections {
!self.synced_connections.load().inner.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.synced_connections.load().inner.len()
}
/// Send the same request to all the handles. Returning the most common success or most common error.
#[instrument(skip_all)]
pub async fn try_send_parallel_requests(
@ -683,6 +695,7 @@ impl Web3Connections {
pub async fn try_send_best_upstream_server(
&self,
request: JsonRpcRequest,
archive_needed: bool,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
@ -768,6 +781,7 @@ impl Web3Connections {
pub async fn try_send_all_upstream_servers(
&self,
request: JsonRpcRequest,
archive_needed: bool,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self.get_upstream_servers().await {

View File

@ -44,6 +44,7 @@ async fn handle_socket_payload(
subscription_count: &AtomicUsize,
subscriptions: &mut HashMap<String, AbortHandle>,
) -> Message {
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(payload) => {
let id = payload.id.clone();
@ -90,7 +91,6 @@ async fn handle_socket_payload(
(id, response)
}
Err(err) => {
// TODO: what should this id be?
let id = RawValue::from_string("null".to_string()).unwrap();
(id, Err(err.into()))
}

View File

@ -209,6 +209,21 @@ impl JsonRpcForwardedResponse {
}
}
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response =
serde_json::to_string(&partial_response).expect("this should always work");
let partial_response =
RawValue::from_string(partial_response).expect("this should always work");
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id,
result: Some(partial_response),
error: None,
}
}
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> anyhow::Result<Self> {
// TODO: move turning ClientError into json to a helper function?
let code;