From 3854312674cad58bc947dd33633630ef2235d661 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 22 Sep 2022 21:51:52 +0000 Subject: [PATCH] we need to clone :'( --- web3_proxy/src/app.rs | 18 +++---- web3_proxy/src/frontend/rpc_proxy_http.rs | 16 +++--- web3_proxy/src/frontend/rpc_proxy_ws.rs | 42 +++++++--------- web3_proxy/src/rpcs/blockchain.rs | 12 ++--- web3_proxy/src/rpcs/connection.rs | 10 ++-- web3_proxy/src/rpcs/connections.rs | 27 +++++------ web3_proxy/src/rpcs/request.rs | 59 +++++++++++++++-------- web3_proxy/src/rpcs/transactions.rs | 2 +- 8 files changed, 98 insertions(+), 88 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index b067ea66..f07d2738 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -410,7 +410,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] pub async fn eth_subscribe<'a>( self: &'a Arc, - authorized_request: Arc, + authorization: Arc, payload: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now @@ -604,7 +604,7 @@ impl Web3ProxyApp { /// send the request or batch of requests to the approriate RPCs pub async fn proxy_web3_rpc( self: &Arc, - authorized_request: &Arc, + authorization: &Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level @@ -619,14 +619,14 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( timeout( max_time, - self.proxy_web3_rpc_request(authorized_request, request), + self.proxy_web3_rpc_request(authorization, request), ) .await??, ), JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( timeout( max_time, - self.proxy_web3_rpc_requests(authorized_request, requests), + self.proxy_web3_rpc_requests(authorization, requests), ) .await??, ), @@ -642,7 +642,7 @@ impl Web3ProxyApp { /// TODO: make sure this isn't a problem async fn proxy_web3_rpc_requests( self: &Arc, - authorized_request: &Arc, + authorization: &Arc, requests: Vec, ) -> anyhow::Result> { // TODO: we should probably change ethers-rs to support this directly @@ -650,7 +650,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(authorized_request, request)) + .map(|request| self.proxy_web3_rpc_request(authorization, request)) .collect::>(), ) .await; @@ -678,7 +678,7 @@ impl Web3ProxyApp { #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] async fn proxy_web3_rpc_request( self: &Arc, - authorized_request: &Arc, + authorization: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { trace!("Received request: {:?}", request); @@ -812,7 +812,7 @@ impl Web3ProxyApp { let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); return rpcs - .try_send_all_upstream_servers(Some(authorized_request), request, None) + .try_send_all_upstream_servers(Some(authorization), request, None) .await; } "eth_syncing" => { @@ -913,7 +913,7 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_send_best_upstream_server( - Some(authorized_request), + Some(authorization), request, Some(&request_block_id.num), ) diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 70370b6a..82709f20 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -19,16 +19,16 @@ pub async fn public_proxy_web3_rpc( ) -> FrontendResult { let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let authorized_request = ip_is_authorized(&app, ip) + let authorization = ip_is_authorized(&app, ip) .instrument(request_span.clone()) .await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorized_request, payload) + app.proxy_web3_rpc(&authorization, payload) .instrument(request_span) .await }); @@ -49,7 +49,7 @@ pub async fn user_proxy_web3_rpc( let request_span = error_span!("request", %ip, ?referer, ?user_agent); // TODO: this should probably return the user_key_id instead? or maybe both? - let authorized_request = key_is_authorized( + let authorization = key_is_authorized( &app, user_key, ip, @@ -59,12 +59,12 @@ pub async fn user_proxy_web3_rpc( .instrument(request_span.clone()) .await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); let f = tokio::spawn(async move { - app.proxy_web3_rpc(&authorized_request, payload) + app.proxy_web3_rpc(&authorization, payload) .instrument(request_span) .await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 4a65beae..7061a36d 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -33,16 +33,16 @@ pub async fn public_websocket_handler( ClientIp(ip): ClientIp, ws_upgrade: Option, ) -> FrontendResult { - let authorized_request = ip_is_authorized(&app, ip).await?; + let authorization = ip_is_authorized(&app, ip).await?; - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); match ws_upgrade { Some(ws) => Ok(ws .on_upgrade(|socket| { - proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + proxy_web3_socket(app, authorization, socket).instrument(request_span) }) .into_response()), None => { @@ -61,7 +61,7 @@ pub async fn user_websocket_handler( user_agent: Option>, ws_upgrade: Option, ) -> FrontendResult { - let authorized_request = key_is_authorized( + let authorization = key_is_authorized( &app, user_key, ip, @@ -71,13 +71,13 @@ pub async fn user_websocket_handler( .await?; // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info - let request_span = error_span!("request", ?authorized_request); + let request_span = error_span!("request", ?authorization); - let authorized_request = Arc::new(authorized_request); + let authorization = Arc::new(authorization); match ws_upgrade { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { - proxy_web3_socket(app, authorized_request, socket).instrument(request_span) + proxy_web3_socket(app, authorization, socket).instrument(request_span) })), None => { // TODO: store this on the app and use register_template? @@ -88,7 +88,7 @@ pub async fn user_websocket_handler( let user_url = reg .render_template( &app.config.redirect_user_url, - &json!({ "authorized_request": authorized_request }), + &json!({ "authorization": authorization }), ) .unwrap(); @@ -100,7 +100,7 @@ pub async fn user_websocket_handler( async fn proxy_web3_socket( app: Arc, - authorized_request: Arc, + authorization: Arc, socket: WebSocket, ) { // split the websocket so we can read and write concurrently @@ -110,18 +110,13 @@ async fn proxy_web3_socket( let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket( - app, - authorized_request, - ws_rx, - response_sender, - )); + tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); } /// websockets support a few more methods than http clients async fn handle_socket_payload( app: Arc, - authorized_request: Arc, + authorization: Arc, payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicUsize, @@ -140,7 +135,7 @@ async fn handle_socket_payload( let response = app .eth_subscribe( - authorized_request.clone(), + authorization.clone(), payload, subscription_count, response_sender.clone(), @@ -177,10 +172,7 @@ async fn handle_socket_payload( Ok(response.into()) } - _ => { - app.proxy_web3_rpc(&authorized_request, payload.into()) - .await - } + _ => app.proxy_web3_rpc(&authorization, payload.into()).await, }; (id, response) @@ -206,7 +198,7 @@ async fn handle_socket_payload( async fn read_web3_socket( app: Arc, - authorized_request: Arc, + authorization: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, ) { @@ -219,7 +211,7 @@ async fn read_web3_socket( Message::Text(payload) => { handle_socket_payload( app.clone(), - authorized_request.clone(), + authorization.clone(), &payload, &response_sender, &subscription_count, @@ -242,7 +234,7 @@ async fn read_web3_socket( handle_socket_payload( app.clone(), - authorized_request.clone(), + authorization.clone(), payload, &response_sender, &subscription_count, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 6ed4b562..abdc4e25 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -91,7 +91,7 @@ impl Web3Connections { /// Will query a specific node or the best available. pub async fn block( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, hash: &H256, rpc: Option<&Arc>, ) -> anyhow::Result { @@ -102,13 +102,13 @@ impl Web3Connections { } // block not in cache. we need to ask an rpc for it - let get_block_params = (hash, false); + let get_block_params = (*hash, false); // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { - rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30)) + rpc.wait_for_request_handle(authorization, Duration::from_secs(30)) .await? - .request("eth_getBlockByHash", &get_block_params, Level::ERROR.into()) + .request("eth_getBlockByHash", get_block_params, Level::ERROR.into()) .await? } None => { @@ -118,7 +118,7 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; let response = self - .try_send_best_upstream_server(authorized_request, request, None) + .try_send_best_upstream_server(authorization, request, None) .await?; let block = response.result.unwrap(); @@ -167,7 +167,7 @@ impl Web3Connections { // deref to not keep the lock open if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set - // TODO: pass authorized_request through here? + // TODO: pass authorization through here? return self.block(None, &block_hash, None).await; } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 6804af0e..7d2f3a67 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -205,7 +205,7 @@ impl Web3Connection { .await? .request( "eth_getCode", - &( + ( "0xdead00000000000000000000000000000000beef", maybe_archive_block, ), @@ -733,13 +733,13 @@ impl Web3Connection { #[instrument] pub async fn wait_for_request_handle( self: &Arc, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, max_wait: Duration, ) -> anyhow::Result { let max_wait = Instant::now() + max_wait; loop { - let x = self.try_request_handle(authorized_request).await; + let x = self.try_request_handle(authorization).await; trace!(?x, "try_request_handle"); @@ -769,7 +769,7 @@ impl Web3Connection { #[instrument] pub async fn try_request_handle( self: &Arc, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, ) -> anyhow::Result { // check that we are connected if !self.has_provider().await { @@ -800,7 +800,7 @@ impl Web3Connection { } }; - let handle = OpenRequestHandle::new(self.clone(), authorized_request.cloned()); + let handle = OpenRequestHandle::new(self.clone(), authorization.cloned()); Ok(OpenRequestResult::Handle(handle)) } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 880f0f27..04b36a6d 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -305,7 +305,7 @@ impl Web3Connections { /// Send the same request to all the handles. Returning the most common success or most common error. pub async fn try_send_parallel_requests( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, active_request_handles: Vec, method: &str, // TODO: remove this box once i figure out how to do the options @@ -317,7 +317,7 @@ impl Web3Connections { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = active_request_handle - .request(method, ¶ms, tracing::Level::ERROR.into()) + .request(method, params.cloned(), tracing::Level::ERROR.into()) .await; result }) @@ -364,7 +364,7 @@ impl Web3Connections { /// get the best available rpc server pub async fn next_upstream_server( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, skip: &[Arc], min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -423,7 +423,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_request_handle(authorized_request).await { + match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -454,7 +454,7 @@ impl Web3Connections { // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -469,7 +469,7 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_request_handle(authorized_request).await { + match connection.try_request_handle(authorization).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); @@ -495,7 +495,7 @@ impl Web3Connections { /// be sure there is a timeout on this or it might loop forever pub async fn try_send_best_upstream_server( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, request: JsonRpcRequest, min_block_needed: Option<&U64>, ) -> anyhow::Result { @@ -507,7 +507,7 @@ impl Web3Connections { break; } match self - .next_upstream_server(authorized_request, &skip_rpcs, min_block_needed) + .next_upstream_server(authorization, &skip_rpcs, min_block_needed) .await? { OpenRequestResult::Handle(active_request_handle) => { @@ -518,7 +518,7 @@ impl Web3Connections { let response_result = active_request_handle .request( &request.method, - &request.params, + request.params.clone(), RequestErrorHandler::SaveReverts(100.0), ) .await; @@ -597,22 +597,19 @@ impl Web3Connections { #[instrument] pub async fn try_send_all_upstream_servers( &self, - authorized_request: Option<&Arc>, + authorization: Option<&Arc>, request: JsonRpcRequest, block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self - .upstream_servers(authorized_request.clone(), block_needed) - .await - { + match self.upstream_servers(authorization, block_needed).await { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle? // TODO: this is not working right. simplify let quorum_response = self .try_send_parallel_requests( - authorized_request, + authorization, active_request_handles, request.method.as_ref(), request.params.as_ref(), diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index e342906f..ca95bf9e 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -26,7 +26,7 @@ pub enum OpenRequestResult { /// Make RPC requests through this handle and drop it when you are done. #[derive(Debug)] pub struct OpenRequestHandle { - authorized_request: Arc, + authorization: Arc, conn: Arc, // TODO: this is the same metrics on the conn. use a reference? metrics: Arc, @@ -51,12 +51,18 @@ impl From for RequestErrorHandler { } } +impl AuthorizedRequest { + async fn save_revert(self: Arc, method: String, params: T) -> anyhow::Result<()> + where + T: fmt::Debug + serde::Serialize + Send + Sync, + { + todo!("save the revert to the database"); + } +} + #[metered(registry = OpenRequestHandleMetrics, visibility = pub)] impl OpenRequestHandle { - pub fn new( - conn: Arc, - authorized_request: Option>, - ) -> Self { + pub fn new(conn: Arc, authorization: Option>) -> Self { // TODO: take request_id as an argument? // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! @@ -71,11 +77,10 @@ impl OpenRequestHandle { let metrics = conn.open_request_handle_metrics.clone(); let used = false.into(); - let authorized_request = - authorized_request.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal)); + let authorization = authorization.unwrap_or_else(|| Arc::new(AuthorizedRequest::Internal)); Self { - authorized_request, + authorization, conn, metrics, used, @@ -94,11 +99,12 @@ impl OpenRequestHandle { pub async fn request( &self, method: &str, - params: &T, + params: T, error_handler: RequestErrorHandler, ) -> Result where - T: fmt::Debug + serde::Serialize + Send + Sync, + // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it + T: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, { // ensure this function only runs once @@ -106,32 +112,33 @@ impl OpenRequestHandle { unimplemented!("a request handle should only be used once"); } - // TODO: use tracing spans properly + // TODO: use tracing spans // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose + // the authorization field is already on a parent span trace!(rpc=%self.conn, %method, "request"); let mut provider = None; while provider.is_none() { - match self.conn.provider.read().await.as_ref() { + match self.conn.provider.read().await.clone() { None => { warn!(rpc=%self.conn, "no provider!"); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? - // TODO: maybe use a watch handle? - // TODO: sleep how long? subscribe to something instead? - // TODO: this is going to be very verbose! + // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? + // TODO: this is going to be way too verbose! sleep(Duration::from_millis(100)).await } - Some(found_provider) => provider = Some(found_provider.clone()), + Some(found_provider) => provider = Some(found_provider), } } let provider = &*provider.expect("provider was checked already"); + // TODO: really sucks that we have to clone here let response = match provider { - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, + Web3Provider::Http(provider) => provider.request(method, params.clone()).await, + Web3Provider::Ws(provider) => provider.request(method, params.clone()).await, }; if let Err(err) = &response { @@ -157,6 +164,14 @@ impl OpenRequestHandle { { if err.message.starts_with("execution reverted") { debug!(%method, ?params, "TODO: save the request"); + + let f = self + .authorization + .clone() + .save_revert(method.to_string(), params); + + tokio::spawn(async move { f.await }); + // TODO: don't do this on the hot path. spawn it } else { debug!(?err, %method, rpc=%self.conn, "bad response!"); @@ -169,7 +184,13 @@ impl OpenRequestHandle { { if err.message.starts_with("execution reverted") { debug!(%method, ?params, "TODO: save the request"); - // TODO: don't do this on the hot path. spawn it + + let f = self + .authorization + .clone() + .save_revert(method.to_string(), params); + + tokio::spawn(async move { f.await }); } else { debug!(?err, %method, rpc=%self.conn, "bad response!"); } diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index a458b1b3..1668c1f5 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -30,7 +30,7 @@ impl Web3Connections { handle .request( "eth_getTransactionByHash", - &(pending_tx_id,), + (pending_tx_id,), Level::ERROR.into(), ) .await?