no timeouts here, we already have a timeout on requests

This commit is contained in:
Bryan Stitt 2022-09-20 03:26:12 +00:00
parent e35a4119e7
commit dfd6aed6e6
6 changed files with 45 additions and 39 deletions

@ -147,12 +147,11 @@ These are roughly in order of completition
- include the old head number and block in the log
- [x] exponential backoff when reconnecting a connection
- [x] once the merge happens, we don't want to use total difficulty and instead just care about the number
- [-] rewrite rate limiting to have a tiered cache. do not put redis in the hot path
- [x] rewrite rate limiting to have a tiered cache. do not put redis in the hot path
- instead, we should check a local cache for the current rate limit (+1) and spawn an update to the local cache from redis in the background.
- [x] when there are a LOT of concurrent requests, we see errors. i thought that was a problem with redis cell, but it happens with my simpler rate limit. now i think the problem is actually with bb8
- https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html or https://crates.io/crates/deadpool-redis?
- WARN http_request: redis_rate_limit::errors: redis error err=Response was of incompatible type: "Response type not string compatible." (response was int(500237)) id=01GC6514JWN5PS1NCWJCGJTC94 method=POST
- [ ] bring back redis-cell?
- [ ] web3_proxy_error_count{path = "backend_rpc/request"} is inflated by a bunch of reverts. do not log reverts as warn.
- erigon gives `method=eth_call reqid=986147 t=1.151551ms err="execution reverted"`
- [ ] opt-in debug mode that inspects responses for reverts and saves the request to the database for the user

