new heads subscription works

This commit is contained in:
Bryan Stitt 2022-06-05 22:39:44 +00:00
parent 3abfeaab0b
commit 2243cacf70
5 changed files with 15 additions and 12 deletions

View File

@ -158,13 +158,12 @@ impl Web3ProxyApp {
let f = { let f = {
let subscription_id = subscription_id.clone(); let subscription_id = subscription_id.clone();
let id = payload.id;
let params = payload.params.as_deref().unwrap().get(); let params = payload.params.as_deref().unwrap().get();
if params == r#"["newHeads"]"# { if params == r#"["newHeads"]"# {
let head_block_receiver = self.head_block_receiver.clone(); let head_block_receiver = self.head_block_receiver.clone();
info!("received new heads subscription"); trace!(?subscription_id, "new heads subscription");
async move { async move {
let mut head_block_receiver = Abortable::new( let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver), WatchStream::new(head_block_receiver),
@ -172,9 +171,8 @@ impl Web3ProxyApp {
); );
while let Some(new_head) = head_block_receiver.next().await { while let Some(new_head) = head_block_receiver.next().await {
// TODO: make a struct for this? using JsonRpcForwardedResponse // TODO: make a struct for this? using JsonRpcForwardedResponse won't work because it needs an id
let msg = json!({ let msg = json!({
"id": id,
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method":"eth_subscription", "method":"eth_subscription",
"params": { "params": {
@ -185,8 +183,13 @@ impl Web3ProxyApp {
let msg = Message::Text(serde_json::to_string(&msg).unwrap()); let msg = Message::Text(serde_json::to_string(&msg).unwrap());
subscription_tx.send_async(msg).await.unwrap(); if subscription_tx.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
} }
trace!(?subscription_id, "closed new heads subscription");
} }
} else { } else {
return Err(anyhow::anyhow!("unimplemented")); return Err(anyhow::anyhow!("unimplemented"));

View File

@ -13,7 +13,7 @@ pub struct CliConfig {
#[argh(option, default = "8544")] #[argh(option, default = "8544")]
pub port: u16, pub port: u16,
/// number of worker threads /// number of worker threads. Defaults to the number of logical processors
#[argh(option, default = "0")] #[argh(option, default = "0")]
pub workers: usize, pub workers: usize,

View File

@ -12,12 +12,12 @@ pub async fn handler_404() -> impl IntoResponse {
/// handle errors by converting them into something that implements `IntoResponse` /// handle errors by converting them into something that implements `IntoResponse`
/// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work /// TODO: use this. i can't get https://docs.rs/axum/latest/axum/error_handling/index.html to work
/// TODO: i think we want a custom result type instead. put the anyhow result inside. then `impl IntoResponse for CustomResult`
pub async fn handle_anyhow_error( pub async fn handle_anyhow_error(
err: anyhow::Error, err: anyhow::Error,
code: Option<StatusCode>, code: Option<StatusCode>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// TODO: what id can we use? how do we make sure the incoming id gets attached to this? let id = RawValue::from_string("null".to_string()).unwrap();
let id = RawValue::from_string("0".to_string()).unwrap();
let err = JsonRpcForwardedResponse::from_anyhow_error(err, id); let err = JsonRpcForwardedResponse::from_anyhow_error(err, id);
@ -27,5 +27,3 @@ pub async fn handle_anyhow_error(
(code, Json(err)) (code, Json(err))
} }
// i think we want a custom result type. it has an anyhow result inside. if it impl IntoResponse I think we'll get this for free

View File

@ -83,7 +83,7 @@ async fn handle_socket_payload(
} }
Err(err) => { Err(err) => {
// TODO: what should this id be? // TODO: what should this id be?
let id = RawValue::from_string("0".to_string()).unwrap(); let id = RawValue::from_string("null".to_string()).unwrap();
(id, Err(err.into())) (id, Err(err.into()))
} }
}; };

View File

@ -193,7 +193,9 @@ impl JsonRpcForwardedResponse {
pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self { pub fn from_string(partial_response: String, id: Box<RawValue>) -> Self {
trace!("partial_response: {}", partial_response); trace!("partial_response: {}", partial_response);
let partial_response = RawValue::from_string(partial_response).unwrap(); // TODO: anyhow result on this
// TODO: proper escaping. this feels wrong. probably refactor to not need this at all
let partial_response = RawValue::from_string(format!(r#""{}""#, partial_response)).unwrap();
Self::from_response(partial_response, id) Self::from_response(partial_response, id)
} }