pass authorized_request through a bunch of places

This commit is contained in:
Bryan Stitt 2022-09-22 20:27:14 +00:00
parent 6905e9fd46
commit 43846a7051
10 changed files with 114 additions and 39 deletions

View File

@ -2,6 +2,7 @@
use crate::block_number::block_needed; use crate::block_number::block_needed;
use crate::config::{AppConfig, TopConfig}; use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::AuthorizedRequest;
use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequest;
@ -12,6 +13,7 @@ use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use anyhow::Context; use anyhow::Context;
use axum::extract::ws::Message; use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter; use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From; use derive_more::From;
use ethers::core::utils::keccak256; use ethers::core::utils::keccak256;
@ -60,12 +62,18 @@ type ResponseCache =
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>; pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
#[derive(Clone, Copy, Debug, From, Serialize)] #[derive(Clone, Debug, From)]
/// TODO: rename this? /// TODO: rename this?
pub struct UserData { pub struct UserKeyData {
pub user_key_id: u64, pub user_key_id: u64,
/// if None, allow unlimited queries /// if None, allow unlimited queries
pub user_count_per_period: Option<u64>, pub user_count_per_period: Option<u64>,
/// if None, allow any Referer
pub allowed_referer: Option<Referer>,
/// if None, allow any UserAgent
pub allowed_user_agent: Option<UserAgent>,
/// if None, allow any IpAddr
pub allowed_ip: Option<IpAddr>,
} }
/// The application /// The application
@ -91,7 +99,7 @@ pub struct Web3ProxyApp {
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>, pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>, pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
pub redis_pool: Option<RedisPool>, pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Uuid, UserData, hashbrown::hash_map::DefaultHashBuilder>, pub user_cache: Cache<Uuid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
} }
/// flatten a JoinError into an anyhow error /// flatten a JoinError into an anyhow error
@ -402,6 +410,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
pub async fn eth_subscribe<'a>( pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>, self: &'a Arc<Self>,
authorized_request: Arc<AuthorizedRequest>,
payload: JsonRpcRequest, payload: JsonRpcRequest,
subscription_count: &'a AtomicUsize, subscription_count: &'a AtomicUsize,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
@ -595,6 +604,7 @@ impl Web3ProxyApp {
/// send the request or batch of requests to the approriate RPCs /// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
request: JsonRpcRequestEnum, request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> { ) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level // TODO: this should probably be trace level
@ -607,10 +617,18 @@ impl Web3ProxyApp {
let response = match request { let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(max_time, self.proxy_web3_rpc_request(request)).await??, timeout(
max_time,
self.proxy_web3_rpc_request(authorized_request, request),
)
.await??,
), ),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(max_time, self.proxy_web3_rpc_requests(requests)).await??, timeout(
max_time,
self.proxy_web3_rpc_requests(authorized_request, requests),
)
.await??,
), ),
}; };
@ -624,6 +642,7 @@ impl Web3ProxyApp {
/// TODO: make sure this isn't a problem /// TODO: make sure this isn't a problem
async fn proxy_web3_rpc_requests( async fn proxy_web3_rpc_requests(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
requests: Vec<JsonRpcRequest>, requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> { ) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly // TODO: we should probably change ethers-rs to support this directly
@ -631,7 +650,7 @@ impl Web3ProxyApp {
let responses = join_all( let responses = join_all(
requests requests
.into_iter() .into_iter()
.map(|request| self.proxy_web3_rpc_request(request)) .map(|request| self.proxy_web3_rpc_request(authorized_request, request))
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
) )
.await; .await;
@ -659,6 +678,7 @@ impl Web3ProxyApp {
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])] #[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
async fn proxy_web3_rpc_request( async fn proxy_web3_rpc_request(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: &Arc<AuthorizedRequest>,
mut request: JsonRpcRequest, mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request); trace!("Received request: {:?}", request);
@ -791,7 +811,9 @@ impl Web3ProxyApp {
// emit stats // emit stats
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs.try_send_all_upstream_servers(request, None).await; return rpcs
.try_send_all_upstream_servers(Some(authorized_request), request, None)
.await;
} }
"eth_syncing" => { "eth_syncing" => {
// no stats on this. its cheap // no stats on this. its cheap
@ -890,7 +912,11 @@ impl Web3ProxyApp {
// TODO: put the hash here instead? // TODO: put the hash here instead?
let mut response = self let mut response = self
.balanced_rpcs .balanced_rpcs
.try_send_best_upstream_server(request, Some(&request_block_id.num)) .try_send_best_upstream_server(
Some(authorized_request),
request,
Some(&request_block_id.num),
)
.await?; .await?;
// discard their id by replacing it with an empty // discard their id by replacing it with an empty

View File

@ -63,7 +63,7 @@ pub async fn clean_block_number(
let block_hash: H256 = let block_hash: H256 =
serde_json::from_value(block_hash).context("decoding blockHash")?; serde_json::from_value(block_hash).context("decoding blockHash")?;
let block = rpcs.block(&block_hash, None).await?; let block = rpcs.block(None, &block_hash, None).await?;
let block_num = block let block_num = block
.number .number

View File

@ -1,4 +1,4 @@
use crate::{app::UserData, jsonrpc::JsonRpcForwardedResponse}; use crate::{app::UserKeyData, jsonrpc::JsonRpcForwardedResponse};
use axum::{ use axum::{
http::StatusCode, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
@ -21,7 +21,7 @@ pub enum FrontendErrorResponse {
Redis(RedisError), Redis(RedisError),
Response(Response), Response(Response),
Database(DbErr), Database(DbErr),
RateLimitedUser(UserData, Option<Instant>), RateLimitedUser(UserKeyData, Option<Instant>),
RateLimitedIp(IpAddr, Option<Instant>), RateLimitedIp(IpAddr, Option<Instant>),
UnknownKey, UnknownKey,
NotFound, NotFound,

View File

@ -1,4 +1,4 @@
mod authorization; pub mod authorization;
mod errors; mod errors;
mod http; mod http;
mod rpc_proxy_http; mod rpc_proxy_http;

View File

@ -19,11 +19,19 @@ pub async fn public_proxy_web3_rpc(
) -> FrontendResult { ) -> FrontendResult {
let request_span = error_span!("request", %ip, ?referer, ?user_agent); let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let ip = ip_is_authorized(&app, ip) let authorized_request = ip_is_authorized(&app, ip)
.instrument(request_span.clone()) .instrument(request_span.clone())
.await?; .await?;
let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); let request_span = error_span!("request", ?authorized_request);
let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload)
.instrument(request_span)
.await
});
let response = f.await.unwrap()?; let response = f.await.unwrap()?;
@ -53,7 +61,13 @@ pub async fn user_proxy_web3_rpc(
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorized_request);
let f = tokio::spawn(async move { app.proxy_web3_rpc(payload).instrument(request_span).await }); let authorized_request = Arc::new(authorized_request);
let f = tokio::spawn(async move {
app.proxy_web3_rpc(&authorized_request, payload)
.instrument(request_span)
.await
});
let response = f.await.unwrap()?; let response = f.await.unwrap()?;

View File

@ -37,6 +37,8 @@ pub async fn public_websocket_handler(
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorized_request);
let authorized_request = Arc::new(authorized_request);
match ws_upgrade { match ws_upgrade {
Some(ws) => Ok(ws Some(ws) => Ok(ws
.on_upgrade(|socket| { .on_upgrade(|socket| {
@ -68,10 +70,11 @@ pub async fn user_websocket_handler(
) )
.await?; .await?;
// log the id, not the address. we don't want to expose the user's address // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses
let request_span = error_span!("request", ?authorized_request); let request_span = error_span!("request", ?authorized_request);
let authorized_request = Arc::new(authorized_request);
match ws_upgrade { match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
proxy_web3_socket(app, authorized_request, socket).instrument(request_span) proxy_web3_socket(app, authorized_request, socket).instrument(request_span)
@ -97,7 +100,7 @@ pub async fn user_websocket_handler(
async fn proxy_web3_socket( async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: AuthorizedRequest, authorized_request: Arc<AuthorizedRequest>,
socket: WebSocket, socket: WebSocket,
) { ) {
// split the websocket so we can read and write concurrently // split the websocket so we can read and write concurrently
@ -118,6 +121,7 @@ async fn proxy_web3_socket(
/// websockets support a few more methods than http clients /// websockets support a few more methods than http clients
async fn handle_socket_payload( async fn handle_socket_payload(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: Arc<AuthorizedRequest>,
payload: &str, payload: &str,
response_sender: &flume::Sender<Message>, response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize, subscription_count: &AtomicUsize,
@ -135,7 +139,12 @@ async fn handle_socket_payload(
let span = error_span!("eth_subscribe"); let span = error_span!("eth_subscribe");
let response = app let response = app
.eth_subscribe(payload, subscription_count, response_sender.clone()) .eth_subscribe(
authorized_request.clone(),
payload,
subscription_count,
response_sender.clone(),
)
.instrument(span) .instrument(span)
.await; .await;
@ -168,7 +177,10 @@ async fn handle_socket_payload(
Ok(response.into()) Ok(response.into())
} }
_ => app.proxy_web3_rpc(payload.into()).await, _ => {
app.proxy_web3_rpc(&authorized_request, payload.into())
.await
}
}; };
(id, response) (id, response)
@ -194,7 +206,7 @@ async fn handle_socket_payload(
async fn read_web3_socket( async fn read_web3_socket(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
authorized_request: AuthorizedRequest, authorized_request: Arc<AuthorizedRequest>,
mut ws_rx: SplitStream<WebSocket>, mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>, response_sender: flume::Sender<Message>,
) { ) {
@ -207,6 +219,7 @@ async fn read_web3_socket(
Message::Text(payload) => { Message::Text(payload) => {
handle_socket_payload( handle_socket_payload(
app.clone(), app.clone(),
authorized_request.clone(),
&payload, &payload,
&response_sender, &response_sender,
&subscription_count, &subscription_count,
@ -229,6 +242,7 @@ async fn read_web3_socket(
handle_socket_payload( handle_socket_payload(
app.clone(), app.clone(),
authorized_request.clone(),
payload, payload,
&response_sender, &response_sender,
&subscription_count, &subscription_count,

View File

@ -2,6 +2,7 @@
use super::connection::Web3Connection; use super::connection::Web3Connection;
use super::connections::Web3Connections; use super::connections::Web3Connections;
use super::transactions::TxStatus; use super::transactions::TxStatus;
use crate::frontend::authorization::AuthorizedRequest;
use crate::{ use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
}; };
@ -90,6 +91,7 @@ impl Web3Connections {
/// Will query a specific node or the best available. /// Will query a specific node or the best available.
pub async fn block( pub async fn block(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
hash: &H256, hash: &H256,
rpc: Option<&Arc<Web3Connection>>, rpc: Option<&Arc<Web3Connection>>,
) -> anyhow::Result<ArcBlock> { ) -> anyhow::Result<ArcBlock> {
@ -104,7 +106,7 @@ impl Web3Connections {
// TODO: if error, retry? // TODO: if error, retry?
let block: Block<TxHash> = match rpc { let block: Block<TxHash> = match rpc {
Some(rpc) => { Some(rpc) => {
rpc.wait_for_request_handle(Duration::from_secs(30)) rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30))
.await? .await?
.request("eth_getBlockByHash", &get_block_params, Level::ERROR.into()) .request("eth_getBlockByHash", &get_block_params, Level::ERROR.into())
.await? .await?
@ -115,7 +117,9 @@ impl Web3Connections {
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params }); let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request: JsonRpcRequest = serde_json::from_value(request)?; let request: JsonRpcRequest = serde_json::from_value(request)?;
let response = self.try_send_best_upstream_server(request, None).await?; let response = self
.try_send_best_upstream_server(authorized_request, request, None)
.await?;
let block = response.result.unwrap(); let block = response.result.unwrap();
@ -163,7 +167,8 @@ impl Web3Connections {
// deref to not keep the lock open // deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num) { if let Some(block_hash) = self.block_numbers.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
return self.block(&block_hash, None).await; // TODO: pass authorized_request through here?
return self.block(None, &block_hash, None).await;
} }
// block number not in cache. we need to ask an rpc for it // block number not in cache. we need to ask an rpc for it
@ -173,7 +178,7 @@ impl Web3Connections {
// TODO: if error, retry? // TODO: if error, retry?
let response = self let response = self
.try_send_best_upstream_server(request, Some(num)) .try_send_best_upstream_server(None, request, Some(num))
.await?; .await?;
let raw_block = response.result.context("no block result")?; let raw_block = response.result.context("no block result")?;
@ -290,7 +295,7 @@ impl Web3Connections {
// this option should always be populated // this option should always be populated
let conn_rpc = self.conns.get(conn_name); let conn_rpc = self.conns.get(conn_name);
match self.block(connection_head_hash, conn_rpc).await { match self.block(None, connection_head_hash, conn_rpc).await {
Ok(block) => block, Ok(block) => block,
Err(err) => { Err(err) => {
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes"); warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes");

View File

@ -4,6 +4,7 @@ use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc; use crate::config::BlockAndRpc;
use crate::frontend::authorization::AuthorizedRequest;
use anyhow::Context; use anyhow::Context;
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all; use futures::future::try_join_all;
@ -114,7 +115,7 @@ impl Web3Connection {
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? // TODO: what should the timeout be?
let found_chain_id: Result<U64, _> = new_connection let found_chain_id: Result<U64, _> = new_connection
.wait_for_request_handle(Duration::from_secs(30)) .wait_for_request_handle(None, Duration::from_secs(30))
.await? .await?
.request("eth_chainId", &Option::None::<()>, Level::ERROR.into()) .request("eth_chainId", &Option::None::<()>, Level::ERROR.into())
.await; .await;
@ -200,7 +201,7 @@ impl Web3Connection {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too! // TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be? // TODO: what should the request be?
let archive_result: Result<Bytes, _> = self let archive_result: Result<Bytes, _> = self
.wait_for_request_handle(Duration::from_secs(30)) .wait_for_request_handle(None, Duration::from_secs(30))
.await? .await?
.request( .request(
"eth_getCode", "eth_getCode",
@ -530,7 +531,7 @@ impl Web3Connection {
loop { loop {
// TODO: what should the max_wait be? // TODO: what should the max_wait be?
match self.wait_for_request_handle(Duration::from_secs(30)).await { match self.wait_for_request_handle(None, Duration::from_secs(30)).await {
Ok(active_request_handle) => { Ok(active_request_handle) => {
let block: Result<Block<TxHash>, _> = active_request_handle let block: Result<Block<TxHash>, _> = active_request_handle
.request( .request(
@ -598,7 +599,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => { Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle? // todo: move subscribe_blocks onto the request handle?
let active_request_handle = let active_request_handle =
self.wait_for_request_handle(Duration::from_secs(30)).await; self.wait_for_request_handle(None, Duration::from_secs(30)).await;
let mut stream = provider.subscribe_blocks().await?; let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle); drop(active_request_handle);
@ -606,7 +607,7 @@ impl Web3Connection {
// there is a very small race condition here where the stream could send us a new block right now // there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block // all it does is print "new block" for the same block as current block
let block: Result<Option<ArcBlock>, _> = self let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle(Duration::from_secs(30)) .wait_for_request_handle(None, Duration::from_secs(30))
.await? .await?
.request( .request(
"eth_getBlockByNumber", "eth_getBlockByNumber",
@ -697,7 +698,7 @@ impl Web3Connection {
Web3Provider::Ws(provider) => { Web3Provider::Ws(provider) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = let active_request_handle =
self.wait_for_request_handle(Duration::from_secs(30)).await; self.wait_for_request_handle(None, Duration::from_secs(30)).await;
let mut stream = provider.subscribe_pending_txs().await?; let mut stream = provider.subscribe_pending_txs().await?;
@ -727,12 +728,13 @@ impl Web3Connection {
#[instrument] #[instrument]
pub async fn wait_for_request_handle( pub async fn wait_for_request_handle(
self: &Arc<Self>, self: &Arc<Self>,
authorized_request: Option<&Arc<AuthorizedRequest>>,
max_wait: Duration, max_wait: Duration,
) -> anyhow::Result<OpenRequestHandle> { ) -> anyhow::Result<OpenRequestHandle> {
let max_wait = Instant::now() + max_wait; let max_wait = Instant::now() + max_wait;
loop { loop {
let x = self.try_request_handle().await; let x = self.try_request_handle(authorized_request.clone()).await;
trace!(?x, "try_request_handle"); trace!(?x, "try_request_handle");
@ -760,7 +762,10 @@ impl Web3Connection {
} }
#[instrument] #[instrument]
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> { pub async fn try_request_handle(
self: &Arc<Self>,
authorized_user: Option<&Arc<AuthorizedRequest>>,
) -> anyhow::Result<OpenRequestResult> {
// check that we are connected // check that we are connected
if !self.has_provider().await { if !self.has_provider().await {
// TODO: emit a stat? // TODO: emit a stat?

View File

@ -7,6 +7,7 @@ use super::request::{
use super::synced_connections::SyncedConnections; use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::AuthorizedRequest;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@ -304,6 +305,7 @@ impl Web3Connections {
/// Send the same request to all the handles. Returning the most common success or most common error. /// Send the same request to all the handles. Returning the most common success or most common error.
pub async fn try_send_parallel_requests( pub async fn try_send_parallel_requests(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
active_request_handles: Vec<OpenRequestHandle>, active_request_handles: Vec<OpenRequestHandle>,
method: &str, method: &str,
// TODO: remove this box once i figure out how to do the options // TODO: remove this box once i figure out how to do the options
@ -362,6 +364,7 @@ impl Web3Connections {
/// get the best available rpc server /// get the best available rpc server
pub async fn next_upstream_server( pub async fn next_upstream_server(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
skip: &[Arc<Web3Connection>], skip: &[Arc<Web3Connection>],
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> { ) -> anyhow::Result<OpenRequestResult> {
@ -420,7 +423,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them // now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() { for rpc in synced_rpcs.into_iter() {
// increment our connection counter // increment our connection counter
match rpc.try_request_handle().await { match rpc.try_request_handle(authorized_request.clone()).await {
Ok(OpenRequestResult::Handle(handle)) => { Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc); trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle)); return Ok(OpenRequestResult::Handle(handle));
@ -451,6 +454,7 @@ impl Web3Connections {
// TODO: better type on this that can return an anyhow::Result // TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers( pub async fn upstream_servers(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
block_needed: Option<&U64>, block_needed: Option<&U64>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> { ) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
@ -465,7 +469,7 @@ impl Web3Connections {
} }
// check rate limits and increment our connection counter // check rate limits and increment our connection counter
match connection.try_request_handle().await { match connection.try_request_handle(authorized_request).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => { Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it // this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at)); earliest_retry_at = earliest_retry_at.min(Some(retry_at));
@ -491,6 +495,7 @@ impl Web3Connections {
/// be sure there is a timeout on this or it might loop forever /// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server( pub async fn try_send_best_upstream_server(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest, request: JsonRpcRequest,
min_block_needed: Option<&U64>, min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
@ -502,7 +507,7 @@ impl Web3Connections {
break; break;
} }
match self match self
.next_upstream_server(&skip_rpcs, min_block_needed) .next_upstream_server(authorized_request, &skip_rpcs, min_block_needed)
.await? .await?
{ {
OpenRequestResult::Handle(active_request_handle) => { OpenRequestResult::Handle(active_request_handle) => {
@ -592,17 +597,22 @@ impl Web3Connections {
#[instrument] #[instrument]
pub async fn try_send_all_upstream_servers( pub async fn try_send_all_upstream_servers(
&self, &self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest, request: JsonRpcRequest,
block_needed: Option<&U64>, block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> { ) -> anyhow::Result<JsonRpcForwardedResponse> {
loop { loop {
match self.upstream_servers(block_needed).await { match self
.upstream_servers(authorized_request.clone(), block_needed)
.await
{
Ok(active_request_handles) => { Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures // TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle? // TODO: do something with this handle?
// TODO: this is not working right. simplify // TODO: this is not working right. simplify
let quorum_response = self let quorum_response = self
.try_send_parallel_requests( .try_send_parallel_requests(
authorized_request,
active_request_handles, active_request_handles,
request.method.as_ref(), request.method.as_ref(),
request.params.as_ref(), request.params.as_ref(),

View File

@ -22,9 +22,10 @@ impl Web3Connections {
pending_tx_id: TxHash, pending_tx_id: TxHash,
) -> Result<Option<TxStatus>, ProviderError> { ) -> Result<Option<TxStatus>, ProviderError> {
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: might not be a race. might be a nonce thats higher than the current account nonce. geth discards chains
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another? // TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_request_handle().await { let tx: Transaction = match rpc.try_request_handle(None).await {
Ok(OpenRequestResult::Handle(handle)) => { Ok(OpenRequestResult::Handle(handle)) => {
handle handle
.request( .request(