subscribe to new heads

This commit is contained in:
Bryan Stitt 2022-04-26 16:45:47 +00:00
parent 0f2dc169bd
commit 131a265756

@ -56,6 +56,49 @@ impl EthersProvider {
Self::Ws(provider) => provider.request(method, params).await,
}
}
pub async fn new_heads(&self, url: String, blocks: Arc<BlockMap>) -> anyhow::Result<()> {
// TODO: automatically reconnect
match &self {
EthersProvider::Http(provider) => {
let mut stream = provider.watch_blocks().await?;
while let Some(block_number) = stream.next().await {
let block = provider.get_block(block_number).await?.unwrap();
println!(
"{:?} = {} Ts: {:?}, block number: {}",
block.hash.unwrap(),
url,
block.timestamp,
block.number.unwrap(),
);
let mut blocks = blocks.write().await;
blocks.insert(url.clone(), block);
}
}
EthersProvider::Ws(provider) => {
let mut stream = provider.subscribe_blocks().await?;
while let Some(block) = stream.next().await {
// TODO: save the block into a dashmap on
println!(
"{:?} = {} Ts: {:?}, block number: {}",
block.hash.unwrap(),
url,
block.timestamp,
block.number.unwrap(),
);
let mut blocks = blocks.write().await;
blocks.insert(url.clone(), block);
}
}
}
Ok(())
}
}
struct EthersConnection {
@ -66,13 +109,13 @@ struct EthersConnection {
impl EthersConnection {
async fn try_new(
url_str: &str,
url_str: String,
http_client: Option<reqwest::Client>,
blocks: Arc<BlockMap>,
) -> anyhow::Result<EthersConnection> {
// TODO: create an ethers-rs rpc client and subscribe/watch new heads in a spawned task
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.try_into()?;
let url: url::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
@ -83,7 +126,7 @@ impl EthersConnection {
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
let provider = ethers::providers::Ws::connect(url_str.to_string()).await?;
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
// TODO: make sure this survives disconnects
@ -95,44 +138,16 @@ impl EthersConnection {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
match &provider {
EthersProvider::Http(provider) => {
let mut stream = provider.watch_blocks().await?.take(3);
while let Some(block_number) = stream.next().await {
let block = provider.get_block(block_number).await?.unwrap();
let provider = Arc::new(provider);
println!(
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
);
let mut blocks = blocks.write().await;
blocks.insert(url_str.to_string(), block);
}
}
EthersProvider::Ws(provider) => {
let mut stream = provider.subscribe_blocks().await?;
while let Some(block) = stream.next().await {
// TODO: save the block into a dashmap on
println!(
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
);
}
}
}
// TODO: subscribe to new_heads
// TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use them for the interval
// subscribe to new heads in a spawned future
// TODO: if http, maybe we should check them all on the same interval. and if there is at least one websocket, use that message to start check?
let provider_clone: Arc<EthersProvider> = Arc::clone(&provider);
tokio::spawn(async move { provider_clone.new_heads(url_str, blocks).await });
Ok(EthersConnection {
active_requests: 0,
provider: Arc::new(provider),
provider,
})
}
@ -195,7 +210,8 @@ impl RpcTier {
rpcs.push(s.to_string());
let connection =
EthersConnection::try_new(s, http_client.clone(), blocks.clone()).await?;
EthersConnection::try_new(s.to_string(), http_client.clone(), blocks.clone())
.await?;
connections.insert(s.to_string(), connection);