small refactor
This commit is contained in:
parent
d56b862fa2
commit
d961aa647d
|
@ -1425,6 +1425,19 @@ dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "generator"
|
||||||
|
version = "0.6.25"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6"
|
||||||
|
dependencies = [
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"rustversion",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "generic-array"
|
name = "generic-array"
|
||||||
version = "0.12.4"
|
version = "0.12.4"
|
||||||
|
@ -1886,6 +1899,16 @@ version = "1.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "left-right"
|
||||||
|
version = "0.11.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2033c37d2bc68719774fa095ccdd46a14b846e9c52ec54107eda92fbf966b203"
|
||||||
|
dependencies = [
|
||||||
|
"loom",
|
||||||
|
"slab",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.125"
|
version = "0.2.125"
|
||||||
|
@ -1928,6 +1951,17 @@ dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "loom"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "27a6650b2f722ae8c0e2ebc46d07f80c9923464fc206d962332f1eff83143530"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"generator",
|
||||||
|
"scoped-tls",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mach"
|
name = "mach"
|
||||||
version = "0.3.2"
|
version = "0.3.2"
|
||||||
|
@ -3863,6 +3897,7 @@ dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
"governor",
|
"governor",
|
||||||
"hashbrown 0.12.1",
|
"hashbrown 0.12.1",
|
||||||
|
"left-right",
|
||||||
"linkedhashmap",
|
"linkedhashmap",
|
||||||
"parking_lot 0.12.0",
|
"parking_lot 0.12.0",
|
||||||
"proctitle",
|
"proctitle",
|
||||||
|
|
|
@ -15,6 +15,7 @@ flume = "0.10.12"
|
||||||
futures = { version = "0.3.21", features = ["thread-pool"] }
|
futures = { version = "0.3.21", features = ["thread-pool"] }
|
||||||
governor = { version = "0.4.2", features = ["dashmap", "std"] }
|
governor = { version = "0.4.2", features = ["dashmap", "std"] }
|
||||||
hashbrown = "0.12.1"
|
hashbrown = "0.12.1"
|
||||||
|
left-right = "0.11.4"
|
||||||
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
|
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
|
||||||
parking_lot = "0.12.0"
|
parking_lot = "0.12.0"
|
||||||
proctitle = "0.1.1"
|
proctitle = "0.1.1"
|
||||||
|
|
|
@ -5,14 +5,12 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
|
||||||
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
|
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
|
||||||
use crate::jsonrpc::JsonRpcRequest;
|
use crate::jsonrpc::JsonRpcRequest;
|
||||||
use crate::jsonrpc::JsonRpcRequestEnum;
|
use crate::jsonrpc::JsonRpcRequestEnum;
|
||||||
use ethers::prelude::ProviderError;
|
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
|
||||||
use ethers::prelude::{HttpClientError, WsClientError};
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use governor::clock::{Clock, QuantaClock};
|
use governor::clock::{Clock, QuantaClock};
|
||||||
use linkedhashmap::LinkedHashMap;
|
// use linkedhashmap::LinkedHashMap;
|
||||||
use parking_lot::RwLock;
|
// use parking_lot::RwLock;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic::AtomicU64;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
@ -25,12 +23,12 @@ static APP_USER_AGENT: &str = concat!(
|
||||||
env!("CARGO_PKG_VERSION"),
|
env!("CARGO_PKG_VERSION"),
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: put this in config? what size should we do?
|
// // TODO: put this in config? what size should we do?
|
||||||
const RESPONSE_CACHE_CAP: usize = 1024;
|
// const RESPONSE_CACHE_CAP: usize = 1024;
|
||||||
|
|
||||||
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
// /// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
|
||||||
type ResponseLruCache =
|
// type ResponseLruCache =
|
||||||
RwLock<LinkedHashMap<(u64, String, Option<String>), JsonRpcForwardedResponse>>;
|
// RwLock<LinkedHashMap<(H256, String, Option<String>), JsonRpcForwardedResponse>>;
|
||||||
|
|
||||||
/// The application
|
/// The application
|
||||||
// TODO: this debug impl is way too verbose. make something smaller
|
// TODO: this debug impl is way too verbose. make something smaller
|
||||||
|
@ -43,7 +41,7 @@ pub struct Web3ProxyApp {
|
||||||
balanced_rpcs: Arc<Web3Connections>,
|
balanced_rpcs: Arc<Web3Connections>,
|
||||||
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
/// Send private requests (like eth_sendRawTransaction) to all these servers
|
||||||
private_rpcs: Arc<Web3Connections>,
|
private_rpcs: Arc<Web3Connections>,
|
||||||
response_cache: ResponseLruCache,
|
// response_cache: ResponseLruCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Web3ProxyApp {
|
impl fmt::Debug for Web3ProxyApp {
|
||||||
|
@ -61,8 +59,6 @@ impl Web3ProxyApp {
|
||||||
) -> anyhow::Result<Web3ProxyApp> {
|
) -> anyhow::Result<Web3ProxyApp> {
|
||||||
let clock = QuantaClock::default();
|
let clock = QuantaClock::default();
|
||||||
|
|
||||||
let best_head_block_number = Arc::new(AtomicU64::new(0));
|
|
||||||
|
|
||||||
// make a http shared client
|
// make a http shared client
|
||||||
// TODO: how should we configure the connection pool?
|
// TODO: how should we configure the connection pool?
|
||||||
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
|
||||||
|
@ -75,7 +71,6 @@ impl Web3ProxyApp {
|
||||||
// TODO: attach context to this error
|
// TODO: attach context to this error
|
||||||
let balanced_rpcs = Web3Connections::try_new(
|
let balanced_rpcs = Web3Connections::try_new(
|
||||||
chain_id,
|
chain_id,
|
||||||
best_head_block_number.clone(),
|
|
||||||
balanced_rpcs,
|
balanced_rpcs,
|
||||||
Some(http_client.clone()),
|
Some(http_client.clone()),
|
||||||
&clock,
|
&clock,
|
||||||
|
@ -88,22 +83,15 @@ impl Web3ProxyApp {
|
||||||
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
|
||||||
balanced_rpcs.clone()
|
balanced_rpcs.clone()
|
||||||
} else {
|
} else {
|
||||||
Web3Connections::try_new(
|
Web3Connections::try_new(chain_id, private_rpcs, Some(http_client), &clock, false)
|
||||||
chain_id,
|
.await?
|
||||||
best_head_block_number.clone(),
|
|
||||||
private_rpcs,
|
|
||||||
Some(http_client),
|
|
||||||
&clock,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(Web3ProxyApp {
|
Ok(Web3ProxyApp {
|
||||||
clock,
|
clock,
|
||||||
balanced_rpcs,
|
balanced_rpcs,
|
||||||
private_rpcs,
|
private_rpcs,
|
||||||
response_cache: Default::default(),
|
// response_cache: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,22 +209,24 @@ impl Web3ProxyApp {
|
||||||
// this is not a private transaction (or no private relays are configured)
|
// this is not a private transaction (or no private relays are configured)
|
||||||
// try to send to each tier, stopping at the first success
|
// try to send to each tier, stopping at the first success
|
||||||
// if no tiers are synced, fallback to privates
|
// if no tiers are synced, fallback to privates
|
||||||
|
// TODO: think more about this loop.
|
||||||
loop {
|
loop {
|
||||||
let best_block_number = self.balanced_rpcs.head_block_number();
|
// TODO: bring back this caching
|
||||||
|
// let best_block_hash = self.balanced_rpcs.head_block_hash();
|
||||||
|
|
||||||
// TODO: building this cache key is slow and its large, but i don't see a better way right now
|
// // TODO: building this cache key is slow and its large, but i don't see a better way right now
|
||||||
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
|
// // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
|
||||||
let cache_key = (
|
// let cache_key = (
|
||||||
best_block_number,
|
// best_block_hash,
|
||||||
request.method.clone(),
|
// request.method.clone(),
|
||||||
request.params.clone().map(|x| x.to_string()),
|
// request.params.clone().map(|x| x.to_string()),
|
||||||
);
|
// );
|
||||||
|
|
||||||
if let Some(cached) = self.response_cache.read().get(&cache_key) {
|
// if let Some(cached) = self.response_cache.read().get(&cache_key) {
|
||||||
// TODO: this still serializes every time
|
// // TODO: this still serializes every time
|
||||||
// TODO: return a reference in the other places so that this works without a clone?
|
// // TODO: return a reference in the other places so that this works without a clone?
|
||||||
return Ok(cached.to_owned());
|
// return Ok(cached.to_owned());
|
||||||
}
|
// }
|
||||||
|
|
||||||
match self.balanced_rpcs.next_upstream_server().await {
|
match self.balanced_rpcs.next_upstream_server().await {
|
||||||
Ok(active_request_handle) => {
|
Ok(active_request_handle) => {
|
||||||
|
@ -249,27 +239,27 @@ impl Web3ProxyApp {
|
||||||
// TODO: trace here was really slow with millions of requests.
|
// TODO: trace here was really slow with millions of requests.
|
||||||
// info!("forwarding request from {}", upstream_server);
|
// info!("forwarding request from {}", upstream_server);
|
||||||
|
|
||||||
let response = JsonRpcForwardedResponse {
|
JsonRpcForwardedResponse {
|
||||||
jsonrpc: "2.0".to_string(),
|
jsonrpc: "2.0".to_string(),
|
||||||
id: request.id,
|
id: request.id,
|
||||||
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
// TODO: since we only use the result here, should that be all we return from try_send_request?
|
||||||
result: Some(partial_response),
|
result: Some(partial_response),
|
||||||
error: None,
|
error: None,
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
|
||||||
let mut response_cache = self.response_cache.write();
|
|
||||||
|
|
||||||
// TODO: cache the warp::reply to save us serializing every time
|
|
||||||
response_cache.insert(cache_key, response.clone());
|
|
||||||
if response_cache.len() >= RESPONSE_CACHE_CAP {
|
|
||||||
// TODO: this isn't really an LRU. what is this called? should we make it an lru? these caches only live for one block
|
|
||||||
response_cache.pop_front();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(response_cache);
|
// // TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
|
||||||
|
// let mut response_cache = self.response_cache.write();
|
||||||
|
|
||||||
response
|
// // TODO: cache the warp::reply to save us serializing every time
|
||||||
|
// response_cache.insert(cache_key, response.clone());
|
||||||
|
// if response_cache.len() >= RESPONSE_CACHE_CAP {
|
||||||
|
// // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
|
||||||
|
// response_cache.pop_front();
|
||||||
|
// }
|
||||||
|
|
||||||
|
// drop(response_cache);
|
||||||
|
|
||||||
|
// response
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// TODO: move this to a helper function?
|
// TODO: move this to a helper function?
|
||||||
|
|
|
@ -201,6 +201,8 @@ impl Web3Connection {
|
||||||
let mut interval = interval(Duration::from_secs(2));
|
let mut interval = interval(Duration::from_secs(2));
|
||||||
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||||
|
|
||||||
|
let mut last_hash = Default::default();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// wait for the interval
|
// wait for the interval
|
||||||
// TODO: if error or rate limit, increase interval?
|
// TODO: if error or rate limit, increase interval?
|
||||||
|
@ -215,6 +217,17 @@ impl Web3Connection {
|
||||||
|
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
|
|
||||||
|
// don't send repeat blocks
|
||||||
|
if let Ok(block) = &block {
|
||||||
|
let new_hash = block.hash.unwrap();
|
||||||
|
|
||||||
|
if new_hash == last_hash {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
last_hash = new_hash;
|
||||||
|
}
|
||||||
|
|
||||||
self.send_block(block, &block_sender);
|
self.send_block(block, &block_sender);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,12 +6,12 @@ use futures::StreamExt;
|
||||||
use governor::clock::{QuantaClock, QuantaInstant};
|
use governor::clock::{QuantaClock, QuantaInstant};
|
||||||
use governor::NotUntil;
|
use governor::NotUntil;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
use left_right::{Absorb, ReadHandle, WriteHandle};
|
||||||
|
use parking_lot::RwLock;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::atomic::{self, AtomicU64};
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
|
||||||
use tracing::{info, trace, warn};
|
use tracing::{info, trace, warn};
|
||||||
|
|
||||||
use crate::config::Web3ConnectionConfig;
|
use crate::config::Web3ConnectionConfig;
|
||||||
|
@ -41,6 +41,76 @@ impl SyncedConnections {
|
||||||
inner,
|
inner,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn update(&mut self, new_block_num: u64, new_block_hash: H256, rpc: Arc<Web3Connection>) {
|
||||||
|
// TODO: double check this logic
|
||||||
|
match new_block_num.cmp(&self.head_block_number) {
|
||||||
|
cmp::Ordering::Greater => {
|
||||||
|
// the rpc's newest block is the new overall best block
|
||||||
|
info!("new head block {} from {}", new_block_num, rpc);
|
||||||
|
|
||||||
|
self.inner.clear();
|
||||||
|
self.inner.push(rpc);
|
||||||
|
|
||||||
|
self.head_block_number = new_block_num;
|
||||||
|
self.head_block_hash = new_block_hash;
|
||||||
|
}
|
||||||
|
cmp::Ordering::Equal => {
|
||||||
|
if new_block_hash != self.head_block_hash {
|
||||||
|
// same height, but different chain
|
||||||
|
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
|
||||||
|
warn!(
|
||||||
|
"chain is forked at #{}! {} has {:?}. First was {:?}",
|
||||||
|
new_block_num, rpc, new_block_hash, self.head_block_hash
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not clear synced_connections.
|
||||||
|
// we just want to add this rpc to the end
|
||||||
|
self.inner.push(rpc);
|
||||||
|
}
|
||||||
|
cmp::Ordering::Less => {
|
||||||
|
// this isn't the best block in the tier. don't do anything
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: better log
|
||||||
|
trace!("Now synced: {:?}", self.inner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SyncedConnectionsUpdate {
|
||||||
|
new_block_number: u64,
|
||||||
|
new_block_hash: H256,
|
||||||
|
rpc: Arc<Web3Connection>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Absorb<SyncedConnectionsUpdate> for SyncedConnections {
|
||||||
|
fn absorb_first(&mut self, operation: &mut SyncedConnectionsUpdate, _: &Self) {
|
||||||
|
self.update(
|
||||||
|
operation.new_block_number,
|
||||||
|
operation.new_block_hash,
|
||||||
|
operation.rpc.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn absorb_second(&mut self, operation: SyncedConnectionsUpdate, _: &Self) {
|
||||||
|
self.update(
|
||||||
|
operation.new_block_number,
|
||||||
|
operation.new_block_hash,
|
||||||
|
operation.rpc,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// See the documentation of `Absorb::drop_first`.
|
||||||
|
fn drop_first(self: Box<Self>) {}
|
||||||
|
|
||||||
|
fn sync_with(&mut self, first: &Self) {
|
||||||
|
// TODO: not sure about this
|
||||||
|
*self = first.clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
||||||
|
@ -48,9 +118,8 @@ impl SyncedConnections {
|
||||||
pub struct Web3Connections {
|
pub struct Web3Connections {
|
||||||
inner: Vec<Arc<Web3Connection>>,
|
inner: Vec<Arc<Web3Connection>>,
|
||||||
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
|
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
|
||||||
/// TODO: we probably need a better lock on this
|
/// TODO: we probably need a better lock on this. left_right with the writer in a mutex
|
||||||
synced_connections: RwLock<SyncedConnections>,
|
synced_connections: RwLock<SyncedConnections>,
|
||||||
best_head_block_number: Arc<AtomicU64>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Web3Connections {
|
impl fmt::Debug for Web3Connections {
|
||||||
|
@ -65,7 +134,6 @@ impl fmt::Debug for Web3Connections {
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
pub async fn try_new(
|
pub async fn try_new(
|
||||||
chain_id: usize,
|
chain_id: usize,
|
||||||
best_head_block_number: Arc<AtomicU64>,
|
|
||||||
servers: Vec<Web3ConnectionConfig>,
|
servers: Vec<Web3ConnectionConfig>,
|
||||||
http_client: Option<reqwest::Client>,
|
http_client: Option<reqwest::Client>,
|
||||||
clock: &QuantaClock,
|
clock: &QuantaClock,
|
||||||
|
@ -88,12 +156,18 @@ impl Web3Connections {
|
||||||
// TODO: exit if no connections?
|
// TODO: exit if no connections?
|
||||||
|
|
||||||
let connections = Arc::new(Self {
|
let connections = Arc::new(Self {
|
||||||
best_head_block_number: best_head_block_number.clone(),
|
|
||||||
inner: connections,
|
inner: connections,
|
||||||
synced_connections: RwLock::new(SyncedConnections::new(num_connections)),
|
synced_connections: RwLock::new(SyncedConnections::new(num_connections)),
|
||||||
});
|
});
|
||||||
|
|
||||||
if subscribe_heads {
|
if subscribe_heads {
|
||||||
|
if connections.inner.len() < 2 {
|
||||||
|
// TODO: less than 3? what should we do here?
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"need at least 2 connections when subscribing to heads!"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
let (block_sender, block_receiver) = flume::unbounded();
|
let (block_sender, block_receiver) = flume::unbounded();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -120,9 +194,9 @@ impl Web3Connections {
|
||||||
Ok(connections)
|
Ok(connections)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn head_block_number(&self) -> u64 {
|
// pub fn synced_connections(&self) -> &RwLock<SyncedConnections> {
|
||||||
self.best_head_block_number.load(atomic::Ordering::Acquire)
|
// &self.synced_connections
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Send the same request to all the handles. Returning the fastest successful result.
|
/// Send the same request to all the handles. Returning the fastest successful result.
|
||||||
pub async fn try_send_parallel_requests(
|
pub async fn try_send_parallel_requests(
|
||||||
|
@ -197,42 +271,9 @@ impl Web3Connections {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: experiment with different locks and such here
|
// TODO: experiment with different locks and such here
|
||||||
let mut synced_connections = self.synced_connections.write().await;
|
let mut synced_connections = self.synced_connections.write();
|
||||||
|
|
||||||
// TODO: double check this logic
|
synced_connections.update(new_block_num, new_block_hash, rpc);
|
||||||
match new_block_num.cmp(&synced_connections.head_block_number) {
|
|
||||||
cmp::Ordering::Greater => {
|
|
||||||
// the rpc's newest block is the new overall best block
|
|
||||||
info!("new head block {} from {}", new_block_num, rpc);
|
|
||||||
|
|
||||||
synced_connections.inner.clear();
|
|
||||||
synced_connections.inner.push(rpc);
|
|
||||||
|
|
||||||
synced_connections.head_block_number = new_block_num;
|
|
||||||
synced_connections.head_block_hash = new_block_hash;
|
|
||||||
}
|
|
||||||
cmp::Ordering::Equal => {
|
|
||||||
if new_block_hash != synced_connections.head_block_hash {
|
|
||||||
// same height, but different chain
|
|
||||||
warn!(
|
|
||||||
"chain is forked! {} has {}. First #{} was {}",
|
|
||||||
rpc, new_block_hash, new_block_num, synced_connections.head_block_hash
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not clear synced_connections.
|
|
||||||
// we just want to add this rpc to the end
|
|
||||||
synced_connections.inner.push(rpc);
|
|
||||||
}
|
|
||||||
cmp::Ordering::Less => {
|
|
||||||
// this isn't the best block in the tier. don't do anything
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: better log
|
|
||||||
trace!("Now synced: {:?}", synced_connections.inner);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -244,15 +285,15 @@ impl Web3Connections {
|
||||||
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
|
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
|
||||||
let mut earliest_not_until = None;
|
let mut earliest_not_until = None;
|
||||||
|
|
||||||
// TODO: this clone is probably not the best way to do this
|
// TODO: this clone is definitely not the best way to do this
|
||||||
let mut synced_rpc_indexes = self.synced_connections.read().await.inner.clone();
|
let mut synced_rpc_arcs = self.synced_connections.read().inner.clone();
|
||||||
|
|
||||||
// // TODO: how should we include the soft limit? floats are slower than integer math
|
// // TODO: how should we include the soft limit? floats are slower than integer math
|
||||||
// let a = a as f32 / self.soft_limit as f32;
|
// let a = a as f32 / self.soft_limit as f32;
|
||||||
// let b = b as f32 / other.soft_limit as f32;
|
// let b = b as f32 / other.soft_limit as f32;
|
||||||
|
|
||||||
// TODO: better key!
|
// TODO: better key!
|
||||||
let sort_cache: HashMap<String, (f32, u32)> = synced_rpc_indexes
|
let sort_cache: HashMap<String, (f32, u32)> = synced_rpc_arcs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|connection| {
|
.map(|connection| {
|
||||||
// TODO: better key!
|
// TODO: better key!
|
||||||
|
@ -267,7 +308,7 @@ impl Web3Connections {
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// TODO: i think we might need to load active connections and then
|
// TODO: i think we might need to load active connections and then
|
||||||
synced_rpc_indexes.sort_unstable_by(|a, b| {
|
synced_rpc_arcs.sort_unstable_by(|a, b| {
|
||||||
// TODO: better keys
|
// TODO: better keys
|
||||||
let a_key = format!("{}", a);
|
let a_key = format!("{}", a);
|
||||||
let b_key = format!("{}", b);
|
let b_key = format!("{}", b);
|
||||||
|
@ -285,7 +326,7 @@ impl Web3Connections {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
for selected_rpc in synced_rpc_indexes.into_iter() {
|
for selected_rpc in synced_rpc_arcs.into_iter() {
|
||||||
// increment our connection counter
|
// increment our connection counter
|
||||||
match selected_rpc.try_request_handle() {
|
match selected_rpc.try_request_handle() {
|
||||||
Err(not_until) => {
|
Err(not_until) => {
|
||||||
|
|
Loading…
Reference in New Issue