unwrap less

This commit is contained in:
Bryan Stitt 2022-09-30 04:18:18 +00:00
parent 9cc34f7aa8
commit a7b5f25467
9 changed files with 83 additions and 74 deletions

@ -191,12 +191,16 @@ These are roughly in order of completition
- [ ] user create script should allow a description field
- [ ] user create script should allow multiple keys per user
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
- [ ] display logged reverts on an endpoint that requires authentication
- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection
## V1
These are not yet ordered.
- [ ] implement filters and other unimplemented rpc methods
- multiple teams need log filters and subscriptions.
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead
- [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json
- [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge
- [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy)
@ -220,7 +224,6 @@ These are not yet ordered.
- [ ] nice output when cargo doc is run
- [ ] cache more things locally or in redis
- [ ] stats when forks are resolved (and what chain they were on?)
- [ ] failsafe. if no blocks or transactions in some time, warn and reset the connection
- [ ] emit stats for user's successes, retries, failures, with the types of requests, chain, rpc
- [ ] cli for creating and editing api keys
- [ ] Only subscribe to transactions when someone is listening and if the server has opted in to it
@ -235,6 +238,8 @@ These are not yet ordered.
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit
- [ ] add a "failover" tier that is only used if balanced_rpcs has "no servers synced"
- use this tier (and private tier) to check timestamp on latest block. if we are behind that by more than a few seconds, something is wrong
new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] GET /u/:api_key

@ -13,9 +13,9 @@ async fn main() -> anyhow::Result<()> {
fdlimit::raise_fd_limit();
// erigon
// let url = "ws://10.11.12.16:8548";
let url = "ws://10.11.12.16:8548";
// geth
let url = "ws://10.11.12.16:8546";
// let url = "ws://10.11.12.16:8546";
println!("Subscribing to blocks from {}", url);

@ -615,7 +615,9 @@ impl Web3ProxyApp {
},
});
let msg = Message::Text(serde_json::to_string(&msg).unwrap());
let msg = Message::Text(
serde_json::to_string(&msg).expect("this message was just built"),
);
if response_sender.send_async(msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
@ -919,7 +921,7 @@ impl Web3ProxyApp {
head_block_id.num,
&self.balanced_rpcs,
)
.await
.await?
{
// TODO: maybe this should be on the app and not on balanced_rpcs
let request_block_hash =
@ -966,7 +968,8 @@ impl Web3ProxyApp {
Ok::<_, anyhow::Error>(response)
})
.await
.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference"))
// TODO: what is the best way to handle an Arc here?
.map_err(|err| anyhow::anyhow!(err))
.context("caching response")?;
// since this data came likely out of a cache, the id is not going to match

@ -51,7 +51,7 @@ impl CreateUserSubCommand {
let bytes = address.as_bytes();
// convert the slice to a Vec
bytes.try_into().unwrap()
bytes.try_into().expect("Bytes can always be a Vec<u8>")
};
// TODO: get existing or create a new one

@ -8,25 +8,25 @@ use tracing::warn;
use crate::rpcs::connections::Web3Connections;
pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) {
pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 {
match block_num {
BlockNumber::Earliest => {
// modified is false because we want the backend to see "pending"
(false, U64::zero())
U64::zero()
}
BlockNumber::Latest => {
// change "latest" to a number
// modified is true because we want the backend to see the height and not "latest"
(true, latest_block)
latest_block
}
BlockNumber::Number(x) => {
// we already have a number
(false, x)
x
}
BlockNumber::Pending => {
// TODO: think more about how to handle Pending
// modified is false because we want the backend to see "pending"
(false, latest_block)
latest_block
}
}
}
@ -57,7 +57,7 @@ pub async fn clean_block_number(
}
Some(x) => {
// convert the json value to a BlockNumber
let (modified, block_num) = if let Some(obj) = x.as_object_mut() {
let block_num = if let Some(obj) = x.as_object_mut() {
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
if let Some(block_hash) = obj.remove("blockHash") {
let block_hash: H256 =
@ -65,12 +65,9 @@ pub async fn clean_block_number(
let block = rpcs.block(None, &block_hash, None).await?;
let block_num = block
block
.number
.expect("blocks here should always have numbers");
// always set modfied to true because we used "take" above
(true, block_num)
.expect("blocks here should always have numbers")
} else {
return Err(anyhow::anyhow!("blockHash missing"));
}
@ -79,16 +76,11 @@ pub async fn clean_block_number(
// TODO: "BlockNumber" needs a better name
let block_number = serde_json::from_value::<BlockNumber>(x.take())?;
let (_, block_num) = block_num_to_u64(block_number, latest_block);
// always set modfied to true because we used "take" above
(true, block_num)
block_num_to_u64(block_number, latest_block)
};
// if we changed "latest" to a number, update the params to match
if modified {
*x = serde_json::to_value(block_num)?;
}
*x = serde_json::to_value(block_num)?;
Ok(block_num)
}
@ -102,9 +94,13 @@ pub async fn block_needed(
params: Option<&mut serde_json::Value>,
head_block_num: U64,
rpcs: &Web3Connections,
) -> Option<U64> {
) -> anyhow::Result<Option<U64>> {
// if no params, no block is needed
let params = params?;
let params = if let Some(params) = params {
params
} else {
return Ok(None);
};
// get the index for the BlockNumber or return None to say no block is needed.
// The BlockNumber is usually the last element.
@ -115,95 +111,95 @@ pub async fn block_needed(
"eth_getBalance" => 1,
"eth_getBlockByHash" => {
// TODO: double check that any node can serve this
return None;
return Ok(None);
}
"eth_getBlockByNumber" => {
// TODO: double check that any node can serve this
return None;
return Ok(None);
}
"eth_getBlockTransactionCountByHash" => {
// TODO: double check that any node can serve this
return None;
return Ok(None);
}
"eth_getBlockTransactionCountByNumber" => 0,
"eth_getCode" => 1,
"eth_getLogs" => {
let obj = params[0].as_object_mut().unwrap();
// TODO: jsonrpc has a specific code for this
let obj = params[0]
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("invalid format"))?;
if let Some(x) = obj.get_mut("fromBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let (modified, block_num) = block_num_to_u64(block_num, head_block_num);
let block_num = block_num_to_u64(block_num, head_block_num);
if modified {
*x = serde_json::to_value(block_num).unwrap();
}
*x = serde_json::to_value(block_num).expect("U64 can always be a serde_json::Value");
// TODO: maybe don't return. instead check toBlock too?
// TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both!
return Some(block_num);
return Ok(Some(block_num));
}
if let Some(x) = obj.get_mut("toBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let (modified, block_num) = block_num_to_u64(block_num, head_block_num);
let block_num = block_num_to_u64(block_num, head_block_num);
if modified {
*x = serde_json::to_value(block_num).unwrap();
}
*x = serde_json::to_value(block_num)
.expect("block_num should always turn into a value");
return Some(block_num);
return Ok(Some(block_num));
}
if obj.contains_key("blockHash") {
1
} else {
return None;
return Ok(None);
}
}
"eth_getStorageAt" => 2,
"eth_getTransactionByHash" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return None;
return Ok(None);
}
"eth_getTransactionByBlockHashAndIndex" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
return Ok(None);
}
"eth_getTransactionByBlockNumberAndIndex" => 0,
"eth_getTransactionCount" => 1,
"eth_getTransactionReceipt" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return None;
return Ok(None);
}
"eth_getUncleByBlockHashAndIndex" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
return Ok(None);
}
"eth_getUncleByBlockNumberAndIndex" => 0,
"eth_getUncleCountByBlockHash" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
return Ok(None);
}
"eth_getUncleCountByBlockNumber" => 0,
_ => {
// some other command that doesn't take block numbers as an argument
return None;
return Ok(None);
}
};
match clean_block_number(params, block_param_id, head_block_num, rpcs).await {
Ok(block) => Some(block),
Ok(block) => Ok(Some(block)),
Err(err) => {
// TODO: seems unlikely that we will get here
warn!(?err, "could not get block from params");
None
Ok(None)
}
}
}

