pass name through and use pub more

This commit is contained in:
Bryan Stitt 2022-08-10 05:56:09 +00:00
parent 21055799f3
commit 77a589a96d
8 changed files with 120 additions and 173 deletions

@ -66,6 +66,7 @@
- [x] Add a "weight" key to the servers. Sort on that after block. keep most requests local
- [x] cache db query results for user data. db is a big bottleneck right now
- [x] allow blocking public requests
- [ ] cache more things locally or in redis
- [ ] use siwe messages and signatures for sign up and login
- [ ] basic request method stats

@ -58,6 +58,21 @@ type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
// TODO: think more about TxState
#[derive(Clone)]
pub enum TxState {
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
}
#[derive(Clone, Copy, From)]
pub struct UserCacheValue {
pub expires_at: Instant,
pub user_id: i64,
pub user_count_per_period: u32,
}
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
@ -109,42 +124,26 @@ pub async fn get_migrated_db(
Ok(db)
}
// TODO: think more about TxState
#[derive(Clone)]
pub enum TxState {
Pending(Transaction),
Confirmed(Transaction),
Orphaned(Transaction),
}
#[derive(Clone, Copy, From)]
pub struct UserCacheValue {
pub expires_at: Instant,
pub user_id: i64,
pub user_count_per_period: u32,
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
/// Send requests to the best server available
balanced_rpcs: Arc<Web3Connections>,
pub balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>,
pub private_rpcs: Arc<Web3Connections>,
/// Track active requests so that we don't send the same query to multiple backends
active_requests: ActiveRequestsMap,
pub active_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
redis_pool: Option<RedisPool>,
rate_limiter: Option<RedisCell>,
db_conn: Option<sea_orm::DatabaseConnection>,
pub pending_transactions: Arc<DashMap<TxHash, TxState>>,
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
pub redis_pool: Option<RedisPool>,
pub rate_limiter: Option<RedisCell>,
pub db_conn: Option<sea_orm::DatabaseConnection>,
}
impl fmt::Debug for Web3ProxyApp {
@ -155,26 +154,6 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
pub fn db_conn(&self) -> Option<&sea_orm::DatabaseConnection> {
self.db_conn.as_ref()
}
pub fn pending_transactions(&self) -> &DashMap<TxHash, TxState> {
&self.pending_transactions
}
pub fn rate_limiter(&self) -> Option<&RedisCell> {
self.rate_limiter.as_ref()
}
pub fn redis_pool(&self) -> Option<&RedisPool> {
self.redis_pool.as_ref()
}
pub fn user_cache(&self) -> &RwLock<FifoCountMap<Uuid, UserCacheValue>> {
&self.user_cache
}
// TODO: should we just take the rpc config as the only arg instead?
pub async fn spawn(
app_config: AppConfig,
@ -195,12 +174,12 @@ impl Web3ProxyApp {
None
};
let balanced_rpcs = app_config.balanced_rpcs.into_values().collect();
let balanced_rpcs = app_config.balanced_rpcs;
let private_rpcs = if let Some(private_rpcs) = app_config.private_rpcs {
private_rpcs.into_values().collect()
private_rpcs
} else {
vec![]
Default::default()
};
// TODO: try_join_all instead?
@ -523,18 +502,6 @@ impl Web3ProxyApp {
Ok((subscription_abort_handle, response))
}
pub fn balanced_rpcs(&self) -> &Web3Connections {
&self.balanced_rpcs
}
pub fn private_rpcs(&self) -> &Web3Connections {
&self.private_rpcs
}
pub fn active_requests(&self) -> &ActiveRequestsMap {
&self.active_requests
}
/// send the request or batch of requests to the approriate RPCs
#[instrument(skip_all)]
pub async fn proxy_web3_rpc(

@ -68,8 +68,10 @@ pub struct Web3ConnectionConfig {
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
self,
name: String,
redis_client_pool: Option<redis_cell_client::RedisPool>,
chain_id: u64,
http_client: Option<reqwest::Client>,
@ -89,6 +91,7 @@ impl Web3ConnectionConfig {
};
Web3Connection::spawn(
name,
chain_id,
self.url,
http_client,

@ -73,8 +73,9 @@ impl fmt::Debug for Web3Provider {
/// An active connection to a Web3Rpc
pub struct Web3Connection {
name: String,
/// TODO: can we get this from the provider? do we even need it?
url: String,
pub url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
/// provider is in a RwLock so that we can replace it if re-connecting
@ -83,66 +84,19 @@ pub struct Web3Connection {
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
hard_limit: Option<redis_cell_client::RedisCell>,
/// used for load balancing to the least loaded server
soft_limit: u32,
pub soft_limit: u32,
block_data_limit: AtomicU64,
weight: u32,
pub weight: u32,
head_block: RwLock<(H256, U64)>,
}
impl Serialize for Web3Connection {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 4)?;
// TODO: sanitize any credentials in the url
state.serialize_field("url", &self.url)?;
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
state.serialize_field("block_data_limit", &block_data_limit)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut f = f.debug_struct("Web3Connection");
f.field("url", &self.url);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
f.field("data", &"archive");
} else {
f.field("data", &block_data_limit);
}
f.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: filter basic auth and api keys
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
/// Connect to a web3 rpc
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
// TODO: have this take a builder (which will have channels attached)
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
name: String,
chain_id: u64,
url_str: String,
// optional because this is only used for http providers. websocket providers don't use it
@ -172,6 +126,7 @@ impl Web3Connection {
let provider = Web3Provider::from_str(&url_str, http_client).await?;
let new_connection = Self {
name,
url: url_str.clone(),
active_requests: 0.into(),
provider: AsyncRwLock::new(Some(Arc::new(provider))),
@ -280,10 +235,6 @@ impl Web3Connection {
Ok((new_connection, handle))
}
pub fn url(&self) -> &str {
&self.url
}
/// TODO: this might be too simple. different nodes can prune differently
pub fn block_data_limit(&self) -> U64 {
self.block_data_limit.load(atomic::Ordering::Acquire).into()
@ -338,11 +289,6 @@ impl Web3Connection {
self.active_requests.load(atomic::Ordering::Acquire)
}
#[inline]
pub fn soft_limit(&self) -> u32 {
self.soft_limit
}
#[inline]
pub async fn has_provider(&self) -> bool {
self.provider.read().await.is_some()
@ -668,10 +614,6 @@ impl Web3Connection {
Ok(HandleResult::ActiveRequest(handle))
}
pub fn weight(&self) -> u32 {
self.weight
}
}
impl Hash for Web3Connection {
@ -768,3 +710,52 @@ impl PartialEq for Web3Connection {
self.url == other.url
}
}
impl Serialize for Web3Connection {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 4)?;
// the url is excluded because it likely includes private information. just show the name
state.serialize_field("name", &self.name)?;
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
state.serialize_field("block_data_limit", &block_data_limit)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
"active_requests",
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut f = f.debug_struct("Web3Connection");
f.field("url", &self.url);
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
f.field("data", &"archive");
} else {
f.field("data", &block_data_limit);
}
f.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: filter basic auth and api keys
write!(f, "{}", &self.url)
}
}

@ -31,10 +31,11 @@ use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
// Serialize so we can print it on our debug endpoint
/// A collection of Web3Connections that are on the same block
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
struct SyncedConnections {
head_block_num: u64,
head_block_num: U64,
head_block_hash: H256,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
@ -53,16 +54,6 @@ impl fmt::Debug for SyncedConnections {
}
}
impl SyncedConnections {
pub fn head_block_hash(&self) -> &H256 {
&self.head_block_hash
}
pub fn head_block_num(&self) -> U64 {
self.head_block_num.into()
}
}
#[derive(Default)]
pub struct BlockChain {
/// only includes blocks on the main chain.
@ -144,7 +135,7 @@ impl Web3Connections {
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn(
chain_id: u64,
server_configs: Vec<Web3ConnectionConfig>,
server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_cell_client::RedisPool>,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
@ -193,7 +184,7 @@ impl Web3Connections {
// turn configs into connections (in parallel)
let spawn_handles: Vec<_> = server_configs
.into_iter()
.map(|server_config| {
.map(|(server_name, server_config)| {
let http_client = http_client.clone();
let redis_client_pool = redis_client_pool.clone();
let http_interval_sender = http_interval_sender.clone();
@ -203,6 +194,7 @@ impl Web3Connections {
tokio::spawn(async move {
server_config
.spawn(
server_name,
redis_client_pool,
chain_id,
http_client,
@ -223,7 +215,7 @@ impl Web3Connections {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
Ok(Ok((connection, handle))) => {
connections.insert(connection.url().to_string(), connection);
connections.insert(connection.url.clone(), connection);
handles.push(handle);
}
Ok(Err(err)) => {
@ -506,18 +498,18 @@ impl Web3Connections {
pub fn head_block(&self) -> (U64, H256) {
let synced_connections = self.synced_connections.load();
let num = synced_connections.head_block_num();
let hash = *synced_connections.head_block_hash();
(num, hash)
(
synced_connections.head_block_num,
synced_connections.head_block_hash,
)
}
pub fn head_block_hash(&self) -> H256 {
*self.synced_connections.load().head_block_hash()
self.synced_connections.load().head_block_hash
}
pub fn head_block_num(&self) -> U64 {
self.synced_connections.load().head_block_num()
self.synced_connections.load().head_block_num
}
pub fn synced(&self) -> bool {
@ -606,7 +598,7 @@ impl Web3Connections {
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
if let Some(current_block) = connection_heads.get(rpc.url()) {
if let Some(current_block) = connection_heads.get(&rpc.url) {
if current_block.hash == new_block.hash {
// duplicate block
continue;
@ -618,7 +610,7 @@ impl Web3Connections {
} else {
warn!(%rpc, ?new_block, "Block without hash!");
connection_heads.remove(rpc.url());
connection_heads.remove(&rpc.url);
continue;
};
@ -631,7 +623,7 @@ impl Web3Connections {
// maybe when a node is syncing or reconnecting?
warn!(%rpc, ?new_block, "Block without number!");
connection_heads.remove(rpc.url());
connection_heads.remove(&rpc.url);
continue;
};
@ -646,10 +638,10 @@ impl Web3Connections {
if new_block_num == U64::zero() {
warn!(%rpc, %new_block_num, "still syncing");
connection_heads.remove(rpc.url());
connection_heads.remove(&rpc.url);
} else {
// TODO: no clone? we end up with different blocks for every rpc
connection_heads.insert(rpc.url().to_string(), new_block.clone());
connection_heads.insert(rpc.url.clone(), new_block.clone());
self.chain.add_block(new_block.clone(), false);
}
@ -675,7 +667,7 @@ impl Web3Connections {
for (rpc_url, head_block) in connection_heads.iter() {
if let Some(rpc) = self.conns.get(rpc_url) {
// we need the total soft limit in order to know when its safe to update the backends
total_soft_limit += rpc.soft_limit();
total_soft_limit += rpc.soft_limit;
let head_hash = head_block.hash.unwrap();
@ -763,7 +755,7 @@ impl Web3Connections {
.iter()
.map(|rpc_url| {
if let Some(rpc) = self.conns.get(*rpc_url) {
rpc.soft_limit()
rpc.soft_limit
} else {
0
}
@ -824,7 +816,7 @@ impl Web3Connections {
.collect();
let pending_synced_connections = SyncedConnections {
head_block_num: best_head_num.as_u64(),
head_block_num: best_head_num,
head_block_hash: best_head_hash,
conns,
};
@ -948,9 +940,9 @@ impl Web3Connections {
.iter()
.map(|rpc| {
// TODO: get active requests and the soft limit out of redis?
let weight = rpc.weight();
let weight = rpc.weight;
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
let soft_limit = rpc.soft_limit;
let utilization = active_requests as f32 / soft_limit as f32;

@ -6,7 +6,7 @@ use crate::app::Web3ProxyApp;
/// Health check page for load balancers to use
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
if app.balanced_rpcs().synced() {
if app.balanced_rpcs.synced() {
(StatusCode::OK, "OK")
} else {
(StatusCode::SERVICE_UNAVAILABLE, ":(")
@ -17,17 +17,12 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// TODO: replace this with proper stats and monitoring
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: what else should we include? uptime? prometheus?
let balanced_rpcs = app.balanced_rpcs();
let private_rpcs = app.private_rpcs();
let num_active_requests = app.active_requests().len();
let num_pending_transactions = app.pending_transactions().len();
let body = json!({
"balanced_rpcs": balanced_rpcs,
"private_rpcs": private_rpcs,
"num_active_requests": num_active_requests,
"num_pending_transactions": num_pending_transactions,
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"num_active_requests": app.active_requests.len(),
"num_pending_transactions": app.pending_transactions.len(),
});
(StatusCode::INTERNAL_SERVER_ERROR, Json(body))
(StatusCode::OK, Json(body))
}

@ -25,7 +25,7 @@ impl Web3ProxyApp {
let rate_limiter_key = format!("ip-{}", ip);
// TODO: dry this up with rate_limit_by_key
if let Some(rate_limiter) = self.rate_limiter() {
if let Some(rate_limiter) = &self.rate_limiter {
match rate_limiter
.throttle_key(&rate_limiter_key, None, None, None)
.await
@ -52,10 +52,8 @@ impl Web3ProxyApp {
}
pub async fn rate_limit_by_key(&self, user_key: Uuid) -> anyhow::Result<RateLimitResult> {
let user_cache = self.user_cache();
// check the local cache
let user_data = if let Some(cached_user) = user_cache.read().get(&user_key) {
let user_data = if let Some(cached_user) = self.user_cache.read().get(&user_key) {
// TODO: also include the time this value was last checked! otherwise we cache forever!
if cached_user.expires_at < Instant::now() {
// old record found
@ -71,7 +69,7 @@ impl Web3ProxyApp {
// if cache was empty, check the database
let user_data = if user_data.is_none() {
if let Some(db) = self.db_conn() {
if let Some(db) = &self.db_conn {
/// helper enum for query just a few columns instead of the entire table
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
@ -105,7 +103,7 @@ impl Web3ProxyApp {
};
// save for the next run
user_cache.write().insert(user_key, user_data);
self.user_cache.write().insert(user_key, user_data);
user_data
} else {
@ -118,7 +116,7 @@ impl Web3ProxyApp {
};
// user key is valid. now check rate limits
if let Some(rate_limiter) = self.rate_limiter() {
if let Some(rate_limiter) = &self.rate_limiter {
// TODO: how does max burst actually work? what should it be?
let user_max_burst = user_data.user_count_per_period / 3;
let user_period = 60;

@ -53,7 +53,7 @@ pub async fn create_user(
..Default::default()
};
let db = app.db_conn().unwrap();
let db = app.db_conn.as_ref().unwrap();
// TODO: proper error message
let user = user.insert(db).await.unwrap();