refactor
This commit is contained in:
parent
4d3b851b2c
commit
1af30a46d3
85
src/block_watcher.rs
Normal file
85
src/block_watcher.rs
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
use ethers::prelude::{Block, TxHash};
|
||||||
|
use governor::clock::{Clock, QuantaClock, QuantaInstant};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
// TODO: what type for the Item? String url works, but i don't love it
|
||||||
|
// TODO: think about boxing this
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum BlockWatcherItem {
|
||||||
|
NewHead((String, Block<TxHash>)),
|
||||||
|
SubscribeHttp(String),
|
||||||
|
Interval,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type BlockWatcherSender = mpsc::UnboundedSender<BlockWatcherItem>;
|
||||||
|
pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<BlockWatcherItem>;
|
||||||
|
|
||||||
|
pub struct BlockWatcher {
|
||||||
|
clock: QuantaClock,
|
||||||
|
receiver: BlockWatcherReceiver,
|
||||||
|
last_update: QuantaInstant,
|
||||||
|
/// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map
|
||||||
|
blocks: HashMap<String, Block<TxHash>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockWatcher {
|
||||||
|
pub fn new(clock: QuantaClock) -> (BlockWatcher, BlockWatcherSender) {
|
||||||
|
// TODO: this also needs to return a reader for blocks
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let last_update = clock.now();
|
||||||
|
|
||||||
|
let watcher = Self {
|
||||||
|
clock,
|
||||||
|
last_update,
|
||||||
|
receiver,
|
||||||
|
blocks: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
(watcher, sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) -> anyhow::Result<()> {
|
||||||
|
// TODO:
|
||||||
|
|
||||||
|
while let Some(x) = self.receiver.recv().await {
|
||||||
|
match x {
|
||||||
|
BlockWatcherItem::Interval => {
|
||||||
|
// TODO: we got an interval. if we haven't updated the blocks recently,
|
||||||
|
}
|
||||||
|
BlockWatcherItem::NewHead((rpc, block)) => {
|
||||||
|
info!(
|
||||||
|
"{:?} = {} Ts: {:?}, block number: {}",
|
||||||
|
block.hash.unwrap(),
|
||||||
|
rpc,
|
||||||
|
block.timestamp,
|
||||||
|
block.number.unwrap(),
|
||||||
|
);
|
||||||
|
self.blocks.insert(rpc, block);
|
||||||
|
|
||||||
|
self.last_update = self.clock.now();
|
||||||
|
}
|
||||||
|
BlockWatcherItem::SubscribeHttp(rpc) => {
|
||||||
|
warn!("subscribing to {} is not yet supported", rpc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
pub async fn save_block(
|
||||||
|
&self,
|
||||||
|
url: String,
|
||||||
|
block_watcher_sender: BlockWatcherSender,
|
||||||
|
block: Block<TxHash>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// TODO: include the block age (compared to local time) in this, too
|
||||||
|
// TODO: use tracing properly
|
||||||
|
|
||||||
|
}
|
||||||
|
*/
|
295
src/main.rs
295
src/main.rs
@ -1,24 +1,26 @@
|
|||||||
// TODO: don't use RwLock<HashMap>. i think we need a concurrent hashmap or we will hit all sorts of deadlocks
|
// TODO: don't use RwLock<HashMap>. i think we need a concurrent hashmap or we will hit all sorts of deadlocks
|
||||||
|
mod block_watcher;
|
||||||
|
mod provider;
|
||||||
|
|
||||||
use derive_more::From;
|
|
||||||
use ethers::prelude::{Block, TxHash};
|
|
||||||
use ethers::providers::Middleware;
|
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::StreamExt;
|
|
||||||
use governor::clock::{Clock, QuantaClock, QuantaInstant};
|
use governor::clock::{Clock, QuantaClock, QuantaInstant};
|
||||||
use governor::middleware::NoOpMiddleware;
|
use governor::middleware::NoOpMiddleware;
|
||||||
use governor::state::{InMemoryState, NotKeyed};
|
use governor::state::{InMemoryState, NotKeyed};
|
||||||
use governor::{NotUntil, RateLimiter};
|
use governor::NotUntil;
|
||||||
use std::cmp::Ordering;
|
use governor::RateLimiter;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::{mpsc, RwLock};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tracing::info;
|
use tracing::log::warn;
|
||||||
use warp::Filter;
|
use warp::Filter;
|
||||||
|
|
||||||
|
// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap};
|
||||||
|
use crate::block_watcher::{BlockWatcher, BlockWatcherSender};
|
||||||
|
use crate::provider::Web3Connection;
|
||||||
|
|
||||||
static APP_USER_AGENT: &str = concat!(
|
static APP_USER_AGENT: &str = concat!(
|
||||||
"satoshiandkin/",
|
"satoshiandkin/",
|
||||||
env!("CARGO_PKG_NAME"),
|
env!("CARGO_PKG_NAME"),
|
||||||
@ -29,173 +31,24 @@ static APP_USER_AGENT: &str = concat!(
|
|||||||
type RpcRateLimiter =
|
type RpcRateLimiter =
|
||||||
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
|
||||||
|
|
||||||
type BlockMap = RwLock<HashMap<String, Block<TxHash>>>;
|
type RpcRateLimiterMap = RwLock<HashMap<String, RpcRateLimiter>>;
|
||||||
type RateLimiterMap = RwLock<HashMap<String, RpcRateLimiter>>;
|
type ConnectionsMap = RwLock<HashMap<String, Web3Connection>>;
|
||||||
// TODO: include the ethers client on this map
|
|
||||||
type ConnectionsMap = RwLock<HashMap<String, EthersConnection>>;
|
|
||||||
|
|
||||||
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
|
|
||||||
#[derive(From)]
|
|
||||||
enum EthersProvider {
|
|
||||||
Http(ethers::providers::Provider<ethers::providers::Http>),
|
|
||||||
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Forward functions to the inner ethers::providers::Provider
|
|
||||||
impl EthersProvider {
|
|
||||||
/// Send a web3 request
|
|
||||||
pub async fn request(
|
|
||||||
&self,
|
|
||||||
method: &str,
|
|
||||||
params: serde_json::Value,
|
|
||||||
) -> Result<serde_json::Value, ethers::prelude::ProviderError> {
|
|
||||||
match self {
|
|
||||||
Self::Http(provider) => provider.request(method, params).await,
|
|
||||||
Self::Ws(provider) => provider.request(method, params).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Subscribe to new block heads
|
|
||||||
pub async fn new_heads(&self, url: String, blocks: Arc<BlockMap>) -> anyhow::Result<()> {
|
|
||||||
// TODO: automatically reconnect
|
|
||||||
match &self {
|
|
||||||
EthersProvider::Http(provider) => {
|
|
||||||
let mut stream = provider.watch_blocks().await?;
|
|
||||||
while let Some(block_number) = stream.next().await {
|
|
||||||
let block = provider.get_block(block_number).await?.unwrap();
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"{:?} = {} Ts: {:?}, block number: {}",
|
|
||||||
block.hash.unwrap(),
|
|
||||||
url,
|
|
||||||
block.timestamp,
|
|
||||||
block.number.unwrap(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut blocks = blocks.write().await;
|
|
||||||
|
|
||||||
blocks.insert(url.clone(), block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EthersProvider::Ws(provider) => {
|
|
||||||
let mut stream = provider.subscribe_blocks().await?;
|
|
||||||
while let Some(block) = stream.next().await {
|
|
||||||
// TODO: save the block into a dashmap on
|
|
||||||
println!(
|
|
||||||
"{:?} = {} Ts: {:?}, block number: {}",
|
|
||||||
block.hash.unwrap(),
|
|
||||||
url,
|
|
||||||
block.timestamp,
|
|
||||||
block.number.unwrap(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut blocks = blocks.write().await;
|
|
||||||
|
|
||||||
blocks.insert(url.clone(), block);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An active connection to a Web3Rpc
|
|
||||||
struct EthersConnection {
|
|
||||||
/// keep track of currently open requests. We sort on this
|
|
||||||
active_requests: u32,
|
|
||||||
provider: Arc<EthersProvider>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EthersConnection {
|
|
||||||
/// Connect to a web3 rpc and subscribe to new heads
|
|
||||||
async fn try_new(
|
|
||||||
url_str: String,
|
|
||||||
http_client: Option<reqwest::Client>,
|
|
||||||
blocks: Arc<BlockMap>,
|
|
||||||
) -> anyhow::Result<EthersConnection> {
|
|
||||||
// TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task
|
|
||||||
let provider = if url_str.starts_with("http") {
|
|
||||||
let url: url::Url = url_str.parse()?;
|
|
||||||
|
|
||||||
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
|
||||||
|
|
||||||
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
|
||||||
|
|
||||||
// TODO: dry this up
|
|
||||||
ethers::providers::Provider::new(provider)
|
|
||||||
.interval(Duration::from_secs(1))
|
|
||||||
.into()
|
|
||||||
} else if url_str.starts_with("ws") {
|
|
||||||
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
|
|
||||||
|
|
||||||
// TODO: make sure this survives disconnects
|
|
||||||
|
|
||||||
// TODO: dry this up
|
|
||||||
ethers::providers::Provider::new(provider)
|
|
||||||
.interval(Duration::from_secs(1))
|
|
||||||
.into()
|
|
||||||
} else {
|
|
||||||
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
|
||||||
};
|
|
||||||
|
|
||||||
let provider = Arc::new(provider);
|
|
||||||
|
|
||||||
// subscribe to new heads in a spawned future
|
|
||||||
// TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check?
|
|
||||||
let provider_clone: Arc<EthersProvider> = Arc::clone(&provider);
|
|
||||||
tokio::spawn(async move { provider_clone.new_heads(url_str, blocks).await });
|
|
||||||
|
|
||||||
Ok(EthersConnection {
|
|
||||||
active_requests: 0,
|
|
||||||
provider,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inc_active_requests(&mut self) {
|
|
||||||
self.active_requests += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn dec_active_requests(&mut self) {
|
|
||||||
self.active_requests -= 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Eq for EthersConnection {}
|
|
||||||
|
|
||||||
impl Ord for EthersConnection {
|
|
||||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
||||||
self.active_requests.cmp(&other.active_requests)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialOrd for EthersConnection {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
||||||
Some(self.cmp(other))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
|
|
||||||
impl PartialEq for EthersConnection {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.active_requests == other.active_requests
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Load balance to the rpc
|
/// Load balance to the rpc
|
||||||
|
/// TODO: i'm not sure about having 3 locks here. can we share them?
|
||||||
struct RpcTier {
|
struct RpcTier {
|
||||||
/// RPC urls sorted by
|
/// RPC urls sorted by active requests
|
||||||
/// TODO: what type?
|
/// TODO: what type for the rpc?
|
||||||
rpcs: RwLock<Vec<String>>,
|
rpcs: RwLock<Vec<String>>,
|
||||||
connections: Arc<ConnectionsMap>,
|
connections: Arc<ConnectionsMap>,
|
||||||
ratelimits: RateLimiterMap,
|
ratelimits: RpcRateLimiterMap,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcTier {
|
impl RpcTier {
|
||||||
async fn try_new(
|
async fn try_new(
|
||||||
servers: Vec<(&str, u32)>,
|
servers: Vec<(&str, u32)>,
|
||||||
http_client: Option<reqwest::Client>,
|
http_client: Option<reqwest::Client>,
|
||||||
blocks: Arc<BlockMap>,
|
block_watcher_sender: BlockWatcherSender,
|
||||||
clock: &QuantaClock,
|
clock: &QuantaClock,
|
||||||
) -> anyhow::Result<RpcTier> {
|
) -> anyhow::Result<RpcTier> {
|
||||||
let mut rpcs: Vec<String> = vec![];
|
let mut rpcs: Vec<String> = vec![];
|
||||||
@ -205,8 +58,11 @@ impl RpcTier {
|
|||||||
for (s, limit) in servers.into_iter() {
|
for (s, limit) in servers.into_iter() {
|
||||||
rpcs.push(s.to_string());
|
rpcs.push(s.to_string());
|
||||||
|
|
||||||
let connection =
|
let connection = Web3Connection::try_new(
|
||||||
EthersConnection::try_new(s.to_string(), http_client.clone(), blocks.clone())
|
s.to_string(),
|
||||||
|
http_client.clone(),
|
||||||
|
block_watcher_sender.clone(),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
connections.insert(s.to_string(), connection);
|
connections.insert(s.to_string(), connection);
|
||||||
@ -220,65 +76,6 @@ impl RpcTier {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
let new_heads_handles = rpcs
|
|
||||||
.clone()
|
|
||||||
.into_iter()
|
|
||||||
.map(|rpc| {
|
|
||||||
// start the subscription inside an abort handler. this way, dropping this BalancedRpcs will close these connections
|
|
||||||
let (abort_handle, abort_registration) = AbortHandle::new_pair();
|
|
||||||
|
|
||||||
tokio::spawn(Abortable::new(
|
|
||||||
async move {
|
|
||||||
// replace "http" at the start with "ws"
|
|
||||||
// TODO: this is fragile. some nodes use different ports, too. use proper config
|
|
||||||
// TODO: maybe we should use this websocket for more than just the new heads subscription. we could send all our requests over it (but would need to modify ids)
|
|
||||||
let re = Regex::new("^http").expect("bad regex");
|
|
||||||
let ws_rpc = re.replace(&rpc, "ws");
|
|
||||||
|
|
||||||
// TODO: if websocket not supported, use polling?
|
|
||||||
let ws_rpc = url::Url::parse(&ws_rpc).expect("invalid websocket url");
|
|
||||||
|
|
||||||
// loop so that if it disconnects, we reconnect
|
|
||||||
loop {
|
|
||||||
match connect_async(&ws_rpc).await {
|
|
||||||
Ok((ws_stream, _)) => {
|
|
||||||
let (mut write, mut read) = ws_stream.split();
|
|
||||||
|
|
||||||
// TODO: send eth_subscribe New Heads
|
|
||||||
if (write.send(tungstenite::Message::Text("{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}".to_string())).await).is_ok() {
|
|
||||||
if let Some(Ok(_first)) = read.next().await {
|
|
||||||
// TODO: what should we do with the first message?
|
|
||||||
|
|
||||||
while let Some(Ok(message)) = read.next().await {
|
|
||||||
if let Err(e) = handle_new_head_message(message).await {
|
|
||||||
eprintln!("error handling new head message @ {}: {}", ws_rpc, e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// no more messages or we got an error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// TODO: proper logging
|
|
||||||
eprintln!("error connecting to websocket @ {}: {}", ws_rpc, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: log that we are going to reconnectto ws_rpc in 1 second
|
|
||||||
// TODO: how long should we wait? exponential backoff?
|
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
abort_registration,
|
|
||||||
));
|
|
||||||
|
|
||||||
abort_handle
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
*/
|
|
||||||
|
|
||||||
Ok(RpcTier {
|
Ok(RpcTier {
|
||||||
rpcs: RwLock::new(rpcs),
|
rpcs: RwLock::new(rpcs),
|
||||||
connections: Arc::new(RwLock::new(connections)),
|
connections: Arc::new(RwLock::new(connections)),
|
||||||
@ -408,8 +205,8 @@ impl RpcTier {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Application state
|
/// The application
|
||||||
struct Web3ProxyState {
|
struct Web3ProxyApp {
|
||||||
/// clock used for rate limiting
|
/// clock used for rate limiting
|
||||||
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
/// TODO: use tokio's clock (will require a different ratelimiting crate)
|
||||||
clock: QuantaClock,
|
clock: QuantaClock,
|
||||||
@ -422,28 +219,32 @@ struct Web3ProxyState {
|
|||||||
private_rpcs_ratelimiter_lock: RwLock<()>,
|
private_rpcs_ratelimiter_lock: RwLock<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3ProxyState {
|
impl Web3ProxyApp {
|
||||||
async fn try_new(
|
async fn try_new(
|
||||||
balanced_rpc_tiers: Vec<Vec<(&str, u32)>>,
|
balanced_rpc_tiers: Vec<Vec<(&str, u32)>>,
|
||||||
private_rpcs: Vec<(&str, u32)>,
|
private_rpcs: Vec<(&str, u32)>,
|
||||||
) -> anyhow::Result<Web3ProxyState> {
|
) -> anyhow::Result<Web3ProxyApp> {
|
||||||
let clock = QuantaClock::default();
|
let clock = QuantaClock::default();
|
||||||
|
|
||||||
let blocks = Arc::new(BlockMap::default());
|
let (mut block_watcher, block_watcher_sender) = BlockWatcher::new(clock.clone());
|
||||||
|
|
||||||
|
// make a http shared client
|
||||||
|
// TODO: how should we configure the connection pool?
|
||||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something
|
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something
|
||||||
let http_client = reqwest::ClientBuilder::new()
|
let http_client = reqwest::ClientBuilder::new()
|
||||||
.timeout(Duration::from_secs(300))
|
.timeout(Duration::from_secs(300))
|
||||||
.user_agent(APP_USER_AGENT)
|
.user_agent(APP_USER_AGENT)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
// TODO: i'm sure we s
|
// start the block_watcher
|
||||||
|
tokio::spawn(async move { block_watcher.run().await });
|
||||||
|
|
||||||
let balanced_rpc_tiers = Arc::new(
|
let balanced_rpc_tiers = Arc::new(
|
||||||
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
|
||||||
RpcTier::try_new(
|
RpcTier::try_new(
|
||||||
balanced_rpc_tier,
|
balanced_rpc_tier,
|
||||||
Some(http_client.clone()),
|
Some(http_client.clone()),
|
||||||
blocks.clone(),
|
block_watcher_sender.clone(),
|
||||||
&clock,
|
&clock,
|
||||||
)
|
)
|
||||||
}))
|
}))
|
||||||
@ -456,12 +257,18 @@ impl Web3ProxyState {
|
|||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(Arc::new(
|
Some(Arc::new(
|
||||||
RpcTier::try_new(private_rpcs, Some(http_client), blocks.clone(), &clock).await?,
|
RpcTier::try_new(
|
||||||
|
private_rpcs,
|
||||||
|
Some(http_client),
|
||||||
|
block_watcher_sender,
|
||||||
|
&clock,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
))
|
))
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: warn if no private relays
|
// TODO: warn if no private relays
|
||||||
Ok(Web3ProxyState {
|
Ok(Web3ProxyApp {
|
||||||
clock,
|
clock,
|
||||||
balanced_rpc_tiers,
|
balanced_rpc_tiers,
|
||||||
private_rpcs,
|
private_rpcs,
|
||||||
@ -473,7 +280,7 @@ impl Web3ProxyState {
|
|||||||
/// send the request to the approriate RPCs
|
/// send the request to the approriate RPCs
|
||||||
/// TODO: dry this up
|
/// TODO: dry this up
|
||||||
async fn proxy_web3_rpc(
|
async fn proxy_web3_rpc(
|
||||||
self: Arc<Web3ProxyState>,
|
self: Arc<Web3ProxyApp>,
|
||||||
json_body: serde_json::Value,
|
json_body: serde_json::Value,
|
||||||
) -> anyhow::Result<impl warp::Reply> {
|
) -> anyhow::Result<impl warp::Reply> {
|
||||||
let eth_send_raw_transaction =
|
let eth_send_raw_transaction =
|
||||||
@ -635,7 +442,7 @@ impl Web3ProxyState {
|
|||||||
|
|
||||||
async move {
|
async move {
|
||||||
// get the client for this rpc server
|
// get the client for this rpc server
|
||||||
let provider = connections.read().await.get(&rpc).unwrap().provider.clone();
|
let provider = connections.read().await.get(&rpc).unwrap().clone_provider();
|
||||||
|
|
||||||
let response = provider.request(&method, params).await;
|
let response = provider.request(&method, params).await;
|
||||||
|
|
||||||
@ -648,6 +455,7 @@ impl Web3ProxyState {
|
|||||||
|
|
||||||
let mut response = response?;
|
let mut response = response?;
|
||||||
|
|
||||||
|
// replace the id with what we originally received
|
||||||
if let Some(response_id) = response.get_mut("id") {
|
if let Some(response_id) = response.get_mut("id") {
|
||||||
*response_id = incoming_id;
|
*response_id = incoming_id;
|
||||||
}
|
}
|
||||||
@ -669,24 +477,25 @@ impl Web3ProxyState {
|
|||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// TODO: better errors
|
// TODO: better errors
|
||||||
eprintln!("Got a tokio::JoinError: {}", e);
|
warn!("Got an error sending request: {}", e);
|
||||||
errs.push(anyhow::anyhow!("Got a tokio::JoinError"));
|
errs.push(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the first error (if any)
|
||||||
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
|
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
|
||||||
Err(errs.pop().unwrap())
|
Err(errs.pop().unwrap())
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow::anyhow!("no successful responses"))
|
Err(anyhow::anyhow!("no successful responses"))
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: think about this more. we want to send it
|
// send the error to the channel
|
||||||
if tx.send(e).is_ok() {
|
if tx.send(e).is_ok() {
|
||||||
// if we were able to send an error, then we never sent a success
|
// if we were able to send an error, then we never sent a success
|
||||||
return Err(anyhow::anyhow!("no successful responses"));
|
return Err(anyhow::anyhow!("no successful responses"));
|
||||||
} else {
|
} else {
|
||||||
// sending the error failed. the other side must be closed (which means we sent a success)
|
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -697,15 +506,13 @@ async fn main() {
|
|||||||
// install global collector configured based on RUST_LOG env var.
|
// install global collector configured based on RUST_LOG env var.
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
info!("starting");
|
|
||||||
|
|
||||||
// TODO: load the config from yaml instead of hard coding
|
// TODO: load the config from yaml instead of hard coding
|
||||||
// TODO: support multiple chains in one process. then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
|
// TODO: support multiple chains in one process. then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
|
||||||
// TODO: i kind of want to make use of caddy's load balancing and health checking and such though
|
// TODO: i kind of want to make use of caddy's load balancing and health checking and such though
|
||||||
let listen_port = 8445;
|
let listen_port = 8445;
|
||||||
|
|
||||||
// TODO: be smart about about using archive nodes?
|
// TODO: be smart about about using archive nodes?
|
||||||
let state = Web3ProxyState::try_new(
|
let state = Web3ProxyApp::try_new(
|
||||||
vec![
|
vec![
|
||||||
// local nodes
|
// local nodes
|
||||||
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)],
|
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)],
|
||||||
@ -726,7 +533,7 @@ async fn main() {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let state: Arc<Web3ProxyState> = Arc::new(state);
|
let state: Arc<Web3ProxyApp> = Arc::new(state);
|
||||||
|
|
||||||
let proxy_rpc_filter = warp::any()
|
let proxy_rpc_filter = warp::any()
|
||||||
.and(warp::post())
|
.and(warp::post())
|
||||||
@ -734,8 +541,6 @@ async fn main() {
|
|||||||
.then(move |json_body| state.clone().proxy_web3_rpc(json_body))
|
.then(move |json_body| state.clone().proxy_web3_rpc(json_body))
|
||||||
.map(handle_anyhow_errors);
|
.map(handle_anyhow_errors);
|
||||||
|
|
||||||
println!("Listening on 0.0.0.0:{}", listen_port);
|
|
||||||
|
|
||||||
warp::serve(proxy_rpc_filter)
|
warp::serve(proxy_rpc_filter)
|
||||||
.run(([0, 0, 0, 0], listen_port))
|
.run(([0, 0, 0, 0], listen_port))
|
||||||
.await;
|
.await;
|
||||||
|
163
src/provider.rs
Normal file
163
src/provider.rs
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
use derive_more::From;
|
||||||
|
use ethers::prelude::Middleware;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
use crate::block_watcher::{BlockWatcherItem, BlockWatcherSender};
|
||||||
|
|
||||||
|
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
|
||||||
|
#[derive(From)]
|
||||||
|
pub enum Web3Provider {
|
||||||
|
Http(ethers::providers::Provider<ethers::providers::Http>),
|
||||||
|
Ws(ethers::providers::Provider<ethers::providers::Ws>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Forward functions to the inner ethers::providers::Provider
|
||||||
|
impl Web3Provider {
|
||||||
|
/// Send a web3 request
|
||||||
|
pub async fn request(
|
||||||
|
&self,
|
||||||
|
method: &str,
|
||||||
|
params: serde_json::Value,
|
||||||
|
) -> Result<serde_json::Value, ethers::prelude::ProviderError> {
|
||||||
|
match self {
|
||||||
|
Self::Http(provider) => provider.request(method, params).await,
|
||||||
|
Self::Ws(provider) => provider.request(method, params).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe to new blocks
|
||||||
|
pub async fn new_heads(
|
||||||
|
&self,
|
||||||
|
url: String,
|
||||||
|
block_watcher_sender: BlockWatcherSender,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
info!("Watching new_heads from {}", url);
|
||||||
|
|
||||||
|
// TODO: automatically reconnect
|
||||||
|
match &self {
|
||||||
|
Web3Provider::Http(_provider) => {
|
||||||
|
/*
|
||||||
|
// TODO: not all providers have this. we need to write our interval checking
|
||||||
|
let mut stream = provider.watch_blocks().await?;
|
||||||
|
while let Some(block_number) = stream.next().await {
|
||||||
|
let block = provider.get_block(block_number).await?.expect("no block");
|
||||||
|
block_watcher_sender
|
||||||
|
.send(Some((url.clone(), block)))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
block_watcher_sender
|
||||||
|
.send(BlockWatcherItem::SubscribeHttp(url.clone()))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
Web3Provider::Ws(provider) => {
|
||||||
|
let mut stream = provider.subscribe_blocks().await?;
|
||||||
|
while let Some(block) = stream.next().await {
|
||||||
|
block_watcher_sender
|
||||||
|
.send(BlockWatcherItem::NewHead((url.clone(), block)))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Done watching new_heads from {}", url);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An active connection to a Web3Rpc
|
||||||
|
pub struct Web3Connection {
|
||||||
|
/// keep track of currently open requests. We sort on this
|
||||||
|
active_requests: u32,
|
||||||
|
provider: Arc<Web3Provider>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Web3Connection {
|
||||||
|
pub fn clone_provider(&self) -> Arc<Web3Provider> {
|
||||||
|
self.provider.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Connect to a web3 rpc and subscribe to new heads
|
||||||
|
pub async fn try_new(
|
||||||
|
url_str: String,
|
||||||
|
http_client: Option<reqwest::Client>,
|
||||||
|
block_watcher_sender: BlockWatcherSender,
|
||||||
|
) -> anyhow::Result<Web3Connection> {
|
||||||
|
// TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task
|
||||||
|
let provider = if url_str.starts_with("http") {
|
||||||
|
let url: url::Url = url_str.parse()?;
|
||||||
|
|
||||||
|
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
||||||
|
|
||||||
|
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
||||||
|
|
||||||
|
// TODO: dry this up
|
||||||
|
ethers::providers::Provider::new(provider)
|
||||||
|
.interval(Duration::from_secs(1))
|
||||||
|
.into()
|
||||||
|
} else if url_str.starts_with("ws") {
|
||||||
|
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
|
||||||
|
|
||||||
|
// TODO: make sure this survives disconnects
|
||||||
|
|
||||||
|
// TODO: dry this up
|
||||||
|
ethers::providers::Provider::new(provider)
|
||||||
|
.interval(Duration::from_secs(1))
|
||||||
|
.into()
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("only http and ws servers are supported"));
|
||||||
|
};
|
||||||
|
|
||||||
|
let provider = Arc::new(provider);
|
||||||
|
|
||||||
|
// subscribe to new heads in a spawned future
|
||||||
|
// TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check?
|
||||||
|
let provider_clone: Arc<Web3Provider> = Arc::clone(&provider);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Err(e) = provider_clone
|
||||||
|
.new_heads(url_str.clone(), block_watcher_sender.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!("new_heads error for {}: {:?}", url_str, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Web3Connection {
|
||||||
|
active_requests: 0,
|
||||||
|
provider,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inc_active_requests(&mut self) {
|
||||||
|
self.active_requests += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dec_active_requests(&mut self) {
|
||||||
|
self.active_requests -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for Web3Connection {}
|
||||||
|
|
||||||
|
impl Ord for Web3Connection {
|
||||||
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
|
self.active_requests.cmp(&other.active_requests)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialOrd for Web3Connection {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||||
|
Some(self.cmp(other))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
|
||||||
|
impl PartialEq for Web3Connection {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.active_requests == other.active_requests
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user