broadcast txs to less servers

This commit is contained in:
Bryan Stitt 2023-01-11 14:51:01 -08:00
parent 30fd0476e5
commit e4f1716f06
5 changed files with 48 additions and 15 deletions

10
TODO.md
View File

@ -295,6 +295,9 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] all_backend_connections skips syncing servers - [x] all_backend_connections skips syncing servers
- [x] change weight back to tier - [x] change weight back to tier
- [x] fix multiple origin and referer checks - [x] fix multiple origin and referer checks
- [x] ip detection needs work so that everything doesnt show up as 172.x.x.x
- i think this was done, but am not positive.
- [x] if private txs are disabled, only send trasactions to some of our servers. we were DOSing ourselves with transactions and slowing down sync
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in and spawned in the background since it will slow things down and will make their calls less private - this must be opt-in and spawned in the background since it will slow things down and will make their calls less private
- [ ] automatic pruning of old revert logs once too many are collected - [ ] automatic pruning of old revert logs once too many are collected
@ -302,11 +305,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [-] add configurable size limits to all the Caches - [-] add configurable size limits to all the Caches
- instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- https://github.com/moka-rs/moka/issues/201 - https://github.com/moka-rs/moka/issues/201
- [-] ip detection needs work so that everything doesnt show up as 172.x.x.x - [ ] have private transactions be enabled by a url setting rather than a setting on the key
- i think this was done, but am not positive.
- [ ] cli for adding rpc keys to an existing user - [ ] cli for adding rpc keys to an existing user
- [ ] automatically tune database and redis connection pool size
- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then
- [ ] rate limiting/throttling on query_user_stats - [ ] rate limiting/throttling on query_user_stats
- [ ] minimum allowed query_start on query_user_stats - [ ] minimum allowed query_start on query_user_stats
- [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100 - [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100
@ -452,6 +452,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
These are not ordered. I think some rows also accidently got deleted here. Check git history. These are not ordered. I think some rows also accidently got deleted here. Check git history.
- [ ] automatically tune database and redis connection pool size
- [ ] if db is down, keep logins cached longer. at least only new logins will have trouble then
- [ ] handle user payments - [ ] handle user payments
- [ ] separate daemon (or users themselves) call POST /users/process_transaction - [ ] separate daemon (or users themselves) call POST /users/process_transaction
- checks a transaction to see if it modifies a user's balance. records results in a sql database - checks a transaction to see if it modifies a user's balance. records results in a sql database

View File

@ -156,6 +156,8 @@ pub struct AuthorizationChecks {
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database. /// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
/// TODO: f32 would be fine /// TODO: f32 would be fine
pub log_revert_chance: f64, pub log_revert_chance: f64,
/// if true, transactions are broadcast to private mempools. They will still be public on the blockchain!
pub private_txs: bool,
} }
/// Simple wrapper so that we can keep track of read only connections. /// Simple wrapper so that we can keep track of read only connections.
@ -1170,8 +1172,18 @@ impl Web3ProxyApp {
// TODO: eth_sendBundle (flashbots command) // TODO: eth_sendBundle (flashbots command)
// broadcast transactions to all private rpcs at once // broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => { "eth_sendRawTransaction" => {
// emit stats let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() {
let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); if authorization.checks.private_txs {
(private_rpcs, None)
} else {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs would be good for this
(&self.balanced_rpcs, Some(2))
}
} else {
(&self.balanced_rpcs, Some(2))
};
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs let mut response = private_rpcs
@ -1181,6 +1193,7 @@ impl Web3ProxyApp {
Some(request_metadata.clone()), Some(request_metadata.clone()),
None, None,
Level::Trace, Level::Trace,
num,
) )
.await?; .await?;
@ -1224,6 +1237,7 @@ impl Web3ProxyApp {
let rpcs = request_metadata.backend_requests.lock().clone(); let rpcs = request_metadata.backend_requests.lock().clone();
// emit stats
if let Some(salt) = self.config.public_recent_ips_salt.as_ref() { if let Some(salt) = self.config.public_recent_ips_salt.as_ref() {
if let Some(tx_hash) = response.result.clone() { if let Some(tx_hash) = response.result.clone() {
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();

View File

@ -750,6 +750,7 @@ impl Web3ProxyApp {
log_revert_chance: rpc_key_model.log_revert_chance, log_revert_chance: rpc_key_model.log_revert_chance,
max_concurrent_requests: user_tier_model.max_concurrent_requests, max_concurrent_requests: user_tier_model.max_concurrent_requests,
max_requests_per_period: user_tier_model.max_requests_per_period, max_requests_per_period: user_tier_model.max_requests_per_period,
private_txs: rpc_key_model.private_txs,
}) })
} }
None => Ok(AuthorizationChecks::default()), None => Ok(AuthorizationChecks::default()),

View File

@ -292,10 +292,10 @@ impl Web3Connection {
self.block_data_limit.load(atomic::Ordering::Acquire).into() self.block_data_limit.load(atomic::Ordering::Acquire).into()
} }
pub fn syncing(&self) -> bool { pub fn syncing(&self, allowed_lag: u64) -> bool {
match self.head_block.read().clone() { match self.head_block.read().clone() {
None => true, None => true,
Some(x) => x.syncing(60), Some(x) => x.syncing(allowed_lag),
} }
} }
@ -303,6 +303,7 @@ impl Web3Connection {
let head_block_num = match self.head_block.read().clone() { let head_block_num = match self.head_block.read().clone() {
None => return false, None => return false,
Some(x) => { Some(x) => {
// TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks?
if x.syncing(60) { if x.syncing(60) {
// skip syncing nodes. even though they might be able to serve a query, // skip syncing nodes. even though they might be able to serve a query,
// latency will be poor and it will get in the way of them syncing further // latency will be poor and it will get in the way of them syncing further
@ -542,7 +543,7 @@ impl Web3Connection {
let _ = head_block.insert(new_head_block.clone().into()); let _ = head_block.insert(new_head_block.clone().into());
} }
if self.block_data_limit() == U64::zero() && !self.syncing() { if self.block_data_limit() == U64::zero() && !self.syncing(1) {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?); let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
if let Err(err) = self.check_block_data_limit(&authorization).await { if let Err(err) = self.check_block_data_limit(&authorization).await {
warn!( warn!(

View File

@ -597,23 +597,34 @@ impl Web3Connections {
} }
/// get all rpc servers that are not rate limited /// get all rpc servers that are not rate limited
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions /// returns servers even if they aren't fully in sync. This is useful for broadcasting signed transactions
// TODO: better type on this that can return an anyhow::Result // TODO: better type on this that can return an anyhow::Result
pub async fn all_backend_connections( pub async fn all_synced_connections(
&self, &self,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
block_needed: Option<&U64>, block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> { ) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
// TODO: with capacity? // TODO: with capacity?
let mut selected_rpcs = vec![]; let mut selected_rpcs = vec![];
let mut max_count = if max_count.is_none() {
self.conns.len()
} else {
self.conns.len().min(max_count.unwrap())
};
for connection in self.conns.values() { for connection in self.conns.values() {
if max_count == 0 {
break;
}
if let Some(block_needed) = block_needed { if let Some(block_needed) = block_needed {
if !connection.has_block_data(block_needed) { if !connection.has_block_data(block_needed) {
continue; continue;
} }
} else if connection.syncing() { } else if connection.syncing(30) {
continue; continue;
} }
@ -626,7 +637,10 @@ impl Web3Connections {
// this rpc is not available. skip it // this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at)); earliest_retry_at = earliest_retry_at.min(Some(retry_at));
} }
Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), Ok(OpenRequestResult::Handle(handle)) => {
max_count -= 1;
selected_rpcs.push(handle)
}
Ok(OpenRequestResult::NotReady) => { Ok(OpenRequestResult::NotReady) => {
warn!("no request handle for {}", connection) warn!("no request handle for {}", connection)
} }
@ -800,10 +814,11 @@ impl Web3Connections {
request_metadata: Option<Arc<RequestMetadata>>, request_metadata: Option<Arc<RequestMetadata>>,
block_needed: Option<&U64>, block_needed: Option<&U64>,
error_level: Level, error_level: Level,
max_count: Option<usize>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self match self
.all_backend_connections(authorization, block_needed) .all_synced_connections(authorization, block_needed, max_count)
.await .await
{ {
Ok(active_request_handles) => { Ok(active_request_handles) => {
@ -1052,7 +1067,7 @@ mod tests {
// all_backend_connections gives everything regardless of sync status // all_backend_connections gives everything regardless of sync status
assert_eq!( assert_eq!(
conns conns
.all_backend_connections(&authorization, None) .all_synced_connections(&authorization, None, None)
.await .await
.unwrap() .unwrap()
.len(), .len(),