diff --git a/TODO.md b/TODO.md index 9e3dece8..20a64880 100644 --- a/TODO.md +++ b/TODO.md @@ -191,12 +191,16 @@ These are roughly in order of completition - [ ] user create script should allow a description field - [ ] user create script should allow multiple keys per user - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens +- [ ] display logged reverts on an endpoint that requires authentication +- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection ## V1 These are not yet ordered. - [ ] implement filters and other unimplemented rpc methods + - multiple teams need log filters and subscriptions. + - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead - [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json - [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge - [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy) @@ -220,7 +224,6 @@ These are not yet ordered. - [ ] nice output when cargo doc is run - [ ] cache more things locally or in redis - [ ] stats when forks are resolved (and what chain they were on?) -- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection - [ ] emit stats for user's successes, retries, failures, with the types of requests, chain, rpc - [ ] cli for creating and editing api keys - [ ] Only subscribe to transactions when someone is listening and if the server has opted in to it @@ -235,6 +238,8 @@ These are not yet ordered. - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is set and non-zero, use it for consensus instead of just the number - [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit +- [ ] add a "failover" tier that is only used if balanced_rpcs has "no servers synced" + - use this tier (and private tier) to check timestamp on latest block. if we are behind that by more than a few seconds, something is wrong new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [x] GET /u/:api_key diff --git a/web3_proxy/examples/subscribe_blocks.rs b/web3_proxy/examples/subscribe_blocks.rs index 9511accd..a8c4375f 100644 --- a/web3_proxy/examples/subscribe_blocks.rs +++ b/web3_proxy/examples/subscribe_blocks.rs @@ -13,9 +13,9 @@ async fn main() -> anyhow::Result<()> { fdlimit::raise_fd_limit(); // erigon - // let url = "ws://10.11.12.16:8548"; + let url = "ws://10.11.12.16:8548"; // geth - let url = "ws://10.11.12.16:8546"; + // let url = "ws://10.11.12.16:8546"; println!("Subscribing to blocks from {}", url); diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index e1c198a6..c1f0e08c 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -615,7 +615,9 @@ impl Web3ProxyApp { }, }); - let msg = Message::Text(serde_json::to_string(&msg).unwrap()); + let msg = Message::Text( + serde_json::to_string(&msg).expect("this message was just built"), + ); if response_sender.send_async(msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? @@ -919,7 +921,7 @@ impl Web3ProxyApp { head_block_id.num, &self.balanced_rpcs, ) - .await + .await? { // TODO: maybe this should be on the app and not on balanced_rpcs let request_block_hash = @@ -966,7 +968,8 @@ impl Web3ProxyApp { Ok::<_, anyhow::Error>(response) }) .await - .map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference")) + // TODO: what is the best way to handle an Arc here? + .map_err(|err| anyhow::anyhow!(err)) .context("caching response")?; // since this data came likely out of a cache, the id is not going to match diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index 82355a18..3c9f85c0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -51,7 +51,7 @@ impl CreateUserSubCommand { let bytes = address.as_bytes(); // convert the slice to a Vec - bytes.try_into().unwrap() + bytes.try_into().expect("Bytes can always be a Vec") }; // TODO: get existing or create a new one diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index fea48a9f..455047fd 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -8,25 +8,25 @@ use tracing::warn; use crate::rpcs::connections::Web3Connections; -pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) { +pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 { match block_num { BlockNumber::Earliest => { // modified is false because we want the backend to see "pending" - (false, U64::zero()) + U64::zero() } BlockNumber::Latest => { // change "latest" to a number // modified is true because we want the backend to see the height and not "latest" - (true, latest_block) + latest_block } BlockNumber::Number(x) => { // we already have a number - (false, x) + x } BlockNumber::Pending => { // TODO: think more about how to handle Pending // modified is false because we want the backend to see "pending" - (false, latest_block) + latest_block } } } @@ -57,7 +57,7 @@ pub async fn clean_block_number( } Some(x) => { // convert the json value to a BlockNumber - let (modified, block_num) = if let Some(obj) = x.as_object_mut() { + let block_num = if let Some(obj) = x.as_object_mut() { // it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` if let Some(block_hash) = obj.remove("blockHash") { let block_hash: H256 = @@ -65,12 +65,9 @@ pub async fn clean_block_number( let block = rpcs.block(None, &block_hash, None).await?; - let block_num = block + block .number - .expect("blocks here should always have numbers"); - - // always set modfied to true because we used "take" above - (true, block_num) + .expect("blocks here should always have numbers") } else { return Err(anyhow::anyhow!("blockHash missing")); } @@ -79,16 +76,11 @@ pub async fn clean_block_number( // TODO: "BlockNumber" needs a better name let block_number = serde_json::from_value::(x.take())?; - let (_, block_num) = block_num_to_u64(block_number, latest_block); - - // always set modfied to true because we used "take" above - (true, block_num) + block_num_to_u64(block_number, latest_block) }; // if we changed "latest" to a number, update the params to match - if modified { - *x = serde_json::to_value(block_num)?; - } + *x = serde_json::to_value(block_num)?; Ok(block_num) } @@ -102,9 +94,13 @@ pub async fn block_needed( params: Option<&mut serde_json::Value>, head_block_num: U64, rpcs: &Web3Connections, -) -> Option { +) -> anyhow::Result> { // if no params, no block is needed - let params = params?; + let params = if let Some(params) = params { + params + } else { + return Ok(None); + }; // get the index for the BlockNumber or return None to say no block is needed. // The BlockNumber is usually the last element. @@ -115,95 +111,95 @@ pub async fn block_needed( "eth_getBalance" => 1, "eth_getBlockByHash" => { // TODO: double check that any node can serve this - return None; + return Ok(None); } "eth_getBlockByNumber" => { // TODO: double check that any node can serve this - return None; + return Ok(None); } "eth_getBlockTransactionCountByHash" => { // TODO: double check that any node can serve this - return None; + return Ok(None); } "eth_getBlockTransactionCountByNumber" => 0, "eth_getCode" => 1, "eth_getLogs" => { - let obj = params[0].as_object_mut().unwrap(); + // TODO: jsonrpc has a specific code for this + let obj = params[0] + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("invalid format"))?; if let Some(x) = obj.get_mut("fromBlock") { - let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; + let block_num: BlockNumber = serde_json::from_value(x.take())?; - let (modified, block_num) = block_num_to_u64(block_num, head_block_num); + let block_num = block_num_to_u64(block_num, head_block_num); - if modified { - *x = serde_json::to_value(block_num).unwrap(); - } + *x = serde_json::to_value(block_num).expect("U64 can always be a serde_json::Value"); // TODO: maybe don't return. instead check toBlock too? // TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both! - return Some(block_num); + return Ok(Some(block_num)); } if let Some(x) = obj.get_mut("toBlock") { - let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; + let block_num: BlockNumber = serde_json::from_value(x.take())?; - let (modified, block_num) = block_num_to_u64(block_num, head_block_num); + let block_num = block_num_to_u64(block_num, head_block_num); - if modified { - *x = serde_json::to_value(block_num).unwrap(); - } + *x = serde_json::to_value(block_num) + .expect("block_num should always turn into a value"); - return Some(block_num); + return Ok(Some(block_num)); } if obj.contains_key("blockHash") { 1 } else { - return None; + return Ok(None); } } "eth_getStorageAt" => 2, "eth_getTransactionByHash" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return None; + return Ok(None); } "eth_getTransactionByBlockHashAndIndex" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return None; + return Ok(None); } "eth_getTransactionByBlockNumberAndIndex" => 0, "eth_getTransactionCount" => 1, "eth_getTransactionReceipt" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return None; + return Ok(None); } "eth_getUncleByBlockHashAndIndex" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return None; + return Ok(None); } "eth_getUncleByBlockNumberAndIndex" => 0, "eth_getUncleCountByBlockHash" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return None; + return Ok(None); } "eth_getUncleCountByBlockNumber" => 0, _ => { // some other command that doesn't take block numbers as an argument - return None; + return Ok(None); } }; match clean_block_number(params, block_param_id, head_block_num, rpcs).await { - Ok(block) => Some(block), + Ok(block) => Ok(Some(block)), Err(err) => { // TODO: seems unlikely that we will get here warn!(?err, "could not get block from params"); - None + Ok(None) } } } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 1be2bc74..1e3665b4 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -326,12 +326,14 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = user_data.max_concurrent_requests { let semaphore = self .user_key_semaphores - .get_with(user_data.user_key_id, async move { - let s = Semaphore::const_new(max_concurrent_requests.try_into().unwrap()); + .try_get_with(user_data.user_key_id, async move { + let s = Semaphore::const_new(max_concurrent_requests.try_into()?); trace!("new semaphore for user_key_id {}", user_data.user_key_id); - Arc::new(s) + Ok::<_, anyhow::Error>(Arc::new(s)) }) - .await; + .await + // TODO: is this the best way to handle an arc + .map_err(|err| anyhow::anyhow!(err))?; let semaphore_permit = semaphore.acquire_owned().await?; @@ -492,8 +494,8 @@ impl Web3ProxyApp { }) .await; - // TODO: i'm not actually sure about this expect - user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference")) + // TODO: what's the best way to handle this arc? try_unwrap will not work + user_data.map_err(|err| anyhow::anyhow!(err)) } pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result { diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index a8d84a66..25408bfa 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -46,7 +46,7 @@ pub async fn proxy_web3_rpc( .await }); - let response = f.await.unwrap()?; + let response = f.await.expect("joinhandle should always work")?; Ok(Json(&response).into_response()) } @@ -86,7 +86,7 @@ pub async fn proxy_web3_rpc_with_key( .await }); - let response = f.await.unwrap()?; + let response = f.await.expect("JoinHandle should always work")?; Ok(Json(&response).into_response()) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index db2fc343..3380dd16 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -117,7 +117,7 @@ pub async fn websocket_handler_with_key( &app.config.redirect_user_url, &json!({ "authorized_request": authorized_request }), ) - .unwrap(); + .expect("templating should always work"); // this is not a websocket. redirect to a page for this user Ok(Redirect::to(&user_url).into_response()) @@ -178,8 +178,15 @@ async fn handle_socket_payload( match response { Ok((handle, response)) => { // TODO: better key - subscriptions - .insert(response.result.as_ref().unwrap().to_string(), handle); + subscriptions.insert( + response + .result + .as_ref() + // TODO: what if there is an error? + .expect("response should always have a result, not an error") + .to_string(), + handle, + ); Ok(response.into()) } @@ -188,7 +195,7 @@ async fn handle_socket_payload( } "eth_unsubscribe" => { // TODO: how should handle rate limits and stats on this? - + // TODO: handle invalid params let subscription_id = payload.params.unwrap().to_string(); let partial_response = match subscriptions.remove(&subscription_id) { @@ -213,7 +220,7 @@ async fn handle_socket_payload( (id, response) } Err(err) => { - let id = RawValue::from_string("null".to_string()).unwrap(); + let id = RawValue::from_string("null".to_string()).expect("null can always be a value"); (id, Err(err.into())) } }; @@ -221,11 +228,12 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x), Err(err) => { - // we have an anyhow error. turn it into + // we have an anyhow error. turn it into a response let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id)); serde_json::to_string(&response) } } + // TODO: what error should this be? .unwrap(); Message::Text(response_str) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 171bd2fe..ef6ed6ac 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -52,7 +52,7 @@ impl Web3Connections { let mut blockchain = self.blockchain_graphmap.write().await; - // TODO: think more about heaviest_chain + // TODO: think more about heaviest_chain. would be better to do the check inside this function if heaviest_chain { // this is the only place that writes to block_numbers // its inside a write lock on blockchain_graphmap, so i think there is no race @@ -61,16 +61,12 @@ impl Web3Connections { } if blockchain.contains_node(*block_hash) { - // this hash is already included - trace!(%block_hash, %block_num, "skipping saving existing block"); - // return now since this work was already done. + trace!(%block_hash, %block_num, "block already saved"); return Ok(()); } - // TODO: prettier log? or probably move the log somewhere else trace!(%block_hash, %block_num, "saving new block"); - // TODO: theres a small race between contains_key and insert self.block_hashes .insert(*block_hash, block.to_owned()) .await; @@ -78,11 +74,10 @@ impl Web3Connections { blockchain.add_node(*block_hash); // what should edge weight be? and should the nodes be the blocks instead? - // TODO: maybe the weight should be the block? // we store parent_hash -> hash because the block already stores the parent_hash blockchain.add_edge(block.parent_hash, *block_hash, 0); - // TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks? + // TODO: prune blockchain to only keep a configurable (256 on ETH?) number of blocks? Ok(()) }