@ -326,12 +326,14 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = user_data.max_concurrent_requests {
let semaphore = self
.user_key_semaphores
.get_with(user_data.user_key_id, async move {
let s = Semaphore::const_new(max_concurrent_requests.try_into().unwrap());
.try_get_with(user_data.user_key_id, async move {
let s = Semaphore::const_new(max_concurrent_requests.try_into()?);
trace!("new semaphore for user_key_id {}", user_data.user_key_id);
Arc::new(s)
Ok::<_, anyhow::Error>(Arc::new(s))
})
.await;
.await
// TODO: is this the best way to handle an arc
.map_err(|err| anyhow::anyhow!(err))?;
let semaphore_permit = semaphore.acquire_owned().await?;
@ -492,8 +494,8 @@ impl Web3ProxyApp {
})
.await;
// TODO: i'm not actually sure about this expect
user_data.map_err(|err| Arc::try_unwrap(err).expect("this should be the only reference"))
// TODO: what's the best way to handle this arc? try_unwrap will not work
user_data.map_err(|err| anyhow::anyhow!(err))
}
pub async fn rate_limit_by_key(&self, user_key: UserKey) -> anyhow::Result<RateLimitResult> {

@ -46,7 +46,7 @@ pub async fn proxy_web3_rpc(
.await
});
let response = f.await.unwrap()?;
let response = f.await.expect("joinhandle should always work")?;
Ok(Json(&response).into_response())
}
@ -86,7 +86,7 @@ pub async fn proxy_web3_rpc_with_key(
.await
});
let response = f.await.unwrap()?;
let response = f.await.expect("JoinHandle should always work")?;
Ok(Json(&response).into_response())
}

