i think it works

This commit is contained in:
Bryan Stitt 2022-10-11 21:31:34 +00:00
parent 552f3dbffc
commit 76c8f1ef96
9 changed files with 138 additions and 61 deletions

2
Cargo.lock generated

@ -5493,7 +5493,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.2.0"
version = "0.4.0"
dependencies = [
"anyhow",
"arc-swap",

@ -17,6 +17,7 @@ pub struct Model {
pub frontend_requests: u32,
pub backend_requests: u32,
pub backend_retries: u32,
pub no_servers: u32,
pub cache_misses: u32,
pub cache_hits: u32,
pub sum_request_bytes: u64,

@ -54,6 +54,11 @@ impl MigrationTrait for Migration {
.big_unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::NoServers)
.big_unsigned()
.not_null(),
)
.col(
ColumnDef::new(RpcAccounting::CacheMisses)
.big_unsigned()
@ -207,6 +212,7 @@ enum RpcAccounting {
FrontendRequests,
BackendRequests,
BackendRetries,
NoServers,
CacheMisses,
CacheHits,
SumRequestBytes,

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.2.0"
version = "0.4.0"
edition = "2021"
default-run = "web3_proxy"

@ -963,6 +963,7 @@ impl Web3ProxyApp {
}
_ => {
// TODO: this needs the correct error code in the response
// TODO: emit stat?
return Err(anyhow::anyhow!("invalid request"));
}
}
@ -1030,6 +1031,7 @@ impl Web3ProxyApp {
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response (or error)
Ok::<_, anyhow::Error>(response)
})
.await
@ -1043,9 +1045,9 @@ impl Web3ProxyApp {
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
// TODO: cache without the id
response.id = request_id;
// DRY this up by just returning the partial result (or error) here
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
@ -1066,6 +1068,16 @@ impl Web3ProxyApp {
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
) {
let response_stat =
ProxyResponseStat::new(request.method, authorized_key, request_metadata, &response);
stat_sender.send_async(response_stat.into()).await?;
}
Ok(response)
}
}

