dont fear locks and better serializing
This commit is contained in:
parent
d3859b463e
commit
cf4055e2b1
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -59,12 +59,6 @@ version = "1.0.57"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
|
||||
|
||||
[[package]]
|
||||
name = "argh"
|
||||
version = "0.1.7"
|
||||
@ -3851,7 +3845,6 @@ name = "web3-proxy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"argh",
|
||||
"derive_more",
|
||||
"ethers",
|
||||
|
@ -7,7 +7,6 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.57"
|
||||
arc-swap = "1.5.0"
|
||||
argh = "0.1.7"
|
||||
# axum = "*" # TODO: use this instead of warp?
|
||||
derive_more = "0.99.17"
|
||||
|
@ -133,13 +133,14 @@ impl Web3Connection {
|
||||
pub async fn new_heads(
|
||||
self: Arc<Self>,
|
||||
connections: Option<Arc<Web3Connections>>,
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("Watching new_heads on {}", self);
|
||||
|
||||
match &self.provider {
|
||||
Web3Provider::Http(provider) => {
|
||||
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||
// TODO: what should this interval be? probably some fraction of block time
|
||||
// TODO: what should this interval be? probably some fraction of block time. set automatically?
|
||||
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
|
||||
let mut interval = interval(Duration::from_secs(2));
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
@ -149,7 +150,6 @@ impl Web3Connection {
|
||||
// TODO: if error or rate limit, increase interval?
|
||||
interval.tick().await;
|
||||
|
||||
// rate limits
|
||||
let active_request_handle = self.wait_for_request_handle().await;
|
||||
|
||||
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
|
||||
@ -165,6 +165,14 @@ impl Web3Connection {
|
||||
if old_block_number != block_number {
|
||||
info!("new block on {}: {}", self, block_number);
|
||||
|
||||
// we don't care about this result.
|
||||
let _ = best_head_block_number.compare_exchange(
|
||||
old_block_number,
|
||||
block_number,
|
||||
atomic::Ordering::AcqRel,
|
||||
atomic::Ordering::Acquire,
|
||||
);
|
||||
|
||||
if let Some(connections) = &connections {
|
||||
connections.update_synced_rpcs(&self, block_number)?;
|
||||
}
|
||||
@ -193,8 +201,17 @@ impl Web3Connection {
|
||||
|
||||
info!("current block on {}: {}", self, block_number);
|
||||
|
||||
self.head_block_number
|
||||
.store(block_number, atomic::Ordering::Release);
|
||||
let old_block_number = self
|
||||
.head_block_number
|
||||
.swap(block_number, atomic::Ordering::Release);
|
||||
|
||||
// we don't care about this result
|
||||
let _ = best_head_block_number.compare_exchange(
|
||||
old_block_number,
|
||||
block_number,
|
||||
atomic::Ordering::AcqRel,
|
||||
atomic::Ordering::Acquire,
|
||||
);
|
||||
|
||||
if let Some(connections) = &connections {
|
||||
connections.update_synced_rpcs(&self, block_number)?;
|
||||
@ -209,6 +226,9 @@ impl Web3Connection {
|
||||
self.head_block_number
|
||||
.store(block_number, atomic::Ordering::Release);
|
||||
|
||||
// TODO: what ordering?
|
||||
best_head_block_number.fetch_max(block_number, atomic::Ordering::AcqRel);
|
||||
|
||||
info!("new block on {}: {}", self, block_number);
|
||||
|
||||
if let Some(connections) = &connections {
|
||||
@ -284,7 +304,7 @@ impl ActiveRequestHandle {
|
||||
self,
|
||||
method: &str,
|
||||
params: &serde_json::value::RawValue,
|
||||
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
|
||||
) -> Result<Box<RawValue>, ethers::prelude::ProviderError> {
|
||||
match &self.0.provider {
|
||||
Web3Provider::Http(provider) => provider.request(method, params).await,
|
||||
Web3Provider::Ws(provider) => provider.request(method, params).await,
|
||||
@ -347,11 +367,15 @@ impl fmt::Debug for JsonRpcRequest {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check for errors too!
|
||||
#[derive(Clone, Deserialize, Serialize)]
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct JsonRpcForwardedResponse {
|
||||
pub jsonrpc: String,
|
||||
pub id: Box<RawValue>,
|
||||
pub result: Box<RawValue>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub result: Option<Box<RawValue>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
// TODO: optional error
|
||||
}
|
||||
|
||||
impl fmt::Debug for JsonRpcForwardedResponse {
|
||||
|
@ -1,19 +1,20 @@
|
||||
///! Communicate with a group of web3 providers
|
||||
use arc_swap::ArcSwap;
|
||||
use derive_more::From;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use governor::clock::{QuantaClock, QuantaInstant};
|
||||
use governor::NotUntil;
|
||||
use hashbrown::HashMap;
|
||||
use parking_lot::RwLock;
|
||||
use serde_json::value::RawValue;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{self, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::config::Web3ConnectionConfig;
|
||||
use crate::connection::{ActiveRequestHandle, JsonRpcForwardedResponse, Web3Connection};
|
||||
use crate::connection::{ActiveRequestHandle, Web3Connection};
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct SyncedConnections {
|
||||
@ -44,8 +45,9 @@ impl SyncedConnections {
|
||||
pub struct Web3Connections {
|
||||
inner: Vec<Arc<Web3Connection>>,
|
||||
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
|
||||
/// TODO: arcswap was a lot faster, but i think we need a lock for proper logic
|
||||
synced_connections: ArcSwap<SyncedConnections>,
|
||||
/// TODO: we probably need a better lock on this
|
||||
synced_connections: RwLock<SyncedConnections>,
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3Connections {
|
||||
@ -59,7 +61,7 @@ impl fmt::Debug for Web3Connections {
|
||||
|
||||
impl Web3Connections {
|
||||
pub async fn try_new(
|
||||
// TODO: servers should be a Web3ConnectionBuilder struct
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
servers: Vec<Web3ConnectionConfig>,
|
||||
http_client: Option<reqwest::Client>,
|
||||
clock: &QuantaClock,
|
||||
@ -78,20 +80,25 @@ impl Web3Connections {
|
||||
// TODO: exit if no connections?
|
||||
|
||||
let connections = Arc::new(Self {
|
||||
best_head_block_number: best_head_block_number.clone(),
|
||||
inner: connections,
|
||||
synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))),
|
||||
synced_connections: RwLock::new(SyncedConnections::new(num_connections)),
|
||||
});
|
||||
|
||||
for connection in connections.inner.iter() {
|
||||
// subscribe to new heads in a spawned future
|
||||
// TODO: channel instead. then we can have one future with write access to a left-right
|
||||
// TODO: channel instead. then we can have one future with write access to a left-right?
|
||||
let connection = Arc::clone(connection);
|
||||
let connections = connections.clone();
|
||||
let best_head_block_number = best_head_block_number.clone();
|
||||
tokio::spawn(async move {
|
||||
let url = connection.url().to_string();
|
||||
|
||||
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
|
||||
if let Err(e) = connection.new_heads(Some(connections)).await {
|
||||
if let Err(e) = connection
|
||||
.new_heads(Some(connections), best_head_block_number)
|
||||
.await
|
||||
{
|
||||
warn!("new_heads error on {}: {:?}", url, e);
|
||||
}
|
||||
});
|
||||
@ -101,15 +108,15 @@ impl Web3Connections {
|
||||
}
|
||||
|
||||
pub fn head_block_number(&self) -> u64 {
|
||||
self.synced_connections.load().head_block_number
|
||||
self.best_head_block_number.load(atomic::Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub async fn try_send_request<'a>(
|
||||
pub async fn try_send_request(
|
||||
&self,
|
||||
connection_handle: ActiveRequestHandle,
|
||||
method: &str,
|
||||
params: &RawValue,
|
||||
) -> anyhow::Result<JsonRpcForwardedResponse> {
|
||||
) -> anyhow::Result<Box<RawValue>> {
|
||||
// connection.in_active_requests was called when this rpc was selected
|
||||
|
||||
let response = connection_handle.request(method, params).await;
|
||||
@ -124,7 +131,7 @@ impl Web3Connections {
|
||||
connections: Vec<ActiveRequestHandle>,
|
||||
method: String,
|
||||
params: Box<RawValue>,
|
||||
response_sender: flume::Sender<anyhow::Result<JsonRpcForwardedResponse>>,
|
||||
response_sender: flume::Sender<anyhow::Result<Box<RawValue>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut unordered_futures = FuturesUnordered::new();
|
||||
|
||||
@ -185,35 +192,38 @@ impl Web3Connections {
|
||||
rpc: &Arc<Web3Connection>,
|
||||
new_block: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: try a left_right instead of an ArcSwap.
|
||||
let synced_connections = self.synced_connections.load();
|
||||
let mut synced_connections = self.synced_connections.write();
|
||||
|
||||
// should we load new_block here?
|
||||
let current_block_number = synced_connections.head_block_number;
|
||||
|
||||
let mut new_synced_connections: SyncedConnections =
|
||||
match synced_connections.head_block_number.cmp(&new_block) {
|
||||
cmp::Ordering::Equal => {
|
||||
// this rpc is synced, but it isn't the first to this block
|
||||
(**synced_connections).to_owned()
|
||||
}
|
||||
cmp::Ordering::Less => {
|
||||
// this is a new head block. clear the current synced connections
|
||||
// TODO: this is too verbose with a bunch of tiers. include the tier
|
||||
// info!("new head block from {:?}: {}", rpc, new_block);
|
||||
let best_head_block = self.head_block_number();
|
||||
|
||||
let mut new_synced_connections = SyncedConnections::new(self.inner.len());
|
||||
match current_block_number.cmp(&best_head_block) {
|
||||
cmp::Ordering::Equal => {
|
||||
// this rpc tier is synced, and it isn't the first to this block
|
||||
}
|
||||
cmp::Ordering::Less => {}
|
||||
cmp::Ordering::Greater => {}
|
||||
}
|
||||
|
||||
// synced_connections.inner.clear();
|
||||
match current_block_number.cmp(&new_block) {
|
||||
cmp::Ordering::Equal => {
|
||||
// this rpc is synced, and it isn't the first to this block
|
||||
}
|
||||
cmp::Ordering::Less => {
|
||||
// this is a new head block. clear the current synced connections
|
||||
// TODO: this is too verbose with a bunch of tiers. include the tier
|
||||
// info!("new head block from {:?}: {}", rpc, new_block);
|
||||
|
||||
new_synced_connections.head_block_number = new_block;
|
||||
synced_connections.inner.clear();
|
||||
|
||||
new_synced_connections
|
||||
}
|
||||
cmp::Ordering::Greater => {
|
||||
// not the latest block. return now
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
synced_connections.head_block_number = new_block;
|
||||
}
|
||||
cmp::Ordering::Greater => {
|
||||
// not the latest block. return now
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let rpc_index = self
|
||||
.inner
|
||||
@ -221,10 +231,7 @@ impl Web3Connections {
|
||||
.position(|x| x.url() == rpc.url())
|
||||
.unwrap();
|
||||
|
||||
new_synced_connections.inner.push(rpc_index);
|
||||
|
||||
self.synced_connections
|
||||
.swap(Arc::new(new_synced_connections));
|
||||
synced_connections.inner.push(rpc_index);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -236,7 +243,7 @@ impl Web3Connections {
|
||||
let mut earliest_not_until = None;
|
||||
|
||||
// TODO: this clone is probably not the best way to do this
|
||||
let mut synced_rpc_indexes = self.synced_connections.load().inner.clone();
|
||||
let mut synced_rpc_indexes = self.synced_connections.read().inner.clone();
|
||||
|
||||
let cache: HashMap<usize, u32> = synced_rpc_indexes
|
||||
.iter()
|
||||
|
@ -3,12 +3,13 @@ mod connection;
|
||||
mod connections;
|
||||
|
||||
use config::Web3ConnectionConfig;
|
||||
use connection::JsonRpcForwardedResponse;
|
||||
use futures::future;
|
||||
use governor::clock::{Clock, QuantaClock};
|
||||
use linkedhashmap::LinkedHashMap;
|
||||
use serde_json::json;
|
||||
use std::fmt;
|
||||
use std::fs;
|
||||
use std::sync::atomic::{self, AtomicU64};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
@ -28,11 +29,17 @@ static APP_USER_AGENT: &str = concat!(
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
);
|
||||
|
||||
const RESPONSE_CACHE_CAP: usize = 128;
|
||||
// TODO: put this in config? what size should we do?
|
||||
const RESPONSE_CACHE_CAP: usize = 1024;
|
||||
|
||||
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
||||
type ResponseLruCache = RwLock<LinkedHashMap<(u64, String, String), JsonRpcForwardedResponse>>;
|
||||
|
||||
/// The application
|
||||
// TODO: this debug impl is way too verbose. make something smaller
|
||||
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
|
||||
pub struct Web3ProxyApp {
|
||||
best_head_block_number: Arc<AtomicU64>,
|
||||
/// clock used for rate limiting
|
||||
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
||||
clock: QuantaClock,
|
||||
@ -40,14 +47,18 @@ pub struct Web3ProxyApp {
|
||||
balanced_rpc_tiers: Vec<Arc<Web3Connections>>,
|
||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||
private_rpcs: Option<Arc<Web3Connections>>,
|
||||
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
||||
response_cache: RwLock<LinkedHashMap<(u64, String, String), serde_json::Value>>,
|
||||
response_cache: ResponseLruCache,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Web3ProxyApp {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
|
||||
f.debug_struct("Web3ProxyApp")
|
||||
.field(
|
||||
"best_head_block_number",
|
||||
&self.best_head_block_number.load(atomic::Ordering::Relaxed),
|
||||
)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,6 +69,8 @@ impl Web3ProxyApp {
|
||||
) -> anyhow::Result<Web3ProxyApp> {
|
||||
let clock = QuantaClock::default();
|
||||
|
||||
let best_head_block_number = Arc::new(AtomicU64::new(0));
|
||||
|
||||
// make a http shared client
|
||||
// TODO: how should we configure the connection pool?
|
||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||
@ -70,7 +83,12 @@ impl Web3ProxyApp {
|
||||
// TODO: attach context to this error
|
||||
let balanced_rpc_tiers =
|
||||
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
||||
Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock)
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
balanced_rpc_tier,
|
||||
Some(http_client.clone()),
|
||||
&clock,
|
||||
)
|
||||
}))
|
||||
.await
|
||||
.into_iter()
|
||||
@ -82,10 +100,19 @@ impl Web3ProxyApp {
|
||||
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
|
||||
None
|
||||
} else {
|
||||
Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?)
|
||||
Some(
|
||||
Web3Connections::try_new(
|
||||
best_head_block_number.clone(),
|
||||
private_rpcs,
|
||||
Some(http_client),
|
||||
&clock,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
};
|
||||
|
||||
Ok(Web3ProxyApp {
|
||||
best_head_block_number,
|
||||
clock,
|
||||
balanced_rpc_tiers,
|
||||
private_rpcs,
|
||||
@ -113,6 +140,7 @@ impl Web3ProxyApp {
|
||||
let method = json_body.method.clone();
|
||||
let params = json_body.params.clone();
|
||||
|
||||
// TODO: benchmark this compared to waiting on unbounded futures
|
||||
tokio::spawn(async move {
|
||||
connections
|
||||
.try_send_requests(upstream_servers, method, params, tx)
|
||||
@ -120,14 +148,16 @@ impl Web3ProxyApp {
|
||||
});
|
||||
|
||||
// wait for the first response
|
||||
let response = rx.recv_async().await?;
|
||||
let backend_response = rx.recv_async().await?;
|
||||
|
||||
if let Ok(partial_response) = response {
|
||||
let response = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": json_body.id,
|
||||
"result": partial_response
|
||||
});
|
||||
if let Ok(backend_response) = backend_response {
|
||||
// TODO: i think we
|
||||
let response = JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
result: Some(backend_response),
|
||||
error: None,
|
||||
};
|
||||
return Ok(warp::reply::json(&response));
|
||||
}
|
||||
}
|
||||
@ -181,17 +211,19 @@ impl Web3ProxyApp {
|
||||
// TODO: trace here was really slow with millions of requests.
|
||||
// info!("forwarding request from {}", upstream_server);
|
||||
|
||||
let response = json!({
|
||||
let response = JsonRpcForwardedResponse {
|
||||
// TODO: re-use their jsonrpc?
|
||||
"jsonrpc": "2.0",
|
||||
"id": json_body.id,
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
||||
"result": partial_response.result,
|
||||
});
|
||||
result: Some(partial_response),
|
||||
error: None,
|
||||
};
|
||||
|
||||
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
||||
let mut response_cache = self.response_cache.write().await;
|
||||
|
||||
// TODO: cache the warp::reply to save us serializing every time
|
||||
response_cache.insert(cache_key, response.clone());
|
||||
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
||||
response_cache.pop_front();
|
||||
@ -201,11 +233,12 @@ impl Web3ProxyApp {
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: what is the proper format for an error?
|
||||
json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": json_body.id,
|
||||
"error": format!("{}", e)
|
||||
})
|
||||
JsonRpcForwardedResponse {
|
||||
jsonrpc: "2.0".to_string(),
|
||||
id: json_body.id,
|
||||
result: None,
|
||||
error: Some(format!("{}", e)),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user