@ -117,7 +117,7 @@ pub async fn websocket_handler_with_key(
&app.config.redirect_user_url,
&json!({ "authorized_request": authorized_request }),
)
.unwrap();
.expect("templating should always work");
// this is not a websocket. redirect to a page for this user
Ok(Redirect::to(&user_url).into_response())
@ -178,8 +178,15 @@ async fn handle_socket_payload(
match response {
Ok((handle, response)) => {
// TODO: better key
subscriptions
.insert(response.result.as_ref().unwrap().to_string(), handle);
subscriptions.insert(
response
.result
.as_ref()
// TODO: what if there is an error?
.expect("response should always have a result, not an error")
.to_string(),
handle,
);
Ok(response.into())
}
@ -188,7 +195,7 @@ async fn handle_socket_payload(
}
"eth_unsubscribe" => {
// TODO: how should handle rate limits and stats on this?
// TODO: handle invalid params
let subscription_id = payload.params.unwrap().to_string();
let partial_response = match subscriptions.remove(&subscription_id) {
@ -213,7 +220,7 @@ async fn handle_socket_payload(
(id, response)
}
Err(err) => {
let id = RawValue::from_string("null".to_string()).unwrap();
let id = RawValue::from_string("null".to_string()).expect("null can always be a value");
(id, Err(err.into()))
}
};
@ -221,11 +228,12 @@ async fn handle_socket_payload(
let response_str = match response {
Ok(x) => serde_json::to_string(&x),
Err(err) => {
// we have an anyhow error. turn it into
// we have an anyhow error. turn it into a response
let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id));
serde_json::to_string(&response)
}
}
// TODO: what error should this be?
.unwrap();
Message::Text(response_str)

@ -52,7 +52,7 @@ impl Web3Connections {
let mut blockchain = self.blockchain_graphmap.write().await;
// TODO: think more about heaviest_chain
// TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain {
// this is the only place that writes to block_numbers
// its inside a write lock on blockchain_graphmap, so i think there is no race
@ -61,16 +61,12 @@ impl Web3Connections {
}
if blockchain.contains_node(*block_hash) {
// this hash is already included
trace!(%block_hash, %block_num, "skipping saving existing block");
// return now since this work was already done.
trace!(%block_hash, %block_num, "block already saved");
return Ok(());
}
// TODO: prettier log? or probably move the log somewhere else
trace!(%block_hash, %block_num, "saving new block");
// TODO: theres a small race between contains_key and insert
self.block_hashes
.insert(*block_hash, block.to_owned())
.await;
@ -78,11 +74,10 @@ impl Web3Connections {
blockchain.add_node(*block_hash);
// what should edge weight be? and should the nodes be the blocks instead?
// TODO: maybe the weight should be the block?
// we store parent_hash -> hash because the block already stores the parent_hash
blockchain.add_edge(block.parent_hash, *block_hash, 0);
// TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks?
// TODO: prune blockchain to only keep a configurable (256 on ETH?) number of blocks?
Ok(())
}