shorter function names

This commit is contained in:
Bryan Stitt 2022-08-03 22:23:10 +00:00
parent 3f063efce8
commit 63ae98e9a4
5 changed files with 54 additions and 52 deletions

View File

@ -198,3 +198,5 @@ in another repo: event subscriber
2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517
2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517
- [ ] threshold should check actual available request limits (if any) instead of just the soft limit - [ ] threshold should check actual available request limits (if any) instead of just the soft limit
- [ ] foreign key on_update and on_delete
- [ ] database creation timestamps

View File

@ -92,7 +92,7 @@ fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> (bool, U64) {
} }
} }
fn get_or_set_block_number( fn clean_block_number(
params: &mut serde_json::Value, params: &mut serde_json::Value,
block_param_id: usize, block_param_id: usize,
latest_block: U64, latest_block: U64,
@ -128,7 +128,7 @@ fn get_or_set_block_number(
} }
// TODO: change this to return also return the hash needed // TODO: change this to return also return the hash needed
fn get_min_block_needed( fn block_needed(
method: &str, method: &str,
params: Option<&mut serde_json::Value>, params: Option<&mut serde_json::Value>,
head_block: U64, head_block: U64,
@ -225,7 +225,7 @@ fn get_min_block_needed(
} }
}; };
match get_or_set_block_number(params, block_param_id, head_block) { match clean_block_number(params, block_param_id, head_block) {
Ok(block) => Some(block), Ok(block) => Some(block),
Err(err) => { Err(err) => {
// TODO: seems unlikely that we will get here // TODO: seems unlikely that we will get here
@ -253,7 +253,7 @@ pub struct Web3ProxyApp {
balanced_rpcs: Arc<Web3Connections>, balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers /// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>, private_rpcs: Arc<Web3Connections>,
incoming_requests: ActiveRequestsMap, active_requests: ActiveRequestsMap,
/// bytes available to response_cache (it will be slightly larger than this) /// bytes available to response_cache (it will be slightly larger than this)
response_cache_max_bytes: AtomicUsize, response_cache_max_bytes: AtomicUsize,
response_cache: ResponseLrcCache, response_cache: ResponseLrcCache,
@ -274,11 +274,11 @@ impl fmt::Debug for Web3ProxyApp {
} }
impl Web3ProxyApp { impl Web3ProxyApp {
pub fn get_pending_transactions(&self) -> &DashMap<TxHash, TxState> { pub fn pending_transactions(&self) -> &DashMap<TxHash, TxState> {
&self.pending_transactions &self.pending_transactions
} }
pub fn get_public_rate_limiter(&self) -> Option<&RedisCellClient> { pub fn public_rate_limiter(&self) -> Option<&RedisCellClient> {
self.public_rate_limiter.as_ref() self.public_rate_limiter.as_ref()
} }
@ -435,7 +435,7 @@ impl Web3ProxyApp {
let app = Self { let app = Self {
balanced_rpcs, balanced_rpcs,
private_rpcs, private_rpcs,
incoming_requests: Default::default(), active_requests: Default::default(),
response_cache_max_bytes: AtomicUsize::new(app_config.shared.response_cache_max_bytes), response_cache_max_bytes: AtomicUsize::new(app_config.shared.response_cache_max_bytes),
response_cache: Default::default(), response_cache: Default::default(),
head_block_receiver, head_block_receiver,
@ -647,16 +647,16 @@ impl Web3ProxyApp {
Ok((subscription_abort_handle, response)) Ok((subscription_abort_handle, response))
} }
pub fn get_balanced_rpcs(&self) -> &Web3Connections { pub fn balanced_rpcs(&self) -> &Web3Connections {
&self.balanced_rpcs &self.balanced_rpcs
} }
pub fn get_private_rpcs(&self) -> &Web3Connections { pub fn private_rpcs(&self) -> &Web3Connections {
&self.private_rpcs &self.private_rpcs
} }
pub fn get_active_requests(&self) -> &ActiveRequestsMap { pub fn active_requests(&self) -> &ActiveRequestsMap {
&self.incoming_requests &self.active_requests
} }
/// send the request or batch of requests to the approriate RPCs /// send the request or batch of requests to the approriate RPCs
@ -715,7 +715,7 @@ impl Web3ProxyApp {
Ok(collected) Ok(collected)
} }
async fn get_cached_response( async fn cached_response(
&self, &self,
// TODO: accept a block hash here also? // TODO: accept a block hash here also?
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
@ -729,10 +729,10 @@ impl Web3ProxyApp {
let request_block_hash = if let Some(min_block_needed) = min_block_needed { let request_block_hash = if let Some(min_block_needed) = min_block_needed {
// TODO: maybe this should be on the app and not on balanced_rpcs // TODO: maybe this should be on the app and not on balanced_rpcs
self.balanced_rpcs.get_block_hash(min_block_needed).await? self.balanced_rpcs.block_hash(min_block_needed).await?
} else { } else {
// TODO: maybe this should be on the app and not on balanced_rpcs // TODO: maybe this should be on the app and not on balanced_rpcs
self.balanced_rpcs.get_head_block_hash() self.balanced_rpcs.head_block_hash()
}; };
// TODO: better key? benchmark this // TODO: better key? benchmark this
@ -852,7 +852,7 @@ impl Web3ProxyApp {
// some commands can use local data or caches // some commands can use local data or caches
"eth_accounts" => serde_json::Value::Array(vec![]), "eth_accounts" => serde_json::Value::Array(vec![]),
"eth_blockNumber" => { "eth_blockNumber" => {
let head_block_number = self.balanced_rpcs.get_head_block_num(); let head_block_number = self.balanced_rpcs.head_block_num();
// TODO: technically, block 0 is okay. i guess we should be using an option // TODO: technically, block 0 is okay. i guess we should be using an option
if head_block_number.as_u64() == 0 { if head_block_number.as_u64() == 0 {
@ -931,23 +931,23 @@ impl Web3ProxyApp {
// TODO: web3_sha3? // TODO: web3_sha3?
// anything else gets sent to backend rpcs and cached // anything else gets sent to backend rpcs and cached
method => { method => {
let head_block_number = self.balanced_rpcs.get_head_block_num(); let head_block_number = self.balanced_rpcs.head_block_num();
// we do this check before checking caches because it might modify the request params // we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different // TODO: add a stat for archive vs full since they should probably cost different
let min_block_needed = let min_block_needed =
get_min_block_needed(method, request.params.as_mut(), head_block_number); block_needed(method, request.params.as_mut(), head_block_number);
let min_block_needed = min_block_needed.as_ref(); let min_block_needed = min_block_needed.as_ref();
trace!(?min_block_needed, ?method); trace!(?min_block_needed, ?method);
let (cache_key, cache_result) = let (cache_key, cache_result) =
self.get_cached_response(min_block_needed, &request).await?; self.cached_response(min_block_needed, &request).await?;
let response_cache = match cache_result { let response_cache = match cache_result {
Ok(response) => { Ok(response) => {
let _ = self.incoming_requests.remove(&cache_key); let _ = self.active_requests.remove(&cache_key);
// TODO: if the response is cached, should it count less against the account's costs? // TODO: if the response is cached, should it count less against the account's costs?
@ -960,7 +960,7 @@ impl Web3ProxyApp {
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true); let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None; let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) { match self.active_requests.entry(cache_key.clone()) {
DashMapEntry::Occupied(entry) => { DashMapEntry::Occupied(entry) => {
other_incoming_rx = Some(entry.get().clone()); other_incoming_rx = Some(entry.get().clone());
} }
@ -977,7 +977,7 @@ impl Web3ProxyApp {
// now that we've waited, lets check the cache again // now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) { if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.incoming_requests.remove(&cache_key); let _ = self.active_requests.remove(&cache_key);
let _ = incoming_tx.send(false); let _ = incoming_tx.send(false);
// TODO: emit a stat // TODO: emit a stat
@ -1039,7 +1039,7 @@ impl Web3ProxyApp {
} }
} }
let _ = self.incoming_requests.remove(&cache_key); let _ = self.active_requests.remove(&cache_key);
let _ = incoming_tx.send(false); let _ = incoming_tx.send(false);
return Ok(response); return Ok(response);

View File

@ -54,11 +54,11 @@ impl fmt::Debug for SyncedConnections {
} }
impl SyncedConnections { impl SyncedConnections {
pub fn get_head_block_hash(&self) -> &H256 { pub fn head_block_hash(&self) -> &H256 {
&self.head_block_hash &self.head_block_hash
} }
pub fn get_head_block_num(&self) -> U64 { pub fn head_block_num(&self) -> U64 {
self.head_block_num.into() self.head_block_num.into()
} }
} }
@ -96,11 +96,11 @@ impl BlockChain {
self.block_map.entry(hash).or_insert(block); self.block_map.entry(hash).or_insert(block);
} }
pub fn get_cannonical_block(&self, num: &U64) -> Option<Arc<Block<TxHash>>> { pub fn cannonical_block(&self, num: &U64) -> Option<Arc<Block<TxHash>>> {
self.chain_map.get(num).map(|x| x.clone()) self.chain_map.get(num).map(|x| x.clone())
} }
pub fn get_block(&self, hash: &H256) -> Option<Arc<Block<TxHash>>> { pub fn block(&self, hash: &H256) -> Option<Arc<Block<TxHash>>> {
self.block_map.get(hash).map(|x| x.clone()) self.block_map.get(hash).map(|x| x.clone())
} }
} }
@ -423,9 +423,9 @@ impl Web3Connections {
Ok(()) Ok(())
} }
pub async fn get_block(&self, hash: &H256) -> anyhow::Result<Arc<Block<TxHash>>> { pub async fn block(&self, hash: &H256) -> anyhow::Result<Arc<Block<TxHash>>> {
// first, try to get the hash from our cache // first, try to get the hash from our cache
if let Some(block) = self.chain.get_block(hash) { if let Some(block) = self.chain.block(hash) {
return Ok(block); return Ok(block);
} }
@ -451,15 +451,15 @@ impl Web3Connections {
} }
/// Get the heaviest chain's block from cache or backend rpc /// Get the heaviest chain's block from cache or backend rpc
pub async fn get_cannonical_block(&self, num: &U64) -> anyhow::Result<Arc<Block<TxHash>>> { pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<Arc<Block<TxHash>>> {
// first, try to get the hash from our cache // first, try to get the hash from our cache
if let Some(block) = self.chain.get_cannonical_block(num) { if let Some(block) = self.chain.cannonical_block(num) {
return Ok(block); return Ok(block);
} }
// block not in cache. we need to ask an rpc for it // block not in cache. we need to ask an rpc for it
// but before we do any queries, be sure the requested block num exists // but before we do any queries, be sure the requested block num exists
let head_block_num = self.get_head_block_num(); let head_block_num = self.head_block_num();
if num > &head_block_num { if num > &head_block_num {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested", "Head block is #{}, but #{} was requested",
@ -491,38 +491,38 @@ impl Web3Connections {
} }
/// Convenience method to get the cannonical block at a given block height. /// Convenience method to get the cannonical block at a given block height.
pub async fn get_block_hash(&self, num: &U64) -> anyhow::Result<H256> { pub async fn block_hash(&self, num: &U64) -> anyhow::Result<H256> {
let block = self.get_cannonical_block(num).await?; let block = self.cannonical_block(num).await?;
let hash = block.hash.unwrap(); let hash = block.hash.unwrap();
Ok(hash) Ok(hash)
} }
pub fn get_head_block(&self) -> (U64, H256) { pub fn head_block(&self) -> (U64, H256) {
let synced_connections = self.synced_connections.load(); let synced_connections = self.synced_connections.load();
let num = synced_connections.get_head_block_num(); let num = synced_connections.head_block_num();
let hash = *synced_connections.get_head_block_hash(); let hash = *synced_connections.head_block_hash();
(num, hash) (num, hash)
} }
pub fn get_head_block_hash(&self) -> H256 { pub fn head_block_hash(&self) -> H256 {
*self.synced_connections.load().get_head_block_hash() *self.synced_connections.load().head_block_hash()
} }
pub fn get_head_block_num(&self) -> U64 { pub fn head_block_num(&self) -> U64 {
self.synced_connections.load().get_head_block_num() self.synced_connections.load().head_block_num()
} }
pub fn has_synced_rpcs(&self) -> bool { pub fn synced(&self) -> bool {
// TODO: require a minimum number of synced rpcs // TODO: require a minimum number of synced rpcs
// TODO: move this whole function to SyncedConnections // TODO: move this whole function to SyncedConnections
if self.synced_connections.load().conns.is_empty() { if self.synced_connections.load().conns.is_empty() {
return false; return false;
} }
self.get_head_block_num() > U64::zero() self.head_block_num() > U64::zero()
} }
pub fn num_synced_rpcs(&self) -> usize { pub fn num_synced_rpcs(&self) -> usize {
@ -697,7 +697,7 @@ impl Web3Connections {
rpc_urls_by_num.push(rpc_url); rpc_urls_by_num.push(rpc_url);
if let Some(parent) = self.chain.get_block(&block.parent_hash) { if let Some(parent) = self.chain.block(&block.parent_hash) {
// save the parent block // save the parent block
blocks_by_hash.insert(block.parent_hash, parent.clone()); blocks_by_hash.insert(block.parent_hash, parent.clone());
@ -812,7 +812,7 @@ impl Web3Connections {
conns, conns,
}; };
let current_head_block = self.get_head_block_hash(); let current_head_block = self.head_block_hash();
let new_head_block = let new_head_block =
pending_synced_connections.head_block_hash != current_head_block; pending_synced_connections.head_block_hash != current_head_block;
@ -971,7 +971,7 @@ impl Web3Connections {
/// get all rpc servers that are not rate limited /// get all rpc servers that are not rate limited
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions /// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
pub async fn get_upstream_servers( pub async fn upstream_servers(
&self, &self,
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> { ) -> Result<Vec<ActiveRequestHandle>, Option<Duration>> {
@ -1107,7 +1107,7 @@ impl Web3Connections {
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self.get_upstream_servers(min_block_needed).await { match self.upstream_servers(min_block_needed).await {
Ok(active_request_handles) => { Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures // TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle? // TODO: do something with this handle?

View File

@ -6,7 +6,7 @@ use crate::app::Web3ProxyApp;
/// Health check page for load balancers to use /// Health check page for load balancers to use
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
if app.get_balanced_rpcs().has_synced_rpcs() { if app.balanced_rpcs().synced() {
(StatusCode::OK, "OK") (StatusCode::OK, "OK")
} else { } else {
(StatusCode::SERVICE_UNAVAILABLE, ":(") (StatusCode::SERVICE_UNAVAILABLE, ":(")
@ -16,10 +16,10 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
/// Very basic status page /// Very basic status page
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse { pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: what else should we include? uptime? prometheus? // TODO: what else should we include? uptime? prometheus?
let balanced_rpcs = app.get_balanced_rpcs(); let balanced_rpcs = app.balanced_rpcs();
let private_rpcs = app.get_private_rpcs(); let private_rpcs = app.private_rpcs();
let num_active_requests = app.get_active_requests().len(); let num_active_requests = app.active_requests().len();
let num_pending_transactions = app.get_pending_transactions().len(); let num_pending_transactions = app.pending_transactions().len();
let body = json!({ let body = json!({
"balanced_rpcs": balanced_rpcs, "balanced_rpcs": balanced_rpcs,

View File

@ -10,7 +10,7 @@ pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
) -> impl IntoResponse { ) -> impl IntoResponse {
if let Some(rate_limiter) = app.get_public_rate_limiter() { if let Some(rate_limiter) = app.public_rate_limiter() {
let rate_limiter_key = format!("{}", ip); let rate_limiter_key = format!("{}", ip);
if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() { if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() {