refactor
This commit is contained in:
parent
07fbd3c71d
commit
d8c8e6591d
@ -14,7 +14,7 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio::time::{sleep, timeout};
|
use tokio::time::timeout;
|
||||||
use tracing::{debug, info, instrument, trace, warn};
|
use tracing::{debug, info, instrument, trace, warn};
|
||||||
|
|
||||||
static APP_USER_AGENT: &str = concat!(
|
static APP_USER_AGENT: &str = concat!(
|
||||||
@ -239,158 +239,89 @@ impl Web3ProxyApp {
|
|||||||
|
|
||||||
// TODO: how much should we retry? probably with a timeout and not with a count like this
|
// TODO: how much should we retry? probably with a timeout and not with a count like this
|
||||||
// TODO: think more about this loop.
|
// TODO: think more about this loop.
|
||||||
for _i in 0..10usize {
|
// // TODO: add more to this span
|
||||||
// // TODO: add more to this span
|
// let span = info_span!("i", ?i);
|
||||||
// let span = info_span!("i", ?i);
|
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
|
||||||
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
|
if request.method == "eth_sendRawTransaction" {
|
||||||
if request.method == "eth_sendRawTransaction" {
|
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
return self
|
||||||
return self
|
.private_rpcs
|
||||||
.private_rpcs
|
.try_send_all_upstream_servers(request)
|
||||||
.try_send_all_upstream_servers(request)
|
.await;
|
||||||
.await;
|
} else {
|
||||||
} else {
|
// this is not a private transaction (or no private relays are configured)
|
||||||
// this is not a private transaction (or no private relays are configured)
|
|
||||||
|
|
||||||
let (cache_key, response_cache) = match self.get_cached_response(&request) {
|
let (cache_key, response_cache) = match self.get_cached_response(&request) {
|
||||||
(cache_key, Ok(response)) => {
|
(cache_key, Ok(response)) => {
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
let _ = self.incoming_requests.remove(&cache_key);
|
||||||
|
|
||||||
return Ok(response);
|
return Ok(response);
|
||||||
}
|
|
||||||
(cache_key, Err(response_cache)) => (cache_key, response_cache),
|
|
||||||
};
|
|
||||||
|
|
||||||
// check if this request is already in flight
|
|
||||||
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
|
|
||||||
let (incoming_tx, incoming_rx) = watch::channel(true);
|
|
||||||
let mut other_incoming_rx = None;
|
|
||||||
match self.incoming_requests.entry(cache_key.clone()) {
|
|
||||||
dashmap::mapref::entry::Entry::Occupied(entry) => {
|
|
||||||
other_incoming_rx = Some(entry.get().clone());
|
|
||||||
}
|
|
||||||
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
|
||||||
entry.insert(incoming_rx);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
(cache_key, Err(response_cache)) => (cache_key, response_cache),
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(mut other_incoming_rx) = other_incoming_rx {
|
// check if this request is already in flight
|
||||||
// wait for the other request to finish. it might have finished successfully or with an error
|
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
|
||||||
trace!("{:?} waiting on in-flight request", request);
|
let (incoming_tx, incoming_rx) = watch::channel(true);
|
||||||
|
let mut other_incoming_rx = None;
|
||||||
let _ = other_incoming_rx.changed().await;
|
match self.incoming_requests.entry(cache_key.clone()) {
|
||||||
|
dashmap::mapref::entry::Entry::Occupied(entry) => {
|
||||||
// now that we've waited, lets check the cache again
|
other_incoming_rx = Some(entry.get().clone());
|
||||||
if let Some(cached) = response_cache.read().get(&cache_key) {
|
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
|
|
||||||
// TODO: emit a stat
|
|
||||||
trace!(
|
|
||||||
"{:?} cache hit after waiting for in-flight request!",
|
|
||||||
request
|
|
||||||
);
|
|
||||||
|
|
||||||
return Ok(cached.to_owned());
|
|
||||||
} else {
|
|
||||||
// TODO: emit a stat
|
|
||||||
trace!(
|
|
||||||
"{:?} cache miss after waiting for in-flight request!",
|
|
||||||
request
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
||||||
// TODO: move this whole match to a function on self.balanced_rpcs. incoming requests checks makes it awkward
|
entry.insert(incoming_rx);
|
||||||
match self.balanced_rpcs.next_upstream_server().await {
|
|
||||||
Ok(active_request_handle) => {
|
|
||||||
let response = active_request_handle
|
|
||||||
.request(&request.method, &request.params)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let response = match response {
|
|
||||||
Ok(partial_response) => {
|
|
||||||
// TODO: trace here was really slow with millions of requests.
|
|
||||||
// trace!("forwarding request from {}", upstream_server);
|
|
||||||
|
|
||||||
let response = JsonRpcForwardedResponse {
|
|
||||||
jsonrpc: "2.0".to_string(),
|
|
||||||
id: request.id,
|
|
||||||
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
|
||||||
result: Some(partial_response),
|
|
||||||
error: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
|
||||||
let mut response_cache = response_cache.write();
|
|
||||||
|
|
||||||
// TODO: cache the warp::reply to save us serializing every time
|
|
||||||
response_cache.insert(cache_key.clone(), response.clone());
|
|
||||||
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
|
||||||
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
|
|
||||||
response_cache.pop_front();
|
|
||||||
}
|
|
||||||
|
|
||||||
drop(response_cache);
|
|
||||||
|
|
||||||
// TODO: needing to remove manually here makes me think we should do this differently
|
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
|
|
||||||
response
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// send now since we aren't going to cache an error response
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
|
|
||||||
JsonRpcForwardedResponse::from_ethers_error(e, request.id)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: needing to remove manually here makes me think we should do this differently
|
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
|
||||||
|
|
||||||
if response.error.is_some() {
|
|
||||||
trace!("Sending error reply: {:?}", response);
|
|
||||||
|
|
||||||
// errors already sent false to the in_flight_tx
|
|
||||||
} else {
|
|
||||||
trace!("Sending reply: {:?}", response);
|
|
||||||
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(response);
|
|
||||||
}
|
|
||||||
Err(None) => {
|
|
||||||
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
|
|
||||||
warn!("No servers in sync!");
|
|
||||||
|
|
||||||
// TODO: needing to remove manually here makes me think we should do this differently
|
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
|
|
||||||
return Err(anyhow::anyhow!("no servers in sync"));
|
|
||||||
}
|
|
||||||
Err(Some(retry_after)) => {
|
|
||||||
// TODO: move this to a helper function
|
|
||||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
|
||||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
|
||||||
warn!("All rate limits exceeded. Sleeping");
|
|
||||||
|
|
||||||
sleep(retry_after).await;
|
|
||||||
|
|
||||||
// TODO: needing to remove manually here makes me think we should do this differently
|
|
||||||
let _ = self.incoming_requests.remove(&cache_key);
|
|
||||||
let _ = incoming_tx.send(false);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
Err(anyhow::anyhow!("internal error"))
|
if let Some(mut other_incoming_rx) = other_incoming_rx {
|
||||||
|
// wait for the other request to finish. it might have finished successfully or with an error
|
||||||
|
trace!("{:?} waiting on in-flight request", request);
|
||||||
|
|
||||||
|
let _ = other_incoming_rx.changed().await;
|
||||||
|
|
||||||
|
// now that we've waited, lets check the cache again
|
||||||
|
if let Some(cached) = response_cache.read().get(&cache_key) {
|
||||||
|
let _ = self.incoming_requests.remove(&cache_key);
|
||||||
|
let _ = incoming_tx.send(false);
|
||||||
|
|
||||||
|
// TODO: emit a stat
|
||||||
|
trace!(
|
||||||
|
"{:?} cache hit after waiting for in-flight request!",
|
||||||
|
request
|
||||||
|
);
|
||||||
|
|
||||||
|
return Ok(cached.to_owned());
|
||||||
|
} else {
|
||||||
|
// TODO: emit a stat
|
||||||
|
trace!(
|
||||||
|
"{:?} cache miss after waiting for in-flight request!",
|
||||||
|
request
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = self
|
||||||
|
.balanced_rpcs
|
||||||
|
.try_send_best_upstream_server(request)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
||||||
|
let mut response_cache = response_cache.write();
|
||||||
|
|
||||||
|
// TODO: cache the warp::reply to save us serializing every time
|
||||||
|
response_cache.insert(cache_key.clone(), response.clone());
|
||||||
|
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
||||||
|
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
|
||||||
|
response_cache.pop_front();
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(response_cache);
|
||||||
|
|
||||||
|
let _ = self.incoming_requests.remove(&cache_key);
|
||||||
|
let _ = incoming_tx.send(false);
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -434,6 +434,49 @@ impl Web3Connections {
|
|||||||
Err(earliest_retry_after)
|
Err(earliest_retry_after)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// be sure there is a timeout on this or it might loop forever
|
||||||
|
pub async fn try_send_best_upstream_server(
|
||||||
|
&self,
|
||||||
|
request: JsonRpcRequest,
|
||||||
|
) -> anyhow::Result<JsonRpcForwardedResponse> {
|
||||||
|
loop {
|
||||||
|
match self.next_upstream_server().await {
|
||||||
|
Ok(active_request_handle) => {
|
||||||
|
let response_result = active_request_handle
|
||||||
|
.request(&request.method, &request.params)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let response =
|
||||||
|
JsonRpcForwardedResponse::from_response_result(response_result, request.id);
|
||||||
|
|
||||||
|
if response.error.is_some() {
|
||||||
|
trace!(?response, "Sending error reply",);
|
||||||
|
// errors already sent false to the in_flight_tx
|
||||||
|
} else {
|
||||||
|
trace!(?response, "Sending reply");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(response);
|
||||||
|
}
|
||||||
|
Err(None) => {
|
||||||
|
warn!(?self, "No servers in sync!");
|
||||||
|
|
||||||
|
return Err(anyhow::anyhow!("no servers in sync"));
|
||||||
|
}
|
||||||
|
Err(Some(retry_after)) => {
|
||||||
|
// TODO: move this to a helper function
|
||||||
|
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||||
|
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||||
|
warn!(?retry_after, "All rate limits exceeded. Sleeping");
|
||||||
|
|
||||||
|
sleep(retry_after).await;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn try_send_all_upstream_servers(
|
pub async fn try_send_all_upstream_servers(
|
||||||
&self,
|
&self,
|
||||||
request: JsonRpcRequest,
|
request: JsonRpcRequest,
|
||||||
@ -463,6 +506,8 @@ impl Web3Connections {
|
|||||||
return Ok(response);
|
return Ok(response);
|
||||||
}
|
}
|
||||||
Err(None) => {
|
Err(None) => {
|
||||||
|
warn!(?self, "No servers in sync!");
|
||||||
|
|
||||||
// TODO: return a 502?
|
// TODO: return a 502?
|
||||||
// TODO: i don't think this will ever happen
|
// TODO: i don't think this will ever happen
|
||||||
return Err(anyhow::anyhow!("no available rpcs!"));
|
return Err(anyhow::anyhow!("no available rpcs!"));
|
||||||
|
@ -162,7 +162,27 @@ impl fmt::Debug for JsonRpcForwardedResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl JsonRpcForwardedResponse {
|
impl JsonRpcForwardedResponse {
|
||||||
pub fn from_ethers_error(e: ProviderError, id: Box<serde_json::value::RawValue>) -> Self {
|
pub fn from_response_result(
|
||||||
|
result: Result<Box<RawValue>, ProviderError>,
|
||||||
|
id: Box<RawValue>,
|
||||||
|
) -> Self {
|
||||||
|
match result {
|
||||||
|
Ok(response) => Self::from_response(response, id),
|
||||||
|
Err(e) => Self::from_ethers_error(e, id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
|
||||||
|
JsonRpcForwardedResponse {
|
||||||
|
jsonrpc: "2.0".to_string(),
|
||||||
|
id,
|
||||||
|
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
||||||
|
result: Some(partial_response),
|
||||||
|
error: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> Self {
|
||||||
// TODO: move turning ClientError into json to a helper function?
|
// TODO: move turning ClientError into json to a helper function?
|
||||||
let code;
|
let code;
|
||||||
let message: String;
|
let message: String;
|
||||||
|
Loading…
Reference in New Issue
Block a user