better results and errors

This commit is contained in:
Bryan Stitt 2022-08-07 06:48:57 +00:00
parent 33dda4dad9
commit 439e27101d
14 changed files with 175 additions and 99 deletions

4
Cargo.lock generated

@ -76,9 +76,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.59"
version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91f1f46651137be86f3a2b9a8359f9ab421d04d941c62b5982e1ca21113adf9"
checksum = "c794e162a5eff65c72ef524dfe393eb923c354e350bb78b9c7383df13f3bc142"
dependencies = [
"backtrace",
]

@ -59,9 +59,10 @@
- [x] instead of tracking `pending_synced_connections`, have a mapping of where all connections are individually. then each change, re-check for consensus.
- [x] synced connections swap threshold set to 1 so that it always serves something
- [x] cli tool for creating new users
- [x] incoming rate limiting by api key
- [ ] after a refactor, public rate limit isnt working anymore. i set to 0 but could still request
- [ ] give users different rate limits looked up from the database
- [ ] basic request method stats
- [ ] incoming rate limiting by api key
- [ ] give users different rate limits looked up from the database
- [ ] allow blocking public requests
## V1
@ -108,6 +109,7 @@
- [ ] Public bsc server got “0” for block data limit (ninicoin)
- [ ] If we need an archive server and no servers in sync, exit immediately with an error instead of waiting 60 seconds
- [ ] 60 second timeout is too short. Maybe do that for free tier and larger timeout for paid. Problem is that some queries can take over 1000 seconds
- [ ] refactor from_anyhow_error to have consistent error codes and http codes
new endpoints for users:
- think about where to put this. a separate app might be better

@ -22,7 +22,7 @@ services:
dev-adminer:
image: adminer
ports:
- 8306:8080
- 18306:8080
environment:
ADMINER_DEFAULT_SERVER: dev-db

