use Value instead of RawValue so we can inspect and modify the request

This commit is contained in:
Bryan Stitt 2022-07-08 19:01:11 +00:00
parent fabbf7a3e8
commit 6fe58bafb4
4 changed files with 16 additions and 13 deletions

@ -256,8 +256,9 @@ impl Web3ProxyApp {
// save the id so we can use it in the response
let id = payload.id.clone();
match payload.params.as_deref().unwrap().get() {
r#"["newHeads"]"# => {
// TODO: calling json! on every request is not fast.
match payload.params {
Some(x) if x == json!(["newHeads"]) => {
let head_block_receiver = self.head_block_receiver.clone();
let subscription_id = subscription_id.clone();
@ -291,7 +292,7 @@ impl Web3ProxyApp {
trace!(?subscription_id, "closed new heads subscription");
});
}
r#"["newPendingTransactions"]"# => {
Some(x) if x == json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let mut pending_tx_receiver = Abortable::new(
@ -331,7 +332,7 @@ impl Web3ProxyApp {
trace!(?subscription_id, "closed new heads subscription");
});
}
r#"["newPendingFullTransactions"]"# => {
Some(x) if x == json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
@ -375,7 +376,7 @@ impl Web3ProxyApp {
trace!(?subscription_id, "closed new heads subscription");
});
}
r#"["newPendingRawTransactions"]"# => {
Some(x) if x == json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
@ -403,7 +404,7 @@ impl Web3ProxyApp {
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});

@ -383,7 +383,10 @@ impl Web3Connection {
// wait for the interval
// TODO: if error or rate limit, increase interval?
http_interval_receiver.recv().await.unwrap();
while let Err(err) = http_interval_receiver.recv().await {
// querying the block was delayed. this can happen if tokio was busy.
warn!(?err, ?self, "http interval lagging!")
}
}
}
Web3Provider::Ws(provider) => {

@ -359,7 +359,7 @@ impl Web3Connections {
active_request_handles: Vec<ActiveRequestHandle>,
method: &str,
// TODO: remove this box once i figure out how to do the options
params: Option<&RawValue>,
params: Option<&serde_json::Value>,
) -> Result<Box<RawValue>, ProviderError> {
// TODO: if only 1 active_request_handles, do self.try_send_request
@ -779,7 +779,7 @@ impl Web3Connections {
.try_send_parallel_requests(
active_request_handles,
request.method.as_ref(),
request.params.as_deref(),
request.params.as_ref(),
)
.await?;

@ -12,8 +12,7 @@ pub struct JsonRpcRequest {
// pub jsonrpc: Box<RawValue>,
pub id: Box<RawValue>,
pub method: String,
// TODO: should we have the default of [] here instead?
pub params: Option<Box<RawValue>>,
pub params: Option<serde_json::Value>,
}
impl fmt::Debug for JsonRpcRequest {
@ -114,8 +113,8 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
let params: Option<Box<RawValue>> = match params {
None => Some(RawValue::from_string("[]".to_string()).unwrap()),
let params: Option<serde_json::Value> = match params {
None => Some(serde_json::Value::Array(vec![])),
Some(x) => Some(x),
};