we need to clone :'(

This commit is contained in:
Bryan Stitt 2022-09-22 21:51:52 +00:00
parent 2ed2408ed3
commit 3854312674
8 changed files with 98 additions and 88 deletions

View File

@ -410,7 +410,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>( pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>, self: &'a Arc<Self>,
authorized_request: Arc<AuthorizedRequest>, authorization: Arc<AuthorizedRequest>,
payload: JsonRpcRequest, payload: JsonRpcRequest,
subscription_count: &'a AtomicUsize, subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
@ -604,7 +604,7 @@ impl Web3ProxyApp {
/// send the request or batch of requests to the approriate RPCs /// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>, authorization: &Arc<AuthorizedRequest>,
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level // TODO: this should probably be trace level
@ -619,14 +619,14 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout( timeout(
max_time, max_time,
self.proxy_web3_rpc_request(authorized_request, request), self.proxy_web3_rpc_request(authorization, request),
) )
.await??, .await??,
), ),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout( timeout(
max_time, max_time,
self.proxy_web3_rpc_requests(authorized_request, requests), self.proxy_web3_rpc_requests(authorization, requests),
) )
.await??, .await??,
), ),
@ -642,7 +642,7 @@ impl Web3ProxyApp {
/// TODO: make sure this isn't a problem /// TODO: make sure this isn't a problem
async fn proxy_web3_rpc_requests( async fn proxy_web3_rpc_requests(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>, authorization: &Arc<AuthorizedRequest>,
requests: Vec<JsonRpcRequest>, requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> { ) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly // TODO: we should probably change ethers-rs to support this directly
@ -650,7 +650,7 @@ impl Web3ProxyApp {
let responses = join_all( let responses = join_all(
requests requests
.into_iter() .into_iter()
.map(|request| self.proxy_web3_rpc_request(authorized_request, request)) .map(|request| self.proxy_web3_rpc_request(authorization, request))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
) )
.await; .await;
@ -678,7 +678,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
async fn proxy_web3_rpc_request( async fn proxy_web3_rpc_request(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>, authorization: &Arc<AuthorizedRequest>,
mut request: JsonRpcRequest, mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request); trace!("Received request: {:?}", request);
@ -812,7 +812,7 @@ impl Web3ProxyApp {
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs return rpcs
.try_send_all_upstream_servers(Some(authorized_request), request, None) .try_send_all_upstream_servers(Some(authorization), request, None)
.await; .await;
} }
"eth_syncing" => { "eth_syncing" => {
@ -913,7 +913,7 @@ impl Web3ProxyApp {
let mut response = self let mut response = self
.balanced_rpcs .balanced_rpcs
.try_send_best_upstream_server( .try_send_best_upstream_server(
Some(authorized_request), Some(authorization),
request, request,
Some(&request_block_id.num), Some(&request_block_id.num),
) )

View File

@ -19,16 +19,16 @@ pub async fn public_proxy_web3_rpc(
) -> FrontendResult { ) -> FrontendResult {
let request_span = error_span!("request", %ip, ?referer, ?user_agent); let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let authorized_request = ip_is_authorized(&app, ip) let authorization = ip_is_authorized(&app, ip)
.instrument(request_span.clone()) .instrument(request_span.clone())
.await?; .await?;
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request); let authorization = Arc::new(authorization);
let f = tokio::spawn(async move { let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload) app.proxy_web3_rpc(&authorization, payload)
.instrument(request_span) .instrument(request_span)
.await .await
}); });
@ -49,7 +49,7 @@ pub async fn user_proxy_web3_rpc(
let request_span = error_span!("request", %ip, ?referer, ?user_agent); let request_span = error_span!("request", %ip, ?referer, ?user_agent);
// TODO: this should probably return the user_key_id instead? or maybe both? // TODO: this should probably return the user_key_id instead? or maybe both?
let authorized_request = key_is_authorized( let authorization = key_is_authorized(
&app, &app,
user_key, user_key,
ip, ip,
@ -59,12 +59,12 @@ pub async fn user_proxy_web3_rpc(
.instrument(request_span.clone()) .instrument(request_span.clone())
.await?; .await?;
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request); let authorization = Arc::new(authorization);
let f = tokio::spawn(async move { let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload) app.proxy_web3_rpc(&authorization, payload)
.instrument(request_span) .instrument(request_span)
.await .await
}); });

View File

@ -33,16 +33,16 @@ pub async fn public_websocket_handler(
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult { ) -> FrontendResult {
let authorized_request = ip_is_authorized(&app, ip).await?; let authorization = ip_is_authorized(&app, ip).await?;
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request); let authorization = Arc::new(authorization);
match ws_upgrade { match ws_upgrade {
Some(ws) => Ok(ws Some(ws) => Ok(ws
.on_upgrade(|socket| { .on_upgrade(|socket| {
proxy_web3_socket(app, authorized_request, socket).instrument(request_span) proxy_web3_socket(app, authorization, socket).instrument(request_span)
}) })
.into_response()), .into_response()),
None => { None => {
@ -61,7 +61,7 @@ pub async fn user_websocket_handler(
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult { ) -> FrontendResult {
let authorized_request = key_is_authorized( let authorization = key_is_authorized(
&app, &app,
user_key, user_key,
ip, ip,
@ -71,13 +71,13 @@ pub async fn user_websocket_handler(
.await?; .await?;
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorization);
let authorized_request = Arc::new(authorized_request); let authorization = Arc::new(authorization);
match ws_upgrade { match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
proxy_web3_socket(app, authorized_request, socket).instrument(request_span) proxy_web3_socket(app, authorization, socket).instrument(request_span)
})), })),
None => { None => {
// TODO: store this on the app and use register_template? // TODO: store this on the app and use register_template?
@ -88,7 +88,7 @@ pub async fn user_websocket_handler(
let user_url = reg let user_url = reg
.render_template( .render_template(
&app.config.redirect_user_url, &app.config.redirect_user_url,
&json!({ "authorized_request": authorized_request }), &json!({ "authorization": authorization }),
) )
.unwrap(); .unwrap();
@ -100,7 +100,7 @@ pub async fn user_websocket_handler(
async fn proxy_web3_socket( async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorization: Arc<AuthorizedRequest>,
socket: WebSocket, socket: WebSocket,
) { ) {
// split the websocket so we can read and write concurrently // split the websocket so we can read and write concurrently
@ -110,18 +110,13 @@ async fn proxy_web3_socket(
let (response_sender, response_receiver) = flume::unbounded::<Message>(); let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx)); tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket( tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
app,
authorized_request,
ws_rx,
response_sender,
));
} }
/// websockets support a few more methods than http clients /// websockets support a few more methods than http clients
async fn handle_socket_payload( async fn handle_socket_payload(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorization: Arc<AuthorizedRequest>,
payload: &str, payload: &str,
response_sender: &flume::Sender<Message>, response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize, subscription_count: &AtomicUsize,
@ -140,7 +135,7 @@ async fn handle_socket_payload(
let response = app let response = app
.eth_subscribe( .eth_subscribe(
authorized_request.clone(), authorization.clone(),
payload, payload,
subscription_count, subscription_count,
response_sender.clone(), response_sender.clone(),
@ -177,10 +172,7 @@ async fn handle_socket_payload(
Ok(response.into()) Ok(response.into())
} }
_ => { _ => app.proxy_web3_rpc(&authorization, payload.into()).await,
app.proxy_web3_rpc(&authorized_request, payload.into())
.await
}
}; };
(id, response) (id, response)
@ -206,7 +198,7 @@ async fn handle_socket_payload(
async fn read_web3_socket( async fn read_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>, authorization: Arc<AuthorizedRequest>,
mut ws_rx: SplitStream<WebSocket>, mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>, response_sender: flume::Sender<Message>,
) { ) {
@ -219,7 +211,7 @@ async fn read_web3_socket(
Message::Text(payload) => { Message::Text(payload) => {
handle_socket_payload( handle_socket_payload(
app.clone(), app.clone(),
authorized_request.clone(), authorization.clone(),
&payload, &payload,
&response_sender, &response_sender,
&subscription_count, &subscription_count,
@ -242,7 +234,7 @@ async fn read_web3_socket(
handle_socket_payload( handle_socket_payload(
app.clone(), app.clone(),
authorized_request.clone(), authorization.clone(),
payload, payload,
&response_sender, &response_sender,
&subscription_count, &subscription_count,

View File

@ -91,7 +91,7 @@ impl Web3Connections {
/// Will query a specific node or the best available. /// Will query a specific node or the best available.
pub async fn block( pub async fn block(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
hash: &H256, hash: &H256,
rpc: Option<&Arc<Web3Connection>>, rpc: Option<&Arc<Web3Connection>>,
) -> anyhow::Result<ArcBlock> { ) -> anyhow::Result<ArcBlock> {
@ -102,13 +102,13 @@ impl Web3Connections {
} }
// block not in cache. we need to ask an rpc for it // block not in cache. we need to ask an rpc for it
let get_block_params = (hash, false); let get_block_params = (*hash, false);
// TODO: if error, retry? // TODO: if error, retry?
let block: Block<TxHash> = match rpc { let block: Block<TxHash> = match rpc {
Some(rpc) => { Some(rpc) => {
rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30)) rpc.wait_for_request_handle(authorization, Duration::from_secs(30))
.await? .await?
.request("eth_getBlockByHash", &get_block_params, Level::ERROR.into()) .request("eth_getBlockByHash", get_block_params, Level::ERROR.into())
.await? .await?
} }
None => { None => {
@ -118,7 +118,7 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
let response = self let response = self
.try_send_best_upstream_server(authorized_request, request, None) .try_send_best_upstream_server(authorization, request, None)
.await?; .await?;
let block = response.result.unwrap(); let block = response.result.unwrap();
@ -167,7 +167,7 @@ impl Web3Connections {
// deref to not keep the lock open // deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) { if let Some(block_hash) = self.block_numbers.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorized_request through here? // TODO: pass authorization through here?
return self.block(None, &block_hash, None).await; return self.block(None, &block_hash, None).await;
} }

View File

@ -205,7 +205,7 @@ impl Web3Connection {
.await? .await?
.request( .request(
"eth_getCode", "eth_getCode",
&( (
"0xdead00000000000000000000000000000000beef", "0xdead00000000000000000000000000000000beef",
maybe_archive_block, maybe_archive_block,
), ),
@ -733,13 +733,13 @@ impl Web3Connection {
#[instrument] #[instrument]
pub async fn wait_for_request_handle( pub async fn wait_for_request_handle(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
max_wait: Duration, max_wait: Duration,
) -> anyhow::Result<OpenRequestHandle> { ) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait; let max_wait = Instant::now() + max_wait;
loop { loop {
let x = self.try_request_handle(authorized_request).await; let x = self.try_request_handle(authorization).await;
trace!(?x, "try_request_handle"); trace!(?x, "try_request_handle");
@ -769,7 +769,7 @@ impl Web3Connection {
#[instrument] #[instrument]
pub async fn try_request_handle( pub async fn try_request_handle(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
) -> anyhow::Result<OpenRequestResult> { ) -> anyhow::Result<OpenRequestResult> {
// check that we are connected // check that we are connected
if !self.has_provider().await { if !self.has_provider().await {
@ -800,7 +800,7 @@ impl Web3Connection {
} }
}; };
let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned()); let handle = OpenRequestHandle::new(self.clone(), authorization.cloned());
Ok(OpenRequestResult::Handle(handle)) Ok(OpenRequestResult::Handle(handle))
} }

View File

@ -305,7 +305,7 @@ impl Web3Connections {
/// Send the same request to all the handles. Returning the most common success or most common error. /// Send the same request to all the handles. Returning the most common success or most common error.
pub async fn try_send_parallel_requests( pub async fn try_send_parallel_requests(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
active_request_handles: Vec<OpenRequestHandle>, active_request_handles: Vec<OpenRequestHandle>,
method: &str, method: &str,
// TODO: remove this box once i figure out how to do the options // TODO: remove this box once i figure out how to do the options
@ -317,7 +317,7 @@ impl Web3Connections {
.into_iter() .into_iter()
.map(|active_request_handle| async move { .map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &params, tracing::Level::ERROR.into()) .request(method, params.cloned(), tracing::Level::ERROR.into())
.await; .await;
result result
}) })
@ -364,7 +364,7 @@ impl Web3Connections {
/// get the best available rpc server /// get the best available rpc server
pub async fn next_upstream_server( pub async fn next_upstream_server(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> { ) -> anyhow::Result<OpenRequestResult> {
@ -423,7 +423,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them // now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() { for rpc in synced_rpcs.into_iter() {
// increment our connection counter // increment our connection counter
match rpc.try_request_handle(authorized_request).await { match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => { Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc); trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle)); return Ok(OpenRequestResult::Handle(handle));
@ -454,7 +454,7 @@ impl Web3Connections {
// TODO: better type on this that can return an anyhow::Result // TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers( pub async fn upstream_servers(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
block_needed: Option<&U64>, block_needed: Option<&U64>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> { ) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
@ -469,7 +469,7 @@ impl Web3Connections {
} }
// check rate limits and increment our connection counter // check rate limits and increment our connection counter
match connection.try_request_handle(authorized_request).await { match connection.try_request_handle(authorization).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => { Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it // this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at)); earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -495,7 +495,7 @@ impl Web3Connections {
/// be sure there is a timeout on this or it might loop forever /// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server( pub async fn try_send_best_upstream_server(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest, request: JsonRpcRequest,
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
@ -507,7 +507,7 @@ impl Web3Connections {
break; break;
} }
match self match self
.next_upstream_server(authorized_request, &skip_rpcs, min_block_needed) .next_upstream_server(authorization, &skip_rpcs, min_block_needed)
.await? .await?
{ {
OpenRequestResult::Handle(active_request_handle) => { OpenRequestResult::Handle(active_request_handle) => {
@ -518,7 +518,7 @@ impl Web3Connections {
let response_result = active_request_handle let response_result = active_request_handle
.request( .request(
&request.method, &request.method,
&request.params, request.params.clone(),
RequestErrorHandler::SaveReverts(100.0), RequestErrorHandler::SaveReverts(100.0),
) )
.await; .await;
@ -597,22 +597,19 @@ impl Web3Connections {
#[instrument] #[instrument]
pub async fn try_send_all_upstream_servers( pub async fn try_send_all_upstream_servers(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>, authorization: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest, request: JsonRpcRequest,
block_needed: Option<&U64>, block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self match self.upstream_servers(authorization, block_needed).await {
.upstream_servers(authorized_request.clone(), block_needed)
.await
{
Ok(active_request_handles) => { Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures // TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle? // TODO: do something with this handle?
// TODO: this is not working right. simplify // TODO: this is not working right. simplify
let quorum_response = self let quorum_response = self
.try_send_parallel_requests( .try_send_parallel_requests(
authorized_request, authorization,
active_request_handles, active_request_handles,
request.method.as_ref(), request.method.as_ref(),
request.params.as_ref(), request.params.as_ref(),

View File

@ -26,7 +26,7 @@ pub enum OpenRequestResult {
/// Make RPC requests through this handle and drop it when you are done. /// Make RPC requests through this handle and drop it when you are done.
#[derive(Debug)] #[derive(Debug)]
pub struct OpenRequestHandle { pub struct OpenRequestHandle {
authorized_request: Arc<AuthorizedRequest>, authorization: Arc<AuthorizedRequest>,
conn: Arc<Web3Connection>, conn: Arc<Web3Connection>,
// TODO: this is the same metrics on the conn. use a reference? // TODO: this is the same metrics on the conn. use a reference?
metrics: Arc<OpenRequestHandleMetrics>, metrics: Arc<OpenRequestHandleMetrics>,
@ -51,12 +51,18 @@ impl From<Level> for RequestErrorHandler {
} }
} }
impl AuthorizedRequest {
async fn save_revert<T>(self: Arc<Self>, method: String, params: T) -> anyhow::Result<()>
where
T: fmt::Debug + serde::Serialize + Send + Sync,
{
todo!("save the revert to the database");
}
}
#[metered(registry = OpenRequestHandleMetrics, visibility = pub)] #[metered(registry = OpenRequestHandleMetrics, visibility = pub)]
impl OpenRequestHandle { impl OpenRequestHandle {
pub fn new( pub fn new(conn: Arc<Web3Connection>, authorization: Option<Arc<AuthorizedRequest>>) -> Self {
conn: Arc<Web3Connection>,
authorized_request: Option<Arc<AuthorizedRequest>>,
) -> Self {
// TODO: take request_id as an argument? // TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?! // TODO: what ordering?!
@ -71,11 +77,10 @@ impl OpenRequestHandle {
let metrics = conn.open_request_handle_metrics.clone(); let metrics = conn.open_request_handle_metrics.clone();
let used = false.into(); let used = false.into();
let authorized_request = let authorization = authorization.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal));
authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal));
Self { Self {
authorized_request, authorization,
conn, conn,
metrics, metrics,
used, used,
@ -94,11 +99,12 @@ impl OpenRequestHandle {
pub async fn request<T, R>( pub async fn request<T, R>(
&self, &self,
method: &str, method: &str,
params: &T, params: T,
error_handler: RequestErrorHandler, error_handler: RequestErrorHandler,
) -> Result<R, ProviderError> ) -> Result<R, ProviderError>
where where
T: fmt::Debug + serde::Serialize + Send + Sync, // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
T: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{ {
// ensure this function only runs once // ensure this function only runs once
@ -106,32 +112,33 @@ impl OpenRequestHandle {
unimplemented!("a request handle should only be used once"); unimplemented!("a request handle should only be used once");
} }
// TODO: use tracing spans properly // TODO: use tracing spans
// TODO: requests from customers have request ids, but we should add // TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose // TODO: including params in this is way too verbose
// the authorization field is already on a parent span
trace!(rpc=%self.conn, %method, "request"); trace!(rpc=%self.conn, %method, "request");
let mut provider = None; let mut provider = None;
while provider.is_none() { while provider.is_none() {
match self.conn.provider.read().await.as_ref() { match self.conn.provider.read().await.clone() {
None => { None => {
warn!(rpc=%self.conn, "no provider!"); warn!(rpc=%self.conn, "no provider!");
// TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: maybe use a watch handle? // TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: sleep how long? subscribe to something instead? // TODO: this is going to be way too verbose!
// TODO: this is going to be very verbose!
sleep(Duration::from_millis(100)).await sleep(Duration::from_millis(100)).await
} }
Some(found_provider) => provider = Some(found_provider.clone()), Some(found_provider) => provider = Some(found_provider),
} }
} }
let provider = &*provider.expect("provider was checked already"); let provider = &*provider.expect("provider was checked already");
// TODO: really sucks that we have to clone here
let response = match provider { let response = match provider {
Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Http(provider) => provider.request(method, params.clone()).await,
Web3Provider::Ws(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params.clone()).await,
}; };
if let Err(err) = &response { if let Err(err) = &response {
@ -157,6 +164,14 @@ impl OpenRequestHandle {
{ {
if err.message.starts_with("execution reverted") { if err.message.starts_with("execution reverted") {
debug!(%method, ?params, "TODO: save the request"); debug!(%method, ?params, "TODO: save the request");
let f = self
.authorization
.clone()
.save_revert(method.to_string(), params);
tokio::spawn(async move { f.await });
// TODO: don't do this on the hot path. spawn it // TODO: don't do this on the hot path. spawn it
} else { } else {
debug!(?err, %method, rpc=%self.conn, "bad response!"); debug!(?err, %method, rpc=%self.conn, "bad response!");
@ -169,7 +184,13 @@ impl OpenRequestHandle {
{ {
if err.message.starts_with("execution reverted") { if err.message.starts_with("execution reverted") {
debug!(%method, ?params, "TODO: save the request"); debug!(%method, ?params, "TODO: save the request");
// TODO: don't do this on the hot path. spawn it
let f = self
.authorization
.clone()
.save_revert(method.to_string(), params);
tokio::spawn(async move { f.await });
} else { } else {
debug!(?err, %method, rpc=%self.conn, "bad response!"); debug!(?err, %method, rpc=%self.conn, "bad response!");
} }

View File

@ -30,7 +30,7 @@ impl Web3Connections {
handle handle
.request( .request(
"eth_getTransactionByHash", "eth_getTransactionByHash",
&(pending_tx_id,), (pending_tx_id,),
Level::ERROR.into(), Level::ERROR.into(),
) )
.await? .await?