getting closer

This commit is contained in:
Bryan Stitt 2022-06-14 05:43:28 +00:00
parent 2e559f3063
commit 1bef6756eb
5 changed files with 203 additions and 109 deletions

@ -6,10 +6,11 @@ use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use axum::extract::ws::Message;
use dashmap::DashMap;
use ethers::prelude::TransactionReceipt;
use ethers::prelude::Transaction;
use ethers::prelude::{Block, TxHash, H256};
use futures::future::Abortable;
use futures::future::{join_all, AbortHandle};
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
@ -19,6 +20,7 @@ use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
@ -40,6 +42,16 @@ type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
}
}
/// The application
// TODO: this debug impl is way too verbose. make something smaller
// TODO: if Web3ProxyApp is always in an Arc, i think we can avoid having at least some of these internal things in arcs
@ -53,7 +65,7 @@ pub struct Web3ProxyApp {
// don't drop this or the sender will stop working
head_block_receiver: watch::Receiver<Block<TxHash>>,
// TODO: i think we want a TxState enum for Confirmed(TxHash, BlockHash) or Pending(TxHash) or Orphan(TxHash, BlockHash)
pending_tx_receipt_receiver: flume::Receiver<TransactionReceipt>,
pending_tx_receiver: flume::Receiver<Transaction>,
next_subscription_id: AtomicUsize,
}
@ -70,7 +82,9 @@ impl Web3ProxyApp {
redis_address: Option<String>,
balanced_rpcs: Vec<Web3ConnectionConfig>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Arc<Web3ProxyApp>> {
) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
let mut handles = FuturesUnordered::new();
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
@ -100,35 +114,41 @@ impl Web3ProxyApp {
// TODO: subscribe to pending transactions on the private rpcs, too?
let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
let (pending_tx_receipt_sender, pending_tx_receipt_receiver) = flume::unbounded();
let (pending_tx_sender, pending_tx_receiver) = flume::unbounded();
// TODO: attach context to this error
let balanced_rpcs = Web3Connections::spawn(
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
chain_id,
balanced_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
Some(head_block_sender),
Some(pending_tx_receipt_sender),
Some(pending_tx_sender),
)
.await?;
handles.push(balanced_handle);
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
balanced_rpcs.clone()
} else {
// TODO: attach context to this error
Web3Connections::spawn(
let (private_rpcs, private_handle) = Web3Connections::spawn(
chain_id,
private_rpcs,
http_client.as_ref(),
rate_limiter.as_ref(),
// subscribing to new heads here won't work well
None,
// TODO: subscribe to pending transactions on the private rpcs, too?
// TODO: subscribe to pending transactions on the private rpcs?
None,
)
.await?
.await?;
handles.push(private_handle);
private_rpcs
};
let app = Web3ProxyApp {
@ -137,13 +157,26 @@ impl Web3ProxyApp {
incoming_requests: Default::default(),
response_cache: Default::default(),
head_block_receiver,
pending_tx_receipt_receiver,
pending_tx_receiver,
next_subscription_id: 1.into(),
};
let app = Arc::new(app);
Ok(app)
// create a handle that returns on the first error
let handle = tokio::spawn(async move {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(())) => {}
}
}
Ok(())
});
Ok((app, handle))
}
pub async fn eth_subscribe(

@ -1,11 +1,12 @@
use crate::app::AnyhowJoinHandle;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
use argh::FromArgs;
use ethers::prelude::{Block, TxHash};
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::Arc;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
#[derive(Debug, FromArgs)]
/// Web3-proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
pub struct CliConfig {
@ -47,7 +48,7 @@ pub struct Web3ConnectionConfig {
impl RpcConfig {
/// Create a Web3ProxyApp from config
// #[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn try_build(self) -> anyhow::Result<Arc<Web3ProxyApp>> {
pub async fn spawn(self) -> anyhow::Result<(Arc<Web3ProxyApp>, AnyhowJoinHandle<()>)> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
let private_rpcs = if let Some(private_rpcs) = self.private_rpcs {
@ -69,13 +70,16 @@ impl RpcConfig {
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
pub async fn try_build(
pub async fn spawn(
self,
redis_conn: Option<&redis_cell_client::MultiplexedConnection>,
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
chain_id: usize,
http_client: Option<&reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
let hard_rate_limit = self.hard_limit.map(|x| (x, redis_conn.unwrap()));
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Web3Connection>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_rate_limit = self.hard_limit.map(|x| (x, rate_limiter.unwrap()));
Web3Connection::spawn(
chain_id,
@ -83,6 +87,9 @@ impl Web3ConnectionConfig {
http_client,
hard_rate_limit,
self.soft_limit,
block_sender,
tx_id_sender,
reconnect,
)
.await
}

@ -9,10 +9,13 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use tokio::task;
use tokio::time::{interval, sleep, Duration, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
use tracing::{error, info, instrument, trace, warn};
use crate::app::AnyhowJoinHandle;
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
@ -117,7 +120,8 @@ impl fmt::Display for Web3Connection {
impl Web3Connection {
/// Connect to a web3 rpc
#[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: usize,
url_str: String,
@ -126,7 +130,10 @@ impl Web3Connection {
hard_limit: Option<(u32, &redis_cell_client::MultiplexedConnection)>,
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Arc<Web3Connection>> {
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
// TODO: allow different max_burst and count_per_period and period
let period = 1;
@ -172,7 +179,7 @@ impl Web3Connection {
found_chain_id
));
} else {
info!("Successful connection");
info!(?connection, "success");
}
}
Err(e) => {
@ -181,7 +188,16 @@ impl Web3Connection {
}
}
Ok(connection)
let handle = {
let connection = connection.clone();
tokio::spawn(async move {
connection
.subscribe(block_sender, tx_id_sender, reconnect)
.await
})
};
Ok((connection, handle))
}
#[instrument(skip_all)]
@ -243,50 +259,59 @@ impl Web3Connection {
Ok(())
}
pub async fn subscribe(
async fn subscribe(
self: Arc<Self>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
block_sender: Option<flume::Sender<(Block<TxHash>, Arc<Self>)>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
// TODO: make these abortable so that if one fails the other can be cancelled?
match (block_sender, tx_id_sender) {
(None, None) => {
// TODO: is there a better way to make a channel that is never ready?
let (tx, rx) = oneshot::channel::<()>();
rx.await?;
drop(tx);
}
(Some(block_sender), Some(tx_id_sender)) => {
// TODO: make these abortable so that if one fails the other can be cancelled?
loop {
let new_heads = {
let clone = self.clone();
let block_sender = block_sender.clone();
let new_heads = {
let clone = self.clone();
let block_sender = block_sender.clone();
clone.subscribe_new_heads(block_sender)
};
clone.subscribe_new_heads(block_sender)
};
let pending_txs = {
let clone = self.clone();
let tx_id_sender = tx_id_sender.clone();
let pending_txs = {
let clone = self.clone();
let tx_id_sender = tx_id_sender.clone();
clone.subscribe_pending_transactions(tx_id_sender)
};
clone.subscribe_pending_transactions(tx_id_sender)
};
match tokio::try_join!(new_heads, pending_txs) {
Ok(_) => break,
Err(err) => {
if reconnect {
// TODO: exponential backoff
// TODO: share code with new heads subscription
warn!(
"subscription exited. Attempting to reconnect in 1 second. {:?}", err
);
sleep(Duration::from_secs(1)).await;
tokio::select! {
_ = new_heads => {
info!(?self, "new heads subscription completed");
}
_ = pending_txs => {
info!(?self, "pending transactions subscription completed");
// TODO: loop on reconnecting! do not return with a "?" here
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(&block_sender).await?;
} else {
error!("subscription exited. {:?}", err);
break;
}
}
};
}
}
if reconnect {
// TODO: exponential backoff
// TODO: share code with new heads subscription
warn!("pending transactions subscription exited. Attempting to reconnect in 1 second...");
sleep(Duration::from_secs(1)).await;
// TODO: loop on reconnecting! do not return with a "?" here
// TODO: this isn't going to work. it will get in a loop with newHeads
self.reconnect(&block_sender).await?;
} else {
break;
}
_ => panic!(),
}
Ok(())
@ -299,7 +324,7 @@ impl Web3Connection {
self: Arc<Self>,
block_sender: flume::Sender<(Block<TxHash>, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
info!("watching {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
@ -390,7 +415,7 @@ impl Web3Connection {
self: Arc<Self>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
info!("watching pending transactions on {}", self);
info!("watching {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
@ -400,7 +425,7 @@ impl Web3Connection {
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
let mut interval = interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: create a filter
@ -410,6 +435,8 @@ impl Web3Connection {
// TODO: if error or rate limit, increase interval?
interval.tick().await;
// TODO: actually do something here
/*
match self.try_request_handle().await {
Ok(active_request_handle) => {
// TODO: check the filter
@ -419,6 +446,7 @@ impl Web3Connection {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
*/
}
}
Web3Provider::Ws(provider) => {
@ -469,7 +497,7 @@ impl Web3Connection {
}
}
// TODO: what should we do?
// TODO: what should we do? panic isn't ever what we want
panic!("no request handle after 10 tries");
}

@ -2,7 +2,7 @@
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, TransactionReceipt, TxHash, H256};
use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
@ -17,9 +17,9 @@ use std::time::Duration;
use tokio::sync::watch;
use tokio::task;
use tokio::time::sleep;
use tracing::Instrument;
use tracing::{debug, info, info_span, instrument, trace, warn};
use crate::app::AnyhowJoinHandle;
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
@ -82,22 +82,38 @@ impl Web3Connections {
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn(
chain_id: usize,
servers: Vec<Web3ConnectionConfig>,
server_configs: Vec<Web3ConnectionConfig>,
http_client: Option<&reqwest::Client>,
rate_limiter: Option<&redis_cell_client::MultiplexedConnection>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_receipt_sender: Option<flume::Sender<TransactionReceipt>>,
) -> anyhow::Result<Arc<Self>> {
let num_connections = servers.len();
pending_tx_sender: Option<flume::Sender<Transaction>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let num_connections = server_configs.len();
let handles = FuturesUnordered::new();
// TODO: only create these if head_block_sender and pending_tx_sender are set
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded();
// turn configs into connections
let mut connections = Vec::with_capacity(num_connections);
for server_config in servers.into_iter() {
for server_config in server_configs.into_iter() {
match server_config
.try_build(rate_limiter, chain_id, http_client)
.spawn(
rate_limiter,
chain_id,
http_client,
Some(block_sender.clone()),
Some(pending_tx_id_sender.clone()),
true,
)
.await
{
Ok(connection) => connections.push(connection),
Ok((connection, connection_handle)) => {
handles.push(connection_handle);
connections.push(connection)
}
Err(e) => warn!("Unable to connect to a server! {:?}", e),
}
}
@ -119,57 +135,50 @@ impl Web3Connections {
tokio::spawn(async move {
connections
.subscribe(head_block_sender, pending_tx_receipt_sender)
.subscribe(
pending_tx_id_sender,
pending_tx_id_receiver,
block_receiver,
head_block_sender,
pending_tx_sender,
)
.await
})
};
Ok(connections)
Ok((connections, handle))
}
/// subscribe to all the backend rpcs
async fn subscribe(
self: Arc<Self>,
pending_tx_id_sender: flume::Sender<(TxHash, Arc<Web3Connection>)>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Block<TxHash>, Arc<Web3Connection>)>,
head_block_sender: Option<watch::Sender<Block<TxHash>>>,
pending_tx_receipt_sender: Option<flume::Sender<TransactionReceipt>>,
pending_tx_sender: Option<flume::Sender<Transaction>>,
) -> anyhow::Result<()> {
let mut futures = FuturesUnordered::new();
// subscribe to pending transactions
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded();
// one future subscribes to pendingTransactions on all the rpcs. it sends them through the funnel
// TODO: do this only when someone is subscribed. otherwise this will be way too many queries
for (rpc_id, connection) in self.inner.iter().cloned().enumerate() {
let pending_tx_id_sender = pending_tx_id_sender.clone();
let block_sender = block_sender.clone();
let handle = tokio::spawn(async move {
// loop to automatically reconnect
// TODO: make this cancellable?
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper span
connection
.subscribe(block_sender, pending_tx_id_sender, true)
.instrument(tracing::info_span!("rpc", ?rpc_id))
.await
});
futures.push(handle);
}
// the next future subscribes to the transaction funnel
// setup the transaction funnel
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
{
if let Some(pending_tx_sender) = pending_tx_sender {
// TODO: do something with the handle so we can catch any errors
let handle = task::spawn(async move {
while let Ok((pending_transaction_id, rpc)) =
pending_tx_id_receiver.recv_async().await
{
unimplemented!("de-dedup the pending txid")
let request_handle = rpc.wait_for_request_handle().await;
let pending_transaction: Transaction = request_handle
.request("eth_getTransactionByHash", (pending_transaction_id,))
.await?;
// unimplemented!("de-dedup the pending txid");
pending_tx_sender.send_async(pending_transaction).await?;
}
Ok(())
@ -178,8 +187,7 @@ impl Web3Connections {
futures.push(handle);
}
// the next future subscribes to the block funnel
// setup the block funnel
if let Some(head_block_sender) = head_block_sender {
let connections = Arc::clone(&self);
let handle = task::Builder::default()
@ -193,10 +201,17 @@ impl Web3Connections {
futures.push(handle);
}
if futures.is_empty() {
// no transaction or block subscriptions.
unimplemented!("every second, check that the provider is still connected");
}
if let Some(Err(e)) = futures.next().await {
return Err(e.into());
}
info!("subscriptions over: {:?}", self);
Ok(())
}

@ -7,18 +7,17 @@ mod connections;
mod frontend;
mod jsonrpc;
use crate::app::{flatten_handle, Web3ProxyApp};
use crate::config::{CliConfig, RpcConfig};
use parking_lot::deadlock;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
use std::thread;
use std::time::Duration;
use tokio::runtime;
use tracing::{info, trace};
use tracing::{error, info, trace};
use tracing_subscriber::EnvFilter;
use crate::app::Web3ProxyApp;
use crate::config::{CliConfig, RpcConfig};
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
@ -83,8 +82,20 @@ fn main() -> anyhow::Result<()> {
// spawn the root task
rt.block_on(async {
let app = rpc_config.try_build().await?;
let (app, app_handle) = rpc_config.spawn().await?;
frontend::run(cli_config.port, app).await
let frontend_handle = tokio::spawn(frontend::run(cli_config.port, app));
match tokio::try_join!(flatten_handle(app_handle), flatten_handle(frontend_handle)) {
Ok(_) => {
// do something with the values
info!("app completed")
}
Err(err) => {
error!(?err, "app failed");
}
}
Ok(())
})
}