web3-proxy/web3_proxy/src/app/mod.rs

1361 lines
52 KiB
Rust
Raw Normal View History

2022-08-10 05:37:34 +03:00
// TODO: this file is way too big now. move things into other modules
mod ws;
2022-08-10 05:37:34 +03:00
use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
2022-12-17 07:05:01 +03:00
use crate::block_number::{block_needed, BlockNeeded};
2022-08-24 03:59:05 +03:00
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::FrontendErrorResponse;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
2022-12-03 08:31:03 +03:00
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connection::Web3Connection;
2022-08-30 23:01:42 +03:00
use crate::rpcs::connections::Web3Connections;
2022-09-09 06:53:16 +03:00
use crate::rpcs::request::OpenRequestHandleMetrics;
2022-08-24 03:59:05 +03:00
use crate::rpcs::transactions::TxStatus;
2022-12-14 05:13:23 +03:00
use crate::user_token::UserBearerToken;
2022-07-26 07:53:38 +03:00
use anyhow::Context;
2022-10-27 00:39:26 +03:00
use axum::headers::{Origin, Referer, UserAgent};
2022-12-28 09:11:18 +03:00
use chrono::Utc;
2022-09-15 20:57:24 +03:00
use deferred_rate_limiter::DeferredRateLimiter;
2022-08-10 05:37:34 +03:00
use derive_more::From;
2022-12-12 07:39:54 +03:00
use entities::sea_orm_active_enums::LogLevel;
2022-07-22 22:30:39 +03:00
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
2022-12-24 04:32:58 +03:00
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{debug, error, info, warn, Level};
2022-09-14 10:08:48 +03:00
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
2022-11-14 21:24:52 +03:00
use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
2022-09-05 08:53:58 +03:00
use moka::future::Cache;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
2022-09-09 06:53:16 +03:00
use serde::Serialize;
2022-05-30 21:23:55 +03:00
use serde_json::json;
2022-12-24 04:32:58 +03:00
use serde_json::value::to_raw_value;
2022-05-12 02:50:52 +03:00
use std::fmt;
2022-12-17 07:05:01 +03:00
use std::hash::{Hash, Hasher};
2022-09-15 20:57:24 +03:00
use std::net::IpAddr;
use std::num::NonZeroU64;
2022-07-14 02:25:01 +03:00
use std::str::FromStr;
use std::sync::{atomic, Arc};
2022-05-12 02:50:52 +03:00
use std::time::Duration;
2022-09-27 05:01:45 +03:00
use tokio::sync::{broadcast, watch, Semaphore};
2022-06-14 08:43:28 +03:00
use tokio::task::JoinHandle;
2022-11-14 21:24:52 +03:00
use tokio::time::{sleep, timeout};
2022-09-24 08:53:45 +03:00
use ulid::Ulid;
2022-05-12 02:50:52 +03:00
// TODO: make this customizable?
pub static APP_USER_AGENT: &str = concat!(
2022-05-12 02:50:52 +03:00
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
2022-11-20 01:05:51 +03:00
/// TODO: allow customizing the request period?
pub static REQUEST_PERIOD: u64 = 60;
2022-12-17 07:05:01 +03:00
#[derive(From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
block: Option<SavedBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
cache_errors: bool,
}
impl ResponseCacheKey {
fn weight(&self) -> usize {
let mut w = self.method.len();
if let Some(p) = self.params.as_ref() {
w += p.to_string().len();
}
w
}
}
impl PartialEq for ResponseCacheKey {
fn eq(&self, other: &Self) -> bool {
if self.cache_errors != other.cache_errors {
return false;
}
match (self.block.as_ref(), other.block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
}
(Some(_), None) => {
return false;
}
(Some(s), Some(o)) => {
if s != o {
return false;
}
}
}
if self.method != other.method {
return false;
}
self.params == other.params
}
}
impl Eq for ResponseCacheKey {}
impl Hash for ResponseCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.block.as_ref().map(|x| x.hash()).hash(state);
self.method.hash(state);
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
}
}
type ResponseCache =
Cache<ResponseCacheKey, JsonRpcForwardedResponse, hashbrown::hash_map::DefaultHashBuilder>;
2022-05-21 01:16:15 +03:00
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
#[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks {
/// database id of the primary user.
/// TODO: do we need this? its on the authorization so probably not
2022-10-21 23:59:05 +03:00
pub user_id: u64,
2022-10-27 03:12:42 +03:00
/// database id of the rpc key
/// if this is None, then this request is being rate limited by ip
pub rpc_key_id: Option<NonZeroU64>,
2022-11-01 21:54:39 +03:00
/// if None, allow unlimited queries. inherited from the user_tier
2022-09-28 06:35:55 +03:00
pub max_requests_per_period: Option<u64>,
2022-11-01 21:54:39 +03:00
// if None, allow unlimited concurrent requests. inherited from the user_tier
2022-11-01 22:12:57 +03:00
pub max_concurrent_requests: Option<u32>,
/// if None, allow any Origin
2022-10-27 00:39:26 +03:00
pub allowed_origins: Option<Vec<Origin>>,
/// if None, allow any Referer
pub allowed_referers: Option<Vec<Referer>>,
/// if None, allow any UserAgent
pub allowed_user_agents: Option<Vec<UserAgent>>,
/// if None, allow any IP Address
pub allowed_ips: Option<Vec<IpNet>>,
2022-12-12 07:39:54 +03:00
pub log_level: LogLevel,
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
2022-11-01 21:54:39 +03:00
/// TODO: f32 would be fine
pub log_revert_chance: f64,
2022-08-10 08:56:09 +03:00
}
/// Simple wrapper so that we can keep track of read only connections.
/// This does no blocking of writing in the compiler!
#[derive(Clone)]
pub struct DatabaseReplica(pub DatabaseConnection);
// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me
impl DatabaseReplica {
pub fn conn(&self) -> &DatabaseConnection {
&self.0
}
}
2022-08-24 03:59:05 +03:00
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
pub private_rpcs: Option<Arc<Web3Connections>>,
2022-09-05 04:52:59 +03:00
response_cache: ResponseCache,
2022-08-24 03:59:05 +03:00
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
2022-08-30 23:01:42 +03:00
head_block_receiver: watch::Receiver<ArcBlock>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_replica: Option<DatabaseReplica>,
2022-09-09 00:01:36 +03:00
/// prometheus metrics
2022-09-09 06:53:16 +03:00
app_metrics: Arc<Web3ProxyAppMetrics>,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
2022-09-05 08:53:58 +03:00
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
2022-09-15 20:57:24 +03:00
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
2022-09-24 06:59:21 +03:00
pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>,
2022-11-01 21:54:39 +03:00
// TODO: this key should be our RpcSecretKey class, not Ulid
pub rpc_secret_key_cache:
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>,
pub registered_user_semaphores:
Cache<NonZeroU64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
2022-09-27 05:01:45 +03:00
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
2022-12-14 05:13:23 +03:00
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
2022-08-24 03:59:05 +03:00
}
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-08-10 05:37:34 +03:00
/// Useful when joining multiple futures.
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
}
}
2022-07-16 03:08:22 +03:00
/// return the first error or okay if everything worked
2022-11-12 11:24:32 +03:00
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
2022-06-16 20:51:49 +03:00
) -> anyhow::Result<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
2022-07-16 03:08:22 +03:00
Ok(Ok(_)) => continue,
2022-06-16 20:51:49 +03:00
}
}
Ok(())
}
pub async fn get_db(
db_url: String,
2022-09-02 23:16:20 +03:00
min_connections: u32,
2022-08-10 07:30:54 +03:00
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the db_url in logs
info!("Connecting to db");
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
2022-08-10 05:37:34 +03:00
// TODO: load all these options from the config file. i think mysql default max is 100
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
2022-09-20 01:41:53 +03:00
.connect_timeout(Duration::from_secs(30))
2022-09-02 23:16:20 +03:00
.min_connections(min_connections)
2022-08-10 07:30:54 +03:00
.max_connections(max_connections)
.sqlx_logging(false);
// .sqlx_logging_level(log::LevelFilter::Info);
Database::connect(db_opt).await
}
pub async fn drop_migration_lock(db_conn: &DatabaseConnection) -> Result<(), DbErr> {
2022-11-14 21:24:52 +03:00
let db_backend = db_conn.get_database_backend();
let drop_lock_statment = db_backend.build(Table::drop().table(Alias::new("migration_lock")));
db_conn.execute(drop_lock_statment).await?;
2022-12-12 07:39:54 +03:00
debug!("migration lock unlocked");
Ok(())
}
/// Connect to the database and run migrations
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> anyhow::Result<DatabaseConnection> {
2022-12-08 09:54:38 +03:00
// TODO: this seems to fail silently
let db_conn = get_db(db_url, min_connections, max_connections).await?;
let db_backend = db_conn.get_database_backend();
2022-11-14 21:24:52 +03:00
// TODO: put the timestamp into this?
let create_lock_statment = db_backend.build(
Table::create()
.table(Alias::new("migration_lock"))
2022-11-14 21:24:52 +03:00
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
loop {
if Migrator::get_pending_migrations(&db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(db_conn);
}
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
debug!("Unable to acquire lock. err={:?}", err);
// TODO: exponential backoff with jitter
sleep(Duration::from_secs(1)).await;
continue;
}
debug!("migration lock acquired");
break;
}
let migration_result = Migrator::up(&db_conn, None).await;
// drop the distributed lock
drop_migration_lock(&db_conn).await?;
2022-11-14 21:24:52 +03:00
// return if migrations erred
migration_result?;
2022-10-20 09:17:20 +03:00
Ok(db_conn)
}
2022-10-31 23:05:58 +03:00
#[derive(From)]
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
// cancellable handles
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
}
2022-09-09 06:53:16 +03:00
#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)]
2022-05-12 02:50:52 +03:00
impl Web3ProxyApp {
2022-10-03 21:08:01 +03:00
/// The main entrypoint.
pub async fn spawn(
2022-08-12 22:07:14 +03:00
top_config: TopConfig,
2022-09-14 09:18:13 +03:00
num_workers: usize,
2022-10-21 01:51:56 +03:00
shutdown_receiver: broadcast::Receiver<()>,
2022-10-31 23:05:58 +03:00
) -> anyhow::Result<Web3ProxyAppSpawn> {
2022-08-12 22:07:14 +03:00
// safety checks on the config
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
2022-10-18 00:47:58 +03:00
assert!(
2022-11-30 00:29:34 +03:00
redirect.contains("{{rpc_key_id}}"),
2022-11-30 01:27:02 +03:00
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
2022-10-18 00:47:58 +03:00
);
}
2022-08-12 22:07:14 +03:00
2022-12-28 19:36:22 +03:00
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
2022-12-28 19:36:22 +03:00
}
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
2022-12-28 19:36:22 +03:00
}
2022-10-03 21:08:01 +03:00
// setup metrics
2022-09-09 06:53:16 +03:00
let app_metrics = Default::default();
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
2022-10-03 21:08:01 +03:00
// connect to mysql and make sure the latest migrations have run
if let Some(db_url) = top_config.app.db_url.clone() {
2022-09-14 09:18:13 +03:00
let db_min_connections = top_config
.app
.db_min_connections
.unwrap_or(num_workers as u32);
2022-07-26 07:53:38 +03:00
2022-09-02 23:16:20 +03:00
// TODO: what default multiple?
2022-10-03 21:08:01 +03:00
let db_max_connections = top_config
2022-09-02 23:16:20 +03:00
.app
.db_max_connections
2022-09-06 23:50:37 +03:00
.unwrap_or(db_min_connections * 2);
2022-09-02 23:16:20 +03:00
db_conn = Some(
get_migrated_db(db_url.clone(), db_min_connections, db_max_connections).await?,
);
db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() {
if db_replica_url == db_url {
// url is the same. do not make a new connection or we might go past our max connections
db_conn.clone().map(DatabaseReplica)
} else {
let db_replica_min_connections = top_config
.app
.db_replica_min_connections
.unwrap_or(db_min_connections);
let db_replica_max_connections = top_config
.app
.db_replica_max_connections
.unwrap_or(db_max_connections);
let db_replica = get_db(
db_replica_url,
db_replica_min_connections,
db_replica_max_connections,
)
.await?;
Some(DatabaseReplica(db_replica))
}
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn.clone().map(DatabaseReplica)
};
2022-07-26 07:53:38 +03:00
} else {
if top_config.app.db_replica_url.is_some() {
return Err(anyhow::anyhow!(
"if there is a db_replica_url, there must be a db_url"
));
}
2022-11-20 01:05:51 +03:00
warn!("no database. some features will be disabled");
2022-07-26 07:53:38 +03:00
};
2022-08-12 22:07:14 +03:00
let balanced_rpcs = top_config.balanced_rpcs;
2022-12-05 04:25:23 +03:00
// safety check on balanced_rpcs
if balanced_rpcs.len() < top_config.app.min_synced_rpcs {
return Err(anyhow::anyhow!(
"Only {}/{} rpcs! Add more balanced_rpcs or reduce min_synced_rpcs.",
balanced_rpcs.len(),
top_config.app.min_synced_rpcs
));
}
// safety check on sum soft limit
let sum_soft_limit = balanced_rpcs.values().fold(0, |acc, x| acc + x.soft_limit);
if sum_soft_limit < top_config.app.min_sum_soft_limit {
return Err(anyhow::anyhow!(
"Only {}/{} soft limit! Add more balanced_rpcs, increase soft limits, or reduce min_sum_soft_limit.",
sum_soft_limit,
top_config.app.min_sum_soft_limit
));
}
2022-10-03 21:08:01 +03:00
let private_rpcs = top_config.private_rpcs.unwrap_or_default();
2022-07-09 02:02:32 +03:00
2022-10-21 01:51:56 +03:00
// these are safe to cancel
let cancellable_handles = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
2022-06-14 08:43:28 +03:00
2022-05-12 02:50:52 +03:00
// make a http shared client
2022-07-09 02:02:32 +03:00
// TODO: can we configure the connection pool? should we?
2022-10-03 21:08:01 +03:00
// TODO: timeouts from config. defaults are hopefully good
2022-05-22 02:34:05 +03:00
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(60))
.user_agent(APP_USER_AGENT)
.build()?,
);
2022-10-03 21:08:01 +03:00
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
let vredis_pool = match top_config.app.volatile_redis_url.as_ref() {
2022-07-26 07:53:38 +03:00
Some(redis_url) => {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to vredis");
2022-05-22 21:39:06 +03:00
2022-09-14 09:18:13 +03:00
// TODO: what is a good default?
2022-09-02 23:16:20 +03:00
let redis_max_connections = top_config
.app
.volatile_redis_max_connections
2022-09-14 09:18:13 +03:00
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts?
2022-09-14 09:18:13 +03:00
let redis_pool = RedisConfig::from_url(redis_url)
.builder()?
.max_size(redis_max_connections)
2022-09-15 20:57:24 +03:00
.runtime(DeadpoolRuntime::Tokio1)
2022-09-14 09:18:13 +03:00
.build()?;
// test the redis pool
2022-09-17 04:19:11 +03:00
if let Err(err) = redis_pool.get().await {
error!(
2022-11-12 11:24:32 +03:00
"failed to connect to vredis. some features will be disabled. err={:?}",
err
);
2022-09-17 04:19:11 +03:00
};
2022-09-14 09:18:13 +03:00
Some(redis_pool)
2022-05-22 21:39:06 +03:00
}
None => {
warn!("no redis connection. some features will be disabled");
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
};
2022-05-12 02:50:52 +03:00
// setup a channel for receiving stats (generally with a high cardinality, such as per-user)
2022-10-03 21:08:01 +03:00
// we do this in a channel so we don't slow down our response to the users
2022-10-10 07:15:07 +03:00
let stat_sender = if let Some(db_conn) = db_conn.clone() {
2022-11-03 02:14:16 +03:00
let emitter_spawn =
StatEmitter::spawn(top_config.app.chain_id, db_conn, 60, shutdown_receiver)?;
2022-10-10 07:15:07 +03:00
2022-11-03 02:14:16 +03:00
important_background_handles.push(emitter_spawn.background_handle);
2022-10-03 21:08:01 +03:00
2022-11-03 02:14:16 +03:00
Some(emitter_spawn.stat_sender)
2022-10-03 21:08:01 +03:00
} else {
2022-11-03 02:14:16 +03:00
warn!("cannot store stats without a database connection");
2022-10-03 21:08:01 +03:00
2022-11-04 22:52:46 +03:00
// TODO: subscribe to the shutdown_receiver here since the stat emitter isn't running?
2022-10-03 21:08:01 +03:00
None
};
// TODO: i don't like doing Block::default here! Change this to "None"?
2022-07-22 08:11:26 +03:00
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
2022-10-03 21:08:01 +03:00
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-07-26 07:53:38 +03:00
drop(pending_tx_receiver);
2022-09-17 05:30:06 +03:00
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: ttl on this? or is max_capacity fine?
let pending_transactions = Cache::builder()
.max_capacity(10_000)
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-06-16 20:51:49 +03:00
// keep 1GB of blocks in the cache
2022-09-17 05:30:06 +03:00
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
2022-11-06 23:52:11 +03:00
// TODO: how can we do the weigher better?
let block_map = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
2022-11-06 23:52:11 +03:00
.weigher(|_k, v: &ArcBlock| {
// TODO: is this good enough?
2022-11-06 23:52:11 +03:00
1 + v.transactions.len().try_into().unwrap_or(u32::MAX)
})
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-08-26 20:26:17 +03:00
2022-10-03 21:08:01 +03:00
// connect to the load balanced rpcs
2022-06-14 08:43:28 +03:00
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
2022-08-12 22:07:14 +03:00
top_config.app.chain_id,
db_conn.clone(),
2022-05-22 02:34:05 +03:00
balanced_rpcs,
http_client.clone(),
vredis_pool.clone(),
2022-08-26 20:26:17 +03:00
block_map.clone(),
Some(head_block_sender),
2022-08-27 06:11:58 +03:00
top_config.app.min_sum_soft_limit,
2022-08-27 03:33:45 +03:00
top_config.app.min_synced_rpcs,
2022-06-16 05:53:37 +03:00
Some(pending_tx_sender.clone()),
2022-06-16 20:51:49 +03:00
pending_transactions.clone(),
2022-09-09 06:53:16 +03:00
open_request_handle_metrics.clone(),
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
.await
2022-10-03 21:08:01 +03:00
.context("spawning balanced rpcs")?;
2022-05-18 19:35:06 +03:00
2022-10-03 21:08:01 +03:00
// save the handle to catch any errors
2022-10-21 01:51:56 +03:00
cancellable_handles.push(balanced_handle);
2022-06-14 08:43:28 +03:00
2022-10-03 21:08:01 +03:00
// connect to the private rpcs
// only some chains have this, so this is optional
2022-05-12 02:50:52 +03:00
let private_rpcs = if private_rpcs.is_empty() {
// TODO: do None instead of clone?
2022-05-12 02:50:52 +03:00
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
2022-05-12 02:50:52 +03:00
} else {
2022-06-14 08:43:28 +03:00
let (private_rpcs, private_handle) = Web3Connections::spawn(
2022-08-12 22:07:14 +03:00
top_config.app.chain_id,
db_conn.clone(),
2022-05-22 02:34:05 +03:00
private_rpcs,
http_client.clone(),
vredis_pool.clone(),
2022-08-26 20:26:17 +03:00
block_map,
2022-08-11 00:29:50 +03:00
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None,
2022-10-19 02:27:33 +03:00
0,
0,
2022-08-11 00:29:50 +03:00
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
None,
2022-06-16 20:51:49 +03:00
pending_transactions.clone(),
2022-09-09 06:53:16 +03:00
open_request_handle_metrics.clone(),
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
.await
2022-10-03 21:08:01 +03:00
.context("spawning private_rpcs")?;
2022-06-14 08:43:28 +03:00
2022-10-20 11:14:38 +03:00
if private_rpcs.conns.is_empty() {
None
} else {
// save the handle to catch any errors
2022-10-21 01:51:56 +03:00
cancellable_handles.push(private_handle);
2022-06-14 08:43:28 +03:00
Some(private_rpcs)
}
2022-05-12 02:50:52 +03:00
};
2022-10-03 21:08:01 +03:00
// create rate limiters
// these are optional. they require redis
2022-09-15 20:57:24 +03:00
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
2022-09-24 06:59:21 +03:00
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
2022-09-24 06:59:21 +03:00
let rpc_rrl = RedisRateLimiter::new(
2022-08-06 08:46:33 +03:00
"web3_proxy",
2022-08-06 08:26:43 +03:00
"frontend",
2022-10-19 02:27:33 +03:00
// TODO: think about this unwrapping
top_config
.app
2022-11-01 22:12:57 +03:00
.public_requests_per_period
2022-10-19 02:27:33 +03:00
.unwrap_or(u64::MAX),
2022-08-30 23:01:42 +03:00
60.0,
2022-09-15 20:57:24 +03:00
redis_pool.clone(),
);
2022-09-24 06:59:21 +03:00
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
2022-09-15 20:57:24 +03:00
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
2022-09-24 06:59:21 +03:00
rpc_rrl.clone(),
None,
));
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
2022-09-24 06:59:21 +03:00
10_000, "key", rpc_rrl, None,
));
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
2022-11-01 22:12:57 +03:00
top_config.app.login_rate_limit_per_period,
2022-09-24 06:59:21 +03:00
60.0,
redis_pool.clone(),
2022-09-15 20:57:24 +03:00
));
}
2022-07-07 06:22:09 +03:00
// keep 1GB of blocks in the cache
2022-09-17 05:30:06 +03:00
// responses can be very different in sizes, so this definitely needs a weigher
// TODO: max_capacity from config
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
2022-12-17 07:05:01 +03:00
.weigher(|k: &ResponseCacheKey, v| {
// TODO: is this good?
if let Ok(v) = serde_json::to_string(v) {
2022-12-17 07:05:01 +03:00
let weight = k.weight() + v.len();
// the or in unwrap_or is probably never called
2022-12-17 07:05:01 +03:00
weight.try_into().unwrap_or(u32::MAX)
} else {
// this seems impossible
u32::MAX
}
})
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-09-15 20:57:24 +03:00
2022-09-17 05:30:06 +03:00
// all the users are the same size, so no need for a weigher
2022-10-03 21:08:01 +03:00
// if there is no database of users, there will be no keys and so this will be empty
2022-09-17 05:30:06 +03:00
// TODO: max_capacity from config
2022-09-22 02:50:55 +03:00
// TODO: ttl from config
2022-11-01 21:54:39 +03:00
let rpc_secret_key_cache = Cache::builder()
.max_capacity(10_000)
2022-11-01 21:54:39 +03:00
.time_to_live(Duration::from_secs(600))
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-09-05 08:53:58 +03:00
2022-10-03 21:08:01 +03:00
// create semaphores for concurrent connection limits
2022-09-27 05:01:45 +03:00
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache::builder()
2022-09-27 05:01:45 +03:00
.time_to_idle(Duration::from_secs(120))
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-09-27 05:01:45 +03:00
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let registered_user_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-09-27 05:01:45 +03:00
2022-07-07 06:22:09 +03:00
let app = Self {
2022-08-12 22:07:14 +03:00
config: top_config.app,
2022-05-13 23:50:11 +03:00
balanced_rpcs,
2022-05-12 02:50:52 +03:00
private_rpcs,
2022-09-05 08:53:58 +03:00
response_cache,
2022-05-30 04:28:22 +03:00
head_block_receiver,
2022-06-16 05:53:37 +03:00
pending_tx_sender,
2022-06-16 20:51:49 +03:00
pending_transactions,
2022-09-15 20:57:24 +03:00
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,
2022-09-24 06:59:21 +03:00
login_rate_limiter,
2022-07-26 07:53:38 +03:00
db_conn,
db_replica,
vredis_pool,
2022-09-09 06:53:16 +03:00
app_metrics,
open_request_handle_metrics,
2022-11-01 21:54:39 +03:00
rpc_secret_key_cache,
bearer_token_semaphores,
2022-09-27 05:01:45 +03:00
ip_semaphores,
registered_user_semaphores,
stat_sender,
};
let app = Arc::new(app);
2022-10-31 23:05:58 +03:00
Ok((app, cancellable_handles, important_background_handles).into())
2022-05-12 02:50:52 +03:00
}
2022-12-28 09:11:18 +03:00
pub async fn prometheus_metrics(&self) -> String {
2022-09-09 00:01:36 +03:00
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
2022-12-28 09:11:18 +03:00
#[derive(Serialize)]
struct RecentCounts {
one_day: i64,
one_hour: i64,
one_minute: i64,
}
2022-12-28 09:11:18 +03:00
impl RecentCounts {
fn for_err() -> Self {
Self {
one_day: -1,
one_hour: -1,
one_minute: -1,
}
2022-12-28 09:11:18 +03:00
}
}
2022-12-28 09:11:18 +03:00
let (recent_ip_counts, recent_user_counts): (RecentCounts, RecentCounts) =
match self.redis_conn().await {
Ok(mut redis_conn) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
const ONE_DAY: i64 = ONE_HOUR * 24;
let one_day_ago = Utc::now().timestamp() - ONE_DAY;
let one_hour_ago = Utc::now().timestamp() - ONE_HOUR;
let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE;
let recent_users_by_user =
format!("recent_users:registered:{}", self.config.chain_id);
let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id);
match redis::pipe()
.atomic()
// delete any entries older than 24 hours
.zrembyscore(&recent_users_by_user, i64::MIN, one_day_ago)
.ignore()
.zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago)
.ignore()
// get count for last day
.zcount(&recent_users_by_user, one_day_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_day_ago, i64::MAX)
// get count for last hour
.zcount(&recent_users_by_user, one_hour_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_hour_ago, i64::MAX)
// get count for last minute
.zcount(&recent_users_by_user, one_minute_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_minute_ago, i64::MAX)
.query_async(&mut redis_conn)
.await
{
Ok((
day_by_user,
day_by_ip,
hour_by_user,
hour_by_ip,
minute_by_user,
minute_by_ip,
)) => {
let recent_ip_counts = RecentCounts {
one_day: day_by_ip,
one_hour: hour_by_ip,
one_minute: minute_by_ip,
};
let recent_user_counts = RecentCounts {
one_day: day_by_user,
one_hour: hour_by_user,
one_minute: minute_by_user,
};
(recent_ip_counts, recent_user_counts)
}
Err(err) => {
warn!("unable to count recent users: {}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
}
}
}
2022-12-28 09:11:18 +03:00
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
2022-12-28 09:11:18 +03:00
}
};
2022-12-28 09:11:18 +03:00
2022-09-09 06:53:16 +03:00
#[derive(Serialize)]
struct CombinedMetrics<'a> {
app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
recent_ip_counts: RecentCounts,
recent_user_counts: RecentCounts,
2022-09-09 06:53:16 +03:00
}
let metrics = CombinedMetrics {
app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
recent_ip_counts,
recent_user_counts,
2022-09-09 06:53:16 +03:00
};
2022-09-10 05:59:07 +03:00
serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)
.expect("prometheus metrics should always serialize")
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: Arc<Authorization>,
2022-05-12 02:50:52 +03:00
request: JsonRpcRequestEnum,
) -> Result<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Connection>>), FrontendErrorResponse>
{
2022-09-05 04:52:59 +03:00
// TODO: this should probably be trace level
2022-11-12 11:24:32 +03:00
// // trace!(?request, "proxy_web3_rpc");
2022-05-12 02:50:52 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. per user max? expiration time instead of duration?
let max_time = Duration::from_secs(120);
2022-05-12 02:50:52 +03:00
let response = match request {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_request(&authorization, request),
)
.await??;
(JsonRpcForwardedResponseEnum::Single(response), rpcs)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??;
2022-05-12 02:50:52 +03:00
(JsonRpcForwardedResponseEnum::Batch(responses), rpcs)
}
};
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok(response)
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: &Arc<Authorization>,
2022-05-12 02:50:52 +03:00
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Connection>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
2022-05-12 02:50:52 +03:00
let num_requests = requests.len();
2022-10-10 07:15:07 +03:00
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
2022-05-12 02:50:52 +03:00
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_web3_rpc_request(authorization, request))
2022-05-12 02:50:52 +03:00
.collect::<Vec<_>>(),
)
.await;
// TODO: i'm sure this could be done better with iterators
// TODO: stream the response?
2022-05-12 02:50:52 +03:00
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpcs: HashSet<Arc<Web3Connection>> = HashSet::new();
2022-05-12 02:50:52 +03:00
for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (response, rpcs) = response?;
collected.push(response);
collected_rpcs.extend(rpcs.into_iter());
2022-05-12 02:50:52 +03:00
}
let collected_rpcs: Vec<_> = collected_rpcs.into_iter().collect();
Ok((collected, collected_rpcs))
2022-05-12 02:50:52 +03:00
}
2022-10-19 03:56:57 +03:00
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
2022-09-24 07:31:06 +03:00
}
pub fn db_replica(&self) -> Option<DatabaseReplica> {
self.db_replica.clone()
}
2022-09-15 20:57:24 +03:00
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
Ok(redis_conn)
}
}
}
2022-09-14 10:08:48 +03:00
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_request(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
) -> anyhow::Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Connection>>)> {
2022-11-12 11:24:32 +03:00
// trace!("Received request: {:?}", request);
2022-05-12 02:50:52 +03:00
2022-11-20 01:05:51 +03:00
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
2022-10-10 07:15:07 +03:00
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out?
2022-09-07 06:54:16 +03:00
let request_id = request.id.clone();
let request_method = request.method.clone();
2022-05-31 04:55:04 +03:00
// TODO: if eth_chainId or net_version, serve those without querying the backend
2022-09-22 02:50:55 +03:00
// TODO: don't clone?
let partial_response: serde_json::Value = match request_method.as_ref() {
// lots of commands are blocked
2022-09-09 00:01:36 +03:00
method @ ("admin_addPeer"
2022-06-14 09:54:19 +03:00
| "admin_datadir"
| "admin_startRPC"
| "admin_startWS"
| "admin_stopRPC"
| "admin_stopWS"
| "db_getHex"
| "db_getString"
| "db_putHex"
| "db_putString"
2022-06-14 09:54:19 +03:00
| "debug_chaindbCompact"
| "debug_freezeClient"
| "debug_goTrace"
| "debug_mutexProfile"
| "debug_setBlockProfileRate"
| "debug_setGCPercent"
| "debug_setHead"
| "debug_setMutexProfileFraction"
| "debug_standardTraceBlockToFile"
| "debug_standardTraceBadBlockToFile"
| "debug_startCPUProfile"
| "debug_startGoTrace"
| "debug_stopCPUProfile"
| "debug_stopGoTrace"
| "debug_writeBlockProfile"
| "debug_writeMemProfile"
| "debug_writeMutexProfile"
| "eth_compileLLL"
| "eth_compileSerpent"
| "eth_compileSolidity"
| "eth_getCompilers"
| "eth_sendTransaction"
| "eth_sign"
| "eth_signTransaction"
| "eth_submitHashrate"
| "eth_submitWork"
2022-11-21 20:49:41 +03:00
| "erigon_cacheCheck"
2022-06-14 09:54:19 +03:00
| "les_addBalance"
| "les_setClientParams"
| "les_setDefaultParams"
| "miner_setExtra"
| "miner_setGasPrice"
| "miner_start"
| "miner_stop"
| "miner_setEtherbase"
| "miner_setGasLimit"
| "personal_importRawKey"
| "personal_listAccounts"
| "personal_lockAccount"
| "personal_newAccount"
| "personal_unlockAccount"
| "personal_sendTransaction"
| "personal_sign"
| "personal_ecRecover"
| "shh_addToGroup"
| "shh_getFilterChanges"
| "shh_getMessages"
| "shh_hasIdentity"
| "shh_newFilter"
| "shh_newGroup"
| "shh_newIdentity"
| "shh_post"
| "shh_uninstallFilter"
2022-09-09 00:01:36 +03:00
| "shh_version") => {
// TODO: client error stat
2022-06-30 03:52:04 +03:00
// TODO: proper error code
2022-09-09 00:01:36 +03:00
return Err(anyhow::anyhow!("method unsupported: {}", method));
}
// TODO: implement these commands
2022-09-09 00:01:36 +03:00
method @ ("eth_getFilterChanges"
| "eth_getFilterLogs"
| "eth_newBlockFilter"
| "eth_newFilter"
| "eth_newPendingTransactionFilter"
2022-09-09 00:01:36 +03:00
| "eth_uninstallFilter") => {
// TODO: unsupported command stat
2022-09-09 00:01:36 +03:00
return Err(anyhow::anyhow!("not yet implemented: {}", method));
}
// some commands can use local data or caches
"eth_accounts" => {
// no stats on this. its cheap
serde_json::Value::Array(vec![])
}
"eth_blockNumber" => {
2022-09-01 08:58:55 +03:00
match self.balanced_rpcs.head_block_num() {
Some(head_block_num) => {
json!(head_block_num)
}
None => {
// TODO: what does geth do if this happens?
2022-09-09 00:01:36 +03:00
return Err(anyhow::anyhow!(
"no servers synced. unknown eth_blockNumber"
));
2022-09-01 08:58:55 +03:00
}
}
}
2022-12-15 05:49:40 +03:00
"eth_chainId" => {
json!(U64::from(self.config.chain_id))
2022-12-15 05:49:40 +03:00
}
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
// we could return a per-user payment address here, but then we might leak that to dapps
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json!(Address::zero())
}
/*
// erigon was giving bad estimates. but now it doesn't need it
"eth_estimateGas" => {
// TODO: eth_estimateGas using anvil?
// TODO: modify the block requested?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
authorization,
request,
Some(&request_metadata),
None,
)
.await?;
let parsed_gas_estimate = if let Some(gas_estimate) = response.result.take() {
let parsed_gas_estimate: U256 = serde_json::from_str(gas_estimate.get())
.context("gas estimate result is not an U256")?;
parsed_gas_estimate
} else {
// i think this is always an error response
let rpcs = request_metadata.backend_requests.lock().clone();
return Ok((response, rpcs));
};
// increase by 1.01%
let parsed_gas_estimate =
parsed_gas_estimate * U256::from(101_010) / U256::from(100_000);
json!(parsed_gas_estimate)
}
*/
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => {
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json!(U64::zero())
2022-06-30 03:52:04 +03:00
}
"eth_mining" => {
// no stats on this. its cheap
2022-07-22 22:30:39 +03:00
json!(false)
}
// TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once
2022-08-09 19:54:05 +03:00
"eth_sendRawTransaction" => {
// emit stats
let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
2022-12-24 04:32:58 +03:00
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
.try_send_all_upstream_servers(
authorization,
2022-12-24 04:32:58 +03:00
&request,
Some(request_metadata.clone()),
None,
Level::Trace,
)
.await?;
2022-12-24 04:32:58 +03:00
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Just return the hash like a successful response would.
if let Some(response_error) = response.error.as_ref() {
if response_error.code == -32000
&& (response_error.message == "ALREADY_EXISTS: already known"
|| response_error.message
== "INTERNAL_ERROR: existing tx with same hash")
{
let params = request
.params
.context("there must be params if we got this far")?;
let params = params
.as_array()
.context("there must be an array if we got this far")?
.get(0)
.context("there must be an item if we got this far")?
.as_str()
.context("there must be a string if we got this far")?;
let params = Bytes::from_str(params)
.expect("there must be Bytes if we got this far");
let rlp = Rlp::new(params.as_ref());
if let Ok(tx) = Transaction::decode(&rlp) {
let tx_hash = json!(tx.hash());
debug!("tx_hash: {:#?}", tx_hash);
let tx_hash = to_raw_value(&tx_hash).unwrap();
response.error = None;
response.result = Some(tx_hash);
}
}
}
let rpcs = request_metadata.backend_requests.lock().clone();
// TODO! STATS!
return Ok((response, rpcs));
2022-08-09 19:54:05 +03:00
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
2022-07-22 22:30:39 +03:00
json!(false)
}
"eth_subscribe" => {
return Err(anyhow::anyhow!(
"notifications not supported. eth_subscribe is only available over a websocket"
));
}
"eth_unsubscribe" => {
return Err(anyhow::anyhow!(
"notifications not supported. eth_unsubscribe is only available over a websocket"
));
}
"net_listening" => {
// no stats on this. its cheap
// TODO: only if there are some backends on balanced_rpcs?
2022-07-22 22:30:39 +03:00
json!(true)
}
"net_peerCount" => {
// emit stats
self.balanced_rpcs.num_synced_rpcs().into()
}
"web3_clientVersion" => {
// no stats on this. its cheap
serde_json::Value::String(APP_USER_AGENT.to_string())
}
2022-07-22 22:30:39 +03:00
"web3_sha3" => {
// emit stats
2022-07-22 22:30:39 +03:00
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
Some(serde_json::Value::Array(params)) => {
2022-08-10 05:37:34 +03:00
// TODO: make a struct and use serde conversion to clean this up
2022-07-22 22:30:39 +03:00
if params.len() != 1 || !params[0].is_string() {
2022-09-10 03:58:33 +03:00
// TODO: this needs the correct error code in the response
2022-07-22 22:30:39 +03:00
return Err(anyhow::anyhow!("invalid request"));
}
let param = Bytes::from_str(
params[0]
.as_str()
.context("parsing params 0 into str then bytes")?,
)?;
2022-07-22 22:30:39 +03:00
let hash = H256::from(keccak256(param));
2022-07-22 22:30:39 +03:00
json!(hash)
}
2022-09-10 03:58:33 +03:00
_ => {
// TODO: this needs the correct error code in the response
2022-10-12 00:31:34 +03:00
// TODO: emit stat?
2022-09-10 03:58:33 +03:00
return Err(anyhow::anyhow!("invalid request"));
}
2022-07-22 22:30:39 +03:00
}
}
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method => {
// emit stats
2022-11-03 02:14:16 +03:00
// TODO: if no servers synced, wait for them to be synced?
2022-12-03 08:31:03 +03:00
let head_block = self
2022-09-01 08:58:55 +03:00
.balanced_rpcs
2022-12-06 01:38:54 +03:00
.head_block()
2022-09-01 08:58:55 +03:00
.context("no servers synced")?;
2022-07-16 07:13:02 +03:00
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-12-17 07:05:01 +03:00
// TODO: this cache key can be rather large. is that okay?
let cache_key: Option<ResponseCacheKey> = match block_needed(
authorization,
2022-09-22 02:50:55 +03:00
method,
request.params.as_mut(),
2022-12-03 08:31:03 +03:00
head_block.number(),
2022-09-22 02:50:55 +03:00
&self.balanced_rpcs,
)
2022-09-30 07:18:18 +03:00
.await?
2022-09-05 09:13:36 +03:00
{
2022-12-17 07:05:01 +03:00
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
block: None,
method: method.to_string(),
params: request.params.clone(),
cache_errors: false,
}),
BlockNeeded::CacheNever => None,
BlockNeeded::Cache {
block_num,
cache_errors,
} => {
let (request_block_hash, archive_needed) = self
.balanced_rpcs
.block_hash(authorization, &block_num)
.await?;
if archive_needed {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
}
2022-12-03 08:31:03 +03:00
2022-12-17 07:05:01 +03:00
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.await?;
Some(ResponseCacheKey {
block: Some(SavedBlock::new(request_block)),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
cache_errors,
})
}
};
let mut response = {
let request_metadata = request_metadata.clone();
let authorization = authorization.clone();
2022-10-10 07:15:07 +03:00
2022-12-17 07:05:01 +03:00
if let Some(cache_key) = cache_key {
let request_block_number = cache_key.block.as_ref().map(|x| x.number());
self.response_cache
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
request_block_number.as_ref(),
)
.await?;
// discard their id by replacing it with an empty
response.id = Default::default();
2022-12-24 04:32:58 +03:00
// TODO: only cache the inner response
2022-12-17 07:05:01 +03:00
Ok::<_, anyhow::Error>(response)
})
.await
// TODO: what is the best way to handle an Arc here?
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!(err)
})
2022-12-24 04:32:58 +03:00
.context("error while forwarding and caching response")?
2022-12-17 07:05:01 +03:00
} else {
2022-12-24 04:32:58 +03:00
self.balanced_rpcs
2022-12-17 07:05:01 +03:00
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
None,
)
2022-12-24 04:32:58 +03:00
.await
.context("error while forwarding response")?
2022-12-17 07:05:01 +03:00
}
};
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
response.id = request_id;
// TODO: DRY!
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
method.to_string(),
authorization.clone(),
2022-10-10 07:15:07 +03:00
request_metadata,
2022-11-20 01:05:51 +03:00
response.num_bytes(),
);
stat_sender
.send_async(response_stat.into())
.await
.context("stat_sender sending response_stat")?;
}
2022-05-29 04:23:58 +03:00
return Ok((response, rpcs));
}
2022-07-22 22:30:39 +03:00
};
2022-09-07 06:54:16 +03:00
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
2022-07-22 22:30:39 +03:00
// TODO: DRY
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_method,
authorization.clone(),
request_metadata,
2022-11-20 01:05:51 +03:00
response.num_bytes(),
);
2022-10-12 00:31:34 +03:00
stat_sender
.send_async(response_stat.into())
.await
.context("stat_sender sending response stat")?;
2022-10-12 00:31:34 +03:00
}
Ok((response, rpcs))
2022-05-12 02:50:52 +03:00
}
}
2022-08-24 03:59:05 +03:00
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}