@ -58,6 +58,7 @@ pub struct RequestMetadata {
pub request_bytes: u64,
/// if this is 0, there was a cache_hit
pub backend_requests: AtomicU32,
pub no_servers: AtomicU32,
pub error_response: AtomicBool,
pub response_bytes: AtomicU64,
pub response_millis: AtomicU64,

@ -366,6 +366,7 @@ impl Web3Connections {
pub async fn next_upstream_server(
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
@ -444,11 +445,20 @@ impl Web3Connections {
match earliest_retry_at {
None => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: error works, but maybe we should just wait a second?
Err(anyhow::anyhow!("no servers synced"))
}
Some(earliest_retry_at) => {
warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
Ok(OpenRequestResult::RetryAt(earliest_retry_at))
}
}
@ -513,7 +523,12 @@ impl Web3Connections {
break;
}
match self
.next_upstream_server(authorized_request, &skip_rpcs, min_block_needed)
.next_upstream_server(
authorized_request,
request_metadata,
&skip_rpcs,
min_block_needed,
)
.await?
{
OpenRequestResult::Handle(active_request_handle) => {
@ -593,6 +608,11 @@ impl Web3Connections {
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!(?retry_at, "All rate limits exceeded. Sleeping");
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
sleep_until(retry_at).await;
continue;
@ -600,6 +620,10 @@ impl Web3Connections {
OpenRequestResult::RetryNever => {
warn!(?self, "No server handles!");
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep(Duration::from_millis(200)).await;
@ -608,6 +632,13 @@ impl Web3Connections {
}
}
// TODO: do we need this here, or do we do it somewhere else?
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len()))
}
@ -629,6 +660,13 @@ impl Web3Connections {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO: this is not working right. simplify
if let Some(request_metadata) = request_metadata {
request_metadata
.backend_requests
.fetch_add(active_request_handles.len() as u32, Ordering::Release);
}
let quorum_response = self
.try_send_parallel_requests(
active_request_handles,
@ -649,6 +687,10 @@ impl Web3Connections {
Err(None) => {
warn!(?self, "No servers in sync! Retrying");
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
// TODO: i don't think this will ever happen
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
@ -664,6 +706,10 @@ impl Web3Connections {
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!("All rate limits exceeded. Sleeping");
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
sleep_until(retry_at).await;
continue;

@ -247,11 +247,45 @@ impl OpenRequestHandle {
error_handler
};
// TODO: check for "execution reverted" here
// check for "execution reverted" here
let is_revert = if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match provider {
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
Some(&err.message)
} else {
None
}
}
Web3Provider::Ws(_) => {
if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
Some(&err.message)
} else {
None
}
}
};
if let Some(msg) = msg {
msg.starts_with("execution reverted")
} else {
false
}
} else {
false
};
match error_handler {
RequestErrorHandler::DebugLevel => {
debug!(?err, %method, rpc=%self.conn, "bad response!");
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if !is_revert {
debug!(?err, %method, rpc=%self.conn, "bad response!");
}
}
RequestErrorHandler::ErrorLevel => {
error!(?err, %method, rpc=%self.conn, "bad response!");
@ -260,55 +294,21 @@ impl OpenRequestHandle {
warn!(?err, %method, rpc=%self.conn, "bad response!");
}
RequestErrorHandler::SaveReverts => {
// TODO: logging every one is going to flood the database
// TODO: have a percent chance to do this. or maybe a "logged reverts per second"
if let ProviderError::JsonRpcClientError(err) = err {
// Http and Ws errors are very similar, but different types
let msg = match provider {
Web3Provider::Http(_) => {
if let Some(HttpClientError::JsonRpcError(err)) =
err.downcast_ref::<HttpClientError>()
{
Some(&err.message)
} else {
None
}
}
Web3Provider::Ws(_) => {
if let Some(WsClientError::JsonRpcError(err)) =
err.downcast_ref::<WsClientError>()
{
Some(&err.message)
} else {
None
}
}
};
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method = Method::try_from_value(&method.to_string()).unwrap();
if let Some(msg) = msg {
if msg.starts_with("execution reverted") {
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
let method: Method =
Method::try_from_value(&method.to_string()).unwrap();
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
let params: EthCallParams = serde_json::from_value(json!(params))
.context("parsing params to EthCallParams")
.unwrap();
// TODO: DO NOT UNWRAP! But also figure out the best way to keep returning ProviderErrors here
let params: EthCallParams = serde_json::from_value(json!(params))
.context("parsing params to EthCallParams")
.unwrap();
// spawn saving to the database so we don't slow down the request
let f = self
.authorized_request
.clone()
.save_revert(method, params.0 .0);
// spawn saving to the database so we don't slow down the request
let f = self
.authorized_request
.clone()
.save_revert(method, params.0 .0);
tokio::spawn(async move { f.await });
} else {
// TODO: log any of the errors?
debug!(?err, %method, rpc=%self.conn, "bad response!");
}
}
}
tokio::spawn(f);
}
}
} else {

@ -64,6 +64,7 @@ pub struct ProxyResponseAggregate {
frontend_requests: AtomicU32,
backend_requests: AtomicU32,
backend_retries: AtomicU32,
no_servers: AtomicU32,
cache_misses: AtomicU32,
cache_hits: AtomicU32,
sum_request_bytes: AtomicU64,
@ -122,12 +123,11 @@ impl ProxyResponseStat {
let period_timestamp =
(metadata.datetime.timestamp() as u64) / period_seconds * period_seconds;
let request_bytes = metadata.request_bytes;
let response_millis = metadata
.datetime
.signed_duration_since(Utc::now())
.num_seconds() as u64;
let error_response = metadata.error_response.load(Ordering::Acquire);
let response_millis =
(Utc::now().timestamp_millis() - metadata.datetime.timestamp_millis()) as u64;
Self {
user_key_id: authorized_key.user_key_id,
method,
@ -226,6 +226,7 @@ impl StatEmitter {
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
let backend_requests = v.backend_requests.load(Ordering::Acquire);
let backend_retries = v.backend_retries.load(Ordering::Acquire);
let no_servers = v.no_servers.load(Ordering::Acquire);
let cache_misses = v.cache_misses.load(Ordering::Acquire);
let cache_hits = v.cache_hits.load(Ordering::Acquire);
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
@ -274,6 +275,7 @@ impl StatEmitter {
frontend_requests: sea_orm::Set(frontend_requests),
backend_requests: sea_orm::Set(backend_requests),
backend_retries: sea_orm::Set(backend_retries),
no_servers: sea_orm::Set(no_servers),
cache_misses: sea_orm::Set(cache_misses),
cache_hits: sea_orm::Set(cache_hits),
@ -347,6 +349,7 @@ impl StatEmitter {
frontend_requests: 0.into(),
backend_requests: 0.into(),
backend_retries: 0.into(),
no_servers: 0.into(),
cache_misses: 0.into(),
cache_hits: 0.into(),
sum_request_bytes: 0.into(),
@ -364,10 +367,18 @@ impl StatEmitter {
.frontend_requests
.fetch_add(1, Ordering::Acquire);
// a stat might have multiple backend requests
user_aggregate
.backend_requests
.fetch_add(stat.backend_requests, Ordering::Acquire);
if stat.backend_requests == 0 {
// no backend request. cache hit!
user_aggregate.cache_hits.fetch_add(1, Ordering::Acquire);
} else {
// backend requests! cache miss!
user_aggregate.cache_misses.fetch_add(1, Ordering::Acquire);
// a stat might have multiple backend requests
user_aggregate
.backend_requests
.fetch_add(stat.backend_requests, Ordering::Acquire);
}
user_aggregate
.sum_request_bytes
@ -386,8 +397,8 @@ impl StatEmitter {
// TODO: use `record_correct`?
histograms.request_bytes.record(stat.request_bytes)?;
histograms.response_bytes.record(stat.response_bytes)?;
histograms.response_millis.record(stat.response_millis)?;
histograms.response_bytes.record(stat.response_bytes)?;
}
}
}