@ -15,6 +15,7 @@ pub struct Model {
pub description: Option<String>,
pub private_txs: bool,
pub active: bool,
pub requests_per_second: u32,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -124,6 +124,12 @@ impl MigrationTrait for Migration {
.default(true)
.not_null(),
)
.col(
ColumnDef::new(UserKeys::RequestsPerMinute)
.unsigned()
.default(true)
.not_null(),
)
.index(sea_query::Index::create().col(UserKeys::Active))
.foreign_key(
sea_query::ForeignKey::create()
@ -199,8 +205,7 @@ enum BlockList {
-- TODO: what size for api_key
-- TODO: track active with a timestamp?
-- TODO: creation time?
-- TODO: requests_per_second INT,
-- TODO: requests_per_day INT,
-- TODO: requests_per_day? requests_per_second?,
-- TODO: more security features. likely similar to infura
*/
#[derive(Iden)]
@ -212,4 +217,5 @@ enum UserKeys {
Description,
PrivateTxs,
Active,
RequestsPerMinute,
}

@ -5,5 +5,5 @@ authors = ["Bryan Stitt <bryan@stitthappens.com>"]
edition = "2018"
[dependencies]
anyhow = "1.0.59"
anyhow = "1.0.60"
bb8-redis = "0.11.0"

@ -5,7 +5,8 @@ use bb8_redis::redis::cmd;
pub use bb8_redis::redis::RedisError;
pub use bb8_redis::{bb8, RedisConnectionManager};
use std::time::Duration;
use std::ops::Add;
use std::time::{Duration, Instant};
pub type RedisClientPool = bb8::Pool<RedisConnectionManager>;
@ -17,12 +18,17 @@ pub struct RedisCellClient {
default_period: u32,
}
pub enum ThrottleResult {
Allowed,
RetryAt(Instant),
}
impl RedisCellClient {
// todo: seems like this could be derived
// TODO: take something generic for conn
// TODO: use r2d2 for connection pooling?
pub fn new(
pool: bb8::Pool<RedisConnectionManager>,
pool: RedisClientPool,
app: &str,
key: &str,
default_max_burst: u32,
@ -48,13 +54,13 @@ impl RedisCellClient {
count_per_period: Option<u32>,
period: Option<u32>,
quantity: u32,
) -> Result<(), Option<Duration>> {
let mut conn = self.pool.get().await.unwrap();
) -> anyhow::Result<ThrottleResult> {
let mut conn = self.pool.get().await?;
let count_per_period = count_per_period.unwrap_or(self.default_count_per_period);
if count_per_period == 0 {
return Err(None);
return Ok(ThrottleResult::Allowed);
}
let max_burst = max_burst.unwrap_or(self.default_max_burst);
@ -78,24 +84,28 @@ impl RedisCellClient {
let x: Vec<isize> = cmd("CL.THROTTLE")
.arg(&(key, max_burst, count_per_period, period, quantity))
.query_async(&mut *conn)
.await
.unwrap();
.await?;
assert_eq!(x.len(), 5);
// TODO: trace log the result?
// TODO: trace log the result
if x.len() != 5 {
return Err(anyhow::anyhow!("unexpected redis result"));
}
let retry_after = *x.get(3).unwrap();
let retry_after = *x.get(3).expect("index exists above");
if retry_after == -1 {
Ok(())
Ok(ThrottleResult::Allowed)
} else {
Err(Some(Duration::from_secs(retry_after as u64)))
// don't return a duration, return an instant
let retry_at = Instant::now().add(Duration::from_secs(retry_after as u64));
Ok(ThrottleResult::RetryAt(retry_at))
}
}
#[inline]
pub async fn throttle(&self) -> Result<(), Option<Duration>> {
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
self._throttle(&self.key, None, None, None, 1).await
}
@ -106,7 +116,7 @@ impl RedisCellClient {
max_burst: Option<u32>,
count_per_period: Option<u32>,
period: Option<u32>,
) -> Result<(), Option<Duration>> {
) -> anyhow::Result<ThrottleResult> {
let key = format!("{}:{}", self.key, key);
self._throttle(key.as_ref(), max_burst, count_per_period, period, 1)
@ -114,7 +124,7 @@ impl RedisCellClient {
}
#[inline]
pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Option<Duration>> {
pub async fn throttle_quantity(&self, quantity: u32) -> anyhow::Result<ThrottleResult> {
self._throttle(&self.key, None, None, None, quantity).await
}
}

@ -16,7 +16,7 @@ redis-cell-client = { path = "../redis-cell-client" }
entities = { path = "../entities" }
migration = { path = "../migration" }
anyhow = { version = "1.0.59", features = ["backtrace"] }
anyhow = { version = "1.0.60", features = ["backtrace"] }
arc-swap = "1.5.1"
argh = "0.1.8"
axum = { version = "0.5.13", features = ["serde_json", "tokio-tungstenite", "ws"] }

@ -671,7 +671,7 @@ impl Web3ProxyApp {
&self,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
debug!(?request, "proxy_web3_rpc");
trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
@ -689,7 +689,7 @@ impl Web3ProxyApp {
),
};
debug!(?response, "Forwarding response");
trace!(?response, "Forwarding");
Ok(response)
}

@ -12,8 +12,8 @@ use parking_lot::deadlock;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
use std::thread;
use std::time::Duration;
use tokio::runtime;
use tokio::time::Duration;
use tracing::{debug, info};
use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, Web3ProxyApp};

@ -4,7 +4,7 @@ use derive_more::From;
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::StreamExt;
use redis_cell_client::RedisCellClient;
use redis_cell_client::{RedisCellClient, ThrottleResult};
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use std::fmt;
@ -13,12 +13,18 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::RwLock;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
pub enum HandleResult {
ActiveRequest(ActiveRequestHandle),
RetryAt(Instant),
None,
}
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
#[derive(From)]
pub enum Web3Provider {
@ -339,7 +345,7 @@ impl Web3Connection {
#[instrument(skip_all)]
async fn send_block_result(
self: &Arc<Self>,
self: Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
) -> anyhow::Result<()> {
@ -355,12 +361,18 @@ impl Web3Connection {
}
block_sender
.send_async((Arc::new(block), self.clone()))
.send_async((Arc::new(block), self))
.await
.context("block_sender")?;
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
// send an empty block to take this server out of rotation
block_sender
.send_async((Arc::new(Block::default()), self))
.await
.context("block_sender")?;
}
}
@ -452,25 +464,37 @@ impl Web3Connection {
loop {
match self.try_request_handle().await {
Ok(active_request_handle) => {
Ok(HandleResult::ActiveRequest(active_request_handle)) => {
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
if let Ok(block) = block {
// don't send repeat blocks
let new_hash = block.hash.unwrap();
let new_hash =
block.hash.expect("blocks here should always have hashes");
if new_hash != last_hash {
last_hash = new_hash;
self.send_block_result(Ok(block), &block_sender).await?;
self.clone()
.send_block_result(Ok(block), &block_sender)
.await?;
}
} else {
// we got an empty block back. thats not good
self.send_block_result(block, &block_sender).await?;
// we did not get a block back. something is up with the server. take it out of rotation
self.clone().send_block_result(block, &block_sender).await?;
}
}
Ok(HandleResult::RetryAt(retry_at)) => {
warn!(?retry_at, "Rate limited on latest block from {}", self);
sleep_until(retry_at).await;
continue;
}
Ok(HandleResult::None) => {
// TODO: what should we do?
warn!("No handle for latest block from {}", self);
}
Err(err) => {
warn!(?err, "Rate limited on latest block from {}", self);
}
@ -507,10 +531,12 @@ impl Web3Connection {
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block_result(block, &block_sender).await?;
self.clone().send_block_result(block, &block_sender).await?;
while let Some(new_block) = stream.next().await {
self.send_block_result(Ok(new_block), &block_sender).await?;
self.clone()
.send_block_result(Ok(new_block), &block_sender)
.await?;
}
warn!(?self, "subscription ended");
@ -588,44 +614,54 @@ impl Web3Connection {
loop {
match self.try_request_handle().await {
Ok(pending_request_handle) => return Ok(pending_request_handle),
Err(Some(retry_after)) => {
sleep(retry_after).await;
Ok(HandleResult::ActiveRequest(handle)) => return Ok(handle),
Ok(HandleResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
sleep_until(retry_at).await;
}
Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")),
Ok(HandleResult::None) => {
// TODO: when can this happen? emit a stat?
// TODO: instead of erroring, subscribe to the head block on this
// TODO: sleep how long? maybe just error?
sleep(Duration::from_secs(1)).await;
}
// Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")),
Err(err) => return Err(err),
}
}
}
pub async fn try_request_handle(
self: &Arc<Self>,
) -> Result<ActiveRequestHandle, Option<Duration>> {
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<HandleResult> {
// check that we are connected
if !self.has_provider().await {
// TODO: how long? use the same amount as the exponential backoff on retry
return Err(Some(Duration::from_secs(1)));
// TODO: emit a stat?
return Ok(HandleResult::None);
}
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
match ratelimiter.throttle().await {
Ok(_) => {
Ok(ThrottleResult::Allowed) => {
// rate limit succeeded
return Ok(ActiveRequestHandle::new(self.clone()));
}
Err(retry_after) => {
Ok(ThrottleResult::RetryAt(retry_at)) => {
// rate limit failed
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
// TODO: use tracing better
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
warn!("Exhausted rate limit on {:?}: {:?}", self, retry_after);
warn!(?retry_at, ?self, "Exhausted rate limit");
return Err(retry_after);
return Ok(HandleResult::RetryAt(retry_at.into()));
}
Err(err) => {
return Err(err);
}
}
};
Ok(ActiveRequestHandle::new(self.clone()))
let handle = ActiveRequestHandle::new(self.clone());
Ok(HandleResult::ActiveRequest(handle))
}
}

@ -19,15 +19,15 @@ use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, MissedTickBehavior};
use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior};
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, instrument, trace, warn};
use crate::app::{flatten_handle, AnyhowJoinHandle, TxState};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
use crate::connection::{ActiveRequestHandle, HandleResult, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
// Serialize so we can print it on our debug endpoint
@ -278,11 +278,15 @@ impl Web3Connections {
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: maximum wait time
let pending_transaction: Transaction = match rpc.try_request_handle().await {
Ok(request_handle) => {
request_handle
Ok(HandleResult::ActiveRequest(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,))
.await?
}
Ok(_) => {
// TODO: actually retry?
return Ok(None);
}
Err(err) => {
trace!(
?pending_tx_id,
@ -897,8 +901,8 @@ impl Web3Connections {
&self,
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
) -> Result<ActiveRequestHandle, Option<Duration>> {
let mut earliest_retry_after = None;
) -> anyhow::Result<HandleResult> {
let mut earliest_retry_at = None;
// filter the synced rpcs
// TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
@ -923,7 +927,8 @@ impl Web3Connections {
};
if synced_rpcs.is_empty() {
return Err(None);
// TODO: what should happen here? might be nicer to retry in a second
return Err(anyhow::anyhow!("not synced"));
}
let sort_cache: HashMap<_, _> = synced_rpcs
@ -953,29 +958,39 @@ impl Web3Connections {
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_request_handle().await {
Err(retry_after) => {
earliest_retry_after = earliest_retry_after.min(retry_after);
}
Ok(handle) => {
Ok(HandleResult::ActiveRequest(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(handle);
return Ok(HandleResult::ActiveRequest(handle));
}
Ok(HandleResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(HandleResult::None) => {
// TODO: log a warning?
}
Err(err) => {
// TODO: log a warning?
warn!(?err, "No request handle for {}", rpc)
}
}
}
warn!("no servers on {:?}! {:?}", self, earliest_retry_after);
warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
// this might be None
Err(earliest_retry_after)
match earliest_retry_at {
None => todo!(),
Some(earliest_retry_at) => Ok(HandleResult::RetryAt(earliest_retry_at)),
}
}
/// get all rpc servers that are not rate limited
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
// TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers(
&self,
min_block_needed: Option<&U64>,
) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> {
let mut earliest_retry_after = None;
) -> Result<Vec<ActiveRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
// TODO: with capacity?
let mut selected_rpcs = vec![];
@ -988,11 +1003,17 @@ impl Web3Connections {
// check rate limits and increment our connection counter
match connection.try_request_handle().await {
Err(retry_after) => {
Ok(HandleResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_after = earliest_retry_after.min(retry_after);
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(HandleResult::ActiveRequest(handle)) => selected_rpcs.push(handle),
Ok(HandleResult::None) => {
warn!("no request handle for {}", connection)
}
Err(err) => {
warn!(?err, "error getting request handle for {}", connection)
}
Ok(handle) => selected_rpcs.push(handle),
}
}
@ -1001,7 +1022,7 @@ impl Web3Connections {
}
// return the earliest retry_after (if no rpcs are synced, this will be None)
Err(earliest_retry_after)
Err(earliest_retry_at)
}
/// be sure there is a timeout on this or it might loop forever
@ -1021,7 +1042,7 @@ impl Web3Connections {
.next_upstream_server(&skip_rpcs, min_block_needed)
.await
{
Ok(active_request_handle) => {
Ok(HandleResult::ActiveRequest(active_request_handle)) => {
// save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection());
@ -1074,25 +1095,26 @@ impl Web3Connections {
}
}
}
Err(None) => {
// TODO: is there some way to check if no servers will ever be in sync?
warn!(?self, "No servers in sync!");
Ok(HandleResult::RetryAt(retry_at)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!(?retry_at, "All rate limits exceeded. Sleeping");
sleep_until(retry_at).await;
continue;
}
Ok(HandleResult::None) => {
warn!(?self, "No server handles!");
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep(Duration::from_millis(200)).await;
continue;
// return Err(anyhow::anyhow!("no servers in sync"));
}
Err(Some(retry_after)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!(?retry_after, "All rate limits exceeded. Sleeping");
sleep(retry_after).await;
continue;
Err(err) => {
return Err(err);
}
}
}
@ -1141,13 +1163,13 @@ impl Web3Connections {
continue;
}
Err(Some(retry_after)) => {
Err(Some(retry_at)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!("All rate limits exceeded. Sleeping");
sleep(retry_after).await;
sleep_until(retry_at).await;
continue;
}

@ -162,9 +162,9 @@ pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()>
info!("listening on port {}", port);
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
axum::Server::bind(&addr)
// TODO: option to use with_connect_info
// .serve(app.into_make_service_with_connect_info::<SocketAddr>())
.serve(app.into_make_service())
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
// .serve(app.into_make_service())
.await
.map_err(Into::into)
}

@ -165,20 +165,19 @@ impl fmt::Debug for JsonRpcForwardedResponse {
impl JsonRpcForwardedResponse {
pub fn from_anyhow_error(err: anyhow::Error, id: Box<RawValue>) -> Self {
let err = format!("{:?}", err);
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
warn!("forwarding error. {:?}", err);
warn!(?err, "forwarding error");
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id,
result: None,
error: Some(JsonRpcErrorData {
// TODO: set this jsonrpc error code to match the http status code
// TODO: set this jsonrpc error code to match the http status code? or maybe the other way around?
code: -32099,
// TODO: some errors should be included here. others should not. i think anyhow might not be the right choice
message: "internal server error".to_string(),
// message: "internal server error".to_string(),
message: format!("{:?}", err),
data: None,
}),
}