diff --git a/Cargo.lock b/Cargo.lock index 04d6d8d6..72dbf0d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9" +checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142" dependencies = [ "backtrace", ] diff --git a/TODO.md b/TODO.md index f4096ad1..773f28d8 100644 --- a/TODO.md +++ b/TODO.md @@ -59,9 +59,10 @@ - [x] instead of tracking `pending_synced_connections`, have a mapping of where all connections are individually. then each change, re-check for consensus. - [x] synced connections swap threshold set to 1 so that it always serves something - [x] cli tool for creating new users +- [x] incoming rate limiting by api key +- [ ] after a refactor, public rate limit isnt working anymore. i set to 0 but could still request +- [ ] give users different rate limits looked up from the database - [ ] basic request method stats -- [ ] incoming rate limiting by api key - - [ ] give users different rate limits looked up from the database - [ ] allow blocking public requests ## V1 @@ -108,6 +109,7 @@ - [ ] Public bsc server got “0” for block data limit (ninicoin) - [ ] If we need an archive server and no servers in sync, exit immediately with an error instead of waiting 60 seconds - [ ] 60 second timeout is too short. Maybe do that for free tier and larger timeout for paid. Problem is that some queries can take over 1000 seconds +- [ ] refactor from_anyhow_error to have consistent error codes and http codes new endpoints for users: - think about where to put this. a separate app might be better diff --git a/docker-compose.yml b/docker-compose.yml index 927e6c13..fcb35791 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: dev-adminer: image: adminer ports: - - 8306:8080 + - 18306:8080 environment: ADMINER_DEFAULT_SERVER: dev-db diff --git a/entities/src/user_keys.rs b/entities/src/user_keys.rs index a1aa9465..0931bde2 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/user_keys.rs @@ -15,6 +15,7 @@ pub struct Model { pub description: Option, pub private_txs: bool, pub active: bool, + pub requests_per_second: u32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/migration/src/m20220101_000001_create_table.rs b/migration/src/m20220101_000001_create_table.rs index ff62f8c2..04e15ca2 100644 --- a/migration/src/m20220101_000001_create_table.rs +++ b/migration/src/m20220101_000001_create_table.rs @@ -124,6 +124,12 @@ impl MigrationTrait for Migration { .default(true) .not_null(), ) + .col( + ColumnDef::new(UserKeys::RequestsPerMinute) + .unsigned() + .default(true) + .not_null(), + ) .index(sea_query::Index::create().col(UserKeys::Active)) .foreign_key( sea_query::ForeignKey::create() @@ -199,8 +205,7 @@ enum BlockList { -- TODO: what size for api_key -- TODO: track active with a timestamp? -- TODO: creation time? --- TODO: requests_per_second INT, --- TODO: requests_per_day INT, +-- TODO: requests_per_day? requests_per_second?, -- TODO: more security features. likely similar to infura */ #[derive(Iden)] @@ -212,4 +217,5 @@ enum UserKeys { Description, PrivateTxs, Active, + RequestsPerMinute, } diff --git a/redis-cell-client/Cargo.toml b/redis-cell-client/Cargo.toml index dce1ce21..ebfc3296 100644 --- a/redis-cell-client/Cargo.toml +++ b/redis-cell-client/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Bryan Stitt "] edition = "2018" [dependencies] -anyhow = "1.0.59" +anyhow = "1.0.60" bb8-redis = "0.11.0" diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs index 71fb09e8..fd90a59c 100644 --- a/redis-cell-client/src/lib.rs +++ b/redis-cell-client/src/lib.rs @@ -5,7 +5,8 @@ use bb8_redis::redis::cmd; pub use bb8_redis::redis::RedisError; pub use bb8_redis::{bb8, RedisConnectionManager}; -use std::time::Duration; +use std::ops::Add; +use std::time::{Duration, Instant}; pub type RedisClientPool = bb8::Pool; @@ -17,12 +18,17 @@ pub struct RedisCellClient { default_period: u32, } +pub enum ThrottleResult { + Allowed, + RetryAt(Instant), +} + impl RedisCellClient { // todo: seems like this could be derived // TODO: take something generic for conn // TODO: use r2d2 for connection pooling? pub fn new( - pool: bb8::Pool, + pool: RedisClientPool, app: &str, key: &str, default_max_burst: u32, @@ -48,13 +54,13 @@ impl RedisCellClient { count_per_period: Option, period: Option, quantity: u32, - ) -> Result<(), Option> { - let mut conn = self.pool.get().await.unwrap(); + ) -> anyhow::Result { + let mut conn = self.pool.get().await?; let count_per_period = count_per_period.unwrap_or(self.default_count_per_period); if count_per_period == 0 { - return Err(None); + return Ok(ThrottleResult::Allowed); } let max_burst = max_burst.unwrap_or(self.default_max_burst); @@ -78,24 +84,28 @@ impl RedisCellClient { let x: Vec = cmd("CL.THROTTLE") .arg(&(key, max_burst, count_per_period, period, quantity)) .query_async(&mut *conn) - .await - .unwrap(); + .await?; - assert_eq!(x.len(), 5); + // TODO: trace log the result? - // TODO: trace log the result + if x.len() != 5 { + return Err(anyhow::anyhow!("unexpected redis result")); + } - let retry_after = *x.get(3).unwrap(); + let retry_after = *x.get(3).expect("index exists above"); if retry_after == -1 { - Ok(()) + Ok(ThrottleResult::Allowed) } else { - Err(Some(Duration::from_secs(retry_after as u64))) + // don't return a duration, return an instant + let retry_at = Instant::now().add(Duration::from_secs(retry_after as u64)); + + Ok(ThrottleResult::RetryAt(retry_at)) } } #[inline] - pub async fn throttle(&self) -> Result<(), Option> { + pub async fn throttle(&self) -> anyhow::Result { self._throttle(&self.key, None, None, None, 1).await } @@ -106,7 +116,7 @@ impl RedisCellClient { max_burst: Option, count_per_period: Option, period: Option, - ) -> Result<(), Option> { + ) -> anyhow::Result { let key = format!("{}:{}", self.key, key); self._throttle(key.as_ref(), max_burst, count_per_period, period, 1) @@ -114,7 +124,7 @@ impl RedisCellClient { } #[inline] - pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Option> { + pub async fn throttle_quantity(&self, quantity: u32) -> anyhow::Result { self._throttle(&self.key, None, None, None, quantity).await } } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4e182a2e..ef5c51af 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -16,7 +16,7 @@ redis-cell-client = { path = "../redis-cell-client" } entities = { path = "../entities" } migration = { path = "../migration" } -anyhow = { version = "1.0.59", features = ["backtrace"] } +anyhow = { version = "1.0.60", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.8" axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index ed18f7d0..8a3c4407 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -671,7 +671,7 @@ impl Web3ProxyApp { &self, request: JsonRpcRequestEnum, ) -> anyhow::Result { - debug!(?request, "proxy_web3_rpc"); + trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever @@ -689,7 +689,7 @@ impl Web3ProxyApp { ), }; - debug!(?response, "Forwarding response"); + trace!(?response, "Forwarding"); Ok(response) } diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index f1c424e0..a867cd7e 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -12,8 +12,8 @@ use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; use std::thread; -use std::time::Duration; use tokio::runtime; +use tokio::time::Duration; use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, Web3ProxyApp}; diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index b23204d2..25a0a388 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -4,7 +4,7 @@ use derive_more::From; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; -use redis_cell_client::RedisCellClient; +use redis_cell_client::{RedisCellClient, ThrottleResult}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; @@ -13,12 +13,18 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock; -use tokio::time::{interval, sleep, Duration, MissedTickBehavior}; +use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; +pub enum HandleResult { + ActiveRequest(ActiveRequestHandle), + RetryAt(Instant), + None, +} + /// TODO: instead of an enum, I tried to use Box, but hit #[derive(From)] pub enum Web3Provider { @@ -339,7 +345,7 @@ impl Web3Connection { #[instrument(skip_all)] async fn send_block_result( - self: &Arc, + self: Arc, block: Result, ProviderError>, block_sender: &flume::Sender, ) -> anyhow::Result<()> { @@ -355,12 +361,18 @@ impl Web3Connection { } block_sender - .send_async((Arc::new(block), self.clone())) + .send_async((Arc::new(block), self)) .await .context("block_sender")?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); + + // send an empty block to take this server out of rotation + block_sender + .send_async((Arc::new(Block::default()), self)) + .await + .context("block_sender")?; } } @@ -452,25 +464,37 @@ impl Web3Connection { loop { match self.try_request_handle().await { - Ok(active_request_handle) => { + Ok(HandleResult::ActiveRequest(active_request_handle)) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; if let Ok(block) = block { // don't send repeat blocks - let new_hash = block.hash.unwrap(); + let new_hash = + block.hash.expect("blocks here should always have hashes"); if new_hash != last_hash { last_hash = new_hash; - self.send_block_result(Ok(block), &block_sender).await?; + self.clone() + .send_block_result(Ok(block), &block_sender) + .await?; } } else { - // we got an empty block back. thats not good - self.send_block_result(block, &block_sender).await?; + // we did not get a block back. something is up with the server. take it out of rotation + self.clone().send_block_result(block, &block_sender).await?; } } + Ok(HandleResult::RetryAt(retry_at)) => { + warn!(?retry_at, "Rate limited on latest block from {}", self); + sleep_until(retry_at).await; + continue; + } + Ok(HandleResult::None) => { + // TODO: what should we do? + warn!("No handle for latest block from {}", self); + } Err(err) => { warn!(?err, "Rate limited on latest block from {}", self); } @@ -507,10 +531,12 @@ impl Web3Connection { .request("eth_getBlockByNumber", ("latest", false)) .await; - self.send_block_result(block, &block_sender).await?; + self.clone().send_block_result(block, &block_sender).await?; while let Some(new_block) = stream.next().await { - self.send_block_result(Ok(new_block), &block_sender).await?; + self.clone() + .send_block_result(Ok(new_block), &block_sender) + .await?; } warn!(?self, "subscription ended"); @@ -588,44 +614,54 @@ impl Web3Connection { loop { match self.try_request_handle().await { - Ok(pending_request_handle) => return Ok(pending_request_handle), - Err(Some(retry_after)) => { - sleep(retry_after).await; + Ok(HandleResult::ActiveRequest(handle)) => return Ok(handle), + Ok(HandleResult::RetryAt(retry_at)) => { + // TODO: emit a stat? + sleep_until(retry_at).await; } - Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")), + Ok(HandleResult::None) => { + // TODO: when can this happen? emit a stat? + // TODO: instead of erroring, subscribe to the head block on this + // TODO: sleep how long? maybe just error? + sleep(Duration::from_secs(1)).await; + } + // Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")), + Err(err) => return Err(err), } } } - pub async fn try_request_handle( - self: &Arc, - ) -> Result> { + pub async fn try_request_handle(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { - // TODO: how long? use the same amount as the exponential backoff on retry - return Err(Some(Duration::from_secs(1))); + // TODO: emit a stat? + return Ok(HandleResult::None); } // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { match ratelimiter.throttle().await { - Ok(_) => { + Ok(ThrottleResult::Allowed) => { // rate limit succeeded - return Ok(ActiveRequestHandle::new(self.clone())); } - Err(retry_after) => { + Ok(ThrottleResult::RetryAt(retry_at)) => { // rate limit failed // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? - warn!("Exhausted rate limit on {:?}: {:?}", self, retry_after); + warn!(?retry_at, ?self, "Exhausted rate limit"); - return Err(retry_after); + return Ok(HandleResult::RetryAt(retry_at.into())); + } + Err(err) => { + return Err(err); } } }; - Ok(ActiveRequestHandle::new(self.clone())) + let handle = ActiveRequestHandle::new(self.clone()); + + Ok(HandleResult::ActiveRequest(handle)) } } diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 3d5c4448..77536935 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -19,15 +19,15 @@ use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; -use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task; -use tokio::time::{interval, sleep, MissedTickBehavior}; +use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; +use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; -use crate::connection::{ActiveRequestHandle, Web3Connection}; +use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; // Serialize so we can print it on our debug endpoint @@ -278,11 +278,15 @@ impl Web3Connections { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: maximum wait time let pending_transaction: Transaction = match rpc.try_request_handle().await { - Ok(request_handle) => { - request_handle + Ok(HandleResult::ActiveRequest(handle)) => { + handle .request("eth_getTransactionByHash", (pending_tx_id,)) .await? } + Ok(_) => { + // TODO: actually retry? + return Ok(None); + } Err(err) => { trace!( ?pending_tx_id, @@ -897,8 +901,8 @@ impl Web3Connections { &self, skip: &[Arc], min_block_needed: Option<&U64>, - ) -> Result> { - let mut earliest_retry_after = None; + ) -> anyhow::Result { + let mut earliest_retry_at = None; // filter the synced rpcs // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" @@ -923,7 +927,8 @@ impl Web3Connections { }; if synced_rpcs.is_empty() { - return Err(None); + // TODO: what should happen here? might be nicer to retry in a second + return Err(anyhow::anyhow!("not synced")); } let sort_cache: HashMap<_, _> = synced_rpcs @@ -953,29 +958,39 @@ impl Web3Connections { for rpc in synced_rpcs.into_iter() { // increment our connection counter match rpc.try_request_handle().await { - Err(retry_after) => { - earliest_retry_after = earliest_retry_after.min(retry_after); - } - Ok(handle) => { + Ok(HandleResult::ActiveRequest(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); - return Ok(handle); + return Ok(HandleResult::ActiveRequest(handle)); + } + Ok(HandleResult::RetryAt(retry_at)) => { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + Ok(HandleResult::None) => { + // TODO: log a warning? + } + Err(err) => { + // TODO: log a warning? + warn!(?err, "No request handle for {}", rpc) } } } - warn!("no servers on {:?}! {:?}", self, earliest_retry_after); + warn!("no servers on {:?}! {:?}", self, earliest_retry_at); - // this might be None - Err(earliest_retry_after) + match earliest_retry_at { + None => todo!(), + Some(earliest_retry_at) => Ok(HandleResult::RetryAt(earliest_retry_at)), + } } /// get all rpc servers that are not rate limited /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions + // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, min_block_needed: Option<&U64>, - ) -> Result, Option> { - let mut earliest_retry_after = None; + ) -> Result, Option> { + let mut earliest_retry_at = None; // TODO: with capacity? let mut selected_rpcs = vec![]; @@ -988,11 +1003,17 @@ impl Web3Connections { // check rate limits and increment our connection counter match connection.try_request_handle().await { - Err(retry_after) => { + Ok(HandleResult::RetryAt(retry_at)) => { // this rpc is not available. skip it - earliest_retry_after = earliest_retry_after.min(retry_after); + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + Ok(HandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle), + Ok(HandleResult::None) => { + warn!("no request handle for {}", connection) + } + Err(err) => { + warn!(?err, "error getting request handle for {}", connection) } - Ok(handle) => selected_rpcs.push(handle), } } @@ -1001,7 +1022,7 @@ impl Web3Connections { } // return the earliest retry_after (if no rpcs are synced, this will be None) - Err(earliest_retry_after) + Err(earliest_retry_at) } /// be sure there is a timeout on this or it might loop forever @@ -1021,7 +1042,7 @@ impl Web3Connections { .next_upstream_server(&skip_rpcs, min_block_needed) .await { - Ok(active_request_handle) => { + Ok(HandleResult::ActiveRequest(active_request_handle)) => { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); @@ -1074,25 +1095,26 @@ impl Web3Connections { } } } - Err(None) => { - // TODO: is there some way to check if no servers will ever be in sync? - warn!(?self, "No servers in sync!"); + Ok(HandleResult::RetryAt(retry_at)) => { + // TODO: move this to a helper function + // sleep (TODO: with a lock?) until our rate limits should be available + // TODO: if a server catches up sync while we are waiting, we could stop waiting + warn!(?retry_at, "All rate limits exceeded. Sleeping"); + + sleep_until(retry_at).await; + + continue; + } + Ok(HandleResult::None) => { + warn!(?self, "No server handles!"); // TODO: subscribe to something on synced connections. maybe it should just be a watch channel sleep(Duration::from_millis(200)).await; continue; - // return Err(anyhow::anyhow!("no servers in sync")); } - Err(Some(retry_after)) => { - // TODO: move this to a helper function - // sleep (TODO: with a lock?) until our rate limits should be available - // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!(?retry_after, "All rate limits exceeded. Sleeping"); - - sleep(retry_after).await; - - continue; + Err(err) => { + return Err(err); } } } @@ -1141,13 +1163,13 @@ impl Web3Connections { continue; } - Err(Some(retry_after)) => { + Err(Some(retry_at)) => { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting warn!("All rate limits exceeded. Sleeping"); - sleep(retry_after).await; + sleep_until(retry_at).await; continue; } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 3cc9f9b8..dac49bfc 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -162,9 +162,9 @@ pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> info!("listening on port {}", port); // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? axum::Server::bind(&addr) - // TODO: option to use with_connect_info - // .serve(app.into_make_service_with_connect_info::()) - .serve(app.into_make_service()) + // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not + .serve(app.into_make_service_with_connect_info::()) + // .serve(app.into_make_service()) .await .map_err(Into::into) } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 904c147b..d1cdd32b 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -165,20 +165,19 @@ impl fmt::Debug for JsonRpcForwardedResponse { impl JsonRpcForwardedResponse { pub fn from_anyhow_error(err: anyhow::Error, id: Box) -> Self { - let err = format!("{:?}", err); - // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that - warn!("forwarding error. {:?}", err); + warn!(?err, "forwarding error"); JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id, result: None, error: Some(JsonRpcErrorData { - // TODO: set this jsonrpc error code to match the http status code + // TODO: set this jsonrpc error code to match the http status code? or maybe the other way around? code: -32099, // TODO: some errors should be included here. others should not. i think anyhow might not be the right choice - message: "internal server error".to_string(), + // message: "internal server error".to_string(), + message: format!("{:?}", err), data: None, }), }