handle batched requests
This commit is contained in:
parent
969f88d4bb
commit
3914a41fa2
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -3333,9 +3333,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.18.1"
|
||||
version = "1.18.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc"
|
||||
checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"libc",
|
||||
|
@ -6,24 +6,3 @@
|
||||
url = "ws://127.0.0.1:8546"
|
||||
soft_limit = 200_000
|
||||
|
||||
[balanced_rpc_tiers.0.ankr]
|
||||
url = "https://rpc.ankr.com/eth"
|
||||
soft_limit = 3_000
|
||||
|
||||
[private_rpcs]
|
||||
|
||||
[private_rpcs.eden]
|
||||
url = "https://api.edennetwork.io/v1/"
|
||||
soft_limit = 1_805
|
||||
|
||||
[private_rpcs.eden_beta]
|
||||
url = "https://api.edennetwork.io/v1/beta"
|
||||
soft_limit = 5_861
|
||||
|
||||
[private_rpcs.ethermine]
|
||||
url = "https://rpc.ethermine.org"
|
||||
soft_limit = 5_861
|
||||
|
||||
[private_rpcs.flashbots]
|
||||
url = "https://rpc.flashbots.net"
|
||||
soft_limit = 7074
|
||||
|
29
config/example.toml.bac
Normal file
29
config/example.toml.bac
Normal file
@ -0,0 +1,29 @@
|
||||
[balanced_rpc_tiers]
|
||||
|
||||
[balanced_rpc_tiers.0]
|
||||
|
||||
[balanced_rpc_tiers.0.geth]
|
||||
url = "ws://127.0.0.1:8546"
|
||||
soft_limit = 200_000
|
||||
|
||||
[balanced_rpc_tiers.0.ankr]
|
||||
url = "https://rpc.ankr.com/eth"
|
||||
soft_limit = 3_000
|
||||
|
||||
[private_rpcs]
|
||||
|
||||
[private_rpcs.eden]
|
||||
url = "https://api.edennetwork.io/v1/"
|
||||
soft_limit = 1_805
|
||||
|
||||
[private_rpcs.eden_beta]
|
||||
url = "https://api.edennetwork.io/v1/beta"
|
||||
soft_limit = 5_861
|
||||
|
||||
[private_rpcs.ethermine]
|
||||
url = "https://rpc.ethermine.org"
|
||||
soft_limit = 5_861
|
||||
|
||||
[private_rpcs.flashbots]
|
||||
url = "https://rpc.flashbots.net"
|
||||
soft_limit = 7074
|
@ -22,7 +22,7 @@ reqwest = { version = "0.11.10", default-features = false, features = ["json", "
|
||||
rustc-hash = "1.1.0"
|
||||
serde = { version = "1.0.137", features = [] }
|
||||
serde_json = { version = "1.0.81", default-features = false, features = ["alloc"] }
|
||||
tokio = { version = "1.18.1", features = ["full"] }
|
||||
tokio = { version = "1.18.2", features = ["full"] }
|
||||
toml = "0.5.9"
|
||||
tracing = "0.1.34"
|
||||
tracing-subscriber = "0.3.11"
|
||||
|
415
web3-proxy/src/app.rs
Normal file
415
web3-proxy/src/app.rs
Normal file
@ -0,0 +1,415 @@
|
||||
use crate::config::Web3ConnectionConfig;
|
||||
use crate::connections::Web3Connections;
|
||||
use crate::jsonrpc::JsonRpcErrorData;
|
||||
use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
|
||||
use crate::jsonrpc::JsonRpcRequest;
|
||||
use crate::jsonrpc::JsonRpcRequestEnum;
|
||||
use ethers::prelude::ProviderError;
|
||||
use ethers::prelude::{HttpClientError, WsClientError};
|
||||
use futures::future;
|
||||
use futures::future::join_all;
|
||||
use governor::clock::{Clock, QuantaClock};
|
||||
use linkedhashmap::LinkedHashMap;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{self, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
static APP_USER_AGENT: &str = concat!(
|
||||
"satoshiandkin/",
|
||||
env!("CARGO_PKG_NAME"),
|
||||
"/",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
);
|
||||
|
||||
// TODO: put this in config? what size should we do?
|
||||
const RESPONSE_CACHE_CAP: usize = 1024;
|
||||
|
||||
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
||||
type ResponseLruCache = RwLock<LinkedHashMap<(u64, String, String), JsonRpcForwardedResponse>>;
|
||||
|
||||
/// The application
|
||||
// TODO: this debug impl is way too verbose. make something smaller
|
||||
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
|
||||
pub struct Web3ProxyApp {
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
/// clock used for rate limiting
|
||||
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
||||
clock: QuantaClock,
|
||||
/// Send requests to the best server available
|
||||
balanced_rpc_tiers: Vec<Arc<Web3Connections>>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
private_rpcs: Option<Arc<Web3Connections>>,
|
||||
response_cache: ResponseLruCache,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3ProxyApp {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("Web3ProxyApp")
|
||||
.field(
|
||||
"best_head_block_number",
|
||||
&self.best_head_block_number.load(atomic::Ordering::Relaxed),
|
||||
)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3ProxyApp {
|
||||
pub async fn try_new(
|
||||
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>,
|
||||
private_rpcs: Vec<Web3ConnectionConfig>,
|
||||
) -> anyhow::Result<Web3ProxyApp> {
|
||||
let clock = QuantaClock::default();
|
||||
|
||||
let best_head_block_number = Arc::new(AtomicU64::new(0));
|
||||
|
||||
// make a http shared client
|
||||
// TODO: how should we configure the connection pool?
|
||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.timeout(Duration::from_secs(300))
|
||||
.user_agent(APP_USER_AGENT)
|
||||
.build()?;
|
||||
|
||||
// TODO: attach context to this error
|
||||
let balanced_rpc_tiers =
|
||||
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
balanced_rpc_tier,
|
||||
Some(http_client.clone()),
|
||||
&clock,
|
||||
)
|
||||
}))
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
|
||||
|
||||
// TODO: attach context to this error
|
||||
let private_rpcs = if private_rpcs.is_empty() {
|
||||
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
||||
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
private_rpcs,
|
||||
Some(http_client),
|
||||
&clock,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Web3ProxyApp {
|
||||
best_head_block_number,
|
||||
clock,
|
||||
balanced_rpc_tiers,
|
||||
private_rpcs,
|
||||
response_cache: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// send the request to the approriate RPCs
|
||||
/// TODO: dry this up
|
||||
pub async fn proxy_web3_rpc(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
request: JsonRpcRequestEnum,
|
||||
) -> anyhow::Result<impl warp::Reply> {
|
||||
trace!("Received request: {:?}", request);
|
||||
|
||||
let response = match request {
|
||||
JsonRpcRequestEnum::Single(request) => {
|
||||
JsonRpcForwardedResponseEnum::Single(self.proxy_web3_rpc_request(request).await?)
|
||||
}
|
||||
JsonRpcRequestEnum::Batch(requests) => {
|
||||
JsonRpcForwardedResponseEnum::Batch(self.proxy_web3_rpc_requests(requests).await?)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(warp::reply::json(&response))
|
||||
}
|
||||
|
||||
async fn proxy_web3_rpc_requests(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
requests: Vec<JsonRpcRequest>,
|
||||
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
|
||||
// TODO: we should probably change ethers-rs to support this directly
|
||||
// we cut up the request and send to potentually different servers. this could be a problem.
|
||||
// if the client needs consistent blocks, they should specify instead of assume batches work on the same
|
||||
// TODO: is spawning here actually slower?
|
||||
let num_requests = requests.len();
|
||||
let responses = join_all(
|
||||
requests
|
||||
.into_iter()
|
||||
.map(|request| {
|
||||
let clone = self.clone();
|
||||
tokio::spawn(async move { clone.proxy_web3_rpc_request(request).await })
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// TODO: i'm sure this could be done better with iterators
|
||||
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
|
||||
for response in responses {
|
||||
collected.push(response??);
|
||||
}
|
||||
|
||||
Ok(collected)
|
||||
}
|
||||
|
||||
async fn proxy_web3_rpc_request(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
request: JsonRpcRequest,
|
||||
) -> anyhow::Result<JsonRpcForwardedResponse> {
|
||||
trace!("Received request: {:?}", request);
|
||||
|
||||
// TODO: apparently json_body can be a vec of multiple requests. should we split them up? we need to respond with a Vec too
|
||||
|
||||
if self.private_rpcs.is_some() && request.method == "eth_sendRawTransaction" {
|
||||
let private_rpcs = self.private_rpcs.as_ref().unwrap();
|
||||
|
||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||
loop {
|
||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||
match private_rpcs.get_upstream_servers() {
|
||||
Ok(active_request_handles) => {
|
||||
let (tx, rx) = flume::unbounded();
|
||||
|
||||
let connections = private_rpcs.clone();
|
||||
let method = request.method.clone();
|
||||
let params = request.params.clone();
|
||||
|
||||
// TODO: benchmark this compared to waiting on unbounded futures
|
||||
// TODO: do something with this handle?
|
||||
tokio::spawn(async move {
|
||||
connections
|
||||
.try_send_parallel_requests(
|
||||
active_request_handles,
|
||||
method,
|
||||
params,
|
||||
tx,
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
// wait for the first response
|
||||
let backend_response = rx.recv_async().await?;
|
||||
|
||||
if let Ok(backend_response) = backend_response {
|
||||
// TODO: i think we
|
||||
let response = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: request.id,
|
||||
result: Some(backend_response),
|
||||
error: None,
|
||||
};
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
Err(not_until) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (TODO: with a lock?) until our rate limits should be available
|
||||
// TODO: if a server catches up sync while we are waiting, we could stop waiting
|
||||
if let Some(not_until) = not_until {
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
} else {
|
||||
// this is not a private transaction (or no private relays are configured)
|
||||
// try to send to each tier, stopping at the first success
|
||||
// if no tiers are synced, fallback to privates
|
||||
loop {
|
||||
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
|
||||
let mut earliest_not_until = None;
|
||||
|
||||
// TODO: how can we better build this iterator?
|
||||
let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
|
||||
self.balanced_rpc_tiers.iter().chain(vec![private_rpcs])
|
||||
} else {
|
||||
self.balanced_rpc_tiers.iter().chain(vec![])
|
||||
};
|
||||
|
||||
for balanced_rpcs in rpc_iter {
|
||||
let best_head_block_number =
|
||||
self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections
|
||||
|
||||
let best_rpc_block_number = balanced_rpcs.head_block_number();
|
||||
|
||||
if best_rpc_block_number < best_head_block_number {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: building this cache key is slow and its large, but i don't see a better way right now
|
||||
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
|
||||
let cache_key = (
|
||||
best_head_block_number,
|
||||
request.method.clone(),
|
||||
request.params.to_string(),
|
||||
);
|
||||
|
||||
if let Some(cached) = self.response_cache.read().await.get(&cache_key) {
|
||||
// TODO: this still serializes every time
|
||||
// TODO: return a reference in the other places so that this works without a clone?
|
||||
return Ok(cached.to_owned());
|
||||
}
|
||||
|
||||
// TODO: what allowed lag?
|
||||
match balanced_rpcs.next_upstream_server().await {
|
||||
Ok(active_request_handle) => {
|
||||
let response = balanced_rpcs
|
||||
.try_send_request(
|
||||
active_request_handle,
|
||||
&request.method,
|
||||
&request.params,
|
||||
)
|
||||
.await;
|
||||
|
||||
let response = match response {
|
||||
Ok(partial_response) => {
|
||||
// TODO: trace here was really slow with millions of requests.
|
||||
// info!("forwarding request from {}", upstream_server);
|
||||
|
||||
let response = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: request.id,
|
||||
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
||||
result: Some(partial_response),
|
||||
error: None,
|
||||
};
|
||||
|
||||
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
||||
let mut response_cache = self.response_cache.write().await;
|
||||
|
||||
// TODO: cache the warp::reply to save us serializing every time
|
||||
response_cache.insert(cache_key, response.clone());
|
||||
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
||||
response_cache.pop_front();
|
||||
}
|
||||
|
||||
response
|
||||
}
|
||||
Err(e) => {
|
||||
let code;
|
||||
let message: String;
|
||||
let data;
|
||||
|
||||
match e {
|
||||
ProviderError::JsonRpcClientError(e) => {
|
||||
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
|
||||
if let Some(e) = e.downcast_ref::<HttpClientError>() {
|
||||
match &*e {
|
||||
HttpClientError::JsonRpcError(e) => {
|
||||
code = e.code;
|
||||
message = e.message.clone();
|
||||
data = e.data.clone();
|
||||
}
|
||||
e => {
|
||||
// TODO: improve this
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
} else if let Some(e) =
|
||||
e.downcast_ref::<WsClientError>()
|
||||
{
|
||||
match &*e {
|
||||
WsClientError::JsonRpcError(e) => {
|
||||
code = e.code;
|
||||
message = e.message.clone();
|
||||
data = e.data.clone();
|
||||
}
|
||||
e => {
|
||||
// TODO: improve this
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
|
||||
JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: request.id,
|
||||
result: None,
|
||||
error: Some(JsonRpcErrorData {
|
||||
code,
|
||||
message,
|
||||
data,
|
||||
}),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if response.error.is_some() {
|
||||
trace!("Sending error reply: {:?}", response);
|
||||
} else {
|
||||
trace!("Sending reply: {:?}", response);
|
||||
}
|
||||
|
||||
return Ok(response);
|
||||
}
|
||||
Err(None) => {
|
||||
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
|
||||
// warn!("No servers in sync!");
|
||||
}
|
||||
Err(Some(not_until)) => {
|
||||
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
|
||||
// TODO: helper function for this
|
||||
if earliest_not_until.is_none() {
|
||||
earliest_not_until.replace(not_until);
|
||||
} else {
|
||||
let earliest_possible =
|
||||
earliest_not_until.as_ref().unwrap().earliest_possible();
|
||||
|
||||
let new_earliest_possible = not_until.earliest_possible();
|
||||
|
||||
if earliest_possible > new_earliest_possible {
|
||||
earliest_not_until = Some(not_until);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we haven't returned an Ok
|
||||
// if we did return a rate limit error, sleep and try again
|
||||
if let Some(earliest_not_until) = earliest_not_until {
|
||||
let deadline = earliest_not_until.wait_time_from(self.clock.now());
|
||||
|
||||
// TODO: max wait
|
||||
|
||||
sleep(deadline).await;
|
||||
} else {
|
||||
// TODO: how long should we wait?
|
||||
// TODO: max wait time?
|
||||
warn!("No servers in sync!");
|
||||
// TODO: return json error? return a 502?
|
||||
return Err(anyhow::anyhow!("no servers in sync"));
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
///! Communicate with a web3 provider
|
||||
///! Rate-limited communication with a web3 provider
|
||||
use derive_more::From;
|
||||
use ethers::prelude::Middleware;
|
||||
use futures::StreamExt;
|
||||
@ -7,7 +7,6 @@ use governor::middleware::NoOpMiddleware;
|
||||
use governor::state::{InMemoryState, NotKeyed};
|
||||
use governor::NotUntil;
|
||||
use governor::RateLimiter;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::value::RawValue;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
@ -274,7 +273,7 @@ impl Web3Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop this once a connection to completes
|
||||
/// Drop this once a connection completes
|
||||
pub struct ActiveRequestHandle(Arc<Web3Connection>);
|
||||
|
||||
impl ActiveRequestHandle {
|
||||
@ -351,50 +350,3 @@ impl PartialEq for Web3Connection {
|
||||
== other.active_requests.load(atomic::Ordering::Acquire)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize)]
|
||||
pub struct JsonRpcRequest {
|
||||
pub id: Box<RawValue>,
|
||||
pub method: String,
|
||||
pub params: Box<RawValue>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsonRpcRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("JsonRpcRequest")
|
||||
.field("id", &self.id)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct JsonRpcErrorData {
|
||||
/// The error code
|
||||
pub code: i64,
|
||||
/// The error message
|
||||
pub message: String,
|
||||
/// Additional data
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub data: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct JsonRpcForwardedResponse {
|
||||
pub jsonrpc: String,
|
||||
pub id: Box<RawValue>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub result: Option<Box<RawValue>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<JsonRpcErrorData>,
|
||||
// TODO: optional error
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsonRpcForwardedResponse {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("JsonRpcForwardedResponse")
|
||||
.field("id", &self.id)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
///! Communicate with a group of web3 providers
|
||||
///! Load balanced communication with a group of web3 providers
|
||||
use derive_more::From;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
@ -109,27 +109,28 @@ impl Web3Connections {
|
||||
|
||||
pub async fn try_send_request(
|
||||
&self,
|
||||
connection_handle: ActiveRequestHandle,
|
||||
active_request_handle: ActiveRequestHandle,
|
||||
method: &str,
|
||||
params: &RawValue,
|
||||
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
|
||||
let response = connection_handle.request(method, params).await;
|
||||
let response = active_request_handle.request(method, params).await;
|
||||
|
||||
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response?
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
pub async fn try_send_requests(
|
||||
/// Send the same request to all the handles. Returning the fastest successful result.
|
||||
pub async fn try_send_parallel_requests(
|
||||
self: Arc<Self>,
|
||||
connections: Vec<ActiveRequestHandle>,
|
||||
active_request_handles: Vec<ActiveRequestHandle>,
|
||||
method: String,
|
||||
params: Box<RawValue>,
|
||||
response_sender: flume::Sender<anyhow::Result<Box<RawValue>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut unordered_futures = FuturesUnordered::new();
|
||||
|
||||
for connection in connections {
|
||||
for connection in active_request_handles {
|
||||
// clone things so we can pass them to a future
|
||||
let connections = self.clone();
|
||||
let method = method.clone();
|
||||
@ -150,7 +151,7 @@ impl Web3Connections {
|
||||
unordered_futures.push(handle);
|
||||
}
|
||||
|
||||
// TODO: use iterators instead of pushing into a vec
|
||||
// TODO: use iterators instead of pushing into a vec?
|
||||
let mut errs = vec![];
|
||||
if let Some(x) = unordered_futures.next().await {
|
||||
match x.unwrap() {
|
||||
|
211
web3-proxy/src/jsonrpc.rs
Normal file
211
web3-proxy/src/jsonrpc.rs
Normal file
@ -0,0 +1,211 @@
|
||||
use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Clone, serde::Deserialize)]
|
||||
pub struct JsonRpcRequest {
|
||||
// TODO: skip jsonrpc entireley?
|
||||
// pub jsonrpc: Box<RawValue>,
|
||||
pub id: Box<RawValue>,
|
||||
pub method: String,
|
||||
pub params: Box<RawValue>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsonRpcRequest {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("JsonRpcRequest")
|
||||
.field("id", &self.id)
|
||||
.field("method", &self.method)
|
||||
.field("params", &self.params)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests can come in multiple formats
|
||||
#[derive(Debug)]
|
||||
pub enum JsonRpcRequestEnum {
|
||||
Batch(Vec<JsonRpcRequest>),
|
||||
Single(JsonRpcRequest),
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(field_identifier, rename_all = "lowercase")]
|
||||
enum Field {
|
||||
JsonRpc,
|
||||
Id,
|
||||
Method,
|
||||
Params,
|
||||
// TODO: jsonrpc here, too?
|
||||
}
|
||||
|
||||
struct JsonRpcBatchVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for JsonRpcBatchVisitor {
|
||||
type Value = JsonRpcRequestEnum;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("JsonRpcRequestEnum")
|
||||
}
|
||||
|
||||
fn visit_seq<V>(self, mut seq: V) -> Result<JsonRpcRequestEnum, V::Error>
|
||||
where
|
||||
V: SeqAccess<'de>,
|
||||
{
|
||||
// TODO: what size should we use as the default?
|
||||
let mut batch: Vec<JsonRpcRequest> =
|
||||
Vec::with_capacity(seq.size_hint().unwrap_or(10));
|
||||
|
||||
// this was easier than expected
|
||||
while let Ok(Some(s)) = seq.next_element::<JsonRpcRequest>() {
|
||||
batch.push(s);
|
||||
}
|
||||
|
||||
Ok(JsonRpcRequestEnum::Batch(batch))
|
||||
}
|
||||
|
||||
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: MapAccess<'de>,
|
||||
{
|
||||
// TODO: i feel like this should be easier
|
||||
let mut id = None;
|
||||
let mut method = None;
|
||||
let mut params = None;
|
||||
|
||||
while let Some(key) = map.next_key()? {
|
||||
match key {
|
||||
Field::JsonRpc => {
|
||||
// throw away the value
|
||||
let foo: String = map.next_value()?;
|
||||
}
|
||||
Field::Id => {
|
||||
if id.is_some() {
|
||||
return Err(de::Error::duplicate_field("id"));
|
||||
}
|
||||
id = Some(map.next_value()?);
|
||||
}
|
||||
Field::Method => {
|
||||
if method.is_some() {
|
||||
return Err(de::Error::duplicate_field("method"));
|
||||
}
|
||||
method = Some(map.next_value()?);
|
||||
}
|
||||
Field::Params => {
|
||||
if params.is_some() {
|
||||
return Err(de::Error::duplicate_field("params"));
|
||||
}
|
||||
params = Some(map.next_value()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
|
||||
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
|
||||
let params = params.ok_or_else(|| de::Error::missing_field("params"))?;
|
||||
|
||||
let single = JsonRpcRequest { id, method, params };
|
||||
|
||||
Ok(JsonRpcRequestEnum::Single(single))
|
||||
}
|
||||
}
|
||||
|
||||
let batch_visitor = JsonRpcBatchVisitor {};
|
||||
|
||||
deserializer.deserialize_any(batch_visitor)
|
||||
}
|
||||
}
|
||||
|
||||
/// All jsonrpc errors use this structure
|
||||
#[derive(Serialize, Clone)]
|
||||
pub struct JsonRpcErrorData {
|
||||
/// The error code
|
||||
pub code: i64,
|
||||
/// The error message
|
||||
pub message: String,
|
||||
/// Additional data
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub data: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// A complete response
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct JsonRpcForwardedResponse {
|
||||
pub jsonrpc: String,
|
||||
pub id: Box<RawValue>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub result: Option<Box<RawValue>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<JsonRpcErrorData>,
|
||||
}
|
||||
|
||||
/// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
impl fmt::Debug for JsonRpcForwardedResponse {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("JsonRpcForwardedResponse")
|
||||
.field("id", &self.id)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
/// JSONRPC Responses can include one or many response objects.
|
||||
#[derive(Clone, Serialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum JsonRpcForwardedResponseEnum {
|
||||
Single(JsonRpcForwardedResponse),
|
||||
Batch(Vec<JsonRpcForwardedResponse>),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn this_deserialize_single() {
|
||||
let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#;
|
||||
|
||||
// test deserializing it directly to a single request object
|
||||
let output: JsonRpcRequest = serde_json::from_str(input).unwrap();
|
||||
|
||||
assert_eq!(output.id.to_string(), "1");
|
||||
assert_eq!(output.method, "eth_blockNumber");
|
||||
assert_eq!(output.params.to_string(), "[]");
|
||||
|
||||
// test deserializing it into an enum
|
||||
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
|
||||
|
||||
assert!(matches!(output, JsonRpcRequestEnum::Single(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn this_deserialize_batch() {
|
||||
let input = r#"[{"jsonrpc":"2.0","method":"eth_getCode","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":27},{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":28},{"jsonrpc":"2.0","method":"eth_getBalance","params":["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"],"id":29}]"#;
|
||||
|
||||
// test deserializing it directly to a batch of request objects
|
||||
let output: Vec<JsonRpcRequest> = serde_json::from_str(input).unwrap();
|
||||
|
||||
assert_eq!(output.len(), 3);
|
||||
|
||||
assert_eq!(output[0].id.to_string(), "27");
|
||||
assert_eq!(output[0].method, "eth_getCode");
|
||||
assert_eq!(
|
||||
output[0].params.to_string(),
|
||||
r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"#
|
||||
);
|
||||
|
||||
assert_eq!(output[1].id.to_string(), "28");
|
||||
assert_eq!(output[2].id.to_string(), "29");
|
||||
|
||||
// test deserializing it into an enum
|
||||
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
|
||||
|
||||
assert!(matches!(output, JsonRpcRequestEnum::Batch(_)));
|
||||
|
||||
assert_eq!(0, 1);
|
||||
}
|
||||
}
|
@ -1,365 +1,27 @@
|
||||
mod app;
|
||||
mod config;
|
||||
mod connection;
|
||||
mod connections;
|
||||
mod jsonrpc;
|
||||
|
||||
use config::Web3ConnectionConfig;
|
||||
use connection::JsonRpcErrorData;
|
||||
use connection::JsonRpcForwardedResponse;
|
||||
use ethers::prelude::ProviderError;
|
||||
use ethers::prelude::{HttpClientError, WsClientError};
|
||||
// use ethers::providers::transports::common::JsonRpcError;
|
||||
|
||||
use futures::future;
|
||||
use governor::clock::{Clock, QuantaClock};
|
||||
use linkedhashmap::LinkedHashMap;
|
||||
use std::fmt;
|
||||
use std::fs;
|
||||
use std::sync::atomic::{self, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{info, warn};
|
||||
use tracing::{info, trace, warn};
|
||||
use warp::Filter;
|
||||
use warp::Reply;
|
||||
|
||||
use crate::app::Web3ProxyApp;
|
||||
use crate::config::{CliConfig, RpcConfig};
|
||||
use crate::connection::JsonRpcRequest;
|
||||
use crate::connections::Web3Connections;
|
||||
|
||||
static APP_USER_AGENT: &str = concat!(
|
||||
"satoshiandkin/",
|
||||
env!("CARGO_PKG_NAME"),
|
||||
"/",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
);
|
||||
|
||||
// TODO: put this in config? what size should we do?
|
||||
const RESPONSE_CACHE_CAP: usize = 1024;
|
||||
|
||||
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
||||
type ResponseLruCache = RwLock<LinkedHashMap<(u64, String, String), JsonRpcForwardedResponse>>;
|
||||
|
||||
/// The application
|
||||
// TODO: this debug impl is way too verbose. make something smaller
|
||||
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
|
||||
pub struct Web3ProxyApp {
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
/// clock used for rate limiting
|
||||
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
||||
clock: QuantaClock,
|
||||
/// Send requests to the best server available
|
||||
balanced_rpc_tiers: Vec<Arc<Web3Connections>>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
private_rpcs: Option<Arc<Web3Connections>>,
|
||||
response_cache: ResponseLruCache,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3ProxyApp {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("Web3ProxyApp")
|
||||
.field(
|
||||
"best_head_block_number",
|
||||
&self.best_head_block_number.load(atomic::Ordering::Relaxed),
|
||||
)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3ProxyApp {
|
||||
async fn try_new(
|
||||
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>,
|
||||
private_rpcs: Vec<Web3ConnectionConfig>,
|
||||
) -> anyhow::Result<Web3ProxyApp> {
|
||||
let clock = QuantaClock::default();
|
||||
|
||||
let best_head_block_number = Arc::new(AtomicU64::new(0));
|
||||
|
||||
// make a http shared client
|
||||
// TODO: how should we configure the connection pool?
|
||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||
let http_client = reqwest::ClientBuilder::new()
|
||||
.connect_timeout(Duration::from_secs(5))
|
||||
.timeout(Duration::from_secs(300))
|
||||
.user_agent(APP_USER_AGENT)
|
||||
.build()?;
|
||||
|
||||
// TODO: attach context to this error
|
||||
let balanced_rpc_tiers =
|
||||
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
balanced_rpc_tier,
|
||||
Some(http_client.clone()),
|
||||
&clock,
|
||||
)
|
||||
}))
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
|
||||
|
||||
// TODO: attach context to this error
|
||||
let private_rpcs = if private_rpcs.is_empty() {
|
||||
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
||||
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
private_rpcs,
|
||||
Some(http_client),
|
||||
&clock,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Web3ProxyApp {
|
||||
best_head_block_number,
|
||||
clock,
|
||||
balanced_rpc_tiers,
|
||||
private_rpcs,
|
||||
response_cache: Default::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// send the request to the approriate RPCs
|
||||
/// TODO: dry this up
|
||||
async fn proxy_web3_rpc(
|
||||
self: Arc<Web3ProxyApp>,
|
||||
json_body: JsonRpcRequest,
|
||||
) -> anyhow::Result<impl warp::Reply> {
|
||||
if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" {
|
||||
let private_rpcs = self.private_rpcs.as_ref().unwrap();
|
||||
|
||||
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
|
||||
loop {
|
||||
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
|
||||
match private_rpcs.get_upstream_servers() {
|
||||
Ok(upstream_servers) => {
|
||||
let (tx, rx) = flume::unbounded();
|
||||
|
||||
let connections = private_rpcs.clone();
|
||||
let method = json_body.method.clone();
|
||||
let params = json_body.params.clone();
|
||||
|
||||
// TODO: benchmark this compared to waiting on unbounded futures
|
||||
tokio::spawn(async move {
|
||||
connections
|
||||
.try_send_requests(upstream_servers, method, params, tx)
|
||||
.await
|
||||
});
|
||||
|
||||
// wait for the first response
|
||||
let backend_response = rx.recv_async().await?;
|
||||
|
||||
if let Ok(backend_response) = backend_response {
|
||||
// TODO: i think we
|
||||
let response = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
result: Some(backend_response),
|
||||
error: None,
|
||||
};
|
||||
return Ok(warp::reply::json(&response));
|
||||
}
|
||||
}
|
||||
Err(not_until) => {
|
||||
// TODO: move this to a helper function
|
||||
// sleep (with a lock) until our rate limits should be available
|
||||
if let Some(not_until) = not_until {
|
||||
let deadline = not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
} else {
|
||||
// this is not a private transaction (or no private relays are configured)
|
||||
// try to send to each tier, stopping at the first success
|
||||
// if no tiers are synced, fallback to privates
|
||||
loop {
|
||||
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
|
||||
let mut earliest_not_until = None;
|
||||
|
||||
// TODO: how can we better build this iterator?
|
||||
let rpc_iter = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
|
||||
self.balanced_rpc_tiers.iter().chain(vec![private_rpcs])
|
||||
} else {
|
||||
self.balanced_rpc_tiers.iter().chain(vec![])
|
||||
};
|
||||
|
||||
for balanced_rpcs in rpc_iter {
|
||||
let best_head_block_number =
|
||||
self.best_head_block_number.load(atomic::Ordering::Acquire); // TODO: we don't store current block for everything anymore. we store it on the connections
|
||||
|
||||
let best_rpc_block_number = balanced_rpcs.head_block_number();
|
||||
|
||||
if best_rpc_block_number < best_head_block_number {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: building this cache key is slow and its large, but i don't see a better way right now
|
||||
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
|
||||
let cache_key = (
|
||||
best_head_block_number,
|
||||
json_body.method.clone(),
|
||||
json_body.params.to_string(),
|
||||
);
|
||||
|
||||
if let Some(cached) = self.response_cache.read().await.get(&cache_key) {
|
||||
// TODO: this still serializes every time
|
||||
return Ok(warp::reply::json(cached));
|
||||
}
|
||||
|
||||
// TODO: what allowed lag?
|
||||
match balanced_rpcs.next_upstream_server().await {
|
||||
Ok(upstream_server) => {
|
||||
let response = balanced_rpcs
|
||||
.try_send_request(
|
||||
upstream_server,
|
||||
&json_body.method,
|
||||
&json_body.params,
|
||||
)
|
||||
.await;
|
||||
|
||||
let response = match response {
|
||||
Ok(partial_response) => {
|
||||
// TODO: trace here was really slow with millions of requests.
|
||||
// info!("forwarding request from {}", upstream_server);
|
||||
|
||||
let response = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
||||
result: Some(partial_response),
|
||||
error: None,
|
||||
};
|
||||
|
||||
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
||||
let mut response_cache = self.response_cache.write().await;
|
||||
|
||||
// TODO: cache the warp::reply to save us serializing every time
|
||||
response_cache.insert(cache_key, response.clone());
|
||||
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
||||
response_cache.pop_front();
|
||||
}
|
||||
|
||||
response
|
||||
}
|
||||
Err(e) => {
|
||||
let code;
|
||||
let message: String;
|
||||
let data;
|
||||
|
||||
match e {
|
||||
ProviderError::JsonRpcClientError(e) => {
|
||||
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
|
||||
if let Some(e) = e.downcast_ref::<HttpClientError>() {
|
||||
match &*e {
|
||||
HttpClientError::JsonRpcError(e) => {
|
||||
code = e.code;
|
||||
message = e.message.clone();
|
||||
data = e.data.clone();
|
||||
}
|
||||
e => {
|
||||
// TODO: improve this
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
} else if let Some(e) =
|
||||
e.downcast_ref::<WsClientError>()
|
||||
{
|
||||
match &*e {
|
||||
WsClientError::JsonRpcError(e) => {
|
||||
code = e.code;
|
||||
message = e.message.clone();
|
||||
data = e.data.clone();
|
||||
}
|
||||
e => {
|
||||
// TODO: improve this
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
code = -32603;
|
||||
message = format!("{}", e);
|
||||
data = None;
|
||||
}
|
||||
}
|
||||
|
||||
JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
result: None,
|
||||
error: Some(JsonRpcErrorData {
|
||||
code,
|
||||
message,
|
||||
data,
|
||||
}),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(warp::reply::json(&response));
|
||||
}
|
||||
Err(None) => {
|
||||
// TODO: this is too verbose. if there are other servers in other tiers, we use those!
|
||||
// warn!("No servers in sync!");
|
||||
}
|
||||
Err(Some(not_until)) => {
|
||||
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
|
||||
// TODO: helper function for this
|
||||
if earliest_not_until.is_none() {
|
||||
earliest_not_until.replace(not_until);
|
||||
} else {
|
||||
let earliest_possible =
|
||||
earliest_not_until.as_ref().unwrap().earliest_possible();
|
||||
|
||||
let new_earliest_possible = not_until.earliest_possible();
|
||||
|
||||
if earliest_possible > new_earliest_possible {
|
||||
earliest_not_until = Some(not_until);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we haven't returned an Ok
|
||||
// if we did return a rate limit error, sleep and try again
|
||||
if let Some(earliest_not_until) = earliest_not_until {
|
||||
let deadline = earliest_not_until.wait_time_from(self.clock.now());
|
||||
|
||||
sleep(deadline).await;
|
||||
} else {
|
||||
// TODO: how long should we wait?
|
||||
// TODO: max wait time?
|
||||
warn!("No servers in sync!");
|
||||
// TODO: return json error? return a 502?
|
||||
return Err(anyhow::anyhow!("no servers in sync"));
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// install global collector configured based on RUST_LOG env var.
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
info!("test info");
|
||||
warn!("test warn");
|
||||
trace!("test trace");
|
||||
|
||||
let cli_config: CliConfig = argh::from_env();
|
||||
|
||||
info!("Loading rpc config @ {}", cli_config.rpc_config_path);
|
||||
@ -391,14 +53,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// convert result into an http response. use this at the end of your warp filter
|
||||
/// TODO: using boxes can't be the best way. think about this more
|
||||
/// convert result into a jsonrpc error. use this at the end of your warp filter
|
||||
fn handle_anyhow_errors<T: warp::Reply>(
|
||||
res: anyhow::Result<T>,
|
||||
) -> warp::http::Response<warp::hyper::Body> {
|
||||
match res {
|
||||
Ok(r) => r.into_response(),
|
||||
Err(e) => warp::reply::with_status(
|
||||
// TODO: json error
|
||||
format!("{}", e),
|
||||
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user