maybe fix next_upstream_server
This commit is contained in:
parent
058dfa6d8e
commit
fc8beb52b0
@ -6,7 +6,7 @@ use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
use crate::config::BlockAndRpc;
|
||||
use crate::frontend::authorization::AuthorizedRequest;
|
||||
use anyhow::Context;
|
||||
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
|
@ -10,6 +10,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
|
||||
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
|
||||
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
|
||||
use crate::rpcs::transactions::TxStatus;
|
||||
use anyhow::Context;
|
||||
use arc_swap::ArcSwap;
|
||||
use counter::Counter;
|
||||
use derive_more::From;
|
||||
@ -379,30 +380,26 @@ impl Web3Connections {
|
||||
) -> anyhow::Result<OpenRequestResult> {
|
||||
let mut earliest_retry_at = None;
|
||||
|
||||
let min_block_needed = if let Some(min_block_needed) = min_block_needed {
|
||||
*min_block_needed
|
||||
} else {
|
||||
self.head_block_num().context("not servers are synced")?
|
||||
};
|
||||
|
||||
// filter the synced rpcs
|
||||
// TODO: we are going to be checking "has_block_data" a lot now
|
||||
let mut synced_rpcs: Vec<Arc<Web3Connection>> =
|
||||
if let Some(min_block_needed) = min_block_needed {
|
||||
self.conns
|
||||
.values()
|
||||
.filter(|x| !skip.contains(x))
|
||||
.filter(|x| x.has_block_data(min_block_needed))
|
||||
.cloned()
|
||||
.collect()
|
||||
} else {
|
||||
self.synced_connections
|
||||
.load()
|
||||
.conns
|
||||
.iter()
|
||||
.filter(|x| !skip.contains(x))
|
||||
.cloned()
|
||||
.collect()
|
||||
};
|
||||
let mut synced_rpcs: Vec<Arc<Web3Connection>> = self
|
||||
.conns
|
||||
.values()
|
||||
.filter(|x| !skip.contains(x))
|
||||
.filter(|x| x.has_block_data(&min_block_needed))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if synced_rpcs.is_empty() {
|
||||
// TODO: what should happen here? automatic retry?
|
||||
// TODO: more detailed error
|
||||
return Err(anyhow::anyhow!("not synced"));
|
||||
return Err(anyhow::anyhow!("no servers are synced"));
|
||||
}
|
||||
|
||||
// we sort on a bunch of values. cache them here so that we don't do this math multiple times.
|
||||
|
@ -171,7 +171,7 @@ pub fn filter_query_window_seconds(
|
||||
response: &mut HashMap<&str, serde_json::Value>,
|
||||
q: Select<rpc_accounting::Entity>,
|
||||
) -> Result<Select<rpc_accounting::Entity>, FrontendErrorResponse> {
|
||||
let query_window_seconds = get_query_window_seconds_from_params(¶ms)?;
|
||||
let query_window_seconds = get_query_window_seconds_from_params(params)?;
|
||||
|
||||
if query_window_seconds == 0 {
|
||||
// TODO: order by more than this?
|
||||
@ -343,7 +343,7 @@ pub async fn query_user_stats<'a>(
|
||||
// TODO: trace log query here? i think sea orm has a useful log level for this
|
||||
|
||||
// set up pagination
|
||||
let page = get_page_from_params(¶ms)?;
|
||||
let page = get_page_from_params(params)?;
|
||||
response.insert("page", serde_json::to_value(page).expect("can't fail"));
|
||||
|
||||
// TODO: page size from param with a max from the config
|
||||
|
Loading…
Reference in New Issue
Block a user