diff --git a/TODO.md b/TODO.md index 5417d951..b405b251 100644 --- a/TODO.md +++ b/TODO.md @@ -410,9 +410,9 @@ These are not yet ordered. There might be duplicates. We might not actually need - [-] add configurable size limits to all the Caches - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - https://github.com/moka-rs/moka/issues/201 -- [ ] all anyhow::Results need to be replaced with FrontendErrorResponse. - - [ ] rename FrontendErrorResponse to Web3ProxyError - - [ ] almost all the anyhows should be Web3ProxyError::BadRequest +- [x] all anyhow::Results need to be replaced with FrontendErrorResponse. + - [x] rename FrontendErrorResponse to Web3ProxyError + - [x] almost all the anyhows should be Web3ProxyError::BadRequest - as is, these errors are seen as 500 errors and so haproxy keeps retrying them - change premium concurrency limit to be against ip+rpckey - then sites like curve.fi don't have to worry about their user count @@ -422,18 +422,18 @@ These are not yet ordered. There might be duplicates. We might not actually need - all nodes have all blocks - most nodes have all receipts - only archives have old state -- [ ] don't use new_head_provider anywhere except new head subscription -- [ ] enable mev protected transactions with either a /protect/ url (instead of /private/) or the database (when on /rpc/) -- [-] have private transactions be enabled by a url setting rather than a setting on the key +- [x] don't use new_head_provider anywhere except new head subscription +- [x] add support for http basic auth +- [-] enable mev protected transactions with either a /protect/ url (instead of /private/) or the database (when on /rpc/) +- [ ] a **lot** got done that wasn't included in this todo list. go through commits and update this - [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running - [ ] cli for adding rpc keys to an existing user - [ ] rename "private" to "mev protected" to avoid confusion about private transactions being public once they are mined - [ ] allow restricting an rpc key to specific chains -- [ ] writes to request_latency should be handled by a background task so they don't slow down the request - - maybe we can use https://docs.rs/hdrhistogram/latest/hdrhistogram/sync/struct.SyncHistogram.html +- [-] writes to request_latency should be handled by a background task so they don't slow down the request - [ ] keep re-broadcasting transactions until they are confirmed - [ ] if mev protection is disabled, we should send to *both* balanced_rpcs *and* private_rps -- [ ] if mev protection is enabled, we should sent to *only* private_rpcs +- [x] if mev protection is enabled, we should sent to *only* private_rpcs - [ ] rate limiting/throttling on query_user_stats - [ ] web3rpc configs should have a max_concurrent_requests - will probably want a tool for calculating a safe value for this. too low and we could kill our performance diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 09a4af4a..5ec2bfac 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -20,6 +20,7 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; +use std::borrow::Cow; use std::cmp::min; use std::fmt; use std::hash::{Hash, Hasher}; @@ -29,6 +30,7 @@ use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; +use url::Url; pub struct Latency { /// exponentially weighted moving average of how many milliseconds behind the fastest node we are @@ -99,8 +101,8 @@ pub struct Web3Rpc { pub name: String, pub display_name: Option, pub db_conn: Option, - pub(super) ws_url: Option, - pub(super) http_url: Option, + pub(super) ws_url: Option, + pub(super) http_url: Option, /// Some connections use an http_client. we keep a clone for reconnecting pub(super) http_client: Option, /// provider is in a RwLock so that we can replace it if re-connecting @@ -227,13 +229,25 @@ impl Web3Rpc { let (disconnect_sender, disconnect_receiver) = watch::channel(false); let reconnect = reconnect.into(); + let http_url = if let Some(http_url) = config.http_url { + Some(http_url.parse()?) + } else { + None + }; + + let ws_url = if let Some(ws_url) = config.ws_url { + Some(ws_url.parse()?) + } else { + None + }; + let new_connection = Self { name, db_conn: db_conn.clone(), display_name: config.display_name, http_client, - ws_url: config.ws_url, - http_url: config.http_url, + ws_url, + http_url, hard_limit, hard_limit_until, soft_limit: config.soft_limit, @@ -526,7 +540,7 @@ impl Web3Rpc { } } - let p = Web3Provider::from_str(ws_url.as_str(), None) + let p = Web3Provider::new(Cow::Borrowed(ws_url), None) .await .context(format!("failed connecting to {}", ws_url))?; @@ -536,7 +550,7 @@ impl Web3Rpc { } else { // http client if let Some(url) = &self.http_url { - let p = Web3Provider::from_str(url, self.http_client.clone()) + let p = Web3Provider::new(Cow::Borrowed(url), self.http_client.clone()) .await .context(format!("failed connecting to {}", url))?; @@ -1481,7 +1495,7 @@ mod tests { let x = Web3Rpc { name: "name".to_string(), - ws_url: Some("ws://example.com".to_string()), + ws_url: Some("ws://example.com".parse::().unwrap()), soft_limit: 1_000, automatic_block_limit: false, backup: false, diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index d5ab318e..a70bc88f 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,6 +1,8 @@ -use anyhow::Context; +use anyhow::anyhow; use derive_more::From; -use std::time::Duration; +use ethers::providers::{Authorization, ConnectionDetails}; +use std::{borrow::Cow, time::Duration}; +use url::Url; // TODO: our own structs for these that handle streaming large responses type EthersHttpProvider = ethers::providers::Provider; @@ -34,24 +36,53 @@ impl Web3Provider { } } - pub async fn from_str( - url_str: &str, + /// Note, if the http url has an authority the http_client param is ignored and a dedicated http_client will be used + /// TODO: take a reqwest::Client or a reqwest::ClientBuilder. that way we can do things like set compression even when auth is set + pub async fn new( + mut url: Cow<'_, Url>, http_client: Option, ) -> anyhow::Result { - let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.parse()?; + let auth = if let Some(pass) = url.password().map(|x| x.to_string()) { + // to_string is needed because we are going to remove these items from the url + let user = url.username().to_string(); - let http_client = http_client.context("no http_client")?; + // clear username and password from the url + let mut_url = url.to_mut(); - let provider = ethers::providers::Http::new_with_client(url, http_client); + mut_url + .set_username("") + .map_err(|_| anyhow!("unable to clear username on websocket"))?; + mut_url + .set_password(None) + .map_err(|_| anyhow!("unable to clear password on websocket"))?; + + // keep them + Some(Authorization::basic(user, pass)) + } else { + None + }; + + let provider = if url.scheme().starts_with("http") { + let provider = if let Some(auth) = auth { + ethers::providers::Http::new_with_auth(url.into_owned(), auth)? + } else if let Some(http_client) = http_client { + ethers::providers::Http::new_with_client(url.into_owned(), http_client) + } else { + ethers::providers::Http::new(url.into_owned()) + }; - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: i don't think this interval matters for our uses, but we should probably set it to like `block time / 2` ethers::providers::Provider::new(provider) .interval(Duration::from_secs(12)) .into() - } else if url_str.starts_with("ws") { - let provider = ethers::providers::Ws::connect(url_str).await?; + } else if url.scheme().starts_with("ws") { + let provider = if auth.is_some() { + let connection_details = ConnectionDetails::new(url.as_str(), auth); + + ethers::providers::Ws::connect(connection_details).await? + } else { + ethers::providers::Ws::connect(url.as_str()).await? + }; // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: i don't think this interval matters