diff --git a/src/main.rs b/src/main.rs index 74e2aa0b..8dac33ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -56,6 +56,49 @@ impl EthersProvider { Self::Ws(provider) => provider.request(method, params).await, } } + + pub async fn new_heads(&self, url: String, blocks: Arc) -> anyhow::Result<()> { + // TODO: automatically reconnect + match &self { + EthersProvider::Http(provider) => { + let mut stream = provider.watch_blocks().await?; + while let Some(block_number) = stream.next().await { + let block = provider.get_block(block_number).await?.unwrap(); + + println!( + "{:?} = {} Ts: {:?}, block number: {}", + block.hash.unwrap(), + url, + block.timestamp, + block.number.unwrap(), + ); + + let mut blocks = blocks.write().await; + + blocks.insert(url.clone(), block); + } + } + EthersProvider::Ws(provider) => { + let mut stream = provider.subscribe_blocks().await?; + while let Some(block) = stream.next().await { + // TODO: save the block into a dashmap on + println!( + "{:?} = {} Ts: {:?}, block number: {}", + block.hash.unwrap(), + url, + block.timestamp, + block.number.unwrap(), + ); + + let mut blocks = blocks.write().await; + + blocks.insert(url.clone(), block); + } + } + } + + Ok(()) + } } struct EthersConnection { @@ -66,13 +109,13 @@ struct EthersConnection { impl EthersConnection { async fn try_new( - url_str: &str, + url_str: String, http_client: Option, blocks: Arc, ) -> anyhow::Result { // TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.try_into()?; + let url: url::Url = url_str.parse()?; let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; @@ -83,7 +126,7 @@ impl EthersConnection { .interval(Duration::from_secs(1)) .into() } else if url_str.starts_with("ws") { - let provider = ethers::providers::Ws::connect(url_str.to_string()).await?; + let provider = ethers::providers::Ws::connect(url_str.clone()).await?; // TODO: make sure this survives disconnects @@ -95,44 +138,16 @@ impl EthersConnection { return Err(anyhow::anyhow!("only http and ws servers are supported")); }; - match &provider { - EthersProvider::Http(provider) => { - let mut stream = provider.watch_blocks().await?.take(3); - while let Some(block_number) = stream.next().await { - let block = provider.get_block(block_number).await?.unwrap(); + let provider = Arc::new(provider); - println!( - "{:?} = Ts: {:?}, block number: {}", - block.hash.unwrap(), - block.timestamp, - block.number.unwrap(), - ); - - let mut blocks = blocks.write().await; - - blocks.insert(url_str.to_string(), block); - } - } - EthersProvider::Ws(provider) => { - let mut stream = provider.subscribe_blocks().await?; - while let Some(block) = stream.next().await { - // TODO: save the block into a dashmap on - println!( - "{:?} = Ts: {:?}, block number: {}", - block.hash.unwrap(), - block.timestamp, - block.number.unwrap(), - ); - } - } - } - - // TODO: subscribe to new_heads - // TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use them for the interval + // subscribe to new heads in a spawned future + // TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check? + let provider_clone: Arc = Arc::clone(&provider); + tokio::spawn(async move { provider_clone.new_heads(url_str, blocks).await }); Ok(EthersConnection { active_requests: 0, - provider: Arc::new(provider), + provider, }) } @@ -195,7 +210,8 @@ impl RpcTier { rpcs.push(s.to_string()); let connection = - EthersConnection::try_new(s, http_client.clone(), blocks.clone()).await?; + EthersConnection::try_new(s.to_string(), http_client.clone(), blocks.clone()) + .await?; connections.insert(s.to_string(), connection);