diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2027680d..9b94d846 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1363,6 +1363,7 @@ impl Web3ProxyApp { "eth_getTransactionReceipt" | "eth_getTransactionByHash" => { // try to get the transaction without specifying a min_block_height // TODO: timeout + // TODO: change this to send serially until we get a success let result = self .balanced_rpcs @@ -1699,20 +1700,26 @@ impl Web3ProxyApp { // this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache // TODO: is this expect actually safe!? could there be a background process that still has the arc? - let mut x = match tokio::spawn(f).await? { + let x = match tokio::spawn(f).await? { Ok(response_data) => Ok(jsonrpc::ParsedResponse::from_response_data(response_data, Default::default()).into()), Err(err) => { self.jsonrpc_response_failed_cache_keys.insert(cache_key, ()).await; if let Web3ProxyError::StreamResponse(x) = err.as_ref() { - let x = x.lock().take().expect("stream processing should only happen once"); + if let Some(x) = x.lock().take() { + Ok(jsonrpc::SingleResponse::Stream(x)) + } else { + let err: Web3ProxyError = anyhow::anyhow!("stream was already taken. please report this in Discord").into(); - Ok(jsonrpc::SingleResponse::Stream(x)) + Err(Arc::new(err)) + } } else { Err(err) } }, - }?; + }; + + let mut x = x?; // clear the id. theres no point including it in our cached response x.set_id(Default::default());