diff --git a/examples/subscribe_blocks.rs b/examples/subscribe_blocks.rs index 78a1106d..b4602212 100644 --- a/examples/subscribe_blocks.rs +++ b/examples/subscribe_blocks.rs @@ -1,18 +1,27 @@ +/// subscribe to a websocket rpc use ethers::prelude::*; use std::time::Duration; #[tokio::main] async fn main() -> anyhow::Result<()> { - let ws = Ws::connect("ws://10.11.12.16:8545").await?; - // let ws = Ws::connect("ws://10.11.12.16:8946").await?; - let provider = Provider::new(ws).interval(Duration::from_secs(1)); - let mut stream = provider.subscribe_blocks().await?.take(5); + // erigon + let url = "ws://10.11.12.16:8545"; + // geth + // let url = "ws://10.11.12.16:8946"; + + println!("Subscribing to blocks from {}", url); + + let provider = Ws::connect(url).await?; + + let provider = Provider::new(provider).interval(Duration::from_secs(1)); + + let mut stream = provider.subscribe_blocks().await?.take(3); while let Some(block) = stream.next().await { println!( - "Ts: {:?}, block number: {} -> {:?}", + "{:?} = Ts: {:?}, block number: {}", + block.hash.unwrap(), block.timestamp, block.number.unwrap(), - block.hash.unwrap() ); } diff --git a/examples/watch_blocks.rs b/examples/watch_blocks.rs new file mode 100644 index 00000000..7fa3ffe2 --- /dev/null +++ b/examples/watch_blocks.rs @@ -0,0 +1,30 @@ +/// poll an http rpc +use ethers::prelude::*; +use std::{str::FromStr, time::Duration}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // erigon does not support most filters + // let url = "http://10.11.12.16:8545"; + // geth + let url = "http://10.11.12.16:8945"; + + println!("Watching blocks from {:?}", url); + + let provider = Http::from_str(url)?; + + let provider = Provider::new(provider).interval(Duration::from_secs(1)); + + let mut stream = provider.watch_blocks().await?.take(3); + while let Some(block_number) = stream.next().await { + let block = provider.get_block(block_number).await?.unwrap(); + println!( + "{:?} = Ts: {:?}, block number: {}", + block.hash.unwrap(), + block.timestamp, + block.number.unwrap(), + ); + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 2be77710..3949b260 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use tokio_tungstenite::{connect_async, tungstenite}; use warp::Filter; type RateLimiterMap = DashMap; +// TODO: include the ethers client on this map type ConnectionsMap = DashMap; type RpcRateLimiter = @@ -27,25 +28,6 @@ struct BalancedRpcs { rpcs: RwLock>, connections: ConnectionsMap, ratelimits: RateLimiterMap, - new_heads_handles: Vec, -} - -impl Drop for BalancedRpcs { - fn drop(&mut self) { - for handle in self.new_heads_handles.iter() { - handle.abort(); - } - } -} - -async fn handle_new_head_message(message: tungstenite::Message) -> anyhow::Result<()> { - // TODO: move this to a "handle_new_head_message" function so that we can use the ? helper - let data: serde_json::Value = serde_json::from_str(message.to_text().unwrap()).unwrap(); - - // TODO: parse the message as json and get out the block data. then update a map for this rpc - println!("now what? {:?}", data); - - Ok(()) } impl BalancedRpcs { @@ -56,6 +38,9 @@ impl BalancedRpcs { for (s, limit) in servers.into_iter() { rpcs.push(s.to_string()); + + // TODO: subscribe to new_heads. if websocket, this is easy. otherwise we + connections.insert(s.to_string(), 0); if limit > 0 { @@ -67,7 +52,6 @@ impl BalancedRpcs { } } - // TODO: subscribe to new_heads let new_heads_handles = rpcs .clone() .into_iter() @@ -481,7 +465,7 @@ async fn main() { let state = Web3ProxyState::new( vec![ // local nodes - vec![("https://10.11.12.16:8545", 0)], + vec![("ws://10.11.12.16:8545", 0), ("ws://10.11.12.16:8946", 0)], // paid nodes // TODO: add paid nodes (with rate limits) // free nodes