add min_sum_soft_limit to config
This commit is contained in:
parent
8703532ed7
commit
5e239c05c8
|
@ -1,7 +1,7 @@
|
||||||
[shared]
|
[shared]
|
||||||
chain_id = 1
|
chain_id = 1
|
||||||
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
|
db_url = "mysql://root:dev_web3_proxy@dev-db:3306/dev_web3_proxy"
|
||||||
min_synced_soft_limit = 2000
|
min_sum_soft_limit = 2000
|
||||||
min_synced_rpcs = 2
|
min_synced_rpcs = 2
|
||||||
redis_url = "redis://dev-redis:6379/"
|
redis_url = "redis://dev-redis:6379/"
|
||||||
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
|
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
|
||||||
|
|
|
@ -255,6 +255,7 @@ impl Web3ProxyApp {
|
||||||
redis_pool.clone(),
|
redis_pool.clone(),
|
||||||
block_map.clone(),
|
block_map.clone(),
|
||||||
Some(head_block_sender),
|
Some(head_block_sender),
|
||||||
|
top_config.app.min_sum_soft_limit,
|
||||||
top_config.app.min_synced_rpcs,
|
top_config.app.min_synced_rpcs,
|
||||||
Some(pending_tx_sender.clone()),
|
Some(pending_tx_sender.clone()),
|
||||||
pending_transactions.clone(),
|
pending_transactions.clone(),
|
||||||
|
@ -279,6 +280,7 @@ impl Web3ProxyApp {
|
||||||
None,
|
None,
|
||||||
// minimum doesn't really matter on private rpcs
|
// minimum doesn't really matter on private rpcs
|
||||||
1,
|
1,
|
||||||
|
1,
|
||||||
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
|
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits
|
||||||
None,
|
None,
|
||||||
pending_transactions.clone(),
|
pending_transactions.clone(),
|
||||||
|
|
|
@ -217,6 +217,7 @@ mod tests {
|
||||||
db_url: None,
|
db_url: None,
|
||||||
invite_code: None,
|
invite_code: None,
|
||||||
redis_url: None,
|
redis_url: None,
|
||||||
|
min_sum_soft_limit: 1,
|
||||||
min_synced_rpcs: 1,
|
min_synced_rpcs: 1,
|
||||||
public_rate_limit_per_minute: 0,
|
public_rate_limit_per_minute: 0,
|
||||||
response_cache_max_bytes: 10_usize.pow(7),
|
response_cache_max_bytes: 10_usize.pow(7),
|
||||||
|
|
|
@ -45,8 +45,10 @@ pub struct AppConfig {
|
||||||
pub chain_id: u64,
|
pub chain_id: u64,
|
||||||
pub db_url: Option<String>,
|
pub db_url: Option<String>,
|
||||||
pub invite_code: Option<String>,
|
pub invite_code: Option<String>,
|
||||||
|
#[serde(default = "default_min_sum_soft_limit")]
|
||||||
|
pub min_sum_soft_limit: u32,
|
||||||
#[serde(default = "default_min_synced_rpcs")]
|
#[serde(default = "default_min_synced_rpcs")]
|
||||||
pub min_synced_rpcs: usize,
|
pub min_synced_rpcs: u32,
|
||||||
pub redis_url: Option<String>,
|
pub redis_url: Option<String>,
|
||||||
#[serde(default = "default_public_rate_limit_per_minute")]
|
#[serde(default = "default_public_rate_limit_per_minute")]
|
||||||
pub public_rate_limit_per_minute: u64,
|
pub public_rate_limit_per_minute: u64,
|
||||||
|
@ -58,10 +60,15 @@ pub struct AppConfig {
|
||||||
pub redirect_user_url: String,
|
pub redirect_user_url: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_min_synced_rpcs() -> usize {
|
fn default_min_sum_soft_limit() -> u32 {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_min_synced_rpcs() -> u32 {
|
||||||
|
1
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 0 blocks public requests by default.
|
||||||
fn default_public_rate_limit_per_minute() -> u64 {
|
fn default_public_rate_limit_per_minute() -> u64 {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ use crate::{
|
||||||
};
|
};
|
||||||
use dashmap::mapref::one::Ref;
|
use dashmap::mapref::one::Ref;
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::prelude::{Block, TxHash, H256, U256, U64};
|
use ethers::prelude::{Block, TxHash, H256, U64};
|
||||||
use hashbrown::{HashMap, HashSet};
|
use hashbrown::{HashMap, HashSet};
|
||||||
use petgraph::algo::all_simple_paths;
|
use petgraph::algo::all_simple_paths;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
@ -15,46 +15,13 @@ use std::sync::Arc;
|
||||||
use tokio::sync::{broadcast, watch};
|
use tokio::sync::{broadcast, watch};
|
||||||
use tracing::{debug, info, trace, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
|
/// A block's hash and number.
|
||||||
#[derive(Default, From)]
|
#[derive(Default, From)]
|
||||||
pub struct BlockId {
|
pub struct BlockId {
|
||||||
pub(super) hash: H256,
|
pub(super) hash: H256,
|
||||||
pub(super) num: U64,
|
pub(super) num: U64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: do we need this? probably big refactor still to do
|
|
||||||
pub(super) struct BlockMetadata<'a> {
|
|
||||||
pub(super) block: &'a Arc<Block<TxHash>>,
|
|
||||||
pub(super) rpc_names: Vec<&'a str>,
|
|
||||||
pub(super) sum_soft_limit: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> BlockMetadata<'a> {
|
|
||||||
// TODO: there are sortable traits, but this seems simpler
|
|
||||||
/// sort the blocks in descending height
|
|
||||||
pub fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) {
|
|
||||||
// trace!(?self.block, ?self.conns);
|
|
||||||
|
|
||||||
// first we care about the block number
|
|
||||||
let block_num = self.block.number.as_ref().unwrap();
|
|
||||||
|
|
||||||
// if block_num ties, the block with the highest total difficulty *should* be the winner
|
|
||||||
// TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything
|
|
||||||
// let total_difficulty = self.block.total_difficulty.as_ref().expect("wat");
|
|
||||||
|
|
||||||
// all the nodes should already be doing this fork priority logic themselves
|
|
||||||
// so, it should be safe to just look at whatever our node majority thinks and go with that
|
|
||||||
let sum_soft_limit = &self.sum_soft_limit;
|
|
||||||
|
|
||||||
let difficulty = &self.block.difficulty;
|
|
||||||
|
|
||||||
// if we are still tied (unlikely). this will definitely break the tie
|
|
||||||
// TODO: what does geth do?
|
|
||||||
let block_hash = self.block.hash.as_ref().unwrap();
|
|
||||||
|
|
||||||
(block_num, sum_soft_limit, difficulty, block_hash)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
/// add a block to our map and it's hash to our graphmap of the blockchain
|
/// add a block to our map and it's hash to our graphmap of the blockchain
|
||||||
pub fn save_block(&self, block: &Arc<Block<TxHash>>) -> anyhow::Result<()> {
|
pub fn save_block(&self, block: &Arc<Block<TxHash>>) -> anyhow::Result<()> {
|
||||||
|
@ -231,26 +198,26 @@ impl Web3Connections {
|
||||||
async fn process_block_from_rpc(
|
async fn process_block_from_rpc(
|
||||||
&self,
|
&self,
|
||||||
connection_heads: &mut HashMap<String, H256>,
|
connection_heads: &mut HashMap<String, H256>,
|
||||||
new_block: Arc<Block<TxHash>>,
|
rpc_head_block: Arc<Block<TxHash>>,
|
||||||
rpc: Arc<Web3Connection>,
|
rpc: Arc<Web3Connection>,
|
||||||
head_block_sender: &watch::Sender<Arc<Block<TxHash>>>,
|
head_block_sender: &watch::Sender<Arc<Block<TxHash>>>,
|
||||||
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
|
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// add the block to connection_heads
|
// add the block to connection_heads
|
||||||
match (new_block.hash, new_block.number) {
|
match (rpc_head_block.hash, rpc_head_block.number) {
|
||||||
(Some(hash), Some(num)) => {
|
(Some(rpc_head_hash), Some(rpc_head_num)) => {
|
||||||
if num == U64::zero() {
|
if rpc_head_num == U64::zero() {
|
||||||
debug!(%rpc, "still syncing");
|
debug!(%rpc, "still syncing");
|
||||||
|
|
||||||
connection_heads.remove(&rpc.name);
|
connection_heads.remove(&rpc.name);
|
||||||
} else {
|
} else {
|
||||||
connection_heads.insert(rpc.name.clone(), hash);
|
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
|
||||||
|
|
||||||
self.save_block(&new_block)?;
|
self.save_block(&rpc_head_block)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
warn!(%rpc, ?new_block, "Block without number or hash!");
|
warn!(%rpc, ?rpc_head_block, "Block without number or hash!");
|
||||||
|
|
||||||
connection_heads.remove(&rpc.name);
|
connection_heads.remove(&rpc.name);
|
||||||
|
|
||||||
|
@ -282,11 +249,6 @@ impl Web3Connections {
|
||||||
// clone to release the read lock
|
// clone to release the read lock
|
||||||
let highest_work_block = highest_work_block.map(|x| x.clone());
|
let highest_work_block = highest_work_block.map(|x| x.clone());
|
||||||
|
|
||||||
// TODO: default min_soft_limit? without, we start serving traffic at the start too quickly
|
|
||||||
// let min_sum_soft_limit = total_soft_limit / 2;
|
|
||||||
// TODO: this should be configurable
|
|
||||||
let min_sum_soft_limit = 1;
|
|
||||||
|
|
||||||
let mut highest_work_block = match highest_work_block {
|
let mut highest_work_block = match highest_work_block {
|
||||||
None => todo!("no servers are in sync"),
|
None => todo!("no servers are in sync"),
|
||||||
Some(highest_work_block) => highest_work_block,
|
Some(highest_work_block) => highest_work_block,
|
||||||
|
@ -297,7 +259,7 @@ impl Web3Connections {
|
||||||
// track rpcs so we can build a new SyncedConnections
|
// track rpcs so we can build a new SyncedConnections
|
||||||
let mut consensus_rpcs: Vec<&Arc<Web3Connection>> = vec![];
|
let mut consensus_rpcs: Vec<&Arc<Web3Connection>> = vec![];
|
||||||
// a running total of the soft limits covered by the rpcs
|
// a running total of the soft limits covered by the rpcs
|
||||||
let mut consensus_soft_limit = 0;
|
let mut consensus_sum_soft_limit: u32 = 0;
|
||||||
|
|
||||||
// check the highest work block and its parents for a set of rpcs that can serve our request load
|
// check the highest work block and its parents for a set of rpcs that can serve our request load
|
||||||
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind
|
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind
|
||||||
|
@ -316,7 +278,7 @@ impl Web3Connections {
|
||||||
if let Some(rpc) = self.conns.get(rpc_name) {
|
if let Some(rpc) = self.conns.get(rpc_name) {
|
||||||
consensus_names.insert(rpc_name);
|
consensus_names.insert(rpc_name);
|
||||||
consensus_rpcs.push(rpc);
|
consensus_rpcs.push(rpc);
|
||||||
consensus_soft_limit += rpc.soft_limit;
|
consensus_sum_soft_limit += rpc.soft_limit;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -338,18 +300,20 @@ impl Web3Connections {
|
||||||
if is_connected {
|
if is_connected {
|
||||||
if let Some(rpc) = self.conns.get(rpc_name) {
|
if let Some(rpc) = self.conns.get(rpc_name) {
|
||||||
consensus_rpcs.push(rpc);
|
consensus_rpcs.push(rpc);
|
||||||
consensus_soft_limit += rpc.soft_limit;
|
consensus_sum_soft_limit += rpc.soft_limit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if consensus_soft_limit >= min_sum_soft_limit {
|
// TODO: min_sum_soft_limit as a percentage of total_soft_limit?
|
||||||
|
// let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit;
|
||||||
|
if consensus_sum_soft_limit >= self.min_sum_soft_limit {
|
||||||
// success! this block has enough nodes on it
|
// success! this block has enough nodes on it
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// else, we need to try the parent block
|
// else, we need to try the parent block
|
||||||
|
|
||||||
trace!(%consensus_soft_limit, ?highest_work_hash, "avoiding thundering herd");
|
trace!(%consensus_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd");
|
||||||
|
|
||||||
// // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here
|
// // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here
|
||||||
// highest_work_block = self
|
// highest_work_block = self
|
||||||
|
@ -362,9 +326,9 @@ impl Web3Connections {
|
||||||
None => {
|
None => {
|
||||||
warn!(
|
warn!(
|
||||||
"ran out of parents to check. soft limit only {}/{}: {}%",
|
"ran out of parents to check. soft limit only {}/{}: {}%",
|
||||||
consensus_soft_limit,
|
consensus_sum_soft_limit,
|
||||||
min_sum_soft_limit,
|
self.min_sum_soft_limit,
|
||||||
consensus_soft_limit * 100 / min_sum_soft_limit
|
consensus_sum_soft_limit * 100 / self.min_sum_soft_limit
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -376,8 +340,8 @@ impl Web3Connections {
|
||||||
// unlock self.blockchain_graphmap
|
// unlock self.blockchain_graphmap
|
||||||
drop(blockchain_guard);
|
drop(blockchain_guard);
|
||||||
|
|
||||||
let soft_limit_met = consensus_soft_limit >= min_sum_soft_limit;
|
let soft_limit_met = consensus_sum_soft_limit >= self.min_sum_soft_limit;
|
||||||
let num_synced_rpcs = consensus_rpcs.len();
|
let num_synced_rpcs = consensus_rpcs.len() as u32;
|
||||||
|
|
||||||
let new_synced_connections = if soft_limit_met {
|
let new_synced_connections = if soft_limit_met {
|
||||||
// we have a consensus large enough to serve traffic
|
// we have a consensus large enough to serve traffic
|
||||||
|
@ -411,27 +375,31 @@ impl Web3Connections {
|
||||||
SyncedConnections::default()
|
SyncedConnections::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_head_hash = new_synced_connections.head_block_hash;
|
let consensus_block_hash = new_synced_connections.head_block_hash;
|
||||||
let new_head_num = new_synced_connections.head_block_num;
|
let consensus_block_num = new_synced_connections.head_block_num;
|
||||||
let new_synced_connections = Arc::new(new_synced_connections);
|
let new_synced_connections = Arc::new(new_synced_connections);
|
||||||
let num_connection_heads = connection_heads.len();
|
let num_connection_heads = connection_heads.len();
|
||||||
|
let total_rpcs = self.conns.len();
|
||||||
|
|
||||||
let old_synced_connections = self.synced_connections.swap(new_synced_connections);
|
let old_synced_connections = self.synced_connections.swap(new_synced_connections);
|
||||||
|
|
||||||
let old_head_hash = old_synced_connections.head_block_hash;
|
let old_head_hash = old_synced_connections.head_block_hash;
|
||||||
let total_rpcs = self.conns.len();
|
|
||||||
|
|
||||||
if new_head_hash == old_head_hash {
|
if Some(consensus_block_hash) != rpc_head_block.hash {
|
||||||
trace!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
|
info!("non consensus block")
|
||||||
|
}
|
||||||
|
|
||||||
|
if consensus_block_hash == old_head_hash {
|
||||||
|
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
|
||||||
} else if soft_limit_met {
|
} else if soft_limit_met {
|
||||||
// TODO: if new's parent is not old, warn?
|
// TODO: if new's parent is not old, warn?
|
||||||
|
|
||||||
debug!(hash=%new_head_hash, num=%new_head_num, limit=%consensus_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
|
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
|
||||||
|
|
||||||
// the head hash changed. forward to any subscribers
|
// the head hash changed. forward to any subscribers
|
||||||
head_block_sender.send(highest_work_block)?;
|
head_block_sender.send(highest_work_block)?;
|
||||||
} else {
|
} else {
|
||||||
warn!(?soft_limit_met, %new_head_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
|
warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -45,7 +45,8 @@ pub struct Web3Connections {
|
||||||
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
||||||
/// TODO: what should we use for edges?
|
/// TODO: what should we use for edges?
|
||||||
pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>,
|
pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>,
|
||||||
pub(super) min_synced_rpcs: usize,
|
pub(super) min_synced_rpcs: u32,
|
||||||
|
pub(super) min_sum_soft_limit: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
|
@ -58,7 +59,8 @@ impl Web3Connections {
|
||||||
redis_client_pool: Option<redis_rate_limit::RedisPool>,
|
redis_client_pool: Option<redis_rate_limit::RedisPool>,
|
||||||
block_map: BlockMap,
|
block_map: BlockMap,
|
||||||
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
|
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
|
||||||
min_synced_rpcs: usize,
|
min_sum_soft_limit: u32,
|
||||||
|
min_synced_rpcs: u32,
|
||||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||||
pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
|
pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
|
||||||
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
||||||
|
@ -171,6 +173,7 @@ impl Web3Connections {
|
||||||
pending_transactions,
|
pending_transactions,
|
||||||
block_map: Default::default(),
|
block_map: Default::default(),
|
||||||
blockchain_graphmap: Default::default(),
|
blockchain_graphmap: Default::default(),
|
||||||
|
min_sum_soft_limit,
|
||||||
min_synced_rpcs,
|
min_synced_rpcs,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue