variable rename

This commit is contained in:
Bryan Stitt 2023-02-12 10:22:20 -08:00 committed by yenicelik
parent 54b4aa8522
commit 53757621ef
4 changed files with 27 additions and 24 deletions

@ -332,6 +332,9 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] add archive depth to app config
- [x] use from_block and to_block so that eth_getLogs is routed correctly
- [x] improve eth_sendRawTransaction server selection
- [x] don't cache methods that are usually very large
- [x] use http provider when available
- [ ] don't use new_head_provider anywhere except new head subscription
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -541,7 +541,7 @@ impl Web3Rpcs {
})
.collect();
trace!("todo: better sort here");
warn!("todo: better sort here");
let sorted_rpcs = {
if usable_rpcs.len() == 1 {

@ -64,7 +64,7 @@ pub struct Web3Rpc {
/// it is an async lock because we hold it open across awaits
/// this provider is only used for new heads subscriptions
/// TODO: put the provider inside an arc?
pub(super) new_head_client: AsyncRwLock<Option<Arc<Web3Provider>>>,
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// keep track of hard limits
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
@ -397,7 +397,7 @@ impl Web3Rpc {
chain_id: u64,
db_conn: Option<&DatabaseConnection>,
) -> anyhow::Result<()> {
if let Ok(mut unlocked_provider) = self.new_head_client.try_write() {
if let Ok(mut unlocked_provider) = self.provider.try_write() {
#[cfg(test)]
if let Some(Web3Provider::Mock) = unlocked_provider.as_deref() {
return Ok(());
@ -494,7 +494,7 @@ impl Web3Rpc {
info!("successfully connected to {}", self);
} else {
if self.new_head_client.read().await.is_none() {
if self.provider.read().await.is_none() {
return Err(anyhow!("failed waiting for client"));
}
};
@ -625,7 +625,7 @@ impl Web3Rpc {
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
if let Some(client) = &*conn.new_head_client.read().await {
if let Some(client) = &*conn.provider.read().await {
// trace!("health check unlocked with error on {}", conn);
// returning error will trigger a reconnect
// TODO: do a query of some kind
@ -700,7 +700,7 @@ impl Web3Rpc {
) -> anyhow::Result<()> {
trace!("watching new heads on {}", self);
let unlocked_provider = self.new_head_client.read().await;
let unlocked_provider = self.provider.read().await;
match unlocked_provider.as_deref() {
Some(Web3Provider::Http(_client)) => {
@ -871,7 +871,7 @@ impl Web3Rpc {
) -> anyhow::Result<()> {
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
let provider = self.new_head_client.read().await;
let provider = self.provider.read().await;
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
@ -983,7 +983,7 @@ impl Web3Rpc {
) -> anyhow::Result<OpenRequestResult> {
// TODO: think more about this read block
// TODO: this should *not* be new_head_client. this should be a separate object
if unlocked_provider.is_some() || self.new_head_client.read().await.is_some() {
if unlocked_provider.is_some() || self.provider.read().await.is_some() {
// we already have an unlocked provider. no need to lock
} else {
return Ok(OpenRequestResult::NotReady(self.backup));

@ -30,7 +30,7 @@ pub enum OpenRequestResult {
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
conn: Arc<Web3Rpc>,
rpc: Arc<Web3Rpc>,
}
/// Depending on the context, RPC errors can require different handling.
@ -124,17 +124,17 @@ impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, conn: Arc<Web3Rpc>) -> Self {
Self {
authorization,
conn,
rpc: conn,
}
}
pub fn connection_name(&self) -> String {
self.conn.name.clone()
self.rpc.name.clone()
}
#[inline]
pub fn clone_connection(&self) -> Arc<Web3Rpc> {
self.conn.clone()
self.rpc.clone()
}
/// Send a web3 request
@ -154,7 +154,7 @@ impl OpenRequestHandle {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.conn, %method, "request");
trace!("requesting from {}", self.conn);
trace!("requesting from {}", self.rpc);
let mut provider: Option<Arc<Web3Provider>> = None;
let mut logged = false;
@ -167,7 +167,7 @@ impl OpenRequestHandle {
break;
}
let unlocked_provider = self.conn.new_head_client.read().await;
let unlocked_provider = self.rpc.provider.read().await;
if let Some(unlocked_provider) = unlocked_provider.clone() {
provider = Some(unlocked_provider);
@ -175,7 +175,7 @@ impl OpenRequestHandle {
}
if !logged {
debug!("no provider for open handle on {}", self.conn);
debug!("no provider for open handle on {}", self.rpc);
logged = true;
}
@ -286,10 +286,10 @@ impl OpenRequestHandle {
if let Some(msg) = msg {
if msg.starts_with("execution reverted") {
trace!("revert from {}", self.conn);
trace!("revert from {}", self.rpc);
ResponseTypes::Revert
} else if msg.contains("limit") || msg.contains("request") {
trace!("rate limit from {}", self.conn);
trace!("rate limit from {}", self.rpc);
ResponseTypes::RateLimit
} else {
ResponseTypes::Ok
@ -302,10 +302,10 @@ impl OpenRequestHandle {
};
if matches!(response_type, ResponseTypes::RateLimit) {
if let Some(hard_limit_until) = self.conn.hard_limit_until.as_ref() {
if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() {
let retry_at = Instant::now() + Duration::from_secs(1);
trace!("retry {} at: {:?}", self.conn, retry_at);
trace!("retry {} at: {:?}", self.rpc, retry_at);
hard_limit_until.send_replace(retry_at);
}
@ -318,14 +318,14 @@ impl OpenRequestHandle {
if matches!(response_type, ResponseTypes::Revert) {
debug!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn, method, params, err
self.rpc, method, params, err
);
}
}
RequestRevertHandler::TraceLevel => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,
self.rpc,
method,
params,
err
@ -335,20 +335,20 @@ impl OpenRequestHandle {
// TODO: include params if not running in release mode
error!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
self.rpc, method, err
);
}
RequestRevertHandler::WarnLevel => {
// TODO: include params if not running in release mode
warn!(
"bad response from {}! method={} err={:?}",
self.conn, method, err
self.rpc, method, err
);
}
RequestRevertHandler::Save => {
trace!(
"bad response from {}! method={} params={:?} err={:?}",
self.conn,
self.rpc,
method,
params,
err