make it work

This commit is contained in:
Bryan Stitt 2022-08-06 05:26:43 +00:00
parent 2d6c1dfb16
commit 3d67fcf74c
8 changed files with 67 additions and 39 deletions

@ -206,3 +206,5 @@ in another repo: event subscriber
- [ ] better error handling. we warn too often for validation errors and use the same error code for most every request
- [ ] use &str more instead of String. lifetime annotations get really annoying though
- [ ] tarpit instead of reject requests (unless theres a lot)
- [ ] tune database connection pool size. i think a single web3-proxy currently maxes out our server
- [ ] subscribing to transactions should be configurable per server. listening to paid servers can get expensive

@ -2,7 +2,7 @@
chain_id = 1
db_url = "mysql://root:dev_web3_proxy@dev-mysql:3306/dev_web3_proxy"
redis_url = "redis://dev-redis:6379/"
public_rate_limit_per_minute = 60_000
public_rate_limit_per_minute = 0
# 1GB of cache
response_cache_max_bytes = 10000000000

@ -48,11 +48,16 @@ impl RedisCellClient {
count_per_period: Option<u32>,
period: Option<u32>,
quantity: u32,
) -> Result<(), Duration> {
) -> Result<(), Option<Duration>> {
let mut conn = self.pool.get().await.unwrap();
let max_burst = max_burst.unwrap_or(self.default_max_burst);
let count_per_period = count_per_period.unwrap_or(self.default_count_per_period);
if count_per_period == 0 {
return Err(None);
}
let max_burst = max_burst.unwrap_or(self.default_max_burst);
let period = period.unwrap_or(self.default_period);
/*
@ -85,24 +90,31 @@ impl RedisCellClient {
if retry_after == -1 {
Ok(())
} else {
Err(Duration::from_secs(retry_after as u64))
Err(Some(Duration::from_secs(retry_after as u64)))
}
}
#[inline]
pub async fn throttle(&self) -> Result<(), Duration> {
pub async fn throttle(&self) -> Result<(), Option<Duration>> {
self._throttle(&self.key, None, None, None, 1).await
}
#[inline]
pub async fn throttle_key(&self, key: &str) -> Result<(), Duration> {
pub async fn throttle_key(
&self,
key: &str,
max_burst: Option<u32>,
count_per_period: Option<u32>,
period: Option<u32>,
) -> Result<(), Option<Duration>> {
let key = format!("{}:{}", self.key, key);
self._throttle(key.as_ref(), None, None, None, 1).await
self._throttle(key.as_ref(), max_burst, count_per_period, period, 1)
.await
}
#[inline]
pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Duration> {
pub async fn throttle_quantity(&self, quantity: u32) -> Result<(), Option<Duration>> {
self._throttle(&self.key, None, None, None, quantity).await
}
}

@ -435,20 +435,16 @@ impl Web3ProxyApp {
// TODO: how much should we allow?
let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3;
let public_rate_limiter = if app_config.shared.public_rate_limit_per_minute == 0 {
None
} else {
redis_client_pool.as_ref().map(|redis_client_pool| {
RedisCellClient::new(
redis_client_pool.clone(),
"web3-proxy",
"ip",
public_max_burst,
app_config.shared.public_rate_limit_per_minute,
60,
)
})
};
let frontend_rate_limiter = redis_client_pool.as_ref().map(|redis_client_pool| {
RedisCellClient::new(
redis_client_pool.clone(),
"web3-proxy",
"frontend",
public_max_burst,
app_config.shared.public_rate_limit_per_minute,
60,
)
});
let app = Self {
balanced_rpcs,
@ -459,7 +455,7 @@ impl Web3ProxyApp {
head_block_receiver,
pending_tx_sender,
pending_transactions,
rate_limiter: public_rate_limiter,
rate_limiter: frontend_rate_limiter,
db_conn,
};

@ -179,7 +179,7 @@ impl Web3Connection {
// TODO: this will wait forever. do we want that?
let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle()
.await
.await?
.request("eth_chainId", Option::None::<()>)
.await;
@ -243,7 +243,7 @@ impl Web3Connection {
let archive_result: Result<Bytes, _> = new_connection
.wait_for_request_handle()
.await
.await?
.request(
"eth_getCode",
(
@ -503,7 +503,7 @@ impl Web3Connection {
// all it does is print "new block" for the same block as current block
let block: Result<Block<TxHash>, _> = self
.wait_for_request_handle()
.await
.await?
.request("eth_getBlockByNumber", ("latest", false))
.await;
@ -583,24 +583,27 @@ impl Web3Connection {
/// be careful with this; it will wait forever!
#[instrument(skip_all)]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<ActiveRequestHandle> {
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
loop {
match self.try_request_handle().await {
Ok(pending_request_handle) => return pending_request_handle,
Err(retry_after) => {
Ok(pending_request_handle) => return Ok(pending_request_handle),
Err(Some(retry_after)) => {
sleep(retry_after).await;
}
Err(None) => return Err(anyhow::anyhow!("rate limit will never succeed")),
}
}
}
pub async fn try_request_handle(self: &Arc<Self>) -> Result<ActiveRequestHandle, Duration> {
pub async fn try_request_handle(
self: &Arc<Self>,
) -> Result<ActiveRequestHandle, Option<Duration>> {
// check that we are connected
if !self.has_provider().await {
// TODO: how long? use the same amount as the exponential backoff on retry
return Err(Duration::from_secs(1));
return Err(Some(Duration::from_secs(1)));
}
// check rate limits

@ -954,7 +954,7 @@ impl Web3Connections {
// increment our connection counter
match rpc.try_request_handle().await {
Err(retry_after) => {
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
earliest_retry_after = earliest_retry_after.min(retry_after);
}
Ok(handle) => {
trace!("next server on {:?}: {:?}", self, rpc);
@ -990,7 +990,7 @@ impl Web3Connections {
match connection.try_request_handle().await {
Err(retry_after) => {
// this rpc is not available. skip it
earliest_retry_after = earliest_retry_after.min(Some(retry_after));
earliest_retry_after = earliest_retry_after.min(retry_after);
}
Ok(handle) => selected_rpcs.push(handle),
}

@ -13,7 +13,7 @@ pub async fn handler_404() -> impl IntoResponse {
/// TODO: use this. i can't get <https://docs.rs/axum/latest/axum/error_handling/index.html> to work
/// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult`
pub async fn handle_anyhow_error(
code: Option<StatusCode>,
http_code: Option<StatusCode>,
id: Option<Box<RawValue>>,
err: anyhow::Error,
) -> impl IntoResponse {
@ -25,7 +25,7 @@ pub async fn handle_anyhow_error(
// TODO: logs here are too verbose. emit a stat
// warn!("Responding with error: {:?}", err);
let code = code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let code = http_code.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
(code, Json(err))
}

@ -30,14 +30,18 @@ pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), imp
// TODO: dry this up with rate_limit_by_key
if let Some(rate_limiter) = app.rate_limiter() {
if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() {
if rate_limiter
.throttle_key(&rate_limiter_key, None, None, None)
.await
.is_err()
{
// TODO: set headers so they know when they can retry
// warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
anyhow::anyhow!("too many requests"),
anyhow::anyhow!("too many requests from this ip"),
)
.await
.into_response());
@ -76,8 +80,19 @@ pub async fn rate_limit_by_key(
Ok::<Option<i64>, _>(Some(_)) => {
// user key is valid
if let Some(rate_limiter) = app.rate_limiter() {
// TODO: check the db for this? maybe add to the find above with a join?
let user_count_per_period = 100_000;
// TODO: how does max burst actually work? what should it be?
let user_max_burst = user_count_per_period;
let user_period = 1;
if rate_limiter
.throttle_key(&user_key.to_string())
.throttle_key(
&user_key.to_string(),
Some(user_max_burst),
Some(user_count_per_period),
Some(user_period),
)
.await
.is_err()
{
@ -87,7 +102,7 @@ pub async fn rate_limit_by_key(
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
anyhow::anyhow!("too many requests"),
anyhow::anyhow!("too many requests from this key"),
)
.await
.into_response());