web3-proxy/web3_proxy/src/app/mod.rs

1797 lines
72 KiB
Rust
Raw Normal View History

2022-08-10 05:37:34 +03:00
// TODO: this file is way too big now. move things into other modules
mod ws;
2022-08-10 05:37:34 +03:00
2022-12-17 07:05:01 +03:00
use crate::block_number::{block_needed, BlockNeeded};
2022-08-24 03:59:05 +03:00
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{
Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, RpcSecretKey,
};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
2023-01-17 09:54:40 +03:00
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest,
JsonRpcRequestEnum,
};
use crate::response_cache::{
JsonRpcQueryCache, JsonRpcQueryCacheKey, JsonRpcQueryWeigher, JsonRpcResponseData,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
2022-08-24 03:59:05 +03:00
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, StatBuffer};
2022-12-14 05:13:23 +03:00
use crate::user_token::UserBearerToken;
2022-07-26 07:53:38 +03:00
use anyhow::Context;
2022-10-27 00:39:26 +03:00
use axum::headers::{Origin, Referer, UserAgent};
use axum::http::StatusCode;
2022-12-28 09:11:18 +03:00
use chrono::Utc;
2022-09-15 20:57:24 +03:00
use deferred_rate_limiter::DeferredRateLimiter;
2022-08-10 05:37:34 +03:00
use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
2023-01-03 04:06:36 +03:00
use entities::user;
2022-07-22 22:30:39 +03:00
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
2023-01-20 05:08:53 +03:00
use ethers::types::U256;
2022-12-24 04:32:58 +03:00
use ethers::utils::rlp::{Decodable, Rlp};
2023-02-27 10:52:37 +03:00
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
2023-05-15 20:48:59 +03:00
use hashbrown::hash_map::DefaultHashBuilder;
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
2023-01-02 21:44:36 +03:00
use log::{debug, error, info, trace, warn, Level};
User Balance + Referral Logic (#44) * will implement balance topup endpoint * will quickly fix other PR reviews * merging from master * will finish up godmoe * will finish up login * added logic to top up balance (first iteration) * should implement additional columns soon (currency, amount, tx-hash), as well as a new table for spend * updated migrations, will account for spend next * get back to this later * will merge PR from stats-v2 * stats v2 rebased all my commits and squashed them down to one * cargo upgrade * added migrtation for spend in accounting table. will run test-deposit next * trying to get request from polygon * first iteration /user/balance/:tx_hash works, needs to add accepted tokens next * creating the referral code seems to work * will now check if spending enough credits will lead to both parties receiving credits * rpcstats takes care of accounting for spend data * removed track spend from table * Revert "removed track spend from table" This reverts commit a50802d6ae75f786864c5ec42d0ceb2cb27124ed. * Revert "rpcstats takes care of accounting for spend data" This reverts commit 1cec728bf241e4cfd24351134637ed81c1a5a10b. * removed rpc request table entity * updated referral code to use ulid s * credits used are aggregated * added a bunch of fields to referrer * added database logic whenever an aggregate stats is added. will have to iterate over this a couple times i think. go to (1) detecting accepted stables next, (2) fix influxdb bug and (3) start to write test * removed track spend as this will occur in the database * will first work on "balance", then referral. these should really be treated as two separate PRs (although already convoluted) * balance logic initial commit * breaking WIP, changing the RPC call logic functions * will start testing next * got rid of warnings & lint * will proceed with subtracting / adding to balance * added decimal points, balance tracking seems to work * will beautify code a bit * removed deprecated dependency, and added topic + deposit contract to app.yaml * brownie test suite does not rely on local contract files it pulls all from polygonscan * will continue with referral * should perhaps (in a future revision) recordhow much the referees got for free. marking referrals seems to work rn * user is upgraded to premium if they deposit more than 10$. we dont accept more than $10M in a single tx * will start PR, referral seems to be fine so far, perhaps up to some numbers that still may need tweaking * will start PR * removed rogue comments, cleaned up payments a bit * changes before PR * apply stats * added unique constraint * some refactoring such that the user file is not too bloated * compiling * progress with subusers, creating a table entry seems to work * good response type is there as well now, will work on getters from primary user and secondary user next * subuser logic also seems fine now * downgrade logic * fixed bug influxdb does not support different types in same query (which makes sense) * WIP temporary commit * merging with PR * Delete daemon.rs there are multiple daemons now, so this was moved to `proxyd` * will remove request clone to &mut * multiple request handles for payment * making requests still seem fine * removed redundant commented out bits * added deposit endpoint, added deposit amount and deposit user, untested yet * small bug with downgrade tier id * will add authorization so balance can be received for users * balance history should be set now too * will check balance over time again * subususer can see rpc key balance if admin or owner * stats also seems to work fine now with historical balance * things seem to be building and working * removed clone from OpenRequestHandle * removed influxdb from workspace members * changed config files * reran sea-orm generate entities, added a foreign key, should be proper now * removed contract from commit * made deposit contract optional * added topic in polygon dev * changed deposit contract to deposit factory contract * added selfrelation on user_tier * added payment required * changed chain id to u64 * add wss in polygon llamarpc * removed origin and method from the table * added onchain transactions naming (and forgot to add a migration before) * changed foreign key to be the referrer (id), not the code itself * forgot to add id as the target foreign key * WIP adding cache to update role * fixed merge conflicts --------- Co-authored-by: Bryan Stitt <bryan@llamanodes.com> Co-authored-by: Bryan Stitt <bryan@stitthappens.com>
2023-05-12 19:45:15 +03:00
use migration::sea_orm::prelude::Decimal;
2023-01-03 04:06:36 +03:00
use migration::sea_orm::{
self, ConnectionTrait, Database, DatabaseConnection, EntityTrait, PaginatorTrait,
};
2022-11-14 21:24:52 +03:00
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
2023-05-18 23:34:22 +03:00
use quick_cache_ttl::{Cache, CacheWithTTL};
2022-12-29 09:21:09 +03:00
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
2022-09-09 06:53:16 +03:00
use serde::Serialize;
2022-05-30 21:23:55 +03:00
use serde_json::json;
use std::borrow::Cow;
2023-02-27 10:52:37 +03:00
use std::fmt;
2022-09-15 20:57:24 +03:00
use std::net::IpAddr;
use std::num::NonZeroU64;
2022-07-14 02:25:01 +03:00
use std::str::FromStr;
use std::sync::{atomic, Arc};
2022-05-12 02:50:52 +03:00
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
2022-06-14 08:43:28 +03:00
use tokio::task::JoinHandle;
2022-11-14 21:24:52 +03:00
use tokio::time::{sleep, timeout};
2022-05-12 02:50:52 +03:00
// TODO: make this customizable?
2023-01-18 07:18:18 +03:00
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
pub static APP_USER_AGENT: &str = concat!(
2023-01-18 07:18:18 +03:00
"llamanodes_",
2022-05-12 02:50:52 +03:00
env!("CARGO_PKG_NAME"),
2023-01-18 07:18:18 +03:00
"/v",
env!("CARGO_PKG_VERSION")
2022-05-12 02:50:52 +03:00
);
// aggregate across 1 week
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
2022-11-20 01:05:51 +03:00
2022-06-14 08:43:28 +03:00
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
/// TODO: move this
#[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks {
2023-01-19 03:17:43 +03:00
/// database id of the primary user. 0 if anon
/// TODO: do we need this? its on the authorization so probably not
2023-04-11 01:06:40 +03:00
/// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now
2022-10-21 23:59:05 +03:00
pub user_id: u64,
2023-01-19 03:17:43 +03:00
/// the key used (if any)
pub rpc_secret_key: Option<RpcSecretKey>,
2022-10-27 03:12:42 +03:00
/// database id of the rpc key
/// if this is None, then this request is being rate limited by ip
2023-01-19 03:17:43 +03:00
pub rpc_secret_key_id: Option<NonZeroU64>,
2022-11-01 21:54:39 +03:00
/// if None, allow unlimited queries. inherited from the user_tier
2022-09-28 06:35:55 +03:00
pub max_requests_per_period: Option<u64>,
2022-11-01 21:54:39 +03:00
// if None, allow unlimited concurrent requests. inherited from the user_tier
2022-11-01 22:12:57 +03:00
pub max_concurrent_requests: Option<u32>,
/// if None, allow any Origin
2022-10-27 00:39:26 +03:00
pub allowed_origins: Option<Vec<Origin>>,
/// if None, allow any Referer
pub allowed_referers: Option<Vec<Referer>>,
/// if None, allow any UserAgent
pub allowed_user_agents: Option<Vec<UserAgent>>,
/// if None, allow any IP Address
pub allowed_ips: Option<Vec<IpNet>>,
/// how detailed any rpc account entries should be
pub tracking_level: TrackingLevel,
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
/// depending on the caller, errors might be expected. this keeps us from bloating our database
2022-11-01 21:54:39 +03:00
/// TODO: f32 would be fine
pub log_revert_chance: f64,
/// if true, transactions are broadcast only to private mempools.
/// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain!
2023-01-12 01:51:01 +03:00
pub private_txs: bool,
2023-03-03 04:39:50 +03:00
pub proxy_mode: ProxyMode,
User Balance + Referral Logic (#44) * will implement balance topup endpoint * will quickly fix other PR reviews * merging from master * will finish up godmoe * will finish up login * added logic to top up balance (first iteration) * should implement additional columns soon (currency, amount, tx-hash), as well as a new table for spend * updated migrations, will account for spend next * get back to this later * will merge PR from stats-v2 * stats v2 rebased all my commits and squashed them down to one * cargo upgrade * added migrtation for spend in accounting table. will run test-deposit next * trying to get request from polygon * first iteration /user/balance/:tx_hash works, needs to add accepted tokens next * creating the referral code seems to work * will now check if spending enough credits will lead to both parties receiving credits * rpcstats takes care of accounting for spend data * removed track spend from table * Revert "removed track spend from table" This reverts commit a50802d6ae75f786864c5ec42d0ceb2cb27124ed. * Revert "rpcstats takes care of accounting for spend data" This reverts commit 1cec728bf241e4cfd24351134637ed81c1a5a10b. * removed rpc request table entity * updated referral code to use ulid s * credits used are aggregated * added a bunch of fields to referrer * added database logic whenever an aggregate stats is added. will have to iterate over this a couple times i think. go to (1) detecting accepted stables next, (2) fix influxdb bug and (3) start to write test * removed track spend as this will occur in the database * will first work on "balance", then referral. these should really be treated as two separate PRs (although already convoluted) * balance logic initial commit * breaking WIP, changing the RPC call logic functions * will start testing next * got rid of warnings & lint * will proceed with subtracting / adding to balance * added decimal points, balance tracking seems to work * will beautify code a bit * removed deprecated dependency, and added topic + deposit contract to app.yaml * brownie test suite does not rely on local contract files it pulls all from polygonscan * will continue with referral * should perhaps (in a future revision) recordhow much the referees got for free. marking referrals seems to work rn * user is upgraded to premium if they deposit more than 10$. we dont accept more than $10M in a single tx * will start PR, referral seems to be fine so far, perhaps up to some numbers that still may need tweaking * will start PR * removed rogue comments, cleaned up payments a bit * changes before PR * apply stats * added unique constraint * some refactoring such that the user file is not too bloated * compiling * progress with subusers, creating a table entry seems to work * good response type is there as well now, will work on getters from primary user and secondary user next * subuser logic also seems fine now * downgrade logic * fixed bug influxdb does not support different types in same query (which makes sense) * WIP temporary commit * merging with PR * Delete daemon.rs there are multiple daemons now, so this was moved to `proxyd` * will remove request clone to &mut * multiple request handles for payment * making requests still seem fine * removed redundant commented out bits * added deposit endpoint, added deposit amount and deposit user, untested yet * small bug with downgrade tier id * will add authorization so balance can be received for users * balance history should be set now too * will check balance over time again * subususer can see rpc key balance if admin or owner * stats also seems to work fine now with historical balance * things seem to be building and working * removed clone from OpenRequestHandle * removed influxdb from workspace members * changed config files * reran sea-orm generate entities, added a foreign key, should be proper now * removed contract from commit * made deposit contract optional * added topic in polygon dev * changed deposit contract to deposit factory contract * added selfrelation on user_tier * added payment required * changed chain id to u64 * add wss in polygon llamarpc * removed origin and method from the table * added onchain transactions naming (and forgot to add a migration before) * changed foreign key to be the referrer (id), not the code itself * forgot to add id as the target foreign key * WIP adding cache to update role * fixed merge conflicts --------- Co-authored-by: Bryan Stitt <bryan@llamanodes.com> Co-authored-by: Bryan Stitt <bryan@stitthappens.com>
2023-05-12 19:45:15 +03:00
pub balance: Option<Decimal>,
2022-08-10 08:56:09 +03:00
}
/// Simple wrapper so that we can keep track of read only connections.
/// This does no blocking of writing in the compiler!
/// TODO: move this
#[derive(Clone)]
pub struct DatabaseReplica(pub DatabaseConnection);
// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me
impl DatabaseReplica {
pub fn conn(&self) -> &DatabaseConnection {
&self.0
}
}
2023-05-18 23:34:22 +03:00
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Arc<CacheWithTTL<RpcSecretKey, AuthorizationChecks>>;
2022-08-24 03:59:05 +03:00
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
pub balanced_rpcs: Arc<Web3Rpcs>,
2023-04-14 10:04:35 +03:00
/// Send 4337 Abstraction Bundler requests to one of these servers
pub bundler_4337_rpcs: Option<Arc<Web3Rpcs>>,
pub http_client: Option<reqwest::Client>,
/// application config
/// TODO: this will need a large refactor to handle reloads while running. maybe use a watch::Receiver?
pub config: AppConfig,
2022-08-24 03:59:05 +03:00
/// Send private requests (like eth_sendRawTransaction) to all these servers
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
pub private_rpcs: Option<Arc<Web3Rpcs>>,
/// track JSONRPC responses
pub jsonrpc_query_cache: JsonRpcQueryCache,
/// rpc clients that subscribe to newHeads use this channel
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
pub watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
/// rpc clients that subscribe to pendingTransactions use this channel
/// This is the Sender so that new channels can subscribe to it
2022-08-24 03:59:05 +03:00
pending_tx_sender: broadcast::Sender<TxStatus>,
/// Optional database for users and accounting
2022-08-24 03:59:05 +03:00
pub db_conn: Option<sea_orm::DatabaseConnection>,
/// Optional read-only database for users and accounting
pub db_replica: Option<DatabaseReplica>,
2023-03-23 01:36:40 +03:00
pub hostname: Option<String>,
2022-09-05 08:53:58 +03:00
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
2023-05-18 23:34:22 +03:00
pub pending_transactions: Arc<CacheWithTTL<TxHash, TxStatus>>,
/// rate limit anonymous users
2022-09-15 20:57:24 +03:00
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
/// Optional time series database for making pretty graphs that load quickly
pub influxdb_client: Option<influxdb2::Client>,
/// rate limit the login endpoint
/// we do this because each pending login is a row in the database
2022-09-24 06:59:21 +03:00
pub login_rate_limiter: Option<RedisRateLimiter>,
/// volatile cache used for rate limits
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
pub vredis_pool: Option<RedisPool>,
/// cache authenticated users so that we don't have to query the database on the hot path
// TODO: should the key be our RpcSecretKey class instead of Ulid?
pub rpc_secret_key_cache: RpcSecretKeyCache,
/// concurrent/parallel RPC request limits for authenticated users
2023-05-18 23:34:22 +03:00
pub rpc_key_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
/// concurrent/parallel request limits for anonymous users
2023-05-18 23:34:22 +03:00
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
/// concurrent/parallel application request limits for authenticated users
2023-05-18 23:34:22 +03:00
pub bearer_token_semaphores: Cache<UserBearerToken, Arc<Semaphore>>,
2023-03-03 04:39:50 +03:00
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
/// channel for sending stats in a background task
pub stat_sender: Option<flume::Sender<AppStat>>,
2022-08-24 03:59:05 +03:00
}
2022-07-16 03:08:22 +03:00
/// flatten a JoinError into an anyhow error
2022-08-10 05:37:34 +03:00
/// Useful when joining multiple futures.
2022-06-14 08:43:28 +03:00
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
}
}
/// return the first error, or Ok if everything worked
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
2022-06-16 20:51:49 +03:00
) -> anyhow::Result<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
2022-07-16 03:08:22 +03:00
Ok(Ok(_)) => continue,
2022-06-16 20:51:49 +03:00
}
}
Ok(())
}
pub async fn get_db(
db_url: String,
2022-09-02 23:16:20 +03:00
min_connections: u32,
2022-08-10 07:30:54 +03:00
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the db_url in logs
info!("Connecting to db");
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
2022-08-10 05:37:34 +03:00
// TODO: load all these options from the config file. i think mysql default max is 100
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
2022-09-20 01:41:53 +03:00
.connect_timeout(Duration::from_secs(30))
2022-09-02 23:16:20 +03:00
.min_connections(min_connections)
2022-08-10 07:30:54 +03:00
.max_connections(max_connections)
.sqlx_logging(false);
// .sqlx_logging_level(log::LevelFilter::Info);
Database::connect(db_opt).await
}
pub async fn drop_migration_lock(db_conn: &DatabaseConnection) -> Result<(), DbErr> {
2022-11-14 21:24:52 +03:00
let db_backend = db_conn.get_database_backend();
let drop_lock_statment = db_backend.build(Table::drop().table(Alias::new("migration_lock")));
db_conn.execute(drop_lock_statment).await?;
2022-12-12 07:39:54 +03:00
debug!("migration lock unlocked");
Ok(())
}
/// Be super careful with override_existing_lock! It is very important that only one process is running the migrations at a time!
pub async fn migrate_db(
db_conn: &DatabaseConnection,
override_existing_lock: bool,
) -> Result<(), DbErr> {
let db_backend = db_conn.get_database_backend();
2022-11-14 21:24:52 +03:00
// TODO: put the timestamp and hostname into this as columns?
2022-11-14 21:24:52 +03:00
let create_lock_statment = db_backend.build(
Table::create()
.table(Alias::new("migration_lock"))
2022-11-14 21:24:52 +03:00
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
loop {
2023-02-08 22:11:16 +03:00
if Migrator::get_pending_migrations(db_conn).await?.is_empty() {
2022-11-14 21:24:52 +03:00
info!("no migrations to apply");
return Ok(());
2022-11-14 21:24:52 +03:00
}
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
if override_existing_lock {
warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!");
sleep(Duration::from_secs(10)).await
} else {
debug!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\". err={:?}", err);
2022-11-14 21:24:52 +03:00
// TODO: exponential backoff with jitter?
sleep(Duration::from_secs(1)).await;
2022-11-14 21:24:52 +03:00
continue;
}
2022-11-14 21:24:52 +03:00
}
debug!("migration lock acquired");
break;
}
2023-02-08 22:11:16 +03:00
let migration_result = Migrator::up(db_conn, None).await;
2022-11-14 21:24:52 +03:00
// drop the distributed lock
2023-02-08 22:11:16 +03:00
drop_migration_lock(db_conn).await?;
2022-11-14 21:24:52 +03:00
// return if migrations erred
migration_result
}
/// Connect to the database and run migrations
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
// TODO: this seems to fail silently
let db_conn = get_db(db_url, min_connections, max_connections).await?;
migrate_db(&db_conn, false).await?;
2022-10-20 09:17:20 +03:00
Ok(db_conn)
}
/// starting an app creates many tasks
2022-10-31 23:05:58 +03:00
#[derive(From)]
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles
pub app: Arc<Web3ProxyApp>,
/// handles for the balanced and private rpcs
pub app_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
2022-10-31 23:05:58 +03:00
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<AnyhowJoinHandle<()>>,
2023-02-27 10:52:37 +03:00
/// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>,
/// watch this to know when to start the app
pub consensus_connections_watcher: watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
2022-10-31 23:05:58 +03:00
}
2022-05-12 02:50:52 +03:00
impl Web3ProxyApp {
2022-10-03 21:08:01 +03:00
/// The main entrypoint.
pub async fn spawn(
2022-08-12 22:07:14 +03:00
top_config: TopConfig,
2022-09-14 09:18:13 +03:00
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
2022-10-31 23:05:58 +03:00
) -> anyhow::Result<Web3ProxyAppSpawn> {
2023-03-31 14:43:41 +03:00
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
// safety checks on the config
// while i would prefer this to be in a "apply_top_config" function, that is a larger refactor
// TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call
if let Some(redirect) = &top_config.app.redirect_rpc_key_url {
assert!(
redirect.contains("{{rpc_key_id}}"),
"redirect_rpc_key_url user url must contain \"{{rpc_key_id}}\""
);
}
if !top_config.extra.is_empty() {
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
if !top_config.app.extra.is_empty() {
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
let app_handles = FuturesUnordered::new();
2023-02-25 20:48:40 +03:00
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
let important_background_handles = FuturesUnordered::new();
2022-12-28 19:36:22 +03:00
// connect to the database and make sure the latest migrations have run
let mut db_conn = None::<DatabaseConnection>;
let mut db_replica = None::<DatabaseReplica>;
if let Some(db_url) = top_config.app.db_url.clone() {
2022-09-14 09:18:13 +03:00
let db_min_connections = top_config
.app
.db_min_connections
.unwrap_or(num_workers as u32);
2022-07-26 07:53:38 +03:00
2022-09-02 23:16:20 +03:00
// TODO: what default multiple?
2022-10-03 21:08:01 +03:00
let db_max_connections = top_config
2022-09-02 23:16:20 +03:00
.app
.db_max_connections
2022-09-06 23:50:37 +03:00
.unwrap_or(db_min_connections * 2);
2022-09-02 23:16:20 +03:00
db_conn = Some(
get_migrated_db(db_url.clone(), db_min_connections, db_max_connections).await?,
);
db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() {
if db_replica_url == db_url {
// url is the same. do not make a new connection or we might go past our max connections
db_conn.clone().map(DatabaseReplica)
} else {
let db_replica_min_connections = top_config
.app
.db_replica_min_connections
.unwrap_or(db_min_connections);
let db_replica_max_connections = top_config
.app
.db_replica_max_connections
.unwrap_or(db_max_connections);
let db_replica = get_db(
db_replica_url,
db_replica_min_connections,
db_replica_max_connections,
)
.await?;
Some(DatabaseReplica(db_replica))
}
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn.clone().map(DatabaseReplica)
};
2022-07-26 07:53:38 +03:00
} else {
anyhow::ensure!(
top_config.app.db_replica_url.is_none(),
"if there is a db_replica_url, there must be a db_url"
);
2022-11-20 01:05:51 +03:00
warn!("no database. some features will be disabled");
2022-07-26 07:53:38 +03:00
};
2023-03-03 04:39:50 +03:00
// connect to kafka for logging requests from the /debug/ urls
let mut kafka_producer: Option<rdkafka::producer::FutureProducer> = None;
if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() {
2023-04-20 01:30:01 +03:00
info!("Connecting to kafka");
2023-04-20 01:22:27 +03:00
let security_protocol = &top_config.app.kafka_protocol;
2023-04-19 20:42:57 +03:00
2023-03-03 04:39:50 +03:00
match rdkafka::ClientConfig::new()
.set("bootstrap.servers", kafka_brokers)
.set("message.timeout.ms", "5000")
2023-04-19 20:42:57 +03:00
.set("security.protocol", security_protocol)
2023-03-03 04:39:50 +03:00
.create()
{
Ok(k) => {
// TODO: create our topic
kafka_producer = Some(k)
}
2023-03-03 04:39:50 +03:00
Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err),
}
}
2023-02-25 20:48:40 +03:00
// TODO: do this during apply_config so that we can change redis url while running
2022-10-03 21:08:01 +03:00
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
let vredis_pool = match top_config.app.volatile_redis_url.as_ref() {
2022-07-26 07:53:38 +03:00
Some(redis_url) => {
2022-09-14 04:43:09 +03:00
// TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to vredis");
2022-05-22 21:39:06 +03:00
2022-09-14 09:18:13 +03:00
// TODO: what is a good default?
2022-09-02 23:16:20 +03:00
let redis_max_connections = top_config
.app
.volatile_redis_max_connections
2022-09-14 09:18:13 +03:00
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts?
2022-09-14 09:18:13 +03:00
let redis_pool = RedisConfig::from_url(redis_url)
.builder()?
.max_size(redis_max_connections)
2022-09-15 20:57:24 +03:00
.runtime(DeadpoolRuntime::Tokio1)
2022-09-14 09:18:13 +03:00
.build()?;
// test the redis pool
2022-09-17 04:19:11 +03:00
if let Err(err) = redis_pool.get().await {
error!(
2022-11-12 11:24:32 +03:00
"failed to connect to vredis. some features will be disabled. err={:?}",
err
);
2022-09-17 04:19:11 +03:00
};
2022-09-14 09:18:13 +03:00
Some(redis_pool)
2022-05-22 21:39:06 +03:00
}
None => {
warn!("no redis connection. some features will be disabled");
2022-05-22 21:39:06 +03:00
None
2022-05-22 02:34:05 +03:00
}
};
2022-05-12 02:50:52 +03:00
let influxdb_client = match top_config.app.influxdb_host.as_ref() {
Some(influxdb_host) => {
let influxdb_org = top_config
.app
.influxdb_org
.clone()
.expect("influxdb_org needed when influxdb_host is set");
let influxdb_token = top_config
.app
.influxdb_token
.clone()
.expect("influxdb_token needed when influxdb_host is set");
let influxdb_client =
influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token);
2022-10-10 07:15:07 +03:00
// TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys
Some(influxdb_client)
}
None => None,
};
User Balance + Referral Logic (#44) * will implement balance topup endpoint * will quickly fix other PR reviews * merging from master * will finish up godmoe * will finish up login * added logic to top up balance (first iteration) * should implement additional columns soon (currency, amount, tx-hash), as well as a new table for spend * updated migrations, will account for spend next * get back to this later * will merge PR from stats-v2 * stats v2 rebased all my commits and squashed them down to one * cargo upgrade * added migrtation for spend in accounting table. will run test-deposit next * trying to get request from polygon * first iteration /user/balance/:tx_hash works, needs to add accepted tokens next * creating the referral code seems to work * will now check if spending enough credits will lead to both parties receiving credits * rpcstats takes care of accounting for spend data * removed track spend from table * Revert "removed track spend from table" This reverts commit a50802d6ae75f786864c5ec42d0ceb2cb27124ed. * Revert "rpcstats takes care of accounting for spend data" This reverts commit 1cec728bf241e4cfd24351134637ed81c1a5a10b. * removed rpc request table entity * updated referral code to use ulid s * credits used are aggregated * added a bunch of fields to referrer * added database logic whenever an aggregate stats is added. will have to iterate over this a couple times i think. go to (1) detecting accepted stables next, (2) fix influxdb bug and (3) start to write test * removed track spend as this will occur in the database * will first work on "balance", then referral. these should really be treated as two separate PRs (although already convoluted) * balance logic initial commit * breaking WIP, changing the RPC call logic functions * will start testing next * got rid of warnings & lint * will proceed with subtracting / adding to balance * added decimal points, balance tracking seems to work * will beautify code a bit * removed deprecated dependency, and added topic + deposit contract to app.yaml * brownie test suite does not rely on local contract files it pulls all from polygonscan * will continue with referral * should perhaps (in a future revision) recordhow much the referees got for free. marking referrals seems to work rn * user is upgraded to premium if they deposit more than 10$. we dont accept more than $10M in a single tx * will start PR, referral seems to be fine so far, perhaps up to some numbers that still may need tweaking * will start PR * removed rogue comments, cleaned up payments a bit * changes before PR * apply stats * added unique constraint * some refactoring such that the user file is not too bloated * compiling * progress with subusers, creating a table entry seems to work * good response type is there as well now, will work on getters from primary user and secondary user next * subuser logic also seems fine now * downgrade logic * fixed bug influxdb does not support different types in same query (which makes sense) * WIP temporary commit * merging with PR * Delete daemon.rs there are multiple daemons now, so this was moved to `proxyd` * will remove request clone to &mut * multiple request handles for payment * making requests still seem fine * removed redundant commented out bits * added deposit endpoint, added deposit amount and deposit user, untested yet * small bug with downgrade tier id * will add authorization so balance can be received for users * balance history should be set now too * will check balance over time again * subususer can see rpc key balance if admin or owner * stats also seems to work fine now with historical balance * things seem to be building and working * removed clone from OpenRequestHandle * removed influxdb from workspace members * changed config files * reran sea-orm generate entities, added a foreign key, should be proper now * removed contract from commit * made deposit contract optional * added topic in polygon dev * changed deposit contract to deposit factory contract * added selfrelation on user_tier * added payment required * changed chain id to u64 * add wss in polygon llamarpc * removed origin and method from the table * added onchain transactions naming (and forgot to add a migration before) * changed foreign key to be the referrer (id), not the code itself * forgot to add id as the target foreign key * WIP adding cache to update role * fixed merge conflicts --------- Co-authored-by: Bryan Stitt <bryan@llamanodes.com> Co-authored-by: Bryan Stitt <bryan@stitthappens.com>
2023-05-12 19:45:15 +03:00
// all the users are the same size, so no need for a weigher
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
2023-05-18 23:34:22 +03:00
let rpc_secret_key_cache =
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(600)).await;
User Balance + Referral Logic (#44) * will implement balance topup endpoint * will quickly fix other PR reviews * merging from master * will finish up godmoe * will finish up login * added logic to top up balance (first iteration) * should implement additional columns soon (currency, amount, tx-hash), as well as a new table for spend * updated migrations, will account for spend next * get back to this later * will merge PR from stats-v2 * stats v2 rebased all my commits and squashed them down to one * cargo upgrade * added migrtation for spend in accounting table. will run test-deposit next * trying to get request from polygon * first iteration /user/balance/:tx_hash works, needs to add accepted tokens next * creating the referral code seems to work * will now check if spending enough credits will lead to both parties receiving credits * rpcstats takes care of accounting for spend data * removed track spend from table * Revert "removed track spend from table" This reverts commit a50802d6ae75f786864c5ec42d0ceb2cb27124ed. * Revert "rpcstats takes care of accounting for spend data" This reverts commit 1cec728bf241e4cfd24351134637ed81c1a5a10b. * removed rpc request table entity * updated referral code to use ulid s * credits used are aggregated * added a bunch of fields to referrer * added database logic whenever an aggregate stats is added. will have to iterate over this a couple times i think. go to (1) detecting accepted stables next, (2) fix influxdb bug and (3) start to write test * removed track spend as this will occur in the database * will first work on "balance", then referral. these should really be treated as two separate PRs (although already convoluted) * balance logic initial commit * breaking WIP, changing the RPC call logic functions * will start testing next * got rid of warnings & lint * will proceed with subtracting / adding to balance * added decimal points, balance tracking seems to work * will beautify code a bit * removed deprecated dependency, and added topic + deposit contract to app.yaml * brownie test suite does not rely on local contract files it pulls all from polygonscan * will continue with referral * should perhaps (in a future revision) recordhow much the referees got for free. marking referrals seems to work rn * user is upgraded to premium if they deposit more than 10$. we dont accept more than $10M in a single tx * will start PR, referral seems to be fine so far, perhaps up to some numbers that still may need tweaking * will start PR * removed rogue comments, cleaned up payments a bit * changes before PR * apply stats * added unique constraint * some refactoring such that the user file is not too bloated * compiling * progress with subusers, creating a table entry seems to work * good response type is there as well now, will work on getters from primary user and secondary user next * subuser logic also seems fine now * downgrade logic * fixed bug influxdb does not support different types in same query (which makes sense) * WIP temporary commit * merging with PR * Delete daemon.rs there are multiple daemons now, so this was moved to `proxyd` * will remove request clone to &mut * multiple request handles for payment * making requests still seem fine * removed redundant commented out bits * added deposit endpoint, added deposit amount and deposit user, untested yet * small bug with downgrade tier id * will add authorization so balance can be received for users * balance history should be set now too * will check balance over time again * subususer can see rpc key balance if admin or owner * stats also seems to work fine now with historical balance * things seem to be building and working * removed clone from OpenRequestHandle * removed influxdb from workspace members * changed config files * reran sea-orm generate entities, added a foreign key, should be proper now * removed contract from commit * made deposit contract optional * added topic in polygon dev * changed deposit contract to deposit factory contract * added selfrelation on user_tier * added payment required * changed chain id to u64 * add wss in polygon llamarpc * removed origin and method from the table * added onchain transactions naming (and forgot to add a migration before) * changed foreign key to be the referrer (id), not the code itself * forgot to add id as the target foreign key * WIP adding cache to update role * fixed merge conflicts --------- Co-authored-by: Bryan Stitt <bryan@llamanodes.com> Co-authored-by: Bryan Stitt <bryan@stitthappens.com>
2023-05-12 19:45:15 +03:00
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
// stats can be saved in mysql, influxdb, both, or none
2023-04-11 02:05:53 +03:00
let mut stat_sender = None;
if let Some(influxdb_bucket) = top_config.app.influxdb_bucket.clone() {
if let Some(spawned_stat_buffer) = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
2023-04-11 02:05:53 +03:00
influxdb_bucket,
top_config.app.chain_id,
2023-04-11 02:05:53 +03:00
db_conn.clone(),
60,
2023-04-11 02:05:53 +03:00
influxdb_client.clone(),
User Balance + Referral Logic (#44) * will implement balance topup endpoint * will quickly fix other PR reviews * merging from master * will finish up godmoe * will finish up login * added logic to top up balance (first iteration) * should implement additional columns soon (currency, amount, tx-hash), as well as a new table for spend * updated migrations, will account for spend next * get back to this later * will merge PR from stats-v2 * stats v2 rebased all my commits and squashed them down to one * cargo upgrade * added migrtation for spend in accounting table. will run test-deposit next * trying to get request from polygon * first iteration /user/balance/:tx_hash works, needs to add accepted tokens next * creating the referral code seems to work * will now check if spending enough credits will lead to both parties receiving credits * rpcstats takes care of accounting for spend data * removed track spend from table * Revert "removed track spend from table" This reverts commit a50802d6ae75f786864c5ec42d0ceb2cb27124ed. * Revert "rpcstats takes care of accounting for spend data" This reverts commit 1cec728bf241e4cfd24351134637ed81c1a5a10b. * removed rpc request table entity * updated referral code to use ulid s * credits used are aggregated * added a bunch of fields to referrer * added database logic whenever an aggregate stats is added. will have to iterate over this a couple times i think. go to (1) detecting accepted stables next, (2) fix influxdb bug and (3) start to write test * removed track spend as this will occur in the database * will first work on "balance", then referral. these should really be treated as two separate PRs (although already convoluted) * balance logic initial commit * breaking WIP, changing the RPC call logic functions * will start testing next * got rid of warnings & lint * will proceed with subtracting / adding to balance * added decimal points, balance tracking seems to work * will beautify code a bit * removed deprecated dependency, and added topic + deposit contract to app.yaml * brownie test suite does not rely on local contract files it pulls all from polygonscan * will continue with referral * should perhaps (in a future revision) recordhow much the referees got for free. marking referrals seems to work rn * user is upgraded to premium if they deposit more than 10$. we dont accept more than $10M in a single tx * will start PR, referral seems to be fine so far, perhaps up to some numbers that still may need tweaking * will start PR * removed rogue comments, cleaned up payments a bit * changes before PR * apply stats * added unique constraint * some refactoring such that the user file is not too bloated * compiling * progress with subusers, creating a table entry seems to work * good response type is there as well now, will work on getters from primary user and secondary user next * subuser logic also seems fine now * downgrade logic * fixed bug influxdb does not support different types in same query (which makes sense) * WIP temporary commit * merging with PR * Delete daemon.rs there are multiple daemons now, so this was moved to `proxyd` * will remove request clone to &mut * multiple request handles for payment * making requests still seem fine * removed redundant commented out bits * added deposit endpoint, added deposit amount and deposit user, untested yet * small bug with downgrade tier id * will add authorization so balance can be received for users * balance history should be set now too * will check balance over time again * subususer can see rpc key balance if admin or owner * stats also seems to work fine now with historical balance * things seem to be building and working * removed clone from OpenRequestHandle * removed influxdb from workspace members * changed config files * reran sea-orm generate entities, added a foreign key, should be proper now * removed contract from commit * made deposit contract optional * added topic in polygon dev * changed deposit contract to deposit factory contract * added selfrelation on user_tier * added payment required * changed chain id to u64 * add wss in polygon llamarpc * removed origin and method from the table * added onchain transactions naming (and forgot to add a migration before) * changed foreign key to be the referrer (id), not the code itself * forgot to add id as the target foreign key * WIP adding cache to update role * fixed merge conflicts --------- Co-authored-by: Bryan Stitt <bryan@llamanodes.com> Co-authored-by: Bryan Stitt <bryan@stitthappens.com>
2023-05-12 19:45:15 +03:00
Some(rpc_secret_key_cache.clone()),
2023-04-11 02:05:53 +03:00
stat_buffer_shutdown_receiver,
1,
2023-04-11 02:05:53 +03:00
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);
stat_sender = Some(spawned_stat_buffer.stat_sender);
}
}
if stat_sender.is_none() {
info!("stats will not be collected");
}
2022-10-03 21:08:01 +03:00
2023-02-25 20:48:40 +03:00
// make a http shared client
// TODO: can we configure the connection pool? should we?
// TODO: timeouts from config. defaults are hopefully good
let http_client = Some(
reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(5 * 60))
.user_agent(APP_USER_AGENT)
.build()?,
);
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(ref redis_pool) = vredis_pool {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
// chain id is included in the app name so that rpc rate limits are per-chain
let rpc_rrl = RedisRateLimiter::new(
&format!("web3_proxy:{}", top_config.app.chain_id),
"frontend",
public_requests_per_period,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
2023-05-18 10:04:30 +03:00
frontend_ip_rate_limiter = Some(
DeferredRateLimiter::<IpAddr>::new(20_000, "ip", rpc_rrl.clone(), None).await,
);
frontend_registered_user_rate_limiter =
Some(DeferredRateLimiter::<u64>::new(10_000, "key", rpc_rrl, None).await);
}
// login rate limiter
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_period,
60.0,
redis_pool.clone(),
));
}
2023-02-15 04:41:40 +03:00
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
2022-07-09 03:00:31 +03:00
// TODO: will one receiver lagging be okay? how big should this be?
2022-07-26 07:53:38 +03:00
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
2022-10-03 21:08:01 +03:00
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them?
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
2022-07-26 07:53:38 +03:00
drop(pending_tx_receiver);
2022-09-17 05:30:06 +03:00
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: this used to have a time_to_idle
2023-05-18 23:34:22 +03:00
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but
let pending_transactions =
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await;
2022-06-16 20:51:49 +03:00
2023-02-25 20:48:40 +03:00
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
// TODO: we should emit stats to calculate a more accurate expected cache size
2023-05-15 20:48:59 +03:00
// TODO: do we actually want a TTL on this?
let response_cache = JsonRpcQueryCache::new(
(top_config.app.response_cache_max_bytes / 2048) as usize,
top_config.app.response_cache_max_bytes,
JsonRpcQueryWeigher,
2023-05-15 20:48:59 +03:00
DefaultHashBuilder::default(),
Duration::from_secs(3600),
)
.await;
2023-02-25 20:48:40 +03:00
2023-05-18 23:34:22 +03:00
// TODO: how should we handle hitting this max?
let max_users = 20_000;
2023-02-25 20:48:40 +03:00
// create semaphores for concurrent connection limits
2023-05-18 23:34:22 +03:00
// TODO: how can we implement time til idle?
2023-02-25 20:48:40 +03:00
// TODO: what should tti be for semaphores?
2023-05-18 23:34:22 +03:00
let bearer_token_semaphores = Cache::new(max_users);
let ip_semaphores = Cache::new(max_users);
let registered_user_semaphores = Cache::new(max_users);
2023-02-25 20:48:40 +03:00
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
2022-08-12 22:07:14 +03:00
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
2022-08-27 03:33:45 +03:00
top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit,
2023-05-17 02:13:56 +03:00
"balanced rpcs".to_string(),
2022-06-16 20:51:49 +03:00
pending_transactions.clone(),
Some(pending_tx_sender.clone()),
Some(watch_consensus_head_sender),
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
.await
2022-10-03 21:08:01 +03:00
.context("spawning balanced rpcs")?;
2022-05-18 19:35:06 +03:00
app_handles.push(balanced_handle);
2022-06-14 08:43:28 +03:00
// prepare a Web3Rpcs to hold all our private connections
2022-10-03 21:08:01 +03:00
// only some chains have this, so this is optional
let private_rpcs = if top_config.private_rpcs.is_none() {
2022-05-12 02:50:52 +03:00
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
None
2022-05-12 02:50:52 +03:00
} else {
2023-02-25 20:48:40 +03:00
// TODO: do something with the spawn handle
// TODO: Merge
// let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn(
let (private_rpcs, private_handle, _) = Web3Rpcs::spawn(
2022-08-12 22:07:14 +03:00
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
2023-05-17 02:13:56 +03:00
"protected rpcs".to_string(),
pending_transactions.clone(),
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits, but they should have
None,
2022-08-11 00:29:50 +03:00
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
2023-01-23 09:02:08 +03:00
// they also often have low rate limits
// however, they are well connected to miners/validators. so maybe using them as a safety check would be good
// TODO: but maybe we could include privates in the "backup" tier
2022-08-11 00:29:50 +03:00
None,
2022-05-22 02:34:05 +03:00
)
2022-07-26 07:53:38 +03:00
.await
2022-10-03 21:08:01 +03:00
.context("spawning private_rpcs")?;
2022-06-14 08:43:28 +03:00
app_handles.push(private_handle);
2023-02-25 20:48:40 +03:00
Some(private_rpcs)
2022-05-12 02:50:52 +03:00
};
2023-04-14 10:04:35 +03:00
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections
// only some chains have this, so this is optional
let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() {
warn!("No bundler_4337_rpcs configured");
None
} else {
// TODO: do something with the spawn handle
2023-04-19 06:14:12 +03:00
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
2023-04-14 10:04:35 +03:00
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
2023-05-17 02:13:56 +03:00
"eip4337 rpcs".to_string(),
2023-04-14 10:04:35 +03:00
pending_transactions.clone(),
None,
None,
)
.await
.context("spawning bundler_4337_rpcs")?;
app_handles.push(bundler_4337_rpcs_handle);
Some(bundler_4337_rpcs)
};
2023-03-30 15:42:56 +03:00
let hostname = hostname::get()
.ok()
.and_then(|x| x.to_str().map(|x| x.to_string()));
2023-03-23 01:18:54 +03:00
2023-02-25 20:48:40 +03:00
let app = Self {
config: top_config.app.clone(),
2023-02-25 20:48:40 +03:00
balanced_rpcs,
2023-04-14 10:04:35 +03:00
bundler_4337_rpcs,
http_client,
2023-03-03 04:39:50 +03:00
kafka_producer,
2023-02-25 20:48:40 +03:00
private_rpcs,
jsonrpc_query_cache: response_cache,
2023-02-25 20:48:40 +03:00
watch_consensus_head_receiver,
pending_tx_sender,
pending_transactions,
frontend_ip_rate_limiter,
frontend_registered_user_rate_limiter,
login_rate_limiter,
db_conn,
db_replica,
influxdb_client,
2023-03-23 01:18:54 +03:00
hostname,
2023-02-25 20:48:40 +03:00
vredis_pool,
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
2023-05-18 23:34:22 +03:00
rpc_key_semaphores: registered_user_semaphores,
2023-02-25 20:48:40 +03:00
stat_sender,
};
let app = Arc::new(app);
// watch for config changes
// TODO: initial config reload should be from this channel. not from the call to spawn
2023-02-27 10:52:37 +03:00
let (new_top_config_sender, mut new_top_config_receiver) = watch::channel(top_config);
2023-02-27 10:52:37 +03:00
{
let app = app.clone();
let config_handle = tokio::spawn(async move {
loop {
2023-02-27 10:52:37 +03:00
let new_top_config = new_top_config_receiver.borrow_and_update().to_owned();
app.apply_top_config(new_top_config)
.await
.context("failed applying new top_config")?;
2023-02-27 10:52:37 +03:00
new_top_config_receiver
.changed()
.await
.context("failed awaiting top_config change")?;
info!("config changed");
}
});
app_handles.push(config_handle);
}
2023-04-04 15:40:22 +03:00
if important_background_handles.is_empty() {
info!("no important background handles");
let f = tokio::spawn(async move {
let _ = background_shutdown_receiver.recv().await;
Ok(())
});
important_background_handles.push(f);
}
2023-02-25 20:48:40 +03:00
2023-02-27 10:52:37 +03:00
Ok((
app,
app_handles,
important_background_handles,
new_top_config_sender,
consensus_connections_watcher,
)
.into())
2023-02-25 20:48:40 +03:00
}
pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> {
// TODO: also update self.config from new_top_config.app
2023-02-25 20:48:40 +03:00
// connect to the backends
self.balanced_rpcs
.apply_server_configs(self, new_top_config.balanced_rpcs)
2023-04-14 10:38:49 +03:00
.await
.context("updating balanced rpcs")?;
2023-02-25 20:48:40 +03:00
if let Some(private_rpc_configs) = new_top_config.private_rpcs {
if let Some(ref private_rpcs) = self.private_rpcs {
private_rpcs
.apply_server_configs(self, private_rpc_configs)
2023-04-14 10:38:49 +03:00
.await
.context("updating private_rpcs")?;
} else {
// TODO: maybe we should have private_rpcs just be empty instead of being None
todo!("handle toggling private_rpcs")
2023-02-22 08:10:23 +03:00
}
2022-09-15 20:57:24 +03:00
}
2022-07-07 06:22:09 +03:00
2023-04-14 10:04:35 +03:00
if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs {
if let Some(ref bundler_4337_rpcs) = self.bundler_4337_rpcs {
2023-04-14 10:04:35 +03:00
bundler_4337_rpcs
.apply_server_configs(self, bundler_4337_rpc_configs)
2023-04-14 10:38:49 +03:00
.await
.context("updating bundler_4337_rpcs")?;
2023-04-14 10:04:35 +03:00
} else {
// TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None
todo!("handle toggling bundler_4337_rpcs")
}
}
2023-02-25 20:48:40 +03:00
Ok(())
2022-05-12 02:50:52 +03:00
}
2023-02-15 04:41:40 +03:00
pub fn head_block_receiver(&self) -> watch::Receiver<Option<Web3ProxyBlock>> {
2023-01-23 09:02:08 +03:00
self.watch_consensus_head_receiver.clone()
}
2022-12-28 09:11:18 +03:00
pub async fn prometheus_metrics(&self) -> String {
2022-09-09 00:01:36 +03:00
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
// TODO: this needs a refactor to get HELP and TYPE into the serialized text
2023-01-03 04:06:36 +03:00
#[derive(Default, Serialize)]
struct UserCount(i64);
let user_count: UserCount = if let Some(db) = self.db_conn() {
match user::Entity::find().count(&db).await {
Ok(user_count) => UserCount(user_count as i64),
Err(err) => {
warn!("unable to count users: {:?}", err);
UserCount(-1)
}
}
} else {
UserCount(-1)
};
2022-12-29 09:21:09 +03:00
#[derive(Default, Serialize)]
struct RecentCounts {
2023-01-03 03:34:32 +03:00
one_week: i64,
one_day: i64,
one_hour: i64,
one_minute: i64,
}
2022-12-28 09:11:18 +03:00
impl RecentCounts {
fn for_err() -> Self {
Self {
2023-01-03 03:34:32 +03:00
one_week: -1,
one_day: -1,
one_hour: -1,
one_minute: -1,
}
2022-12-28 09:11:18 +03:00
}
}
2022-12-28 09:11:18 +03:00
2022-12-29 09:21:09 +03:00
let (recent_ip_counts, recent_user_id_counts, recent_tx_counts): (
RecentCounts,
RecentCounts,
RecentCounts,
) = match self.redis_conn().await {
Ok(Some(mut redis_conn)) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
const ONE_DAY: i64 = ONE_HOUR * 24;
2023-01-03 03:34:32 +03:00
const ONE_WEEK: i64 = ONE_DAY * 7;
2022-12-29 09:21:09 +03:00
2023-01-03 03:34:32 +03:00
let one_week_ago = Utc::now().timestamp() - ONE_WEEK;
2022-12-29 09:21:09 +03:00
let one_day_ago = Utc::now().timestamp() - ONE_DAY;
let one_hour_ago = Utc::now().timestamp() - ONE_HOUR;
let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE;
let recent_users_by_id = format!("recent_users:id:{}", self.config.chain_id);
let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id);
let recent_transactions =
format!("eth_sendRawTransaction:{}", self.config.chain_id);
match redis::pipe()
.atomic()
2023-01-03 03:34:32 +03:00
// delete any entries older than 1 week
.zrembyscore(&recent_users_by_id, i64::MIN, one_week_ago)
2022-12-29 09:21:09 +03:00
.ignore()
2023-01-03 03:34:32 +03:00
.zrembyscore(&recent_users_by_ip, i64::MIN, one_week_ago)
2022-12-29 09:21:09 +03:00
.ignore()
2023-01-03 03:34:32 +03:00
.zrembyscore(&recent_transactions, i64::MIN, one_week_ago)
2022-12-29 09:21:09 +03:00
.ignore()
2023-01-03 03:34:32 +03:00
// get counts for last week
.zcount(&recent_users_by_id, one_week_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_week_ago, i64::MAX)
.zcount(&recent_transactions, one_week_ago, i64::MAX)
2022-12-29 09:21:09 +03:00
// get counts for last day
.zcount(&recent_users_by_id, one_day_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_day_ago, i64::MAX)
.zcount(&recent_transactions, one_day_ago, i64::MAX)
// get counts for last hour
.zcount(&recent_users_by_id, one_hour_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_hour_ago, i64::MAX)
.zcount(&recent_transactions, one_hour_ago, i64::MAX)
// get counts for last minute
.zcount(&recent_users_by_id, one_minute_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_minute_ago, i64::MAX)
.zcount(&recent_transactions, one_minute_ago, i64::MAX)
.query_async(&mut redis_conn)
.await
{
Ok((
2023-01-03 03:34:32 +03:00
user_id_in_week,
ip_in_week,
txs_in_week,
2022-12-29 09:21:09 +03:00
user_id_in_day,
ip_in_day,
txs_in_day,
user_id_in_hour,
ip_in_hour,
txs_in_hour,
user_id_in_minute,
ip_in_minute,
txs_in_minute,
)) => {
let recent_user_id_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week: user_id_in_week,
2022-12-29 09:21:09 +03:00
one_day: user_id_in_day,
one_hour: user_id_in_hour,
one_minute: user_id_in_minute,
};
let recent_ip_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week: ip_in_week,
2022-12-29 09:21:09 +03:00
one_day: ip_in_day,
one_hour: ip_in_hour,
one_minute: ip_in_minute,
};
let recent_tx_counts = RecentCounts {
2023-01-03 03:34:32 +03:00
one_week: txs_in_week,
2022-12-29 09:21:09 +03:00
one_day: txs_in_day,
one_hour: txs_in_hour,
one_minute: txs_in_minute,
};
(recent_ip_counts, recent_user_id_counts, recent_tx_counts)
}
Err(err) => {
warn!("unable to count recent users: {}", err);
(
RecentCounts::for_err(),
RecentCounts::for_err(),
RecentCounts::for_err(),
)
}
}
2022-12-29 09:21:09 +03:00
}
Ok(None) => (
RecentCounts::default(),
RecentCounts::default(),
RecentCounts::default(),
),
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(
RecentCounts::for_err(),
RecentCounts::for_err(),
RecentCounts::for_err(),
)
}
};
2022-12-28 09:11:18 +03:00
// app.pending_transactions.sync();
// app.rpc_secret_key_cache.sync();
// "pending_transactions_count": app.pending_transactions.entry_count(),
// "pending_transactions_size": app.pending_transactions.weighted_size(),
// "user_cache_count": app.rpc_secret_key_cache.entry_count(),
// "user_cache_size": app.rpc_secret_key_cache.weighted_size(),
2022-09-09 06:53:16 +03:00
#[derive(Serialize)]
struct CombinedMetrics {
recent_ip_counts: RecentCounts,
2022-12-29 09:21:09 +03:00
recent_user_id_counts: RecentCounts,
recent_tx_counts: RecentCounts,
2023-01-03 04:06:36 +03:00
user_count: UserCount,
2022-09-09 06:53:16 +03:00
}
let metrics = CombinedMetrics {
recent_ip_counts,
2022-12-29 09:21:09 +03:00
recent_user_id_counts,
recent_tx_counts,
2023-01-03 04:06:36 +03:00
user_count,
2022-09-09 06:53:16 +03:00
};
// TODO: i don't like this library. it doesn't include HELP or TYPE lines and so our prometheus server fails to parse it
2022-09-10 05:59:07 +03:00
serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)
.expect("prometheus metrics should always serialize")
}
2022-07-22 22:30:39 +03:00
/// send the request or batch of requests to the approriate RPCs
2022-05-12 02:50:52 +03:00
pub async fn proxy_web3_rpc(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: Arc<Authorization>,
2022-05-12 02:50:52 +03:00
request: JsonRpcRequestEnum,
) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
2023-01-17 09:54:40 +03:00
// trace!(?request, "proxy_web3_rpc");
2022-05-12 02:50:52 +03:00
// even though we have timeouts on the requests to our backend providers,
2022-07-22 22:30:39 +03:00
// we need a timeout for the incoming request so that retries don't run forever
// TODO: take this as an optional argument. check for a different max from the user_tier?
// TODO: how much time was spent on this request alredy?
let max_time = Duration::from_secs(240);
2022-07-22 22:30:39 +03:00
// TODO: use streams and buffers so we don't overwhelm our server
2022-05-12 02:50:52 +03:00
let response = match request {
JsonRpcRequestEnum::Single(mut request) => {
let (status_code, response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, &mut request, None),
)
.await?;
(
status_code,
JsonRpcForwardedResponseEnum::Single(response),
rpcs,
)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
2023-03-03 04:39:50 +03:00
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??;
2022-05-12 02:50:52 +03:00
// TODO: real status code
(
StatusCode::OK,
JsonRpcForwardedResponseEnum::Batch(responses),
rpcs,
)
}
};
2022-05-17 03:56:56 +03:00
2022-05-20 08:27:18 +03:00
Ok(response)
2022-05-12 02:50:52 +03:00
}
2022-09-09 00:01:36 +03:00
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
2022-05-12 02:50:52 +03:00
async fn proxy_web3_rpc_requests(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut requests: Vec<JsonRpcRequest>,
) -> Web3ProxyResult<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
2022-05-12 02:50:52 +03:00
let num_requests = requests.len();
2022-10-10 07:15:07 +03:00
2023-02-15 04:41:40 +03:00
// get the head block now so that any requests that need it all use the same block
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
.balanced_rpcs
.head_block_num()
.ok_or(Web3ProxyError::NoServersSynced)?;
2023-02-15 04:41:40 +03:00
2022-05-12 02:50:52 +03:00
let responses = join_all(
requests
.iter_mut()
2023-02-15 04:41:40 +03:00
.map(|request| {
2023-03-03 04:39:50 +03:00
self.proxy_cached_request(authorization, request, Some(head_block_num))
2023-02-15 04:41:40 +03:00
})
2022-05-12 02:50:52 +03:00
.collect::<Vec<_>>(),
)
.await;
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpc_names: HashSet<String> = HashSet::new();
let mut collected_rpcs: Vec<Arc<Web3Rpc>> = vec![];
2022-05-12 02:50:52 +03:00
for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (status_code, response, rpcs) = response;
collected.push(response);
collected_rpcs.extend(rpcs.into_iter().filter(|x| {
if collected_rpc_names.contains(&x.name) {
false
} else {
collected_rpc_names.insert(x.name.clone());
true
}
}));
// TODO: what should we do with the status code? check the jsonrpc spec
2022-05-12 02:50:52 +03:00
}
Ok((collected, collected_rpcs))
2022-05-12 02:50:52 +03:00
}
2022-10-19 03:56:57 +03:00
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
2022-09-24 07:31:06 +03:00
}
pub fn db_replica(&self) -> Option<DatabaseReplica> {
self.db_replica.clone()
}
2022-12-29 09:21:09 +03:00
pub async fn redis_conn(&self) -> anyhow::Result<Option<redis_rate_limiter::RedisConnection>> {
match self.vredis_pool.as_ref() {
2022-12-29 09:21:09 +03:00
// TODO: don't do an error. return None
None => Ok(None),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
2022-12-29 09:21:09 +03:00
Ok(Some(redis_conn))
}
}
}
/// try to send transactions to the best available rpcs with protected/private mempools
/// if no protected rpcs are configured, then some public rpcs are used instead
async fn try_send_protected(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseData> {
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty() {
let protected_response = protected_rpcs
.try_send_all_synced_connections(
authorization,
request,
Some(request_metadata),
None,
None,
Level::Trace,
None,
true,
)
.await?;
return Ok(protected_response);
}
}
let num_public_rpcs = match authorization.checks.proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this
ProxyMode::Fastest(x) => Some(x * 4),
ProxyMode::Versus => None,
};
// no private rpcs to send to. send to a few public rpcs
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
self.balanced_rpcs
.try_send_all_synced_connections(
authorization,
request,
Some(request_metadata),
None,
None,
Level::Trace,
num_public_rpcs,
true,
)
.await
}
2023-04-14 10:04:35 +03:00
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
// TODO: move this to another module
2023-01-17 09:54:40 +03:00
async fn proxy_cached_request(
2022-09-09 00:01:36 +03:00
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &mut JsonRpcRequest,
2023-02-15 04:41:40 +03:00
head_block_num: Option<U64>,
) -> (StatusCode, JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>) {
// TODO: move this code to another module so that its easy to turn this trace logging on in dev
trace!("Received request: {:?}", request);
2022-05-12 02:50:52 +03:00
// save the id so we can attach it to the response
// TODO: we don't always need to clone this. if we come from the cache, we can just take from the request
// TODO: store on the request_metadata?
let response_id = request.id.clone();
let request_metadata = RequestMetadata::new(
self,
authorization.clone(),
RequestOrMethod::Request(request),
head_block_num.as_ref(),
)
.await;
2023-03-03 17:58:45 +03:00
let (status_code, response_data): (_, JsonRpcResponseData) = match self
._proxy_cached_request(authorization, request, head_block_num, &request_metadata)
.await
{
Ok(x) => (StatusCode::OK, x),
Err(err) => err.into_response_parts(),
};
2023-03-03 04:39:50 +03:00
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
2023-03-03 04:39:50 +03:00
// TODO: with parallel request sending, I think there could be a race on this
let rpcs = request_metadata.backend_rpcs_used();
2023-03-03 04:39:50 +03:00
(status_code, response, rpcs)
}
2023-03-03 04:39:50 +03:00
/// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use
async fn _proxy_cached_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &mut JsonRpcRequest,
head_block_num: Option<U64>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseData> {
// TODO: don't clone?
let request_method = request.method.clone();
// TODO: serve net_version without querying the backend
let response_data: JsonRpcResponseData = match request_method.as_ref() {
// lots of commands are blocked
2023-02-03 21:56:05 +03:00
method @ ("db_getHex"
| "db_getString"
| "db_putHex"
| "db_putString"
| "debug_accountRange"
| "debug_backtraceAt"
| "debug_blockProfile"
| "debug_bundler_clearState"
| "debug_bundler_dumpMempool"
| "debug_bundler_sendBundleNow"
2022-06-14 09:54:19 +03:00
| "debug_chaindbCompact"
| "debug_chaindbProperty"
| "debug_cpuProfile"
| "debug_freeOSMemory"
2022-06-14 09:54:19 +03:00
| "debug_freezeClient"
| "debug_gcStats"
2022-06-14 09:54:19 +03:00
| "debug_goTrace"
| "debug_memStats"
2022-06-14 09:54:19 +03:00
| "debug_mutexProfile"
| "debug_setBlockProfileRate"
| "debug_setGCPercent"
| "debug_setHead"
| "debug_setMutexProfileFraction"
| "debug_standardTraceBadBlockToFile"
| "debug_standardTraceBlockToFile"
2022-06-14 09:54:19 +03:00
| "debug_startCPUProfile"
| "debug_startGoTrace"
| "debug_stopCPUProfile"
| "debug_stopGoTrace"
| "debug_writeBlockProfile"
| "debug_writeMemProfile"
| "debug_writeMutexProfile"
| "erigon_cacheCheck"
| "eth_compileLLL"
| "eth_compileSerpent"
| "eth_compileSolidity"
| "eth_getCompilers"
| "eth_sendTransaction"
| "eth_sign"
| "eth_signTransaction"
| "eth_submitHashrate"
| "eth_submitWork"
2022-06-14 09:54:19 +03:00
| "les_addBalance"
| "les_setClientParams"
| "les_setDefaultParams"
| "miner_setEtherbase"
2022-06-14 09:54:19 +03:00
| "miner_setExtra"
| "miner_setGasLimit"
2022-06-14 09:54:19 +03:00
| "miner_setGasPrice"
| "miner_start"
| "miner_stop"
| "personal_ecRecover"
2022-06-14 09:54:19 +03:00
| "personal_importRawKey"
| "personal_listAccounts"
| "personal_lockAccount"
| "personal_newAccount"
| "personal_sendTransaction"
| "personal_sign"
| "personal_unlockAccount"
| "shh_addToGroup"
| "shh_getFilterChanges"
| "shh_getMessages"
| "shh_hasIdentity"
| "shh_newFilter"
| "shh_newGroup"
| "shh_newIdentity"
| "shh_post"
| "shh_uninstallFilter"
2022-09-09 00:01:36 +03:00
| "shh_version") => {
// i don't think we will ever support these methods. maybe do Forbidden?
2023-01-24 19:07:10 +03:00
// TODO: what error code?
JsonRpcErrorData::from(format!(
"the method {} does not exist/is not available",
method
)).into()
}
// TODO: implement these commands
2022-09-09 00:01:36 +03:00
method @ ("eth_getFilterChanges"
| "eth_getFilterLogs"
| "eth_newBlockFilter"
| "eth_newFilter"
| "eth_newPendingTransactionFilter"
2023-02-02 00:56:43 +03:00
| "eth_pollSubscriptions"
2022-09-09 00:01:36 +03:00
| "eth_uninstallFilter") => {
// TODO: unsupported command stat. use the count to prioritize new features
2023-01-24 19:07:10 +03:00
// TODO: what error code?
JsonRpcErrorData::from(format!(
"the method {} is not yet implemented. contact us if you need this",
method
))
.into()
2023-04-14 10:04:35 +03:00
}
_method @ ("eth_sendUserOperation"
2023-04-14 10:04:35 +03:00
| "eth_estimateUserOperationGas"
| "eth_getUserOperationByHash"
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
bundler_4337_rpcs
2023-04-14 10:04:35 +03:00
.try_proxy_connection(
authorization,
request,
Some(request_metadata),
2023-04-14 10:04:35 +03:00
None,
None,
)
.await?
2023-04-14 10:04:35 +03:00
}
None => {
// TODO: stats even when we error!
// TODO: dedicated error for no 4337 bundlers
return Err(Web3ProxyError::NoServersSynced);
2023-04-14 10:04:35 +03:00
}
},
"eth_accounts" => JsonRpcResponseData::from(serde_json::Value::Array(vec![])),
"eth_blockNumber" => {
2023-02-15 04:41:40 +03:00
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => JsonRpcResponseData::from(json!(head_block_num)),
2022-09-01 08:58:55 +03:00
None => {
// TODO: what does geth do if this happens?
// TODO: standard not synced error
return Err(Web3ProxyError::NoServersSynced);
2022-09-01 08:58:55 +03:00
}
}
}
"eth_chainId" => JsonRpcResponseData::from(json!(U64::from(self.config.chain_id))),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
2022-07-22 22:30:39 +03:00
// no need for serving coinbase
JsonRpcResponseData::from(json!(Address::zero()))
}
"eth_estimateGas" => {
let response_data = self
.balanced_rpcs
2023-01-20 05:08:53 +03:00
.try_proxy_connection(
authorization,
request,
Some(request_metadata),
None,
2023-02-11 07:45:57 +03:00
None,
)
.await?;
if let JsonRpcResponseData::Result { value, .. } = response_data {
let mut gas_estimate: U256 = serde_json::from_str(value.get())
.or(Err(Web3ProxyError::GasEstimateNotU256))?;
let gas_increase = if let Some(gas_increase_percent) =
self.config.gas_increase_percent
{
2023-01-20 05:08:53 +03:00
let gas_increase = gas_estimate * gas_increase_percent / U256::from(100);
let min_gas_increase = self.config.gas_increase_min.unwrap_or_default();
gas_increase.max(min_gas_increase)
} else {
self.config.gas_increase_min.unwrap_or_default()
};
gas_estimate += gas_increase;
JsonRpcResponseData::from(json!(gas_estimate))
} else {
response_data
}
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
let mut response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
request,
Some(request_metadata),
None,
None,
)
.await?;
// if we got "null", it is probably because the tx is old. retry on nodes with old block data
if let JsonRpcResponseData::Result { value, .. } = &response_data {
if value.get() == "null" {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
request,
Some(request_metadata),
Some(&U64::one()),
None,
)
.await?;
}
}
response_data
}
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => JsonRpcResponseData::from(json!(U64::zero())),
"eth_mining" => JsonRpcResponseData::from(serde_json::Value::Bool(false)),
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
2022-08-09 19:54:05 +03:00
"eth_sendRawTransaction" => {
// TODO: decode the transaction
2023-01-17 09:54:40 +03:00
// TODO: error if the chain_id is incorrect
// TODO: check the cache to see if we have sent this transaction recently
// TODO: if so, use a cached response.
// TODO: if not,
// TODO: - cache successes for up to 1 minute
// TODO: - cache failures for 1 block (we see transactions skipped because out of funds. but that can change block to block)
let mut response_data = self
.try_send_protected(
authorization,
request,
request_metadata.clone(),
)
.await?;
2022-12-24 04:32:58 +03:00
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
if let JsonRpcResponseData::Error { value, .. } = &response_data {
if value.code == -32000
&& (value.message == "ALREADY_EXISTS: already known"
|| value.message == "INTERNAL_ERROR: existing tx with same hash")
2022-12-24 04:32:58 +03:00
{
let params = request
.params
.as_mut()
.web3_context("there must be params if we got this far")?;
2022-12-24 04:32:58 +03:00
let params = params
.as_array()
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get array from params".to_string(),
)
})?
2022-12-24 04:32:58 +03:00
.get(0)
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get item 0 from params".to_string(),
)
})?
2022-12-24 04:32:58 +03:00
.as_str()
.ok_or_else(|| {
Web3ProxyError::BadRequest(
"Unable to get string from params item 0".to_string(),
)
})?;
2022-12-24 04:32:58 +03:00
let params = Bytes::from_str(params)
.expect("there must be Bytes if we got this far");
let rlp = Rlp::new(params.as_ref());
if let Ok(tx) = Transaction::decode(&rlp) {
// TODO: decode earlier and confirm that tx.chain_id (if set) matches self.config.chain_id
2022-12-24 04:32:58 +03:00
let tx_hash = json!(tx.hash());
trace!("tx_hash: {:#?}", tx_hash);
2022-12-24 04:32:58 +03:00
response_data = JsonRpcResponseData::from(tx_hash);
2022-12-24 04:32:58 +03:00
}
}
}
// emit transaction count stats
// TODO: use this cache to avoid sending duplicate transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let JsonRpcResponseData::Result { value, .. } = &response_data {
2022-12-29 09:21:09 +03:00
let now = Utc::now().timestamp();
let app = self.clone();
let salted_tx_hash = format!("{}:{}", salt, value.get());
2022-12-29 09:21:09 +03:00
let f = async move {
match app.redis_conn().await {
Ok(Some(mut redis_conn)) => {
let hashed_tx_hash =
Bytes::from(keccak256(salted_tx_hash.as_bytes()));
let recent_tx_hash_key =
format!("eth_sendRawTransaction:{}", app.config.chain_id);
redis_conn
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Ok(None) => {}
Err(err) => {
warn!(
"unable to save stats for eth_sendRawTransaction: {:?}",
err
)
}
}
Ok::<_, anyhow::Error>(())
};
tokio::spawn(f);
}
}
response_data
2022-08-09 19:54:05 +03:00
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(false))
}
"eth_subscribe" => JsonRpcErrorData {
message: Cow::Borrowed(
"notifications not supported. eth_subscribe is only available over a websocket",
),
code: -32601,
data: None,
}
.into(),
"eth_unsubscribe" => JsonRpcErrorData {
message: Cow::Borrowed("notifications not supported. eth_unsubscribe is only available over a websocket"),
code: -32601,
data: None,
}.into(),
"net_listening" => {
// TODO: only true if there are some backends on balanced_rpcs?
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(true))
}
"net_peerCount" =>
JsonRpcResponseData::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())))
,
"web3_clientVersion" =>
JsonRpcResponseData::from(serde_json::Value::String(APP_USER_AGENT.to_string()))
,
2022-07-22 22:30:39 +03:00
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
match &request.params {
Some(serde_json::Value::Array(params)) => {
2022-08-10 05:37:34 +03:00
// TODO: make a struct and use serde conversion to clean this up
2023-03-09 20:32:30 +03:00
if params.len() != 1
|| !params.get(0).map(|x| x.is_string()).unwrap_or(false)
{
2023-01-24 19:07:10 +03:00
// TODO: what error code?
// TODO: use Web3ProxyError::BadRequest
JsonRpcErrorData {
message: Cow::Borrowed("Invalid request"),
code: -32600,
data: None
}.into()
} else {
// TODO: BadRequest instead of web3_context
let param = Bytes::from_str(
params[0]
.as_str()
.ok_or(Web3ProxyError::ParseBytesError(None))
.web3_context("parsing params 0 into str then bytes")?,
)
.map_err(|x| {
trace!("bad request: {:?}", x);
Web3ProxyError::BadRequest(
"param 0 could not be read as H256".to_string(),
)
})?;
let hash = H256::from(keccak256(param));
JsonRpcResponseData::from(json!(hash))
}
2022-07-22 22:30:39 +03:00
}
2022-09-10 03:58:33 +03:00
_ => {
// TODO: this needs the correct error code in the response
// TODO: Web3ProxyError::BadRequest instead?
JsonRpcErrorData {
message: Cow::Borrowed("invalid request"),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
}.into()
2022-09-10 03:58:33 +03:00
}
2022-07-22 22:30:39 +03:00
}
}
"test" => JsonRpcErrorData {
message: Cow::Borrowed("The method test does not exist/is not available."),
code: -32601,
data: None,
}.into(),
2022-07-22 22:30:39 +03:00
// anything else gets sent to backend rpcs and cached
2022-06-30 03:52:04 +03:00
method => {
2023-02-03 21:56:05 +03:00
if method.starts_with("admin_") {
// TODO: emit a stat? will probably just be noise
return Err(Web3ProxyError::AccessDenied);
2023-02-03 21:56:05 +03:00
}
2023-01-23 09:02:08 +03:00
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
2023-02-15 04:41:40 +03:00
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or(Web3ProxyError::NoServersSynced)?;
2022-07-16 07:13:02 +03:00
// TODO: don't clone. this happens way too much. maybe &mut?
// let mut request = request.clone();
// we do this check before checking caches because it might modify the request params
2022-07-16 08:48:02 +03:00
// TODO: add a stat for archive vs full since they should probably cost different
2022-12-17 07:05:01 +03:00
// TODO: this cache key can be rather large. is that okay?
let cache_key: Option<JsonRpcQueryCacheKey> = match block_needed(
authorization,
2022-09-22 02:50:55 +03:00
method,
request.params.as_mut(),
2023-01-23 09:02:08 +03:00
head_block_num,
2022-09-22 02:50:55 +03:00
&self.balanced_rpcs,
)
2022-09-30 07:18:18 +03:00
.await?
2022-09-05 09:13:36 +03:00
{
BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey {
from_block: None,
to_block: None,
2022-12-17 07:05:01 +03:00
method: method.to_string(),
params: request.params.clone(),
cache_errors: false,
}),
BlockNeeded::CacheNever => None,
BlockNeeded::Cache {
block_num,
cache_errors,
} => {
2023-02-06 04:58:03 +03:00
let (request_block_hash, block_depth) = self
2022-12-17 07:05:01 +03:00
.balanced_rpcs
.block_hash(authorization, &block_num)
.await?;
2023-02-06 04:58:03 +03:00
if block_depth < self.config.archive_depth {
2022-12-17 07:05:01 +03:00
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
2022-12-17 07:05:01 +03:00
}
2022-12-03 08:31:03 +03:00
2022-12-17 07:05:01 +03:00
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.await?
.block;
2022-12-17 07:05:01 +03:00
Some(JsonRpcQueryCacheKey {
from_block: Some(request_block),
to_block: None,
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
cache_errors,
})
}
BlockNeeded::CacheRange {
from_block_num,
to_block_num,
cache_errors,
} => {
2023-02-06 04:58:03 +03:00
let (from_block_hash, block_depth) = self
.balanced_rpcs
.block_hash(authorization, &from_block_num)
.await?;
2023-02-06 04:58:03 +03:00
if block_depth < self.config.archive_depth {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
}
let from_block = self
.balanced_rpcs
.block(authorization, &from_block_hash, None)
.await?
.block;
let (to_block_hash, _) = self
.balanced_rpcs
.block_hash(authorization, &to_block_num)
.await?;
let to_block = self
.balanced_rpcs
.block(authorization, &to_block_hash, None)
.await?
.block;
Some(JsonRpcQueryCacheKey {
from_block: Some(from_block),
to_block: Some(to_block),
2022-12-17 07:05:01 +03:00
method: method.to_string(),
params: request.params.clone(),
cache_errors,
})
}
};
2022-12-17 07:05:01 +03:00
let authorization = authorization.clone();
2022-12-17 07:05:01 +03:00
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
match self
.jsonrpc_query_cache
2023-05-15 20:48:59 +03:00
.get_value_or_guard_async(cache_key).await
{
Ok(x) => x,
Err(x) => {
let response_data = self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
.await?;
// TODO: convert the Box<RawValue> to an Arc<RawValue>
x.insert(response_data.clone());
response_data
}
}
} else {
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
Some(request_metadata),
None,
None,
)
.await?
}
}
2022-07-22 22:30:39 +03:00
};
Ok(response_data)
2022-05-12 02:50:52 +03:00
}
}
2022-08-24 03:59:05 +03:00
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}