use tokio locks will maybe fix

This commit is contained in:
Bryan Stitt 2022-05-13 07:01:14 +00:00
parent 93ed030d08
commit 77a99bd5ae
3 changed files with 11 additions and 11 deletions

@ -209,7 +209,7 @@ impl Web3Connection {
if old_block_number != block_number {
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;
connections.update_synced_rpcs(&self).await?;
} else {
info!("new block on {}: {}", self, block_number);
}
@ -241,7 +241,7 @@ impl Web3Connection {
.store(block_number, atomic::Ordering::Release);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;
connections.update_synced_rpcs(&self).await?;
} else {
info!("new head block {} from {}", block_number, self);
}
@ -256,7 +256,7 @@ impl Web3Connection {
.fetch_max(new_block_number, atomic::Ordering::AcqRel);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self)?;
connections.update_synced_rpcs(&self).await?;
}
}
}

@ -5,12 +5,12 @@ use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use hashbrown::HashMap;
use parking_lot::RwLock;
use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, trace, warn};
use crate::config::Web3ConnectionConfig;
@ -174,13 +174,13 @@ impl Web3Connections {
}
}
pub fn update_synced_rpcs(&self, rpc: &Arc<Web3Connection>) -> anyhow::Result<()> {
let mut synced_connections = self.synced_connections.write();
pub async fn update_synced_rpcs(&self, rpc: &Arc<Web3Connection>) -> anyhow::Result<()> {
let mut synced_connections = self.synced_connections.write().await;
let new_block = rpc.head_block_number();
if new_block == 0 {
warn!("{:?} is still syncing", rpc);
warn!("{} is still syncing", rpc);
return Ok(());
}
@ -281,7 +281,7 @@ impl Web3Connections {
let mut earliest_not_until = None;
// TODO: this clone is probably not the best way to do this
let mut synced_rpc_indexes = self.synced_connections.read().inner.clone();
let mut synced_rpc_indexes = self.synced_connections.read().await.inner.clone();
// // TODO: how should we include the soft limit? floats are slower than integer math
// let a = a as f32 / self.soft_limit as f32;

@ -27,7 +27,7 @@ fn main() -> anyhow::Result<()> {
let rpc_config: String = fs::read_to_string(cli_config.rpc_config_path)?;
let rpc_config: RpcConfig = toml::from_str(&rpc_config)?;
// TODO: setting title inside of tokio doesnt seem to work. lets do it outside of tokio
// TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3-proxy-{}", rpc_config.shared.chain_id));
let chain_id = rpc_config.shared.chain_id;
@ -36,7 +36,7 @@ fn main() -> anyhow::Result<()> {
.enable_all()
.thread_name_fn(move || {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// TODO: what ordering?
// TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need
let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst);
// TODO: i think these max at 15 characters
format!("web3-{}-{}", chain_id, worker_id)
@ -77,7 +77,7 @@ fn handle_anyhow_errors<T: warp::Reply>(
Err(e) => {
let e = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
// TODO: what id can we use? how do we make sure it gets attached to this?
// TODO: what id can we use? how do we make sure the incoming id gets attached to this?
id: RawValue::from_string("0".to_string()).unwrap(),
result: None,
error: Some(JsonRpcErrorData {