in-flight request checks

This commit is contained in:
Bryan Stitt 2022-05-16 19:15:40 +00:00
parent fdeeea479e
commit 6ab2b3a533
5 changed files with 142 additions and 76 deletions

1
Cargo.lock generated

@ -3965,6 +3965,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"argh", "argh",
"dashmap",
"derive_more", "derive_more",
"ethers", "ethers",
"flume", "flume",

@ -5,6 +5,7 @@ members = [
] ]
# TODO: enable these once rapid development is done # TODO: enable these once rapid development is done
[profile.release] # TODO: we can't do panic = abort because the websockets disconnect by panicing sometimes
#[profile.release]
#panic = abort
#lto = true #lto = true
panic = "abort"

@ -9,6 +9,7 @@ edition = "2021"
anyhow = "1.0.57" anyhow = "1.0.57"
argh = "0.1.7" argh = "0.1.7"
# axum = "*" # TODO: use this instead of warp? # axum = "*" # TODO: use this instead of warp?
dashmap = "5.3.3"
derive_more = "0.99.17" derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
flume = "0.10.12" flume = "0.10.12"

@ -5,6 +5,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum; use crate::jsonrpc::JsonRpcRequestEnum;
use dashmap::DashMap;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256}; use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256};
use futures::future::join_all; use futures::future::join_all;
use governor::clock::{Clock, QuantaClock}; use governor::clock::{Clock, QuantaClock};
@ -13,6 +14,7 @@ use parking_lot::RwLock;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep; use tokio::time::sleep;
use tracing::{trace, warn}; use tracing::{trace, warn};
@ -27,8 +29,9 @@ static APP_USER_AGENT: &str = concat!(
const RESPONSE_CACHE_CAP: usize = 1024; const RESPONSE_CACHE_CAP: usize = 1024;
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
type ResponseLruCache = type CacheKey = (H256, String, Option<String>);
RwLock<LinkedHashMap<(H256, String, Option<String>), JsonRpcForwardedResponse>>;
type ResponseLruCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
/// The application /// The application
// TODO: this debug impl is way too verbose. make something smaller // TODO: this debug impl is way too verbose. make something smaller
@ -41,6 +44,7 @@ pub struct Web3ProxyApp {
balanced_rpcs: Arc<Web3Connections>, balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>, private_rpcs: Arc<Web3Connections>,
active_requests: DashMap<CacheKey, watch::Receiver<bool>>,
response_cache: ResponseLruCache, response_cache: ResponseLruCache,
} }
@ -91,6 +95,7 @@ impl Web3ProxyApp {
clock, clock,
balanced_rpcs, balanced_rpcs,
private_rpcs, private_rpcs,
active_requests: Default::default(),
response_cache: Default::default(), response_cache: Default::default(),
}) })
} }
@ -154,64 +159,62 @@ impl Web3ProxyApp {
if request.method == "eth_sendRawTransaction" { if request.method == "eth_sendRawTransaction" {
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
loop { // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit match self.private_rpcs.get_upstream_servers() {
match self.private_rpcs.get_upstream_servers() { Ok(active_request_handles) => {
Ok(active_request_handles) => { let (tx, rx) = flume::unbounded();
let (tx, rx) = flume::unbounded();
let connections = self.private_rpcs.clone(); let connections = self.private_rpcs.clone();
let method = request.method.clone(); let method = request.method.clone();
let params = request.params.clone(); let params = request.params.clone();
// TODO: benchmark this compared to waiting on unbounded futures // TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle? // TODO: do something with this handle?
tokio::spawn(async move { tokio::spawn(async move {
connections connections
.try_send_parallel_requests( .try_send_parallel_requests(active_request_handles, method, params, tx)
active_request_handles, .await
method, });
params,
tx,
)
.await
});
// wait for the first response // wait for the first response
let backend_response = rx.recv_async().await?; // TODO: we don't want the first response. we want the quorum response
let backend_response = rx.recv_async().await?;
if let Ok(backend_response) = backend_response { if let Ok(backend_response) = backend_response {
// TODO: i think we // TODO: i think we
let response = JsonRpcForwardedResponse { let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(), jsonrpc: "2.0".to_string(),
id: request.id, id: request.id,
result: Some(backend_response), result: Some(backend_response),
error: None, error: None,
}; };
return Ok(response); return Ok(response);
}
} }
Err(None) => { }
// TODO: return a 502? Err(None) => {
return Err(anyhow::anyhow!("no private rpcs!")); // TODO: return a 502?
} return Err(anyhow::anyhow!("no private rpcs!"));
Err(Some(not_until)) => { }
// TODO: move this to a helper function Err(Some(not_until)) => {
// sleep (TODO: with a lock?) until our rate limits should be available // TODO: move this to a helper function
// TODO: if a server catches up sync while we are waiting, we could stop waiting // sleep (TODO: with a lock?) until our rate limits should be available
let deadline = not_until.wait_time_from(self.clock.now()); // TODO: if a server catches up sync while we are waiting, we could stop waiting
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await; let deadline = deadline.min(Duration::from_millis(200));
}
}; sleep(deadline).await;
}
warn!("All rate limits exceeded. Sleeping");
}
};
} else { } else {
// this is not a private transaction (or no private relays are configured) // this is not a private transaction (or no private relays are configured)
// try to send to each tier, stopping at the first success // TODO: how much should we retry?
// if no tiers are synced, fallback to privates for _ in 0..10 {
// TODO: think more about this loop. // TODO: think more about this loop.
loop {
// todo: bring back this caching // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
let best_block_hash = self let best_block_hash = self
.balanced_rpcs .balanced_rpcs
.get_synced_rpcs() .get_synced_rpcs()
@ -227,12 +230,38 @@ impl Web3ProxyApp {
request.params.clone().map(|x| x.to_string()), request.params.clone().map(|x| x.to_string()),
); );
// first check to see if this is cached
if let Some(cached) = self.response_cache.read().get(&cache_key) { if let Some(cached) = self.response_cache.read().get(&cache_key) {
// TODO: this still serializes every time let _ = self.active_requests.remove(&cache_key);
// TODO: return a reference in the other places so that this works without a clone?
return Ok(cached.to_owned()); return Ok(cached.to_owned());
} }
// check if this request is already in flight
let (in_flight_tx, in_flight_rx) = watch::channel(true);
let mut other_in_flight_rx = None;
match self.active_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
other_in_flight_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(in_flight_rx);
}
}
if let Some(mut other_in_flight_rx) = other_in_flight_rx {
// wait for the other request to finish. it can finish successfully or with an error
let _ = other_in_flight_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = self.response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
return Ok(cached.to_owned());
}
}
match self.balanced_rpcs.next_upstream_server().await { match self.balanced_rpcs.next_upstream_server().await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
let response = active_request_handle let response = active_request_handle
@ -256,7 +285,7 @@ impl Web3ProxyApp {
let mut response_cache = self.response_cache.write(); let mut response_cache = self.response_cache.write();
// TODO: cache the warp::reply to save us serializing every time // TODO: cache the warp::reply to save us serializing every time
response_cache.insert(cache_key, response.clone()); response_cache.insert(cache_key.clone(), response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP { if response_cache.len() >= RESPONSE_CACHE_CAP {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
response_cache.pop_front(); response_cache.pop_front();
@ -264,9 +293,16 @@ impl Web3ProxyApp {
drop(response_cache); drop(response_cache);
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
response response
} }
Err(e) => { Err(e) => {
// send now since we aren't going to cache an error response
let _ = in_flight_tx.send(false);
// TODO: move this to a helper function? // TODO: move this to a helper function?
let code; let code;
let message: String; let message: String;
@ -333,11 +369,20 @@ impl Web3ProxyApp {
trace!("Sending reply: {:?}", response); trace!("Sending reply: {:?}", response);
} }
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
return Ok(response); return Ok(response);
} }
Err(None) => { Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, we use those! // TODO: this is too verbose. if there are other servers in other tiers, we use those!
warn!("No servers in sync!"); warn!("No servers in sync!");
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
return Err(anyhow::anyhow!("no servers in sync")); return Err(anyhow::anyhow!("no servers in sync"));
} }
Err(Some(not_until)) => { Err(Some(not_until)) => {
@ -346,10 +391,20 @@ impl Web3ProxyApp {
// TODO: if a server catches up sync while we are waiting, we could stop waiting // TODO: if a server catches up sync while we are waiting, we could stop waiting
let deadline = not_until.wait_time_from(self.clock.now()); let deadline = not_until.wait_time_from(self.clock.now());
let deadline = deadline.min(Duration::from_millis(200));
sleep(deadline).await; sleep(deadline).await;
warn!("All rate limits exceeded. Sleeping");
} }
} }
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
} }
} }
Err(anyhow::anyhow!("internal error"))
} }
} }