@ -52,7 +52,7 @@ where
/// TODO: max_per_period being None means two things. some places it means unlimited, but here it means to use the default. make an enum
pub async fn throttle(
&self,
key: &K,
key: K,
max_per_period: Option<u64>,
count: u64,
) -> anyhow::Result<DeferredRateLimitResult> {
@ -76,7 +76,7 @@ where
// set arc_deferred_rate_limit_result and return the coun
self.local_cache
.get_with(*key, async move {
.get_with(key, async move {
// we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl
.throttle_label(&redis_key, Some(max_per_period), count)
@ -167,9 +167,9 @@ where
Err(err) => {
// don't let redis errors block our users!
error!(
// ?key, // TODO: this errors
?key,
?err,
"unable to query rate limits. local cache available"
"unable to query rate limits, but local cache is available"
);
// TODO: we need to start a timer that resets this count every minute
DeferredRateLimitResult::Allowed

@ -39,7 +39,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::{timeout};
use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{error, info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid;
@ -166,7 +166,7 @@ impl Web3ProxyApp {
let open_request_handle_metrics: Arc<OpenRequestHandleMetrics> = Default::default();
// first, we connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = &top_config.app.db_url {
let db_conn = if let Some(db_url) = top_config.app.db_url.clone() {
let db_min_connections = top_config
.app
.db_min_connections
@ -178,8 +178,7 @@ impl Web3ProxyApp {
.db_max_connections
.unwrap_or(db_min_connections * 2);
let db =
get_migrated_db(db_url.clone(), db_min_connections, redis_max_connections).await?;
let db = get_migrated_db(db_url, db_min_connections, redis_max_connections).await?;
Some(db)
} else {
@ -220,17 +219,15 @@ impl Web3ProxyApp {
.redis_max_connections
.unwrap_or(num_workers * 2);
// TODO: what are reasonable timeouts
// TODO: what are reasonable timeouts?
// TODO: set a wait timeout? maybe somehow just emit a warning if this is long
let redis_pool = RedisConfig::from_url(redis_url)
.builder()?
.create_timeout(Some(Duration::from_secs(5)))
.max_size(redis_max_connections)
.recycle_timeout(Some(Duration::from_secs(5)))
.runtime(DeadpoolRuntime::Tokio1)
.build()?;
// test the pool
// test the redis pool
if let Err(err) = redis_pool.get().await {
error!(
?err,
@ -484,7 +481,8 @@ impl Web3ProxyApp {
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
let msg =
Message::Text(serde_json::to_string(&msg).expect("we made this `msg`"));
if response_sender.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
@ -526,7 +524,9 @@ impl Web3ProxyApp {
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
let msg = Message::Text(
serde_json::to_string(&msg).expect("we made this message"),
);
if response_sender.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
@ -667,6 +667,7 @@ impl Web3ProxyApp {
trace!("Received request: {:?}", request);
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out
let request_id = request.id.clone();
// TODO: if eth_chainId or net_version, serve those without querying the backend
@ -834,7 +835,11 @@ impl Web3ProxyApp {
return Err(anyhow::anyhow!("invalid request"));
}
let param = Bytes::from_str(params[0].as_str().unwrap())?;
let param = Bytes::from_str(
params[0]
.as_str()
.context("parsing params 0 into str then bytes")?,
)?;
let hash = H256::from(keccak256(param));
@ -881,6 +886,8 @@ impl Web3ProxyApp {
request.params.clone().map(|x| x.to_string()),
);
// TODO: remove their request id instead of cloning it
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
let mut response = self
@ -888,15 +895,24 @@ impl Web3ProxyApp {
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
self.balanced_rpcs
// TODO: put the hash here instead?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(request, Some(&request_block_id.num))
.await
.await?;
// discard their id by replacing it with an empty
response.id = Default::default();
Ok::<_, anyhow::Error>(response)
})
.await
.unwrap();
.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference"))
.context("caching response")?;
// since this data came out of a cache, the id is likely wrong.
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
// TODO: cache without the id
response.id = request_id;
return Ok(response);

@ -36,19 +36,19 @@ impl IntoResponse for FrontendErrorResponse {
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_string(
// TODO: is it safe to expose all our anyhow strings?
// TODO: is it safe to expose all of our anyhow strings?
err.to_string(),
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
// TODO: make this better
Self::Box(err) => {
warn!(?err, "boxed");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
// TODO: make this better. maybe include the error type?
"boxed error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
@ -66,17 +66,6 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
// Self::RedisRun(err) => {
// warn!(?err, "redis run");
// (
// StatusCode::INTERNAL_SERVER_ERROR,
// JsonRpcForwardedResponse::from_str(
// "redis run error!",
// Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
// None,
// ),
// )
// }
Self::Response(r) => {
debug_assert_ne!(r.status(), StatusCode::OK);
return r;
@ -95,6 +84,7 @@ impl IntoResponse for FrontendErrorResponse {
Self::RateLimitedIp(ip, retry_at) => {
// TODO: emit a stat
// TODO: include retry_at in the error
// TODO: if retry_at is None, give an unauthorized status code?
(
StatusCode::TOO_MANY_REQUESTS,
JsonRpcForwardedResponse::from_string(
@ -127,6 +117,7 @@ impl IntoResponse for FrontendErrorResponse {
),
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404
(
StatusCode::NOT_FOUND,
JsonRpcForwardedResponse::from_str(

@ -56,7 +56,7 @@ impl Web3ProxyApp {
// TODO: have a local cache because if we hit redis too hard we get errors
// TODO: query redis in the background so that users don't have to wait on this network request
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter.throttle(&ip, None, 1).await {
match rate_limiter.throttle(ip, None, 1).await {
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
@ -117,6 +117,7 @@ impl Web3ProxyApp {
} else {
Some(requests_per_minute)
};
Ok(UserCacheValue::from((user_id, user_count_per_period)))
}
None => Ok(UserCacheValue::from((0, Some(0)))),
@ -144,7 +145,7 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits
if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
match rate_limiter
.throttle(&user_key, Some(user_count_per_period), 1)
.throttle(user_key, Some(user_count_per_period), 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {

@ -245,13 +245,12 @@ pub async fn post_login(
// save the bearer token in redis with a long (7 or 30 day?) expiry. or in database?
let mut redis_conn = app.redis_conn().await?;
// TODO: move this into a struct so this is less fragile
let bearer_key = format!("bearer:{}", bearer_token);
redis_conn.set(bearer_key, u.id.to_string()).await?;
// save the user data in redis with a short expiry
// TODO: we already have uk, so this could be more efficient. it works for now
app.user_data(uk.api_key).await?;
// TODO: save user_data. we already have uk, so this could be more efficient. it works for now
Ok(response)
}