block watching vs subscribing

This commit is contained in:
Bryan Stitt 2022-04-25 20:26:54 +00:00
parent b16fbf090e
commit f8ff0370d2
3 changed files with 50 additions and 27 deletions

@ -1,18 +1,27 @@
/// subscribe to a websocket rpc
use ethers::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let ws = Ws::connect("ws://10.11.12.16:8545").await?;
// let ws = Ws::connect("ws://10.11.12.16:8946").await?;
let provider = Provider::new(ws).interval(Duration::from_secs(1));
let mut stream = provider.subscribe_blocks().await?.take(5);
// erigon
let url = "ws://10.11.12.16:8545";
// geth
// let url = "ws://10.11.12.16:8946";
println!("Subscribing to blocks from {}", url);
let provider = Ws::connect(url).await?;
let provider = Provider::new(provider).interval(Duration::from_secs(1));
let mut stream = provider.subscribe_blocks().await?.take(3);
while let Some(block) = stream.next().await {
println!(
"Ts: {:?}, block number: {} -> {:?}",
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
block.hash.unwrap()
);
}

30
examples/watch_blocks.rs Normal file

@ -0,0 +1,30 @@
/// poll an http rpc
use ethers::prelude::*;
use std::{str::FromStr, time::Duration};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// erigon does not support most filters
// let url = "http://10.11.12.16:8545";
// geth
let url = "http://10.11.12.16:8945";
println!("Watching blocks from {:?}", url);
let provider = Http::from_str(url)?;
let provider = Provider::new(provider).interval(Duration::from_secs(1));
let mut stream = provider.watch_blocks().await?.take(3);
while let Some(block_number) = stream.next().await {
let block = provider.get_block(block_number).await?.unwrap();
println!(
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
);
}
Ok(())
}

@ -17,6 +17,7 @@ use tokio_tungstenite::{connect_async, tungstenite};
use warp::Filter;
type RateLimiterMap = DashMap<String, RpcRateLimiter>;
// TODO: include the ethers client on this map
type ConnectionsMap = DashMap<String, u32>;
type RpcRateLimiter =
@ -27,25 +28,6 @@ struct BalancedRpcs {
rpcs: RwLock<Vec<String>>,
connections: ConnectionsMap,
ratelimits: RateLimiterMap,
new_heads_handles: Vec<AbortHandle>,
}
impl Drop for BalancedRpcs {
fn drop(&mut self) {
for handle in self.new_heads_handles.iter() {
handle.abort();
}
}
}
async fn handle_new_head_message(message: tungstenite::Message) -> anyhow::Result<()> {
// TODO: move this to a "handle_new_head_message" function so that we can use the ? helper
let data: serde_json::Value = serde_json::from_str(message.to_text().unwrap()).unwrap();
// TODO: parse the message as json and get out the block data. then update a map for this rpc
println!("now what? {:?}", data);
Ok(())
}
impl BalancedRpcs {
@ -56,6 +38,9 @@ impl BalancedRpcs {
for (s, limit) in servers.into_iter() {
rpcs.push(s.to_string());
// TODO: subscribe to new_heads. if websocket, this is easy. otherwise we
connections.insert(s.to_string(), 0);
if limit > 0 {
@ -67,7 +52,6 @@ impl BalancedRpcs {
}
}
// TODO: subscribe to new_heads
let new_heads_handles = rpcs
.clone()
.into_iter()
@ -481,7 +465,7 @@ async fn main() {
let state = Web3ProxyState::new(
vec![
// local nodes
vec![("https://10.11.12.16:8545", 0)],
vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)],
// paid nodes
// TODO: add paid nodes (with rate limits)
// free nodes