@ -199,6 +199,7 @@ impl Web3Connection {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically? // TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2)); let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
@ -209,27 +210,32 @@ impl Web3Connection {
// TODO: if error or rate limit, increase interval? // TODO: if error or rate limit, increase interval?
interval.tick().await; interval.tick().await;
let active_request_handle = self.wait_for_request_handle().await; match self.try_request_handle() {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" drop(active_request_handle);
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
drop(active_request_handle); // don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
// don't send repeat blocks if new_hash == last_hash {
if let Ok(block) = &block { continue;
let new_hash = block.hash.unwrap(); }
if new_hash == last_hash { last_hash = new_hash;
continue; }
self.send_block(block, &block_sender).await;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
} }
last_hash = new_hash;
} }
self.send_block(block, &block_sender).await;
} }
} }
Web3Provider::Ws(provider) => { Web3Provider::Ws(provider) => {
@ -248,12 +254,10 @@ impl Web3Connection {
// there is a very small race condition here where the stream could send us a new block right now // there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block // all it does is print "new block" for the same block as current block
// TODO: rate limit! // TODO: rate limit!
let block: Result<Block<TxHash>, _> = provider let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false)) .request("eth_getBlockByNumber", ("latest", false))
.await; .await;
drop(active_request_handle);
self.send_block(block, &block_sender).await; self.send_block(block, &block_sender).await;
while let Some(new_block) = stream.next().await { while let Some(new_block) = stream.next().await {
@ -269,7 +273,8 @@ impl Web3Connection {
pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle { pub async fn wait_for_request_handle(self: &Arc<Self>) -> ActiveRequestHandle {
// TODO: maximum wait time // TODO: maximum wait time
loop {
for _ in 0..10 {
match self.try_request_handle() { match self.try_request_handle() {
Ok(pending_request_handle) => return pending_request_handle, Ok(pending_request_handle) => return pending_request_handle,
Err(not_until) => { Err(not_until) => {
@ -279,6 +284,9 @@ impl Web3Connection {
} }
} }
} }
// TODO: what should we do?
panic!("no request handle after 10 tries");
} }
pub fn try_request_handle( pub fn try_request_handle(