borrow_and_update instead of borrow

i think this won't matter and the update is probably a waste, but it feels wrong to leave it in a changed state
This commit is contained in:
Bryan Stitt 2023-09-18 23:27:44 -07:00
parent 94cbe6153a
commit fc287d24ea
3 changed files with 8 additions and 14 deletions

@ -10,12 +10,11 @@ pub mod rpc_proxy_ws;
pub mod status; pub mod status;
pub mod users; pub mod users;
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyResult; use crate::errors::Web3ProxyResult;
use crate::{app::Web3ProxyApp, errors::Web3ProxyError};
use axum::{ use axum::{
error_handling::HandleErrorLayer,
routing::{get, post}, routing::{get, post},
BoxError, Extension, Router, Extension, Router,
}; };
use http::{header::AUTHORIZATION, Request, StatusCode}; use http::{header::AUTHORIZATION, Request, StatusCode};
use hyper::Body; use hyper::Body;
@ -26,8 +25,6 @@ use std::{iter::once, time::Duration};
use std::{net::SocketAddr, sync::atomic::Ordering}; use std::{net::SocketAddr, sync::atomic::Ordering};
use strum::{EnumCount, EnumIter}; use strum::{EnumCount, EnumIter};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tower::timeout::TimeoutLayer;
use tower::ServiceBuilder;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::{cors::CorsLayer, normalize_path::NormalizePathLayer, trace::TraceLayer}; use tower_http::{cors::CorsLayer, normalize_path::NormalizePathLayer, trace::TraceLayer};
use tracing::{error_span, info}; use tracing::{error_span, info};

@ -993,7 +993,6 @@ impl Web3Rpcs {
} }
} }
yield_now().await;
} }
OpenRequestResult::NotReady => { OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata { if let Some(request_metadata) = request_metadata {
@ -1032,7 +1031,7 @@ impl Web3Rpcs {
let needed = min_block_needed.max(max_block_needed); let needed = min_block_needed.max(max_block_needed);
let head_block_num = watch_consensus_rpcs let head_block_num = watch_consensus_rpcs
.borrow() .borrow_and_update()
.as_ref() .as_ref()
.map(|x| *x.head_block.number()); .map(|x| *x.head_block.number());

@ -814,7 +814,7 @@ impl Web3Rpc {
// rpcs opt-into subscribing to transactions. its a lot of bandwidth // rpcs opt-into subscribing to transactions. its a lot of bandwidth
if !self.subscribe_txs { if !self.subscribe_txs {
loop { loop {
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow_and_update() {
trace!("stopping ws block subscription on {}", self); trace!("stopping ws block subscription on {}", self);
return Ok(()); return Ok(());
} }
@ -837,7 +837,7 @@ impl Web3Rpc {
drop(active_request_handle); drop(active_request_handle);
while let Some(x) = pending_txs_sub.next().await { while let Some(x) = pending_txs_sub.next().await {
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow_and_update() {
// TODO: this is checking way too often. have this on a timer instead // TODO: this is checking way too often. have this on a timer instead
trace!("stopping ws block subscription on {}", self); trace!("stopping ws block subscription on {}", self);
break; break;
@ -854,7 +854,7 @@ impl Web3Rpc {
} else { } else {
// TODO: what should we do here? // TODO: what should we do here?
loop { loop {
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow_and_update() {
trace!("stopping ws block subscription on {}", self); trace!("stopping ws block subscription on {}", self);
return Ok(()); return Ok(());
} }
@ -870,7 +870,7 @@ impl Web3Rpc {
self: &Arc<Self>, self: &Arc<Self>,
block_sender: mpsc::UnboundedSender<BlockAndRpc>, block_sender: mpsc::UnboundedSender<BlockAndRpc>,
block_map: BlocksByHashCache, block_map: BlocksByHashCache,
subscribe_stop_rx: watch::Receiver<bool>, mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
trace!("subscribing to new heads on {}", self); trace!("subscribing to new heads on {}", self);
@ -926,7 +926,7 @@ impl Web3Rpc {
i.set_missed_tick_behavior(MissedTickBehavior::Delay); i.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop { loop {
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow_and_update() {
trace!(%self, "stopping http block subscription"); trace!(%self, "stopping http block subscription");
break; break;
} }
@ -992,8 +992,6 @@ impl Web3Rpc {
} }
sleep_until(retry_at).await; sleep_until(retry_at).await;
yield_now().await;
} }
Ok(OpenRequestResult::NotReady) => { Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat? // TODO: when can this happen? log? emit a stat?