This commit is contained in:
Bryan Stitt 2022-07-16 00:08:22 +00:00
parent 8b5578b261
commit 7e4675eefb
2 changed files with 5 additions and 1 deletions

@ -48,12 +48,14 @@ const RESPONSE_CACHE_CAP: usize = 1024;
// block hash, method, params // block hash, method, params
type CacheKey = (H256, String, Option<String>); type CacheKey = (H256, String, Option<String>);
// TODO: make something more advanced that keeps track of cache size in bytes
type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>; type ResponseLrcCache = RwLock<LinkedHashMap<CacheKey, JsonRpcForwardedResponse>>;
type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>; type ActiveRequestsMap = DashMap<CacheKey, watch::Receiver<bool>>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>; pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
/// flatten a JoinError into an anyhow error
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> { pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await { match handle.await {
Ok(Ok(result)) => Ok(result), Ok(Ok(result)) => Ok(result),
@ -62,6 +64,7 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
} }
} }
/// return the first error or okay if everything worked
pub async fn flatten_handles<T>( pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>, mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -69,7 +72,7 @@ pub async fn flatten_handles<T>(
match x { match x {
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e), Ok(Err(e)) => return Err(e),
Ok(Ok(_)) => {} Ok(Ok(_)) => continue,
} }
} }

@ -840,6 +840,7 @@ impl Web3Connections {
// TODO: return a 502? if it does? // TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!")); // return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long? // TODO: sleep how long?
// TODO: subscribe to something in SyncedConnections instead
sleep(Duration::from_millis(200)).await; sleep(Duration::from_millis(200)).await;
continue; continue;