Request refactor (#99)

* wip

* AsRef finally works like i wanted

* actually return the block

* start adding async trait

* remove stale import

* include id in the error response when possible

* remove stale comments
This commit is contained in:
Bryan Stitt 2023-05-30 16:32:34 -07:00 committed by GitHub
parent 4246342806
commit cebe8ed1ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1413 additions and 1317 deletions

3
Cargo.lock generated

@ -6455,6 +6455,7 @@ dependencies = [
"anyhow",
"arc-swap",
"argh",
"async-trait",
"axum",
"axum-client-ip",
"axum-macros",
@ -6519,6 +6520,8 @@ dependencies = [
"toml 0.7.4",
"tower",
"tower-http",
"tracing",
"tracing-subscriber 0.3.17",
"ulid",
"url",
"uuid 1.3.3",

@ -1,13 +1,31 @@
FROM rust:1.69.0-bullseye AS builder
FROM debian:bullseye-slim as builder
WORKDIR /app
ENV CARGO_TERM_COLOR always
# install rustup dependencies
RUN apt-get update && \
apt-get install --yes \
build-essential \
curl \
git \
&& \
rm -rf /var/lib/apt/lists/*
# install rustup
ENV PATH="/root/.cargo/bin:${PATH}"
RUN --mount=type=cache,target=/usr/local/cargo/registry \
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain none
# install the correct version of rust
# we need nightly for a few features
COPY rust-toolchain.toml .
RUN /root/.cargo/bin/rustup update
# a next-generation test runner for Rust projects.
# We only pay the installation cost once,
# it will be cached from the second build onwards
# TODO: more mount type cache?
# TODO: do this in a seperate FROM and COPY it in
RUN --mount=type=cache,target=/usr/local/cargo/registry \
cargo install cargo-nextest
@ -26,6 +44,7 @@ RUN apt-get update && \
libssl-dev \
libzstd-dev \
make \
pkg-config \
&& \
rm -rf /var/lib/apt/lists/*
@ -40,7 +59,8 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
cargo nextest run --features "$WEB3_PROXY_FEATURES" --no-default-features
# build the application
# using a "release" profile (which install does) is **very** important
# using a "release" profile (which install does by default) is **very** important
# we use the "faster_release" profile which builds with `codegen-units = 1`
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target \
cargo install \
@ -71,3 +91,6 @@ CMD [ "--config", "/web3-proxy.toml", "proxyd" ]
ENV RUST_LOG "warn,ethers_providers::rpc=off,web3_proxy=debug,web3_proxy_cli=debug"
COPY --from=builder /usr/local/bin/* /usr/local/bin/
# make sure the app works
RUN web3_proxy_cli --version

@ -29,12 +29,10 @@ struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
}
pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
name: &'a str,
cache: &'a KQCacheWithTTL<Key, Qey, Val, We, B>,
inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>,
key: Key,
qey: Qey,
ttl: Duration,
tx: &'a flume::Sender<(Instant, Key, Qey)>,
}
impl<
@ -141,12 +139,10 @@ impl<
match self.cache.get_value_or_guard_async(&key, &qey).await {
Ok(x) => Ok(x),
Err(inner) => Err(PlaceholderGuardWithTTL {
name: self.name,
cache: self,
inner,
key,
qey,
ttl: self.ttl,
tx: &self.tx,
}),
}
}
@ -251,20 +247,24 @@ impl<
> PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B>
{
pub fn insert(self, val: Val) {
let expire_at = Instant::now() + self.ttl;
let expire_at = Instant::now() + self.cache.ttl;
self.inner.insert(val);
let weight = self.cache.weighter.weight(&self.key, &self.qey, &val);
if log_enabled!(log::Level::Trace) {
trace!(
"{}, {:?}, {:?} expiring in {}s",
self.name,
self.key,
self.qey,
expire_at.duration_since(Instant::now()).as_secs_f32()
);
if weight <= self.cache.max_item_weight {
self.inner.insert(val);
if log_enabled!(log::Level::Trace) {
trace!(
"{}, {:?}, {:?} expiring in {}s",
self.cache.name,
self.key,
self.qey,
expire_at.duration_since(Instant::now()).as_secs_f32()
);
}
self.cache.tx.send((expire_at, self.key, self.qey)).unwrap();
}
self.tx.send((expire_at, self.key, self.qey)).unwrap();
}
}

@ -1,2 +1,2 @@
[toolchain]
channel = "1.69.0"
channel = "nightly-2023-05-25"

@ -89,9 +89,12 @@ tokio-uring = { version = "0.4.0", optional = true }
toml = "0.7.4"
tower = "0.4.13"
tower-http = { version = "0.4.0", features = ["cors", "sensitive-headers"] }
tracing = "0.1.37"
tracing-subscriber = "0.3"
ulid = { version = "1.0.0", features = ["uuid", "serde"] }
url = "2.3.1"
uuid = "1.3.3"
async-trait = "0.1.68"
[dev-dependencies]
tokio = { version = "1.28.2", features = ["full", "test-util"] }

@ -0,0 +1,16 @@
use tracing::info;
fn main() {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
let number_of_yaks = 3;
// this creates a new event, outside of any spans.
info!(number_of_yaks, "preparing to shave yaks");
let number_shaved = 3;
info!(
all_yaks_shaved = number_shaved == number_of_yaks,
"yak shaving completed."
);
}

@ -1,4 +1,3 @@
// TODO: this file is way too big now. move things into other modules
mod ws;
use crate::block_number::{block_needed, BlockNeeded};
@ -9,11 +8,12 @@ use crate::frontend::authorization::{
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest,
JsonRpcRequestEnum,
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcId,
JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResultData,
};
use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica};
use crate::response_cache::{
JsonRpcResponseCache, JsonRpcResponseCacheKey, JsonRpcResponseData, JsonRpcResponseWeigher,
JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
@ -38,18 +38,15 @@ use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{debug, error, info, trace, warn, Level};
use log::{error, info, trace, warn, Level};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ConnectionTrait, Database, DatabaseConnection, EntityTrait, PaginatorTrait,
};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use migration::sea_orm::{EntityTrait, PaginatorTrait};
use quick_cache_ttl::{Cache, CacheWithTTL};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt;
use std::net::IpAddr;
@ -59,7 +56,7 @@ use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tokio::time::timeout;
// TODO: make this customizable?
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
@ -112,19 +109,6 @@ pub struct AuthorizationChecks {
pub balance: Option<Decimal>,
}
/// Simple wrapper so that we can keep track of read only connections.
/// This does no blocking of writing in the compiler!
/// TODO: move this
#[derive(Clone)]
pub struct DatabaseReplica(pub DatabaseConnection);
// TODO: I feel like we could do something smart with DeRef or AsRef or Borrow, but that wasn't working for me
impl DatabaseReplica {
pub fn conn(&self) -> &DatabaseConnection {
&self.0
}
}
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Arc<CacheWithTTL<RpcSecretKey, AuthorizationChecks>>;
@ -152,7 +136,7 @@ pub struct Web3ProxyApp {
/// This is the Sender so that new channels can subscribe to it
pending_tx_sender: broadcast::Sender<TxStatus>,
/// Optional database for users and accounting
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub db_conn: Option<DatabaseConnection>,
/// Optional read-only database for users and accounting
pub db_replica: Option<DatabaseReplica>,
pub hostname: Option<String>,
@ -210,104 +194,6 @@ pub async fn flatten_handles<T>(
Ok(())
}
pub async fn get_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
// TODO: scrub credentials and then include the db_url in logs
info!("Connecting to db");
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
// TODO: load all these options from the config file. i think mysql default max is 100
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
.connect_timeout(Duration::from_secs(30))
.min_connections(min_connections)
.max_connections(max_connections)
.sqlx_logging(false);
// .sqlx_logging_level(log::LevelFilter::Info);
Database::connect(db_opt).await
}
pub async fn drop_migration_lock(db_conn: &DatabaseConnection) -> Result<(), DbErr> {
let db_backend = db_conn.get_database_backend();
let drop_lock_statment = db_backend.build(Table::drop().table(Alias::new("migration_lock")));
db_conn.execute(drop_lock_statment).await?;
debug!("migration lock unlocked");
Ok(())
}
/// Be super careful with override_existing_lock! It is very important that only one process is running the migrations at a time!
pub async fn migrate_db(
db_conn: &DatabaseConnection,
override_existing_lock: bool,
) -> Result<(), DbErr> {
let db_backend = db_conn.get_database_backend();
// TODO: put the timestamp and hostname into this as columns?
let create_lock_statment = db_backend.build(
Table::create()
.table(Alias::new("migration_lock"))
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
loop {
if Migrator::get_pending_migrations(db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(());
}
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
if override_existing_lock {
warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!");
sleep(Duration::from_secs(10)).await
} else {
debug!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\". err={:?}", err);
// TODO: exponential backoff with jitter?
sleep(Duration::from_secs(1)).await;
continue;
}
}
debug!("migration lock acquired");
break;
}
let migration_result = Migrator::up(db_conn, None).await;
// drop the distributed lock
drop_migration_lock(db_conn).await?;
// return if migrations erred
migration_result
}
/// Connect to the database and run migrations
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
// TODO: this seems to fail silently
let db_conn = get_db(db_url, min_connections, max_connections).await?;
migrate_db(&db_conn, false).await?;
Ok(db_conn)
}
/// starting an app creates many tasks
#[derive(From)]
pub struct Web3ProxyAppSpawn {
@ -387,7 +273,7 @@ impl Web3ProxyApp {
db_replica = if let Some(db_replica_url) = top_config.app.db_replica_url.clone() {
if db_replica_url == db_url {
// url is the same. do not make a new connection or we might go past our max connections
db_conn.clone().map(DatabaseReplica)
db_conn.clone().map(Into::into)
} else {
let db_replica_min_connections = top_config
.app
@ -406,11 +292,11 @@ impl Web3ProxyApp {
)
.await?;
Some(DatabaseReplica(db_replica))
Some(db_replica.into())
}
} else {
// just clone so that we don't need a bunch of checks all over our code
db_conn.clone().map(DatabaseReplica)
db_conn.clone().map(Into::into)
};
} else {
anyhow::ensure!(
@ -1012,6 +898,43 @@ impl Web3ProxyApp {
.expect("prometheus metrics should always serialize")
}
/// make an internal request with stats and caching
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
params: P,
) -> Web3ProxyResult<R> {
let db_conn = self.db_conn();
let authorization = Arc::new(Authorization::internal(db_conn)?);
self.authorized_request(method, params, authorization).await
}
/// this is way more round-a-bout than we want, but it means stats are emitted and caches are used
/// request_with_caching
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
params: P,
authorization: Arc<Authorization>,
) -> Web3ProxyResult<R> {
let request = JsonRpcRequest::new(JsonRpcId::Number(1), method.to_string(), json!(params))?;
let (_, response, _) = self.proxy_request(request, authorization, None).await;
if let Some(result) = response.result {
let result = serde_json::from_str(result.get())?;
Ok(result)
} else if let Some(error_data) = response.error {
// TODO: this might lose the http error code
Err(Web3ProxyError::JsonRpcErrorData(error_data))
} else {
unimplemented!();
}
}
/// send the request or batch of requests to the approriate RPCs
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
@ -1020,11 +943,10 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
// TODO: use streams and buffers so we don't overwhelm our server
let response = match request {
JsonRpcRequestEnum::Single(mut request) => {
JsonRpcRequestEnum::Single(request) => {
let (status_code, response, rpcs) = self
.proxy_cached_request(&authorization, &mut request, None)
.proxy_request(request, authorization.clone(), None)
.await;
(
@ -1038,7 +960,7 @@ impl Web3ProxyApp {
.proxy_web3_rpc_requests(&authorization, requests)
.await?;
// TODO: real status code
// TODO: real status code. i don't think we are following the spec here
(
StatusCode::OK,
JsonRpcForwardedResponseEnum::Batch(responses),
@ -1055,7 +977,7 @@ impl Web3ProxyApp {
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut requests: Vec<JsonRpcRequest>,
requests: Vec<JsonRpcRequest>,
) -> Web3ProxyResult<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1067,11 +989,12 @@ impl Web3ProxyApp {
.head_block_num()
.ok_or(Web3ProxyError::NoServersSynced)?;
// TODO: use streams and buffers so we don't overwhelm our server
let responses = join_all(
requests
.iter_mut()
.into_iter()
.map(|request| {
self.proxy_cached_request(authorization, request, Some(head_block_num))
self.proxy_request(request, authorization.clone(), Some(head_block_num))
})
.collect::<Vec<_>>(),
)
@ -1123,32 +1046,32 @@ impl Web3ProxyApp {
/// try to send transactions to the best available rpcs with protected/private mempools
/// if no protected rpcs are configured, then some public rpcs are used instead
async fn try_send_protected(
async fn try_send_protected<P: JsonRpcParams>(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseData> {
method: &str,
params: &P,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<Box<RawValue>> {
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty() {
let protected_response = protected_rpcs
.try_send_all_synced_connections(
authorization,
request,
method,
params,
Some(request_metadata),
None,
None,
Level::Trace,
Some(Level::Trace.into()),
None,
true,
)
.await?;
.await;
return Ok(protected_response);
return protected_response;
}
}
let num_public_rpcs = match authorization.checks.proxy_mode {
let num_public_rpcs = match request_metadata.proxy_mode() {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
@ -1163,47 +1086,46 @@ impl Web3ProxyApp {
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
self.balanced_rpcs
.try_send_all_synced_connections(
authorization,
request,
method,
params,
Some(request_metadata),
None,
None,
Level::Trace,
Some(Level::Trace.into()),
num_public_rpcs,
true,
)
.await
}
// TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved!
// TODO: move this to another module
async fn proxy_cached_request(
///
// TODO: is this a good return type? i think the status code should be one level higher
async fn proxy_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &mut JsonRpcRequest,
request: JsonRpcRequest,
authorization: Arc<Authorization>,
head_block_num: Option<U64>,
) -> (StatusCode, JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>) {
// TODO: move this code to another module so that its easy to turn this trace logging on in dev
trace!("Received request: {:?}", request);
// save the id so we can attach it to the response
// TODO: we don't always need to clone this. if we come from the cache, we can just take from the request
// TODO: store on the request_metadata?
let response_id = request.id.clone();
let request_metadata = RequestMetadata::new(
self,
authorization.clone(),
RequestOrMethod::Request(request),
authorization,
RequestOrMethod::Request(&request),
head_block_num.as_ref(),
)
.await;
let (status_code, response_data): (_, JsonRpcResponseData) = match self
._proxy_cached_request(authorization, request, head_block_num, &request_metadata)
let response_id = request.id;
let (code, response_data) = match self
._proxy_request_with_caching(
&request.method,
request.params,
head_block_num,
&request_metadata,
)
.await
{
Ok(x) => (StatusCode::OK, x),
Ok(response_data) => (StatusCode::OK, response_data),
Err(err) => err.into_response_parts(),
};
@ -1212,25 +1134,28 @@ impl Web3ProxyApp {
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
// TODO: with parallel request sending, I think there could be a race on this
let rpcs = request_metadata.backend_rpcs_used();
(status_code, response, rpcs)
(code, response, rpcs)
}
/// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use
async fn _proxy_cached_request(
/// TODO: how can we make this generic?
async fn _proxy_request_with_caching(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &mut JsonRpcRequest,
method: &str,
mut params: serde_json::Value,
head_block_num: Option<U64>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseData> {
// TODO: don't clone?
let request_method = request.method.clone();
) -> Web3ProxyResult<JsonRpcResponseEnum<Box<RawValue>>> {
// TODO: don't clone into a new string?
let request_method = method.to_string();
let authorization = request_metadata.authorization.clone().unwrap_or_default();
// TODO: serve net_version without querying the backend
let response_data: JsonRpcResponseData = match request_method.as_ref() {
// TODO: don't force RawValue
let response_data: JsonRpcResponseEnum<Box<RawValue>> = match request_method.as_ref() {
// lots of commands are blocked
method @ ("db_getHex"
| "db_getString"
@ -1324,22 +1249,24 @@ impl Web3ProxyApp {
))
.into()
}
_method @ ("eth_sendUserOperation"
method @ ("eth_sendUserOperation"
| "eth_estimateUserOperationGas"
| "eth_getUserOperationByHash"
| "eth_getUserOperationReceipt"
| "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
// TODO: timeout
bundler_4337_rpcs
.try_proxy_connection(
authorization,
request,
// TODO: timeout
let x = bundler_4337_rpcs
.try_proxy_connection::<_, Box<RawValue>>(
method,
&params,
Some(request_metadata),
None,
None,
)
.await?
.await?;
x.into()
}
None => {
// TODO: stats even when we error!
@ -1347,10 +1274,10 @@ impl Web3ProxyApp {
return Err(Web3ProxyError::NoServersSynced);
}
},
"eth_accounts" => JsonRpcResponseData::from(serde_json::Value::Array(vec![])),
"eth_accounts" => JsonRpcResponseEnum::from(serde_json::Value::Array(vec![])),
"eth_blockNumber" => {
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => JsonRpcResponseData::from(json!(head_block_num)),
Some(head_block_num) => JsonRpcResponseEnum::from(json!(head_block_num)),
None => {
// TODO: what does geth do if this happens?
// TODO: standard not synced error
@ -1358,89 +1285,88 @@ impl Web3ProxyApp {
}
}
}
"eth_chainId" => JsonRpcResponseData::from(json!(U64::from(self.config.chain_id))),
"eth_chainId" => JsonRpcResponseEnum::from(json!(U64::from(self.config.chain_id))),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
// no need for serving coinbase
JsonRpcResponseData::from(json!(Address::zero()))
JsonRpcResponseEnum::from(json!(Address::zero()))
}
"eth_estimateGas" => {
// TODO: timeout
let response_data = self
let mut gas_estimate = self
.balanced_rpcs
.try_proxy_connection(
authorization,
request,
.try_proxy_connection::<_, U256>(
method,
&params,
Some(request_metadata),
None,
None,
)
.await?;
if let JsonRpcResponseData::Result { value, .. } = response_data {
let mut gas_estimate: U256 = serde_json::from_str(value.get())
.or(Err(Web3ProxyError::GasEstimateNotU256))?;
let gas_increase = if let Some(gas_increase_percent) =
self.config.gas_increase_percent
{
let gas_increase = gas_estimate * gas_increase_percent / U256::from(100);
let gas_increase = if let Some(gas_increase_percent) =
self.config.gas_increase_percent
{
let gas_increase = gas_estimate * gas_increase_percent / U256::from(100);
let min_gas_increase = self.config.gas_increase_min.unwrap_or_default();
let min_gas_increase = self.config.gas_increase_min.unwrap_or_default();
gas_increase.max(min_gas_increase)
} else {
self.config.gas_increase_min.unwrap_or_default()
};
gas_estimate += gas_increase;
JsonRpcResponseData::from(json!(gas_estimate))
gas_increase.max(min_gas_increase)
} else {
response_data
}
self.config.gas_increase_min.unwrap_or_default()
};
gas_estimate += gas_increase;
// TODO: from_serializable?
JsonRpcResponseEnum::from(json!(gas_estimate))
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
// TODO: timeout
let mut response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
request,
.try_proxy_connection::<_, Box<RawValue>>(
method,
&params,
Some(request_metadata),
None,
None,
)
.await?;
.await;
// if we got "null", it is probably because the tx is old. retry on nodes with old block data
if let JsonRpcResponseData::Result { value, .. } = &response_data {
if value.get() == "null" {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
let try_archive = if let Ok(value) = &response_data {
value.get() == "null"
} else {
true
};
response_data = self
.balanced_rpcs
.try_proxy_connection(
authorization,
request,
Some(request_metadata),
Some(&U64::one()),
None,
)
.await?;
}
if try_archive {
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
response_data = self
.balanced_rpcs
.try_proxy_connection::<_, Box<RawValue>>(
method,
&params,
Some(request_metadata),
Some(&U64::one()),
None,
)
.await;
}
response_data
response_data.try_into()?
}
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => JsonRpcResponseData::from(json!(U64::zero())),
"eth_mining" => JsonRpcResponseData::from(serde_json::Value::Bool(false)),
"eth_hashrate" => JsonRpcResponseEnum::from(json!(U64::zero())),
"eth_mining" => JsonRpcResponseEnum::from(serde_json::Value::Bool(false)),
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
@ -1448,28 +1374,27 @@ impl Web3ProxyApp {
// TODO: error if the chain_id is incorrect
// TODO: timeout
let mut response_data = self
.try_send_protected(
authorization,
request,
request_metadata.clone(),
)
.await?;
let response = timeout(
Duration::from_secs(30),
self
.try_send_protected(
method,
&params,
request_metadata,
)
)
.await?;
let mut response = response.try_into()?;
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
if let JsonRpcResponseData::Error { value, .. } = &response_data {
if value.code == -32000
&& (value.message == "ALREADY_EXISTS: already known"
|| value.message == "INTERNAL_ERROR: existing tx with same hash")
if let JsonRpcResponseEnum::RpcError{ error_data, ..} = &response {
if error_data.code == -32000
&& (error_data.message == "ALREADY_EXISTS: already known"
|| error_data.message == "INTERNAL_ERROR: existing tx with same hash")
{
let params = request
.params
.as_mut()
.web3_context("there must be params if we got this far")?;
let params = params
.as_array()
.ok_or_else(|| {
@ -1501,7 +1426,7 @@ impl Web3ProxyApp {
trace!("tx_hash: {:#?}", tx_hash);
response_data = JsonRpcResponseData::from(tx_hash);
response = JsonRpcResponseEnum::from(tx_hash);
}
}
}
@ -1509,7 +1434,7 @@ impl Web3ProxyApp {
// emit transaction count stats
// TODO: use this cache to avoid sending duplicate transactions?
if let Some(ref salt) = self.config.public_recent_ips_salt {
if let JsonRpcResponseData::Result { value, .. } = &response_data {
if let JsonRpcResponseEnum::Result { value, .. } = &response {
let now = Utc::now().timestamp();
let app = self.clone();
@ -1544,13 +1469,13 @@ impl Web3ProxyApp {
}
}
response_data
todo!();
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(false))
JsonRpcResponseEnum::from(serde_json::Value::Bool(false))
}
"eth_subscribe" => JsonRpcErrorData {
message: Cow::Borrowed(
@ -1568,19 +1493,19 @@ impl Web3ProxyApp {
"net_listening" => {
// TODO: only true if there are some backends on balanced_rpcs?
// TODO: const
JsonRpcResponseData::from(serde_json::Value::Bool(true))
JsonRpcResponseEnum::from(serde_json::Value::Bool(true))
}
"net_peerCount" =>
JsonRpcResponseData::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())))
JsonRpcResponseEnum::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())))
,
"web3_clientVersion" =>
JsonRpcResponseData::from(serde_json::Value::String(APP_USER_AGENT.to_string()))
JsonRpcResponseEnum::from(serde_json::Value::String(APP_USER_AGENT.to_string()))
,
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
// TODO: timeout
match &request.params {
Some(serde_json::Value::Array(params)) => {
match &params {
serde_json::Value::Array(params) => {
// TODO: make a struct and use serde conversion to clean this up
if params.len() != 1
|| !params.get(0).map(|x| x.is_string()).unwrap_or(false)
@ -1609,7 +1534,7 @@ impl Web3ProxyApp {
let hash = H256::from(keccak256(param));
JsonRpcResponseData::from(json!(hash))
JsonRpcResponseEnum::from(json!(hash))
}
}
_ => {
@ -1640,28 +1565,25 @@ impl Web3ProxyApp {
.or(self.balanced_rpcs.head_block_num())
.ok_or(Web3ProxyError::NoServersSynced)?;
// TODO: don't clone. this happens way too much. maybe &mut?
// let mut request = request.clone();
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
// TODO: this cache key can be rather large. is that okay?
let cache_key: Option<JsonRpcResponseCacheKey> = match block_needed(
authorization,
let cache_key: Option<JsonRpcQueryCacheKey> = match block_needed(
&authorization,
method,
request.params.as_mut(),
&mut params,
head_block_num,
&self.balanced_rpcs,
)
.await?
{
BlockNeeded::CacheSuccessForever => Some(JsonRpcResponseCacheKey {
from_block: None,
to_block: None,
method: method.to_string(),
params: request.params.clone(),
cache_errors: false,
}),
BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey::new(
None,
None,
method,
&params,
false,
)),
BlockNeeded::CacheNever => None,
BlockNeeded::Cache {
block_num,
@ -1669,7 +1591,7 @@ impl Web3ProxyApp {
} => {
let (request_block_hash, block_depth) = self
.balanced_rpcs
.block_hash(authorization, &block_num)
.block_hash(&authorization, &block_num)
.await?;
if block_depth < self.config.archive_depth {
@ -1680,18 +1602,17 @@ impl Web3ProxyApp {
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.block(&authorization, &request_block_hash, None)
.await?
.block;
Some(JsonRpcResponseCacheKey {
from_block: Some(request_block),
to_block: None,
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
Some(JsonRpcQueryCacheKey::new(
Some(request_block),
None,
method,
&params,
cache_errors,
})
))
}
BlockNeeded::CacheRange {
from_block_num,
@ -1700,7 +1621,7 @@ impl Web3ProxyApp {
} => {
let (from_block_hash, block_depth) = self
.balanced_rpcs
.block_hash(authorization, &from_block_num)
.block_hash(&authorization, &from_block_num)
.await?;
if block_depth < self.config.archive_depth {
@ -1711,39 +1632,38 @@ impl Web3ProxyApp {
let from_block = self
.balanced_rpcs
.block(authorization, &from_block_hash, None)
.block(&authorization, &from_block_hash, None)
.await?
.block;
let (to_block_hash, _) = self
.balanced_rpcs
.block_hash(authorization, &to_block_num)
.block_hash(&authorization, &to_block_num)
.await?;
let to_block = self
.balanced_rpcs
.block(authorization, &to_block_hash, None)
.block(&authorization, &to_block_hash, None)
.await?
.block;
Some(JsonRpcResponseCacheKey {
from_block: Some(from_block),
to_block: Some(to_block),
method: method.to_string(),
params: request.params.clone(),
Some(JsonRpcQueryCacheKey::new(
Some(from_block),
Some(to_block),
method,
&params,
cache_errors,
})
))
}
};
let authorization = authorization.clone();
// TODO: different timeouts for different user tiers
// TODO: different timeouts for different user tiers. get the duration out of the request_metadata
let duration = Duration::from_secs(240);
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
let from_block_num = cache_key.from_block_num();
let to_block_num = cache_key.to_block_num();
let cache_errors = cache_key.cache_errors();
match self
.jsonrpc_response_cache
@ -1754,35 +1674,41 @@ impl Web3ProxyApp {
let response_data = timeout(
duration,
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
.try_proxy_connection::<_, Box<RawValue>>(
method,
&params,
Some(request_metadata),
from_block_num.as_ref(),
to_block_num.as_ref(),
)
)
.await??;
.await?;
// TODO: convert the Box<RawValue> to an Arc<RawValue>
x.insert(response_data.clone());
let response_data: JsonRpcResponseEnum<Box<RawValue>> = response_data.try_into()?;
if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors {
// TODO: convert the Box<RawValue> to an Arc<RawValue>?
x.insert(response_data.clone());
}
response_data
}
}
} else {
timeout(
let x = timeout(
duration,
self.balanced_rpcs
.try_proxy_connection(
&authorization,
request,
.try_proxy_connection::<_, Box<RawValue>>(
method,
&params,
Some(request_metadata),
None,
None,
)
)
.await??
.await??;
x.into()
}
}
};

@ -5,7 +5,7 @@ use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMe
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest;
use crate::response_cache::JsonRpcResponseData;
use crate::response_cache::JsonRpcResponseEnum;
use crate::rpcs::transactions::TxStatus;
use axum::extract::ws::Message;
use ethers::types::U64;
@ -47,265 +47,261 @@ impl Web3ProxyApp {
// TODO: calling json! on every request is probably not fast. but we can only match against
// TODO: i think we need a stricter EthSubscribeRequest type that JsonRpcRequest can turn into
match jsonrpc_request.params.as_ref() {
Some(x) if x == &json!(["newHeads"]) => {
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let app = self.clone();
if jsonrpc_request.params == json!(["newHeads"]) {
let head_block_receiver = self.watch_consensus_head_receiver.clone();
let app = self.clone();
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
while let Some(new_head) = head_block_receiver.next().await {
let new_head = if let Some(new_head) = new_head {
new_head
} else {
continue;
};
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newHeads)", 0),
Some(new_head.number()),
)
.await;
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.block,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
subscription_request_metadata.add_response(response_bytes);
}
trace!("closed newHeads subscription {:?}", subscription_id);
});
}
Some(x) if x == &json!(["newPendingTransactions"]) => {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
trace!("newHeads subscription {:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
subscription_registration,
);
trace!(
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
while let Some(new_head) = head_block_receiver.next().await {
let new_head = if let Some(new_head) = new_head {
new_head
} else {
continue;
};
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0),
None,
)
.await;
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newHeads)", 0),
Some(new_head.number()),
)
.await;
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method":"eth_subscription",
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.block,
},
});
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// we could use JsonRpcForwardedResponseEnum::num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: test that this len is the same as JsonRpcForwardedResponseEnum.num_bytes()
let response_bytes = response_str.len();
// TODO: do clients support binary messages?
// TODO: can we check a content type header?
let response_msg = Message::Text(response_str);
subscription_request_metadata.add_response(response_bytes);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects.
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
subscription_request_metadata.add_response(response_bytes);
}
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!("closed newHeads subscription {:?}", subscription_id);
});
} else if jsonrpc_request.params == json!(["newPendingTransactions"]) {
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
trace!(
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingFullTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
trace!(
"pending newPendingTransactions subscription id: {:?}",
subscription_id
);
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingTransactions)", 0),
None,
)
.await;
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
"result": new_tx.hash,
},
});
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// TODO: test that this len is the same as JsonRpcForwardedResponseEnum.num_bytes()
let response_bytes = response_str.len();
subscription_request_metadata.add_response(response_bytes);
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(
"pending newPendingFullTransactions subscription: {:?}",
"closed newPendingTransactions subscription: {:?}",
subscription_id
);
});
} else if jsonrpc_request.params == json!(["newPendingFullTransactions"]) {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingFullTransactions)", 0),
None,
)
.await;
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
trace!(
"pending newPendingFullTransactions subscription: {:?}",
subscription_id
);
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
RequestOrMethod::Method("eth_subscribe(newPendingFullTransactions)", 0),
None,
)
.await;
subscription_request_metadata.add_response(&response_json);
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the whole transaction
"result": new_tx,
},
});
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
subscription_request_metadata.add_response(&response_json);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
trace!(
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
}
Some(x) if x == &json!(["newPendingRawTransactions"]) => {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
}
trace!(
"pending transactions subscription id: {:?}",
"closed newPendingFullTransactions subscription: {:?}",
subscription_id
);
});
} else if jsonrpc_request.params == json!(["newPendingRawTransactions"]) {
// TODO: too much copy/pasta with newPendingTransactions
let pending_tx_receiver = self.pending_tx_sender.subscribe();
let app = self.clone();
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
"eth_subscribe(newPendingRawTransactions)",
None,
)
.await;
let mut pending_tx_receiver = Abortable::new(
BroadcastStream::new(pending_tx_receiver),
subscription_registration,
);
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
trace!(
"pending transactions subscription id: {:?}",
subscription_id
);
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let subscription_request_metadata = RequestMetadata::new(
&app,
authorization.clone(),
"eth_subscribe(newPendingRawTransactions)",
None,
)
.await;
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
TxStatus::Confirmed(..) => continue,
TxStatus::Orphaned(tx) => tx,
};
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": subscription_id,
// upstream just sends the txid, but we want to send the raw transaction
"result": new_tx.rlp(),
},
});
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
let response_str = serde_json::to_string(&response_json)
.expect("this should always be valid json");
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
// we could use response.num_bytes() here, but since we already have the string, this is easier
let response_bytes = response_str.len();
subscription_request_metadata.add_response(response_bytes);
}
// TODO: do clients support binary messages?
let response_msg = Message::Text(response_str);
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
}
_ => return Err(Web3ProxyError::NotImplemented),
if response_sender.send_async(response_msg).await.is_err() {
// TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle?
break;
};
subscription_request_metadata.add_response(response_bytes);
}
trace!(
"closed newPendingRawTransactions subscription: {:?}",
subscription_id
);
});
} else {
return Err(Web3ProxyError::NotImplemented);
}
// TODO: do something with subscription_join_handle?
let response_data = JsonRpcResponseData::from(json!(subscription_id));
let response_data = JsonRpcResponseEnum::from(json!(subscription_id));
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);

@ -1,6 +1,6 @@
use argh::FromArgs;
use migration::sea_orm::DatabaseConnection;
use web3_proxy::app::{drop_migration_lock, migrate_db};
use web3_proxy::relational_db::{drop_migration_lock, migrate_db};
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// In case of emergency, break glass.

@ -34,8 +34,9 @@ use std::{
use tokio::runtime;
use web3_proxy::pagerduty::panic_handler;
use web3_proxy::{
app::{get_db, get_migrated_db, APP_USER_AGENT},
app::APP_USER_AGENT,
config::TopConfig,
relational_db::{get_db, get_migrated_db},
};
#[cfg(feature = "mimalloc")]

@ -10,7 +10,7 @@ use rdkafka::{
};
use std::num::NonZeroU64;
use uuid::Uuid;
use web3_proxy::{app::get_db, config::TopConfig, frontend::authorization::RpcSecretKey};
use web3_proxy::{config::TopConfig, frontend::authorization::RpcSecretKey, relational_db::get_db};
/// Second subcommand.
#[derive(FromArgs, PartialEq, Debug, Eq)]

@ -124,7 +124,7 @@ pub enum BlockNeeded {
pub async fn block_needed(
authorization: &Arc<Authorization>,
method: &str,
params: Option<&mut serde_json::Value>,
params: &mut serde_json::Value,
head_block_num: U64,
rpcs: &Web3Rpcs,
) -> Web3ProxyResult<BlockNeeded> {
@ -134,18 +134,13 @@ pub async fn block_needed(
return Ok(BlockNeeded::CacheNever);
}
let params = if let Some(params) = params {
// grab the params so we can inspect and potentially modify them
params
} else {
// if no params, no block is needed
// TODO: check all the methods with no params, some might not be cacheable
// caching with the head block /should/ always be okay
if matches!(params, serde_json::Value::Null) {
// no params given
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
};
}
// get the index for the BlockNumber
// The BlockNumber is usually the last element.

@ -126,13 +126,6 @@ pub async fn admin_increase_balance(
.await?
.context("User does not have a balance row")?;
// Finally make the user premium if balance is above 10$
let premium_user_tier = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq("Premium"))
.one(&db_conn)
.await?
.context("Premium tier was not found!")?;
let balance_entry = balance_entry.into_active_model();
balance::Entity::insert(balance_entry)
.on_conflict(
@ -281,7 +274,7 @@ pub async fn admin_login_get(
// TODO: Only get the id, not the whole user object ...
let user = user::Entity::find()
.filter(user::Column::Address.eq(user_address))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::BadRequest(
"Could not find user in db".to_string(),
@ -292,7 +285,7 @@ pub async fn admin_login_get(
info!("Encoded admin address is: {:?}", admin_address);
let admin = user::Entity::find()
.filter(user::Column::Address.eq(admin_address))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::BadRequest(
"Could not find admin in db".to_string(),
@ -409,7 +402,7 @@ pub async fn admin_login_post(
// TODO: Here we will need to re-find the parameter where the admin wants to log-in as the user ...
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(login_nonce_uuid))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.web3_context("database error while finding pending_login")?
.web3_context("login nonce not found")?;
@ -461,13 +454,13 @@ pub async fn admin_login_post(
// TODO: Right now this loads the whole admin. I assume we might want to load the user though (?) figure this out as we go along...
let admin = user::Entity::find()
.filter(user::Column::Address.eq(our_msg.address.as_ref()))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.web3_context("getting admin address")?;
let imitating_user = user::Entity::find()
.filter(user::Column::Id.eq(imitating_user_id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.web3_context("admin address was not found!")?;
@ -491,7 +484,7 @@ pub async fn admin_login_post(
// the user is already registered
let admin_rpc_key = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(admin.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?;

@ -72,7 +72,6 @@ pub enum AuthorizationType {
#[derive(Clone, Debug)]
pub struct Authorization {
pub checks: AuthorizationChecks,
// TODO: instead of the conn, have a channel?
pub db_conn: Option<DatabaseConnection>,
pub ip: IpAddr,
pub origin: Option<Origin>,
@ -283,6 +282,12 @@ pub struct RequestMetadata {
pub stat_sender: Option<flume::Sender<AppStat>>,
}
impl Default for Authorization {
fn default() -> Self {
Authorization::internal(None).unwrap()
}
}
impl Default for RequestMetadata {
fn default() -> Self {
Self {
@ -305,6 +310,15 @@ impl Default for RequestMetadata {
}
}
impl RequestMetadata {
pub fn proxy_mode(&self) -> ProxyMode {
self.authorization
.as_ref()
.map(|x| x.checks.proxy_mode)
.unwrap_or_default()
}
}
#[derive(From)]
pub enum RequestOrMethod<'a> {
Request(&'a JsonRpcRequest),
@ -961,7 +975,7 @@ impl Web3ProxyApp {
let user = user::Entity::find()
.left_join(login::Entity)
.filter(login::Column::BearerToken.eq(user_bearer_uuid))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.web3_context("fetching user from db by bearer token")?
.web3_context("unknown bearer token")?;
@ -1104,27 +1118,27 @@ impl Web3ProxyApp {
match rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(<Uuid>::from(rpc_secret_key)))
.filter(rpc_key::Column::Active.eq(true))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
{
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.context("no related user")?;
let balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(user_model.id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.map(|x| x.available_balance)
.unwrap_or_default();
let user_tier_model =
user_tier::Entity::find_by_id(user_model.user_tier_id)
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.context("no related user tier")?;

@ -2,7 +2,7 @@
use super::authorization::Authorization;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::response_cache::JsonRpcResponseData;
use crate::response_cache::JsonRpcResponseEnum;
use std::error::Error;
use std::{borrow::Cow, net::IpAddr};
@ -20,6 +20,8 @@ use log::{debug, error, info, trace, warn};
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use serde::Serialize;
use serde_json::value::RawValue;
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
pub type Web3ProxyResult<T> = Result<T, Web3ProxyError>;
@ -78,6 +80,9 @@ pub enum Web3ProxyError {
JoinError(JoinError),
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
JsonRpcErrorData(JsonRpcErrorData),
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
MsgPackEncode(rmp_serde::encode::Error),
NoBlockNumberOrHash,
NoBlocksKnown,
@ -143,7 +148,7 @@ pub enum Web3ProxyError {
}
impl Web3ProxyError {
pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) {
pub fn into_response_parts<R: Serialize>(self) -> (StatusCode, JsonRpcResponseEnum<R>) {
// TODO: include a unique request id in the data
let (code, err): (StatusCode, JsonRpcErrorData) = match self {
Self::AccessDenied => {
@ -478,6 +483,7 @@ impl Web3ProxyError {
},
)
}
Self::JsonRpcErrorData(jsonrpc_error_data) => (StatusCode::OK, jsonrpc_error_data),
Self::MsgPackEncode(err) => {
warn!("MsgPackEncode Error: {}", err);
(
@ -931,7 +937,15 @@ impl Web3ProxyError {
},
};
(code, JsonRpcResponseData::from(err))
(code, JsonRpcResponseEnum::from(err))
}
pub fn into_response_with_id(self, id: Box<RawValue>) -> Response {
let (status_code, response_data) = self.into_response_parts();
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);
(status_code, Json(response)).into_response()
}
}
@ -948,20 +962,13 @@ impl From<tokio::time::error::Elapsed> for Web3ProxyError {
}
impl IntoResponse for Web3ProxyError {
#[inline]
fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs
// TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY
let (status_code, response_data) = self.into_response_parts();
// this will be missing the jsonrpc id!
// its better to get request id and call from_response_data with it then to use this IntoResponse helper.
let response =
JsonRpcForwardedResponse::from_response_data(response_data, Default::default());
(status_code, Json(response)).into_response()
self.into_response_with_id(Default::default())
}
}
#[inline]
pub async fn handler_404() -> Response {
Web3ProxyError::NotFound.into_response()
}

@ -1,11 +1,12 @@
//! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server.
use super::authorization::{ip_is_authorized, key_is_authorized};
use super::errors::Web3ProxyResponse;
use super::errors::Web3ProxyError;
use super::rpc_proxy_ws::ProxyMode;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
use axum::headers::{Origin, Referer, UserAgent};
use axum::response::Response;
use axum::TypedHeader;
use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::InsecureClientIp;
@ -22,7 +23,7 @@ pub async fn proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await
}
@ -32,7 +33,7 @@ pub async fn fastest_proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
// TODO: read the fastest number from params
// TODO: check that the app allows this without authentication
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await
@ -44,7 +45,7 @@ pub async fn versus_proxy_web3_rpc(
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await
}
@ -54,12 +55,16 @@ async fn _proxy_web3_rpc(
origin: Option<TypedHeader<Origin>>,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
// TODO: benchmark spawning this
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let (authorization, semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let first_id = payload.first_id().map_err(|e| e.into_response())?;
let (authorization, semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode)
.await
.map_err(|e| e.into_response_with_id(first_id.to_owned()))?;
let authorization = Arc::new(authorization);
@ -68,7 +73,8 @@ async fn _proxy_web3_rpc(
let (status_code, response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(s, x, y)| (s, x, y, semaphore))?;
.map(|(s, x, y)| (s, x, y, semaphore))
.map_err(|e| e.into_response_with_id(first_id.to_owned()))?;
let mut response = (status_code, Json(response)).into_response();
@ -117,7 +123,7 @@ pub async fn proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
app,
ip,
@ -142,7 +148,7 @@ pub async fn debug_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
app,
ip,
@ -165,7 +171,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
app,
ip,
@ -188,7 +194,7 @@ pub async fn versus_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
app,
ip,
@ -212,10 +218,14 @@ async fn _proxy_web3_rpc_with_key(
rpc_key: String,
payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> Web3ProxyResponse {
) -> Result<Response, Response> {
// TODO: DRY w/ proxy_web3_rpc
// the request can take a while, so we spawn so that we can start serving another request
let rpc_key = rpc_key.parse()?;
let first_id = payload.first_id().map_err(|e| e.into_response())?;
let rpc_key = rpc_key
.parse()
.map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.to_owned()))?;
let (authorization, semaphore) = key_is_authorized(
&app,
@ -226,7 +236,8 @@ async fn _proxy_web3_rpc_with_key(
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.await?;
.await
.map_err(|e| e.into_response_with_id(first_id.to_owned()))?;
let authorization = Arc::new(authorization);
@ -235,7 +246,8 @@ async fn _proxy_web3_rpc_with_key(
let (status_code, response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(s, x, y)| (s, x, y, semaphore))?;
.map(|(s, x, y)| (s, x, y, semaphore))
.map_err(|e| e.into_response_with_id(first_id.to_owned()))?;
let mut response = (status_code, Json(response)).into_response();

@ -389,47 +389,41 @@ async fn handle_socket_payload(
#[derive(serde::Deserialize)]
struct EthUnsubscribeParams([U64; 1]);
if let Some(params) = json_request.params {
match serde_json::from_value(params) {
Ok::<EthUnsubscribeParams, _>(params) => {
let subscription_id = &params.0[0];
match serde_json::from_value(json_request.params) {
Ok::<EthUnsubscribeParams, _>(params) => {
let subscription_id = &params.0[0];
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;
match x.remove(subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;
match x.remove(subscription_id) {
None => false,
Some(handle) => {
handle.abort();
true
}
};
}
};
// TODO: don't create the response here. use a JsonRpcResponseData instead
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
response_id.clone(),
);
// TODO: don't create the response here. use a JsonRpcResponseData instead
let response = JsonRpcForwardedResponse::from_value(
json!(partial_response),
response_id.clone(),
);
request_metadata.add_response(&response);
request_metadata.add_response(&response);
Ok(response.into())
}
Err(err) => Err(Web3ProxyError::BadRequest(f!(
"incorrect params given for eth_unsubscribe. {err:?}"
))),
Ok(response.into())
}
} else {
Err(Web3ProxyError::BadRequest(
"no params given for eth_unsubscribe".to_string(),
))
Err(err) => Err(Web3ProxyError::BadRequest(f!(
"incorrect params given for eth_unsubscribe. {err:?}"
))),
}
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
.map(|(status_code, response, _)| response),
.map(|(_, response, _)| response),
};
(response_id, response)

@ -196,7 +196,7 @@ pub async fn user_login_post(
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(login_nonce_uuid))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.web3_context("database error while finding pending_login")?
.web3_context("login nonce not found")?;
@ -244,7 +244,7 @@ pub async fn user_login_post(
// TODO: limit columns or load whole user?
let caller = user::Entity::find()
.filter(user::Column::Address.eq(our_msg.address.as_ref()))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?;
let db_conn = app.db_conn().web3_context("login requires a db")?;
@ -319,7 +319,7 @@ pub async fn user_login_post(
warn!("Using register referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::UnknownReferralCode)?;
@ -350,7 +350,7 @@ pub async fn user_login_post(
warn!("Using referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::BadRequest(format!(
"The referral_link you provided does not exist {}",
@ -375,7 +375,7 @@ pub async fn user_login_post(
// the user is already registered
let user_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(caller.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?;

@ -1,7 +1,5 @@
use crate::app::Web3ProxyApp;
use crate::frontend::authorization::Authorization as InternalAuthorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::rpcs::request::OpenRequestResult;
use anyhow::{anyhow, Context};
use axum::{
extract::Path,
@ -12,23 +10,43 @@ use axum::{
use axum_macros::debug_handler;
use entities::{balance, increase_on_chain_balance_receipt, user, user_tier};
use ethers::abi::{AbiEncode, ParamType};
use ethers::prelude::abigen;
use ethers::types::{Address, TransactionReceipt, H256, U256};
use ethers::utils::{hex, keccak256};
use hashbrown::HashMap;
use hex_fmt::HexFmt;
use http::StatusCode;
use log::{debug, info, warn, Level};
use migration::sea_orm;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::ActiveModelTrait;
// use http::StatusCode;
use log::{debug, info, trace, warn};
// use migration::sea_orm;
// use migration::sea_orm::prelude::Decimal;
// use migration::sea_orm::ActiveModelTrait;
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::EntityTrait;
use migration::sea_orm::IntoActiveModel;
// use migration::sea_orm::IntoActiveModel;
use migration::sea_orm::QueryFilter;
use migration::sea_orm::TransactionTrait;
// use migration::sea_orm::TransactionTrait;
use serde_json::json;
use std::sync::Arc;
abigen!(
IERC20,
r#"[
event Transfer(address indexed from, address indexed to, uint256 value)
event Approval(address indexed owner, address indexed spender, uint256 value)
]"#,
);
abigen!(
PaymentFactory,
r#"[
event PaymentReceived(address indexed account, address token, uint256 amount)
]"#,
);
abigen!(
PaymentSweeper,
r#"[
]"#,
);
/// Implements any logic related to payments
/// Removed this mainly from "user" as this was getting clogged
///
@ -46,20 +64,16 @@ pub async fn user_balance_get(
let db_replica = app.db_replica().context("Getting database connection")?;
// Just return the balance for the user
let user_balance = match balance::Entity::find()
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(_user.id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
{
Some(x) => x.available_balance,
None => Decimal::from(0), // That means the user has no balance as of yet
// (user exists, but balance entry does not exist)
// In that case add this guy here
// Err(FrontendErrorResponse::BadRequest("User not found!"))
};
.map(|x| x.available_balance)
.unwrap_or_default();
let mut response = HashMap::new();
response.insert("balance", json!(user_balance));
let response = json!({
"balance": user_balance,
});
// TODO: Gotta create a new table for the spend part
Ok(Json(response).into_response())
@ -80,7 +94,7 @@ pub async fn user_deposits_get(
// Filter by user ...
let receipts = increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::DepositToUserId.eq(user.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await?;
// Return the response, all except the user ...
@ -115,7 +129,8 @@ pub async fn user_balance_post(
Path(mut params): Path<HashMap<String, String>>,
) -> Web3ProxyResponse {
// I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap"
// Check that the user is logged-in and authorized. We don't need a semaphore here btw
// Check that the user is logged-in and authorized
// The semaphore keeps a user from submitting tons of transactions in parallel which would DOS our backends
let (_, _semaphore) = app.bearer_is_authorized(bearer).await?;
// Get the transaction hash, and the amount that the user wants to top up by.
@ -130,13 +145,14 @@ pub async fn user_balance_post(
.context("unable to parse tx_hash")?;
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
let db_replica = app
.db_replica()
.context("query_user_stats needs a db replica")?;
// let db_replica = app
// .db_replica()
// .context("query_user_stats needs a db replica")?;
// Return straight false if the tx was already added ...
// TODO: TxHash being string
let receipt = increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(hex::encode(tx_hash)))
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.one(&db_conn)
.await?;
if receipt.is_some() {
@ -148,151 +164,31 @@ pub async fn user_balance_post(
// Iterate through all logs, and add them to the transaction list if there is any
// Address will be hardcoded in the config
let authorization = Arc::new(InternalAuthorization::internal(None).unwrap());
let transaction_receipt: TransactionReceipt = app
.internal_request("eth_getTransactionReceipt", (tx_hash,))
.await?;
// Just make an rpc request, idk if i need to call this super extensive code
let transaction_receipt: TransactionReceipt = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
debug!(
"Params are: {:?}",
&vec![format!("0x{}", hex::encode(tx_hash))]
);
handle
.request(
"eth_getTransactionReceipt",
&vec![format!("0x{}", hex::encode(tx_hash))],
Level::Trace.into(),
)
.await
// TODO: What kind of error would be here
.map_err(|err| Web3ProxyError::Anyhow(err.into()))
}
Ok(_) => {
// TODO: @Brllan Is this the right error message?
Err(Web3ProxyError::NoHandleReady)
}
Err(err) => {
log::trace!(
"cancelled funneling transaction {} from: {:?}",
tx_hash,
err,
);
Err(err)
}
}?;
debug!("Transaction receipt is: {:?}", transaction_receipt);
let accepted_token: Address = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
let mut accepted_tokens_request_object: serde_json::Map<String, serde_json::Value> =
serde_json::Map::new();
// We want to send a request to the contract
accepted_tokens_request_object.insert(
"to".to_owned(),
serde_json::Value::String(format!(
"{:?}",
app.config.deposit_factory_contract.clone()
)),
);
// We then want to include the function that we want to call
accepted_tokens_request_object.insert(
"data".to_owned(),
serde_json::Value::String(format!(
"0x{}",
HexFmt(keccak256("get_approved_tokens()".to_owned().into_bytes()))
)),
// hex::encode(
);
let params = serde_json::Value::Array(vec![
serde_json::Value::Object(accepted_tokens_request_object),
serde_json::Value::String("latest".to_owned()),
]);
debug!("Params are: {:?}", &params);
let accepted_token: String = handle
.request("eth_call", &params, Level::Trace.into())
.await
// TODO: What kind of error would be here
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
// Read the last
debug!("Accepted token response is: {:?}", accepted_token);
accepted_token[accepted_token.len() - 40..]
.parse::<Address>()
.map_err(|err| Web3ProxyError::Anyhow(err.into()))
}
Ok(_) => {
// TODO: @Brllan Is this the right error message?
Err(Web3ProxyError::NoHandleReady)
}
Err(err) => {
log::trace!(
"cancelled funneling transaction {} from: {:?}",
tx_hash,
err,
);
Err(err)
}
}?;
debug!("Accepted token is: {:?}", accepted_token);
let decimals: u32 = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
// Now get decimals points of the stablecoin
let mut token_decimals_request_object: serde_json::Map<String, serde_json::Value> =
serde_json::Map::new();
token_decimals_request_object.insert(
"to".to_owned(),
serde_json::Value::String(format!("0x{}", HexFmt(accepted_token))),
);
token_decimals_request_object.insert(
"data".to_owned(),
serde_json::Value::String(format!(
"0x{}",
HexFmt(keccak256("decimals()".to_owned().into_bytes()))
)),
);
let params = serde_json::Value::Array(vec![
serde_json::Value::Object(token_decimals_request_object),
serde_json::Value::String("latest".to_owned()),
]);
debug!("ERC20 Decimal request params are: {:?}", &params);
let decimals: String = handle
.request("eth_call", &params, Level::Trace.into())
.await
.map_err(|err| Web3ProxyError::Anyhow(err.into()))?;
debug!("Decimals response is: {:?}", decimals);
u32::from_str_radix(&decimals[2..], 16)
.map_err(|err| Web3ProxyError::Anyhow(err.into()))
}
Ok(_) => {
// TODO: @Brllan Is this the right error message?
Err(Web3ProxyError::NoHandleReady)
}
Err(err) => {
log::trace!(
"cancelled funneling transaction {} from: {:?}",
tx_hash,
err,
);
Err(err)
}
}?;
debug!("Decimals are: {:?}", decimals);
debug!("Tx receipt: {:?}", transaction_receipt);
trace!("Transaction receipt: {:#?}", transaction_receipt);
// there is no need to check accepted tokens. the smart contract already does that
// parse the log from the transaction receipt to get the token address,
/*
event PaymentReceived:
account: indexed(address)
token: address
amount: uint256
*/
// TODO: get the decimals for the token
// Go through all logs, this should prob capture it,
// At least according to this SE logs are just concatenations of the underlying types (like a struct..)
// https://ethereum.stackexchange.com/questions/87653/how-to-decode-log-event-of-my-transaction-log
todo!("refactor this to use new helpers");
/*
let deposit_contract = match app.config.deposit_factory_contract {
Some(x) => Ok(x),
None => Err(Web3ProxyError::Anyhow(anyhow!(
@ -370,15 +266,6 @@ pub async fn user_balance_post(
continue;
}
// Skip if no accepted token. Right now we only accept a single stablecoin as input
if token != accepted_token {
warn!(
"Out: Token is not accepted: {:?} != {:?}",
token, accepted_token
);
continue;
}
info!(
"Found deposit transaction for: {:?} {:?} {:?}",
recipient_account, token, amount
@ -386,8 +273,8 @@ pub async fn user_balance_post(
// Encoding is inefficient, revisit later
let recipient = match user::Entity::find()
.filter(user::Column::Address.eq(&recipient_account.encode()[12..]))
.one(db_replica.conn())
.filter(user::Column::Address.eq(recipient_account.encode_hex()))
.one(db_replica.as_ref())
.await?
{
Some(x) => Ok(x),
@ -396,102 +283,103 @@ pub async fn user_balance_post(
)),
}?;
// For now we only accept stablecoins
// And we hardcode the peg (later we would have to depeg this, for example
// 1$ = Decimal(1) for any stablecoin
// TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now
debug!("Arithmetic is: {:?} {:?}", amount, decimals);
debug!(
"Decimals arithmetic is: {:?} {:?}",
Decimal::from(amount.as_u128()),
Decimal::from(10_u64.pow(decimals))
);
let mut amount = Decimal::from(amount.as_u128());
let _ = amount.set_scale(decimals);
debug!("Amount is: {:?}", amount);
// For now we only accept stablecoins
// And we hardcode the peg (later we would have to depeg this, for example
// 1$ = Decimal(1) for any stablecoin
// TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now
debug!("Arithmetic is: {:?} {:?}", amount, decimals);
debug!(
"Decimals arithmetic is: {:?} {:?}",
Decimal::from(amount.as_u128()),
Decimal::from(10_u64.pow(decimals))
);
let mut amount = Decimal::from(amount.as_u128());
let _ = amount.set_scale(decimals);
debug!("Amount is: {:?}", amount);
// Check if the item is in the database. If it is not, then add it into the database
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(recipient.id))
.one(&db_conn)
.await?;
// Check if the item is in the database. If it is not, then add it into the database
let user_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(recipient.id))
.one(&db_conn)
.await?;
// Get the premium user-tier
let premium_user_tier = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq("Premium"))
.one(&db_conn)
.await?
.context("Could not find 'Premium' Tier in user-database")?;
// Get the premium user-tier
let premium_user_tier = user_tier::Entity::find()
.filter(user_tier::Column::Title.eq("Premium"))
.one(&db_conn)
.await?
.context("Could not find 'Premium' Tier in user-database")?;
let txn = db_conn.begin().await?;
match user_balance {
Some(user_balance) => {
let balance_plus_amount = user_balance.available_balance + amount;
info!("New user balance is: {:?}", balance_plus_amount);
// Update the entry, adding the balance
let mut active_user_balance = user_balance.into_active_model();
active_user_balance.available_balance = sea_orm::Set(balance_plus_amount);
let txn = db_conn.begin().await?;
match user_balance {
Some(user_balance) => {
let balance_plus_amount = user_balance.available_balance + amount;
info!("New user balance is: {:?}", balance_plus_amount);
// Update the entry, adding the balance
let mut active_user_balance = user_balance.into_active_model();
active_user_balance.available_balance = sea_orm::Set(balance_plus_amount);
if balance_plus_amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
if balance_plus_amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
}
debug!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance
}
None => {
// Create the entry with the respective balance
let active_user_balance = balance::ActiveModel {
available_balance: sea_orm::ActiveValue::Set(amount),
user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
debug!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance
}
None => {
// Create the entry with the respective balance
let active_user_balance = balance::ActiveModel {
available_balance: sea_orm::ActiveValue::Set(amount),
user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
if amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
}
if amount >= Decimal::new(10, 0) {
// Also make the user premium at this point ...
let mut active_recipient = recipient.clone().into_active_model();
// Make the recipient premium "Effectively Unlimited"
active_recipient.user_tier_id = sea_orm::Set(premium_user_tier.id);
active_recipient.save(&txn).await?;
info!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance // .try_into_model().unwrap()
}
};
debug!("Setting tx_hash: {:?}", tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel {
tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
amount: sea_orm::ActiveValue::Set(amount),
deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
info!("New user balance model is: {:?}", active_user_balance);
active_user_balance.save(&txn).await?;
// txn.commit().await?;
// user_balance // .try_into_model().unwrap()
}
};
debug!("Setting tx_hash: {:?}", tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel {
tx_hash: sea_orm::ActiveValue::Set(hex::encode(tx_hash)),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
amount: sea_orm::ActiveValue::Set(amount),
deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
};
receipt.save(&txn).await?;
txn.commit().await?;
debug!("Saved to db");
receipt.save(&txn).await?;
txn.commit().await?;
debug!("Saved to db");
let response = (
StatusCode::CREATED,
Json(json!({
"tx_hash": tx_hash,
"amount": amount
})),
)
.into_response();
let response = (
StatusCode::CREATED,
Json(json!({
"tx_hash": tx_hash,
"amount": amount
})),
)
.into_response();
// Return early if the log was added, assume there is at most one valid log per transaction
return Ok(response);
}
// Return early if the log was added, assume there is at most one valid log per transaction
return Ok(response);
}
*/
Err(Web3ProxyError::BadRequest(
"No such transaction was found, or token is not supported!".to_string(),

@ -40,7 +40,7 @@ pub async fn user_referral_link_get(
// Then get the referral token. If one doesn't exist, create one
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::UserId.eq(user.id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?;
let (referral_code, status_code) = match user_referrer {

@ -37,7 +37,7 @@ pub async fn rpc_keys_get(
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?;
@ -103,7 +103,7 @@ pub async fn rpc_keys_management(
rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.filter(rpc_key::Column::Id.eq(existing_key_id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?
.web3_context("key does not exist or is not controlled by this bearer token")?

@ -49,7 +49,7 @@ pub async fn user_revert_logs_get(
let uks = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?;
@ -72,7 +72,7 @@ pub async fn user_revert_logs_get(
// query the database for number of items and pages
let pages_result = q
.clone()
.paginate(db_replica.conn(), page_size)
.paginate(db_replica.as_ref(), page_size)
.num_items_and_pages()
.await?;
@ -81,7 +81,7 @@ pub async fn user_revert_logs_get(
// query the database for the revert logs
let revert_logs = q
.paginate(db_replica.conn(), page_size)
.paginate(db_replica.as_ref(), page_size)
.fetch_page(page)
.await?;

@ -16,7 +16,7 @@ use entities::{balance, rpc_key, secondary_user, user, user_tier};
use ethers::types::Address;
use hashbrown::HashMap;
use http::StatusCode;
use log::{debug, warn};
use log::{debug, trace, warn};
use migration::sea_orm;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::ActiveModelTrait;
@ -47,7 +47,7 @@ pub async fn get_keys_as_subuser(
// Get all secondary users that have access to this rpc key
let secondary_user_entities = secondary_user::Entity::find()
.filter(secondary_user::Column::UserId.eq(subuser.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await?
.into_iter()
.map(|x| (x.rpc_secret_key_id, x))
@ -64,7 +64,7 @@ pub async fn get_keys_as_subuser(
),
)
.find_also_related(user::Entity)
.all(db_replica.conn())
.all(db_replica.as_ref())
.await?;
// TODO: Merge rpc-key with respective user (join is probably easiest ...)
@ -119,7 +119,7 @@ pub async fn get_subusers(
// Get the rpc key id
let rpc_key = rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(Uuid::from(rpc_key)))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::BadRequest(
"The provided RPC key cannot be found".to_string(),
@ -128,7 +128,7 @@ pub async fn get_subusers(
// Get all secondary users that have access to this rpc key
let secondary_user_entities = secondary_user::Entity::find()
.filter(secondary_user::Column::RpcSecretKeyId.eq(rpc_key.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await?
.into_iter()
.map(|x| (x.user_id, x))
@ -144,7 +144,7 @@ pub async fn get_subusers(
.collect::<Vec<_>>(),
),
)
.all(db_replica.conn())
.all(db_replica.as_ref())
.await?;
warn!("Subusers are: {:?}", subusers);
@ -181,7 +181,7 @@ pub async fn modify_subuser(
.db_replica()
.context("getting replica db for user's revert logs")?;
debug!("Parameters are: {:?}", params);
trace!("Parameters are: {:?}", params);
// Then, distinguish the endpoint to modify
let rpc_key_to_modify: Ulid = params
@ -245,12 +245,12 @@ pub async fn modify_subuser(
// ---------------------------
let subuser = user::Entity::find()
.filter(user::Column::Address.eq(subuser_address.as_ref()))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?;
let rpc_key_entity = rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(Uuid::from(rpc_key_to_modify)))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await?
.ok_or(Web3ProxyError::BadRequest(
"Provided RPC key does not exist!".to_owned(),
@ -316,7 +316,7 @@ pub async fn modify_subuser(
// the user is already registered
let subuser_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(subuser.id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?;
@ -335,7 +335,7 @@ pub async fn modify_subuser(
let subuser_entry_secondary_user = secondary_user::Entity::find()
.filter(secondary_user::Column::UserId.eq(subuser.id))
.filter(secondary_user::Column::RpcSecretKeyId.eq(rpc_key_entity.id))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.web3_context("failed using the db to check for a subuser")?;

@ -1,5 +1,5 @@
use crate::app::DatabaseReplica;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::relational_db::{DatabaseConnection, DatabaseReplica};
use crate::{app::Web3ProxyApp, user_token::UserBearerToken};
use anyhow::Context;
use axum::{
@ -10,7 +10,7 @@ use chrono::{NaiveDateTime, Utc};
use entities::login;
use hashbrown::HashMap;
use log::{debug, warn};
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
/// get the attached address for the given bearer token.
@ -42,7 +42,7 @@ pub async fn get_user_id_from_params(
let user_login = login::Entity::find()
.filter(login::Column::BearerToken.eq(user_bearer_token.uuid()))
.one(db_replica.conn())
.one(db_replica.as_ref())
.await
.context("database error while querying for user")?
.ok_or(Web3ProxyError::AccessDenied)?;

@ -1,5 +1,5 @@
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::response_cache::JsonRpcResponseData;
use crate::response_cache::JsonRpcResponseEnum;
use derive_more::From;
use ethers::prelude::ProviderError;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
@ -9,14 +9,19 @@ use serde_json::value::{to_raw_value, RawValue};
use std::borrow::Cow;
use std::fmt;
pub trait JsonRpcParams = Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
// TODO: &str here instead of String should save a lot of allocations
// TODO: generic type for params?
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
/// id could be a stricter type, but many rpcs do things against the spec
pub id: Box<RawValue>,
pub method: String,
pub params: Option<serde_json::Value>,
/// TODO: skip serializing if serde_json::Value::Null
pub params: serde_json::Value,
}
#[derive(From)]
@ -27,7 +32,7 @@ pub enum JsonRpcId {
}
impl JsonRpcId {
pub fn to_raw_value(&self) -> Box<RawValue> {
pub fn to_raw_value(self) -> Box<RawValue> {
// TODO: is this a good way to do this? we should probably use references
match self {
Self::None => {
@ -36,17 +41,13 @@ impl JsonRpcId {
Self::Number(x) => {
serde_json::from_value(json!(x)).expect("number id should always work")
}
Self::String(x) => serde_json::from_str(x).expect("string id should always work"),
Self::String(x) => serde_json::from_str(&x).expect("string id should always work"),
}
}
}
impl JsonRpcRequest {
pub fn new(
id: JsonRpcId,
method: String,
params: Option<serde_json::Value>,
) -> anyhow::Result<Self> {
pub fn new(id: JsonRpcId, method: String, params: serde_json::Value) -> anyhow::Result<Self> {
let x = Self {
jsonrpc: "2.0".to_string(),
id: id.to_raw_value(),
@ -77,6 +78,20 @@ pub enum JsonRpcRequestEnum {
Single(JsonRpcRequest),
}
impl JsonRpcRequestEnum {
pub fn first_id(&self) -> Web3ProxyResult<Box<RawValue>> {
match self {
Self::Batch(x) => match x.first() {
Some(x) => Ok(x.id.clone()),
None => Err(Web3ProxyError::BadRequest(
"no requests in the batch".to_string(),
)),
},
Self::Single(x) => Ok(x.id.clone()),
}
}
}
impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
@ -89,7 +104,6 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
Id,
Method,
Params,
// TODO: jsonrpc here, too?
}
struct JsonRpcBatchVisitor;
@ -162,16 +176,11 @@ impl<'de> Deserialize<'de> for JsonRpcRequestEnum {
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
let method = method.ok_or_else(|| de::Error::missing_field("method"))?;
let params: Option<serde_json::Value> = match params {
None => Some(serde_json::Value::Array(vec![])),
Some(x) => Some(x),
};
let single = JsonRpcRequest {
jsonrpc,
id,
method,
params,
params: params.unwrap_or_default(),
};
Ok(JsonRpcRequestEnum::Single(single))
@ -339,10 +348,12 @@ impl JsonRpcForwardedResponse {
}
}
pub fn from_response_data(data: JsonRpcResponseData, id: Box<RawValue>) -> Self {
pub fn from_response_data(data: JsonRpcResponseEnum<Box<RawValue>>, id: Box<RawValue>) -> Self {
match data {
JsonRpcResponseData::Result { value, .. } => Self::from_raw_response(value, id),
JsonRpcResponseData::Error { value, .. } => JsonRpcForwardedResponse {
JsonRpcResponseEnum::Result { value, .. } => Self::from_raw_response(value, id),
JsonRpcResponseEnum::RpcError {
error_data: value, ..
} => JsonRpcForwardedResponse {
jsonrpc: "2.0",
id,
result: None,
@ -373,7 +384,7 @@ mod tests {
assert_eq!(output.id.to_string(), "1");
assert_eq!(output.method, "eth_blockNumber");
assert_eq!(output.params.unwrap().to_string(), "[]");
assert_eq!(output.params.to_string(), "[]");
// test deserializing it into an enum
let output: JsonRpcRequestEnum = serde_json::from_str(input).unwrap();
@ -393,7 +404,7 @@ mod tests {
assert_eq!(output[0].id.to_string(), "27");
assert_eq!(output[0].method, "eth_getCode");
assert_eq!(
output[0].params.as_ref().unwrap().to_string(),
output[0].params.to_string(),
r#"["0x5ba1e12693dc8f9c48aad8770482f4739beed696","0xe0e6a4"]"#
);

@ -0,0 +1,28 @@
use crate::app::Web3ProxyApp;
// use crate::frontend::errors::Web3ProxyError;
use ethers::providers::{JsonRpcClient, ProviderError};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
/// use the app as an ether's JsonRpcClient
#[derive(Debug)]
struct Web3ProxyAppClient(Arc<Web3ProxyApp>);
#[async_trait::async_trait]
impl JsonRpcClient for Web3ProxyAppClient {
type Error = ProviderError;
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned + Send,
{
todo!("figure out traits");
// match self.0.internal_request(method, &params).await {
// Ok(x) => Ok(x),
// Err(Web3ProxyError::EthersProvider(err)) => Err(err),
// Err(err) => Err(ProviderError::CustomError(format!("{}", err))),
// }
}
}

@ -1,3 +1,5 @@
#![feature(trait_alias)]
pub mod admin_queries;
pub mod app;
pub mod block_number;
@ -5,9 +7,11 @@ pub mod config;
pub mod frontend;
pub mod http_params;
pub mod jsonrpc;
pub mod jsonrpc_client;
pub mod pagerduty;
pub mod prometheus;
pub mod referral_code;
pub mod relational_db;
pub mod response_cache;
pub mod rpcs;
pub mod stats;

@ -0,0 +1,119 @@
use derive_more::From;
use log::{debug, info, warn};
use migration::sea_orm::{self, ConnectionTrait, Database};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use std::time::Duration;
use tokio::time::sleep;
pub use migration::sea_orm::DatabaseConnection;
/// Simple wrapper so that we can keep track of read only connections.
/// WARNING! This does not actually block writing in the compiler!
/// There will be runtime errors if this is used to write though.
#[derive(Clone, From)]
pub struct DatabaseReplica(DatabaseConnection);
impl AsRef<DatabaseConnection> for DatabaseReplica {
fn as_ref(&self) -> &DatabaseConnection {
&self.0
}
}
pub async fn get_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
// TODO: scrub credentials and then include the db_url in logs
info!("Connecting to db");
let mut db_opt = sea_orm::ConnectOptions::new(db_url);
// TODO: load all these options from the config file. i think mysql default max is 100
// TODO: sqlx logging only in debug. way too verbose for production
db_opt
.connect_timeout(Duration::from_secs(30))
.min_connections(min_connections)
.max_connections(max_connections)
.sqlx_logging(false);
// .sqlx_logging_level(log::LevelFilter::Info);
Database::connect(db_opt).await
}
pub async fn drop_migration_lock(db_conn: &DatabaseConnection) -> Result<(), DbErr> {
let db_backend = db_conn.get_database_backend();
let drop_lock_statment = db_backend.build(Table::drop().table(Alias::new("migration_lock")));
db_conn.execute(drop_lock_statment).await?;
debug!("migration lock unlocked");
Ok(())
}
/// Be super careful with override_existing_lock! It is very important that only one process is running the migrations at a time!
pub async fn migrate_db(
db_conn: &DatabaseConnection,
override_existing_lock: bool,
) -> Result<(), DbErr> {
let db_backend = db_conn.get_database_backend();
// TODO: put the timestamp and hostname into this as columns?
let create_lock_statment = db_backend.build(
Table::create()
.table(Alias::new("migration_lock"))
.col(ColumnDef::new(Alias::new("locked")).boolean().default(true)),
);
loop {
if Migrator::get_pending_migrations(db_conn).await?.is_empty() {
info!("no migrations to apply");
return Ok(());
}
// there are migrations to apply
// acquire a lock
if let Err(err) = db_conn.execute(create_lock_statment.clone()).await {
if override_existing_lock {
warn!("OVERRIDING EXISTING LOCK in 10 seconds! ctrl+c now if other migrations are actually running!");
sleep(Duration::from_secs(10)).await
} else {
debug!("Unable to acquire lock. if you are positive no migration is running, run \"web3_proxy_cli drop_migration_lock\". err={:?}", err);
// TODO: exponential backoff with jitter?
sleep(Duration::from_secs(1)).await;
continue;
}
}
debug!("migration lock acquired");
break;
}
let migration_result = Migrator::up(db_conn, None).await;
// drop the distributed lock
drop_migration_lock(db_conn).await?;
// return if migrations erred
migration_result
}
/// Connect to the database and run migrations
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
max_connections: u32,
) -> Result<DatabaseConnection, DbErr> {
// TODO: this seems to fail silently
let db_conn = get_db(db_url, min_connections, max_connections).await?;
migrate_db(&db_conn, false).await?;
Ok(db_conn)
}

@ -2,66 +2,114 @@ use crate::{
frontend::errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain::ArcBlock,
};
use derive_more::From;
use ethers::providers::ProviderError;
use ethers::{providers::ProviderError, types::U64};
use hashbrown::hash_map::DefaultHashBuilder;
use quick_cache_ttl::{CacheWithTTL, Weighter};
use serde_json::value::RawValue;
use std::{
borrow::Cow,
hash::{Hash, Hasher},
hash::{BuildHasher, Hash, Hasher},
num::NonZeroU32,
};
#[derive(Clone, Debug, From, PartialEq, Eq)]
pub struct JsonRpcResponseCacheKey {
pub from_block: Option<ArcBlock>,
pub to_block: Option<ArcBlock>,
pub method: String,
pub params: Option<serde_json::Value>,
pub cache_errors: bool,
#[derive(Clone, Debug, Eq, From)]
pub struct JsonRpcQueryCacheKey {
hash: u64,
from_block_num: Option<U64>,
to_block_num: Option<U64>,
cache_errors: bool,
}
impl Hash for JsonRpcResponseCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.from_block.as_ref().map(|x| x.hash).hash(state);
self.to_block.as_ref().map(|x| x.hash).hash(state);
self.method.hash(state);
// make sure preserve_order feature is OFF
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
impl JsonRpcQueryCacheKey {
pub fn from_block_num(&self) -> Option<U64> {
self.from_block_num
}
pub fn to_block_num(&self) -> Option<U64> {
self.to_block_num
}
pub fn cache_errors(&self) -> bool {
self.cache_errors
}
}
pub type JsonRpcResponseCache =
CacheWithTTL<JsonRpcResponseCacheKey, JsonRpcResponseData, JsonRpcResponseWeigher>;
#[derive(Clone)]
pub struct JsonRpcResponseWeigher;
#[derive(Clone, Debug)]
pub enum JsonRpcResponseData {
Result {
value: Box<RawValue>,
num_bytes: NonZeroU32,
},
Error {
value: JsonRpcErrorData,
num_bytes: NonZeroU32,
},
impl PartialEq for JsonRpcQueryCacheKey {
fn eq(&self, other: &Self) -> bool {
self.hash.eq(&other.hash)
}
}
impl JsonRpcResponseData {
pub fn num_bytes(&self) -> NonZeroU32 {
// TODO: dry this somehow
match self {
JsonRpcResponseData::Result { num_bytes, .. } => *num_bytes,
JsonRpcResponseData::Error { num_bytes, .. } => *num_bytes,
impl Hash for JsonRpcQueryCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
// TODO: i feel like this hashes twice. oh well
self.hash.hash(state);
}
}
impl JsonRpcQueryCacheKey {
pub fn new(
from_block: Option<ArcBlock>,
to_block: Option<ArcBlock>,
method: &str,
params: &serde_json::Value,
cache_errors: bool,
) -> Self {
let from_block_num = from_block.as_ref().and_then(|x| x.number);
let to_block_num = to_block.as_ref().and_then(|x| x.number);
let mut hasher = DefaultHashBuilder::default().build_hasher();
from_block.as_ref().and_then(|x| x.hash).hash(&mut hasher);
to_block.as_ref().and_then(|x| x.hash).hash(&mut hasher);
method.hash(&mut hasher);
// TODO: make sure preserve_order feature is OFF
// TODO: is there a faster way to do this?
params.to_string().hash(&mut hasher);
cache_errors.hash(&mut hasher);
let hash = hasher.finish();
Self {
hash,
from_block_num,
to_block_num,
cache_errors,
}
}
}
impl From<serde_json::Value> for JsonRpcResponseData {
pub type JsonRpcResponseCache =
CacheWithTTL<JsonRpcQueryCacheKey, JsonRpcResponseEnum<Box<RawValue>>, JsonRpcResponseWeigher>;
#[derive(Clone)]
pub struct JsonRpcResponseWeigher;
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
#[derive(Clone, Debug)]
pub enum JsonRpcResponseEnum<R> {
Result {
value: R,
num_bytes: NonZeroU32,
},
RpcError {
error_data: JsonRpcErrorData,
num_bytes: NonZeroU32,
},
}
// TODO: impl for other inner result types?
impl<R> JsonRpcResponseEnum<R> {
pub fn num_bytes(&self) -> NonZeroU32 {
match self {
Self::Result { num_bytes, .. } => *num_bytes,
Self::RpcError { num_bytes, .. } => *num_bytes,
}
}
}
impl From<serde_json::Value> for JsonRpcResponseEnum<Box<RawValue>> {
fn from(value: serde_json::Value) -> Self {
let value = RawValue::from_string(value.to_string()).unwrap();
@ -69,7 +117,7 @@ impl From<serde_json::Value> for JsonRpcResponseData {
}
}
impl From<Box<RawValue>> for JsonRpcResponseData {
impl From<Box<RawValue>> for JsonRpcResponseEnum<Box<RawValue>> {
fn from(value: Box<RawValue>) -> Self {
let num_bytes = value.get().len();
@ -79,14 +127,47 @@ impl From<Box<RawValue>> for JsonRpcResponseData {
}
}
impl From<JsonRpcErrorData> for JsonRpcResponseData {
impl<R> TryFrom<Web3ProxyError> for JsonRpcResponseEnum<R> {
type Error = Web3ProxyError;
fn try_from(value: Web3ProxyError) -> Result<Self, Self::Error> {
match value {
Web3ProxyError::EthersProvider(provider_err) => {
let err = JsonRpcErrorData::try_from(provider_err)?;
Ok(err.into())
}
err => Err(err),
}
}
}
impl TryFrom<Result<Box<RawValue>, Web3ProxyError>> for JsonRpcResponseEnum<Box<RawValue>> {
type Error = Web3ProxyError;
fn try_from(value: Result<Box<RawValue>, Web3ProxyError>) -> Result<Self, Self::Error> {
match value {
Ok(x) => Ok(x.into()),
Err(err) => {
let x: Self = err.try_into()?;
Ok(x)
}
}
}
}
impl<R> From<JsonRpcErrorData> for JsonRpcResponseEnum<R> {
fn from(value: JsonRpcErrorData) -> Self {
// TODO: wrap the error in a complete response?
let num_bytes = serde_json::to_string(&value).unwrap().len();
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
Self::Error { value, num_bytes }
Self::RpcError {
error_data: value,
num_bytes,
}
}
}
@ -126,16 +207,17 @@ impl TryFrom<ProviderError> for JsonRpcErrorData {
}
}
impl<K, Q> Weighter<K, Q, JsonRpcResponseData> for JsonRpcResponseWeigher {
fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseData) -> NonZeroU32 {
impl<K, Q> Weighter<K, Q, JsonRpcResponseEnum<Box<RawValue>>> for JsonRpcResponseWeigher {
fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum<Box<RawValue>>) -> NonZeroU32 {
value.num_bytes()
}
}
#[cfg(test)]
mod tests {
use super::{JsonRpcResponseData, JsonRpcResponseWeigher};
use super::{JsonRpcResponseEnum, JsonRpcResponseWeigher};
use quick_cache_ttl::CacheWithTTL;
use serde_json::value::RawValue;
use std::{num::NonZeroU32, time::Duration};
#[tokio::test(start_paused = true)]
@ -143,45 +225,48 @@ mod tests {
let max_item_weight = 200;
let weight_capacity = 1_000;
let response_cache: CacheWithTTL<u32, JsonRpcResponseData, JsonRpcResponseWeigher> =
CacheWithTTL::new_with_weights(
"test",
5,
max_item_weight.try_into().unwrap(),
weight_capacity,
JsonRpcResponseWeigher,
Duration::from_secs(2),
)
.await;
let test_cache: CacheWithTTL<
u32,
JsonRpcResponseEnum<Box<RawValue>>,
JsonRpcResponseWeigher,
> = CacheWithTTL::new_with_weights(
"test",
5,
max_item_weight.try_into().unwrap(),
weight_capacity,
JsonRpcResponseWeigher,
Duration::from_secs(2),
)
.await;
let small_data: JsonRpcResponseData = JsonRpcResponseData::Result {
let small_data = JsonRpcResponseEnum::Result {
value: Default::default(),
num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(),
};
let max_sized_data = JsonRpcResponseData::Result {
let max_sized_data = JsonRpcResponseEnum::Result {
value: Default::default(),
num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(),
};
let oversized_data = JsonRpcResponseData::Result {
let oversized_data = JsonRpcResponseEnum::Result {
value: Default::default(),
num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(),
};
response_cache.try_insert(0, small_data).unwrap();
test_cache.try_insert(0, small_data).unwrap();
response_cache.get(&0).unwrap();
test_cache.get(&0).unwrap();
response_cache.try_insert(1, max_sized_data).unwrap();
test_cache.try_insert(1, max_sized_data).unwrap();
response_cache.get(&0).unwrap();
response_cache.get(&1).unwrap();
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
response_cache.try_insert(2, oversized_data).unwrap_err();
test_cache.try_insert(2, oversized_data).unwrap_err();
response_cache.get(&0).unwrap();
response_cache.get(&1).unwrap();
assert!(response_cache.get(&2).is_none());
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
assert!(test_cache.get(&2).is_none());
}
}

@ -1,15 +1,14 @@
///! Keep track of the blockchain as seen by a Web3Rpcs.
//! Keep track of the blockchain as seen by a Web3Rpcs.
use super::consensus::ConsensusFinder;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::config::BlockAndRpc;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::response_cache::JsonRpcResponseData;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, trace, warn, Level};
use log::{debug, trace, warn};
use quick_cache_ttl::CacheWithTTL;
use serde::ser::SerializeStruct;
use serde::Serialize;
@ -199,7 +198,6 @@ impl Web3Rpcs {
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
/// TODO: return `Web3ProxyResult<Option<ArcBlock>>`?
pub async fn block(
&self,
authorization: &Arc<Authorization>,
@ -215,57 +213,30 @@ impl Web3Rpcs {
// block not in cache. we need to ask an rpc for it
let get_block_params = (*hash, false);
// TODO: if error, retry?
let block: Web3ProxyBlock = match rpc {
Some(rpc) => rpc
.request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
authorization.clone(),
)
let block: Option<ArcBlock> = if let Some(rpc) = rpc {
// TODO: request_with_metadata would probably be better
rpc.authorized_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
authorization,
None,
)
.await?
} else {
// TODO: request_with_metadata would probably be better
self.internal_request::<_, Option<ArcBlock>>("eth_getBlockByHash", &get_block_params)
.await?
.and_then(|x| {
if x.number.is_none() {
None
} else {
x.try_into().ok()
}
})
.web3_context("no block!")?,
None => {
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorization?
// TODO: think more about this wait_for_sync
let response = self
.try_send_best_connection(authorization, &request, None, None, None)
.await?;
let value = match response {
JsonRpcResponseData::Error { .. } => {
return Err(anyhow::anyhow!("failed fetching block").into());
}
JsonRpcResponseData::Result { value, .. } => value,
};
let block: Option<ArcBlock> = serde_json::from_str(value.get())?;
let block: ArcBlock = block.web3_context("no block in the response")?;
// TODO: received time is going to be weird
Web3ProxyBlock::try_from(block)?
}
};
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
let block = self.try_cache_block(block, false).await?;
Ok(block)
match block {
Some(block) => {
let block = self.try_cache_block(block.try_into()?, false).await?;
Ok(block)
}
// TODO: better error. some blocks are known, just not this one
None => Err(Web3ProxyError::NoBlocksKnown),
}
}
/// Convenience method to get the cannonical block at a given block height.
@ -283,6 +254,7 @@ impl Web3Rpcs {
/// Get the heaviest chain's block from cache or backend rpc
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
/// TODO: take a RequestMetadata
pub async fn cannonical_block(
&self,
authorization: &Arc<Authorization>,
@ -332,24 +304,13 @@ impl Web3Rpcs {
}
// block number not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: this error is too broad
let response = self
.try_send_best_connection(authorization, &request, None, Some(num), None)
.await?;
.internal_request::<_, Option<ArcBlock>>("eth_getBlockByNumber", &(*num, false))
.await?
.ok_or(Web3ProxyError::NoBlocksKnown)?;
let value = match response {
JsonRpcResponseData::Error { .. } => {
return Err(anyhow::anyhow!("failed fetching block").into());
}
JsonRpcResponseData::Result { value, .. } => value,
};
let block: ArcBlock = serde_json::from_str(value.get())?;
let block = Web3ProxyBlock::try_from(block)?;
let block = Web3ProxyBlock::try_from(response)?;
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
let block = self.try_cache_block(block, true).await?;

@ -1,4 +1,4 @@
///! Load balanced communication with a group of web3 rpc providers
//! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
@ -8,8 +8,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest};
use crate::response_cache::JsonRpcResponseData;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
use counter::Counter;
@ -20,7 +19,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use log::{debug, error, info, trace, warn, Level};
use log::{debug, error, info, trace, warn};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
use quick_cache_ttl::CacheWithTTL;
@ -359,22 +358,21 @@ impl Web3Rpcs {
/// Send the same request to all the handles. Returning the most common success or most common error.
/// TODO: option to return the fastest response and handles for all the others instead?
pub async fn try_send_parallel_requests(
pub async fn try_send_parallel_requests<P: JsonRpcParams>(
&self,
active_request_handles: Vec<OpenRequestHandle>,
method: &str,
params: Option<&serde_json::Value>,
error_level: Level,
params: &P,
// TODO: remove this box once i figure out how to do the options
) -> Web3ProxyResult<JsonRpcResponseData> {
) -> Result<Box<RawValue>, ProviderError> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
// TODO: iter stream
let responses = active_request_handles
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &json!(&params), error_level.into())
.await;
let result: Result<Box<RawValue>, _> =
active_request_handle.request(method, &json!(&params)).await;
result
})
.collect::<FuturesUnordered<_>>()
@ -400,6 +398,7 @@ impl Web3Rpcs {
counts.update([s].into_iter());
}
// return the most_common success if any. otherwise return the most_common error
for (most_common, _) in counts.most_common_ordered() {
let most_common = count_map
.remove(&most_common)
@ -408,17 +407,15 @@ impl Web3Rpcs {
match most_common {
Ok(x) => {
// return the most common success
return Ok(x.into());
return Ok(x);
}
Err(err) => {
if any_ok_with_json_result {
// the most common is an error, but there is an Ok in here somewhere. loop to find it
// the most common is an error, but there is an Ok in here somewhere. continue the loop to find it
continue;
}
let err: JsonRpcErrorData = err.try_into()?;
return Ok(err.into());
return Err(err);
}
}
}
@ -430,6 +427,7 @@ impl Web3Rpcs {
async fn _best_available_rpc(
&self,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
potential_rpcs: &[Arc<Web3Rpc>],
skip: &mut Vec<Arc<Web3Rpc>>,
) -> OpenRequestResult {
@ -446,7 +444,11 @@ impl Web3Rpcs {
skip.push(Arc::clone(faster_rpc));
// just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits
match faster_rpc.try_request_handle(authorization).await {
// TODO: what error_handler?
match faster_rpc
.try_request_handle(authorization, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", faster_rpc);
return OpenRequestResult::Handle(handle);
@ -483,17 +485,22 @@ impl Web3Rpcs {
pub async fn wait_for_best_rpc(
&self,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip_rpcs: &mut Vec<Arc<Web3Rpc>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_wait: Option<Duration>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
let mut earliest_retry_at: Option<Instant> = None;
// TODO: pass db_conn to the "default" authorization for revert logging
let authorization = request_metadata
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
if self.watch_consensus_head_sender.is_none() {
trace!("this Web3Rpcs is not tracking head blocks. pick any server");
// this Web3Rpcs is not tracking head blocks. pick any server
let by_name = self.by_name.load();
@ -517,7 +524,7 @@ impl Web3Rpcs {
potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng());
match self
._best_available_rpc(authorization, &potential_rpcs, skip_rpcs)
._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs)
.await
{
OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)),
@ -568,7 +575,12 @@ impl Web3Rpcs {
// we have enough potential rpcs. try to load balance
match self
._best_available_rpc(authorization, &potential_rpcs, skip_rpcs)
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
@ -611,7 +623,12 @@ impl Web3Rpcs {
if potential_rpcs.len() >= self.min_head_rpcs {
// we have enough potential rpcs. try to load balance
match self
._best_available_rpc(authorization, &potential_rpcs, skip_rpcs)
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
@ -635,7 +652,12 @@ impl Web3Rpcs {
if !potential_rpcs.is_empty() {
// even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways
match self
._best_available_rpc(authorization, &potential_rpcs, skip_rpcs)
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
@ -703,11 +725,12 @@ impl Web3Rpcs {
// TODO: this is broken
pub async fn all_connections(
&self,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
allow_backups: bool,
error_level: Option<RequestErrorHandler>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
@ -745,6 +768,10 @@ impl Web3Rpcs {
trace!("all_rpcs: {:#?}", all_rpcs);
let authorization = request_metadata
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
for rpc in itertools::chain(synced_rpcs, all_rpcs) {
if max_count == 0 {
break;
@ -778,7 +805,7 @@ impl Web3Rpcs {
}
// check rate limits and increment our connection counter
match rpc.try_request_handle(authorization).await {
match rpc.try_request_handle(&authorization, error_level).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
trace!("{} is rate limited. skipping", rpc);
@ -806,16 +833,25 @@ impl Web3Rpcs {
Err(earliest_retry_at)
}
/// be sure there is a timeout on this or it might loop forever
/// TODO: think more about wait_for_sync
pub async fn try_send_best_connection(
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
method: &str,
params: &P,
) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata(method, params, None, None, None)
.await
}
/// Track stats
pub async fn request_with_metadata<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<JsonRpcResponseData> {
) -> Web3ProxyResult<R> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
@ -823,19 +859,21 @@ impl Web3Rpcs {
let start = Instant::now();
// TODO: get from config
// TODO: get from config or arguments
let max_wait = Duration::from_secs(10);
let error_handler = Some(RequestErrorHandler::Save);
// TODO: the loop here feels somewhat redundant with the loop in best_available_rpc
while start.elapsed() < max_wait {
match self
.wait_for_best_rpc(
authorization,
request_metadata,
&mut skip_rpcs,
min_block_needed,
max_block_needed,
None,
error_handler,
)
.await?
{
@ -850,16 +888,7 @@ impl Web3Rpcs {
let is_backup_response = rpc.backup;
// TODO: get the log percent from the user data
let response_result: Result<Box<RawValue>, _> = active_request_handle
.request(
&request.method,
&json!(request.params),
RequestErrorHandler::Save,
)
.await;
match response_result {
match active_request_handle.request::<P, R>(method, params).await {
Ok(response) => {
// TODO: if there are multiple responses being aggregated, this will only use the last server's backup type
if let Some(request_metadata) = request_metadata {
@ -868,7 +897,7 @@ impl Web3Rpcs {
.store(is_backup_response, Ordering::Release);
}
return Ok(response.into());
return Ok(response);
}
Err(error) => {
// trace!(?response, "rpc error");
@ -963,7 +992,7 @@ impl Web3Rpcs {
// hard_limit_until.send_replace(retry_at);
// }
return Ok(error.into());
return Err(error.into());
}
}
}
@ -1005,11 +1034,9 @@ impl Web3Rpcs {
}
if let Some(err) = method_not_available_response {
// TODO: this error response is likely the user's fault. do we actually want it marked as an error? maybe keep user and server error bools?
// TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend
// TODO: this is too verbose!
debug!("{}", serde_json::to_string(&err)?);
return Ok(err.into());
// this error response is likely the user's fault
// TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature
return Err(err.into());
}
let num_conns = self.by_name.load().len();
@ -1046,7 +1073,7 @@ impl Web3Rpcs {
// TODO: what error code?
// cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1}
Ok(JsonRpcErrorData {
Err(JsonRpcErrorData {
message: Cow::Borrowed("Requested data is not available"),
code: -32043,
data: None,
@ -1056,32 +1083,33 @@ impl Web3Rpcs {
/// be sure there is a timeout on this or it might loop forever
#[allow(clippy::too_many_arguments)]
pub async fn try_send_all_synced_connections(
pub async fn try_send_all_synced_connections<P: JsonRpcParams>(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
error_level: Level,
max_count: Option<usize>,
always_include_backups: bool,
) -> Web3ProxyResult<JsonRpcResponseData> {
error_level: Option<RequestErrorHandler>,
max_sends: Option<usize>,
include_backups: bool,
) -> Web3ProxyResult<Box<RawValue>> {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let start = Instant::now();
// TODO: get from config or function arguments
let max_wait = Duration::from_secs(5);
// TODO: get from config
let max_wait = Duration::from_secs(3);
let wait_until = Instant::now() + max_wait;
while start.elapsed() < max_wait {
while Instant::now() < wait_until {
match self
.all_connections(
authorization,
request_metadata,
min_block_needed,
max_block_needed,
max_count,
always_include_backups,
max_sends,
include_backups,
error_level,
)
.await
{
@ -1107,14 +1135,11 @@ impl Web3Rpcs {
.store(only_backups_used, Ordering::Release);
}
return self
.try_send_parallel_requests(
active_request_handles,
request.method.as_ref(),
request.params.as_ref(),
error_level,
)
.await;
let x = self
.try_send_parallel_requests(active_request_handles, method, params)
.await?;
return Ok(x);
}
Err(None) => {
warn!(
@ -1127,21 +1152,26 @@ impl Web3Rpcs {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
tokio::select! {
_ = sleep_until(wait_until) => break,
_ = watch_consensus_rpcs.changed() => {
watch_consensus_rpcs.borrow_and_update();
}
}
continue;
}
Err(Some(retry_at)) => {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!("All rate limits exceeded. Sleeping");
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::AcqRel);
}
if retry_at > wait_until {
warn!("All rate limits exceeded. And sleeping would take too long");
break;
}
warn!("All rate limits exceeded. Sleeping");
tokio::select! {
_ = sleep_until(retry_at) => {}
_ = watch_consensus_rpcs.changed() => {
@ -1157,19 +1187,21 @@ impl Web3Rpcs {
Err(Web3ProxyError::NoServersSynced)
}
pub async fn try_proxy_connection(
pub async fn try_proxy_connection<P: JsonRpcParams, R: JsonRpcResultData>(
&self,
authorization: &Arc<Authorization>,
request: &JsonRpcRequest,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<JsonRpcResponseData> {
match authorization.checks.proxy_mode {
) -> Web3ProxyResult<R> {
let proxy_mode = request_metadata.map(|x| x.proxy_mode()).unwrap_or_default();
match proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
self.try_send_best_connection(
authorization,
request,
self.request_with_metadata(
method,
params,
request_metadata,
min_block_needed,
max_block_needed,
@ -1502,7 +1534,7 @@ mod tests {
// all_backend_connections gives all non-backup servers regardless of sync status
assert_eq!(
rpcs.all_connections(&authorization, None, None, None, false)
rpcs.all_connections(None, None, None, None, false, None)
.await
.unwrap()
.len(),
@ -1512,12 +1544,12 @@ mod tests {
// best_synced_backend_connection which servers to be synced with the head block should not find any nodes
let x = rpcs
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(head_block.number.as_ref().unwrap()),
None,
Some(Duration::from_secs(0)),
Some(RequestErrorHandler::DebugLevel),
)
.await
.unwrap();
@ -1610,12 +1642,12 @@ mod tests {
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0))
Some(Duration::from_secs(0)),
None,
)
.await,
Ok(OpenRequestResult::Handle(_))
@ -1624,12 +1656,12 @@ mod tests {
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&0.into()),
None,
Some(Duration::from_secs(0)),
None,
)
.await,
Ok(OpenRequestResult::Handle(_))
@ -1638,12 +1670,12 @@ mod tests {
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
None,
)
.await,
Ok(OpenRequestResult::Handle(_))
@ -1652,12 +1684,12 @@ mod tests {
// future block should not get a handle
let future_rpc = rpcs
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&2.into()),
None,
Some(Duration::from_secs(0)),
None,
)
.await;
assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
@ -1791,12 +1823,12 @@ mod tests {
// TODO: test with and without passing the head_block.number?
let best_available_server = rpcs
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(head_block.number()),
None,
Some(Duration::from_secs(0)),
None,
)
.await;
@ -1809,12 +1841,12 @@ mod tests {
let _best_available_server_from_none = rpcs
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0)),
None,
)
.await;
@ -1822,12 +1854,12 @@ mod tests {
let best_archive_server = rpcs
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
None,
)
.await;
@ -1984,7 +2016,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let head_connections = rpcs
.all_connections(&authorization, Some(block_2.number()), None, None, false)
.all_connections(None, Some(block_2.number()), None, None, false, None)
.await;
debug!("head_connections: {:#?}", head_connections);
@ -1996,7 +2028,7 @@ mod tests {
);
let all_connections = rpcs
.all_connections(&authorization, Some(block_1.number()), None, None, false)
.all_connections(None, Some(block_1.number()), None, None, false, None)
.await;
debug!("all_connections: {:#?}", all_connections);
@ -2008,7 +2040,7 @@ mod tests {
);
let all_connections = rpcs
.all_connections(&authorization, None, None, None, false)
.all_connections(None, None, None, None, false, None)
.await;
debug!("all_connections: {:#?}", all_connections);

@ -1,4 +1,4 @@
///! Rate-limited communication with a web3 provider.
//! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
@ -6,6 +6,7 @@ use crate::app::{flatten_handle, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, TxHash, U64};
@ -208,16 +209,9 @@ impl Web3Rpc {
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
let handle = {
let new_connection = new_connection.clone();
let authorization = Arc::new(Authorization::internal(db_conn)?);
tokio::spawn(async move {
new_connection
.subscribe(
&authorization,
block_map,
block_sender,
chain_id,
tx_id_sender,
)
.subscribe(block_map, block_sender, chain_id, tx_id_sender)
.await
})
};
@ -239,10 +233,7 @@ impl Web3Rpc {
}
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
async fn check_block_data_limit(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
) -> anyhow::Result<Option<u64>> {
async fn check_block_data_limit(self: &Arc<Self>) -> anyhow::Result<Option<u64>> {
if !self.automatic_block_limit {
// TODO: is this a good thing to return?
return Ok(None);
@ -255,12 +246,11 @@ impl Web3Rpc {
// TODO: binary search between 90k and max?
// TODO: start at 0 or 1?
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let head_block_num_future = self.request::<Option<()>, U256>(
let head_block_num_future = self.internal_request::<_, U256>(
"eth_blockNumber",
&None,
&(),
// error here are expected, so keep the level low
Level::Debug.into(),
authorization.clone(),
Some(Level::Debug.into()),
);
let head_block_num = timeout(Duration::from_secs(5), head_block_num_future)
@ -279,15 +269,14 @@ impl Web3Rpc {
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
// TODO: what should the request be?
let archive_result: Result<Bytes, _> = self
.request(
.internal_request(
"eth_getCode",
&json!((
"0xdead00000000000000000000000000000000beef",
maybe_archive_block,
)),
// error here are expected, so keep the level low
Level::Trace.into(),
authorization.clone(),
Some(Level::Trace.into()),
)
.await;
@ -367,43 +356,27 @@ impl Web3Rpc {
/// query the web3 provider to confirm it is on the expected chain with the expected data available
async fn check_provider(self: &Arc<Self>, chain_id: u64) -> Web3ProxyResult<()> {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id: Result<U64, _> = self
.request(
"eth_chainId",
&json!(Vec::<()>::new()),
Level::Trace.into(),
authorization.clone(),
)
.await;
let found_chain_id: U64 = self
.internal_request("eth_chainId", &(), Some(Level::Trace.into()))
.await?;
trace!("found_chain_id: {:#?}", found_chain_id);
match found_chain_id {
Ok(found_chain_id) => {
// TODO: there has to be a cleaner way to do this
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self))
.into());
}
}
Err(e) => {
return Err(anyhow::Error::from(e)
.context(format!("unable to parse eth_chainId from {}", self))
.into());
}
if chain_id != found_chain_id.as_u64() {
return Err(anyhow::anyhow!(
"incorrect chain id! Config has {}, but RPC has {}",
chain_id,
found_chain_id
)
.context(format!("failed @ {}", self))
.into());
}
self.check_block_data_limit(&authorization)
self.check_block_data_limit()
.await
.context(format!("unable to check_block_data_limit of {}", self))?;
@ -454,8 +427,7 @@ impl Web3Rpc {
.send_replace(Some(new_head_block.clone()));
if self.block_data_limit() == U64::zero() {
let authorization = Arc::new(Authorization::internal(self.db_conn.clone())?);
if let Err(err) = self.check_block_data_limit(&authorization).await {
if let Err(err) = self.check_block_data_limit().await {
warn!(
"failed checking block limit after {} finished syncing. {:?}",
self, err
@ -489,8 +461,7 @@ impl Web3Rpc {
async fn healthcheck(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
error_handler: RequestErrorHandler,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<()> {
let head_block = self.head_block.as_ref().unwrap().borrow().clone();
@ -503,11 +474,10 @@ impl Web3Rpc {
let to = if let Some(txid) = head_block.transactions.last().cloned() {
let tx = self
.request::<_, Option<Transaction>>(
.internal_request::<_, Option<Transaction>>(
"eth_getTransactionByHash",
&(txid,),
error_handler,
authorization.clone(),
)
.await?
.context("no transaction")?;
@ -525,11 +495,10 @@ impl Web3Rpc {
};
let _code = self
.request::<_, Option<Bytes>>(
.internal_request::<_, Option<Bytes>>(
"eth_getCode",
&(to, block_number),
error_handler,
authorization.clone(),
)
.await?;
} else {
@ -545,16 +514,15 @@ impl Web3Rpc {
#[allow(clippy::too_many_arguments)]
async fn subscribe(
self: Arc<Self>,
authorization: &Arc<Authorization>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
let error_handler = if self.backup {
RequestErrorHandler::DebugLevel
Some(RequestErrorHandler::DebugLevel)
} else {
RequestErrorHandler::ErrorLevel
Some(RequestErrorHandler::ErrorLevel)
};
debug!("starting subscriptions on {}", self);
@ -566,7 +534,6 @@ impl Web3Rpc {
// health check that runs if there haven't been any recent requests
{
// TODO: move this into a proper function
let authorization = authorization.clone();
let rpc = self.clone();
// TODO: how often? different depending on the chain?
@ -586,7 +553,7 @@ impl Web3Rpc {
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
if let Err(err) = rpc.healthcheck(&authorization, error_handler).await {
if let Err(err) = rpc.healthcheck(error_handler).await {
// TODO: different level depending on the error handler
warn!("health checking {} failed: {:?}", rpc, err);
}
@ -609,11 +576,9 @@ impl Web3Rpc {
// subscribe to new heads
if let Some(block_sender) = &block_sender {
// TODO: do we need this to be abortable?
let f = self.clone().subscribe_new_heads(
authorization.clone(),
block_sender.clone(),
block_map.clone(),
);
let f = self
.clone()
.subscribe_new_heads(block_sender.clone(), block_map.clone());
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -622,9 +587,7 @@ impl Web3Rpc {
// TODO: make this opt-in. its a lot of bandwidth
if let Some(tx_id_sender) = tx_id_sender {
// TODO: do we need this to be abortable?
let f = self
.clone()
.subscribe_pending_transactions(authorization.clone(), tx_id_sender);
let f = self.clone().subscribe_pending_transactions(tx_id_sender);
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -647,15 +610,20 @@ impl Web3Rpc {
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: Arc<Self>,
authorization: Arc<Authorization>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlocksByHashCache,
) -> Web3ProxyResult<()> {
debug!("subscribing to new heads on {}", self);
// TODO: different handler depending on backup or not
let error_handler = None;
let authorization = Default::default();
if let Some(ws_provider) = self.ws_provider.as_ref() {
// todo: move subscribe_blocks onto the request handle
let active_request_handle = self.wait_for_request_handle(&authorization, None).await;
let active_request_handle = self
.wait_for_request_handle(&authorization, None, error_handler)
.await;
let mut blocks = ws_provider.subscribe_blocks().await?;
drop(active_request_handle);
@ -665,11 +633,11 @@ impl Web3Rpc {
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: can we force this to use the websocket?
let latest_block: Result<Option<ArcBlock>, _> = self
.request(
.authorized_request(
"eth_getBlockByNumber",
&json!(("latest", false)),
Level::Warn.into(),
authorization,
&("latest", false),
&authorization,
Some(Level::Warn.into()),
)
.await;
@ -724,7 +692,6 @@ impl Web3Rpc {
/// Turn on the firehose of pending transactions
async fn subscribe_pending_transactions(
self: Arc<Self>,
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> Web3ProxyResult<()> {
// TODO: make this subscription optional
@ -780,16 +747,16 @@ impl Web3Rpc {
}
}
/// be careful with this; it might wait forever!
pub async fn wait_for_request_handle<'a>(
self: &'a Arc<Self>,
authorization: &'a Arc<Authorization>,
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
max_wait: Option<Duration>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestHandle> {
let max_wait = max_wait.map(|x| Instant::now() + x);
let max_wait_until = max_wait.map(|x| Instant::now() + x);
loop {
match self.try_request_handle(authorization).await {
match self.try_request_handle(authorization, error_handler).await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
@ -801,8 +768,8 @@ impl Web3Rpc {
self
);
if let Some(max_wait) = max_wait {
if retry_at > max_wait {
if let Some(max_wait_until) = max_wait_until {
if retry_at > max_wait_until {
// break now since we will wait past our maximum wait time
return Err(Web3ProxyError::Timeout(None));
}
@ -814,10 +781,8 @@ impl Web3Rpc {
// TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self);
if let Some(max_wait) = max_wait {
let now = Instant::now();
if now > max_wait {
if let Some(max_wait_until) = max_wait_until {
if Instant::now() > max_wait_until {
return Err(Web3ProxyError::NoHandleReady);
}
}
@ -834,6 +799,7 @@ impl Web3Rpc {
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: if websocket is reconnecting, return an error?
@ -882,9 +848,10 @@ impl Web3Rpc {
}
};
let handle = OpenRequestHandle::new(authorization.clone(), self.clone()).await;
let handle =
OpenRequestHandle::new(authorization.clone(), self.clone(), error_handler).await;
Ok(OpenRequestResult::Handle(handle))
Ok(handle.into())
}
async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> {
@ -901,23 +868,30 @@ impl Web3Rpc {
}
}
pub async fn request<P, R>(
pub async fn internal_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
params: &P,
revert_handler: RequestErrorHandler,
authorization: Arc<Authorization>,
) -> Web3ProxyResult<R>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send,
{
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<R> {
let authorization = Default::default();
self.authorized_request(method, params, &authorization, error_handler)
.await
}
pub async fn authorized_request<P: JsonRpcParams, R: JsonRpcResultData>(
self: &Arc<Self>,
method: &str,
params: &P,
authorization: &Arc<Authorization>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<R> {
// TODO: take max_wait as a function argument?
let x = self
.wait_for_request_handle(&authorization, None)
.wait_for_request_handle(authorization, None, error_handler)
.await?
.request::<P, R>(method, params, revert_handler)
.request::<P, R>(method, params)
.await?;
Ok(x)

@ -1,8 +1,10 @@
use super::one::Web3Rpc;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::Web3ProxyResult;
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use anyhow::Context;
use chrono::Utc;
use derive_more::From;
use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::ProviderError;
@ -10,13 +12,12 @@ use ethers::types::{Address, Bytes};
use log::{debug, error, trace, warn, Level};
use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait};
use serde_json::json;
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{Duration, Instant};
#[derive(Debug)]
#[derive(Debug, From)]
pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
@ -30,11 +31,12 @@ pub enum OpenRequestResult {
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
error_handler: RequestErrorHandler,
rpc: Arc<Web3Rpc>,
}
/// Depending on the context, RPC errors require different handling.
#[derive(Copy, Clone)]
#[derive(Copy, Debug, Clone)]
pub enum RequestErrorHandler {
/// Log at the trace level. Use when errors are expected.
TraceLevel,
@ -48,6 +50,12 @@ pub enum RequestErrorHandler {
Save,
}
impl Default for RequestErrorHandler {
fn default() -> Self {
Self::TraceLevel
}
}
// TODO: second param could be skipped since we don't need it here
#[derive(serde::Deserialize, serde::Serialize)]
struct EthCallParams((EthCallFirstParams, Option<serde_json::Value>));
@ -130,14 +138,24 @@ impl Drop for OpenRequestHandle {
}
impl OpenRequestHandle {
pub async fn new(authorization: Arc<Authorization>, rpc: Arc<Web3Rpc>) -> Self {
pub async fn new(
authorization: Arc<Authorization>,
rpc: Arc<Web3Rpc>,
error_handler: Option<RequestErrorHandler>,
) -> Self {
// TODO: take request_id as an argument?
// TODO: attach a unique id to this? customer requests have one, but not internal queries
// TODO: what ordering?!
rpc.active_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
Self { authorization, rpc }
let error_handler = error_handler.unwrap_or_default();
Self {
authorization,
error_handler,
rpc,
}
}
pub fn connection_name(&self) -> String {
@ -153,17 +171,11 @@ impl OpenRequestHandle {
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// depending on how things are locked, you might need to pass the provider in
/// we take self to ensure this function only runs once
pub async fn request<P, R>(
pub async fn request<P: JsonRpcParams, R: JsonRpcResultData + serde::Serialize>(
self,
method: &str,
params: &P,
mut error_handler: RequestErrorHandler,
) -> Result<R, ProviderError>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send,
{
) -> Result<R, ProviderError> {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.rpc, %method, "request");
@ -205,7 +217,7 @@ impl OpenRequestHandle {
if let Err(err) = &response {
// only save reverts for some types of calls
// TODO: do something special for eth_sendRawTransaction too
error_handler = if let RequestErrorHandler::Save = error_handler {
let error_handler = if let RequestErrorHandler::Save = self.error_handler {
// TODO: should all these be Trace or Debug or a mix?
if !["eth_call", "eth_estimateGas"].contains(&method) {
// trace!(%method, "skipping save on revert");
@ -218,7 +230,7 @@ impl OpenRequestHandle {
RequestErrorHandler::TraceLevel
} else if log_revert_chance == 1.0 {
// trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler
self.error_handler
} else if thread_fast_rng::thread_fast_rng().gen_range(0.0f64..=1.0)
< log_revert_chance
{
@ -227,14 +239,14 @@ impl OpenRequestHandle {
} else {
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
error_handler
self.error_handler
}
} else {
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::TraceLevel
}
} else {
error_handler
self.error_handler
};
// TODO: simple enum -> string derive?
@ -297,6 +309,7 @@ impl OpenRequestHandle {
}
// TODO: think more about the method and param logs. those can be sensitive information
// we do **NOT** use self.error_handler here because it might have been modified
match error_handler {
RequestErrorHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag

@ -1,9 +1,8 @@
use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult};
//! Load balanced communication with a group of web3 providers
use super::many::Web3Rpcs;
///! Load balanced communication with a group of web3 providers
use super::one::Web3Rpc;
use super::request::OpenRequestResult;
use crate::frontend::{authorization::Authorization, errors::Web3ProxyResult};
use ethers::prelude::{ProviderError, Transaction, TxHash};
use log::{debug, trace, Level};
use std::sync::Arc;
@ -29,14 +28,13 @@ impl Web3Rpcs {
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
// TODO: try_request_handle, or wait_for_request_handle? I think we want wait here
let tx: Transaction = match rpc.try_request_handle(authorization).await {
let tx: Transaction = match rpc
.try_request_handle(authorization, Some(Level::Warn.into()))
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::Error.into(),
)
.request("eth_getTransactionByHash", &(pending_tx_id,))
.await?
}
Ok(_) => {

@ -241,7 +241,7 @@ pub async fn query_user_stats<'a>(
// query the database for number of items and pages
let pages_result = q
.clone()
.paginate(db_replica.conn(), page_size)
.paginate(db_replica.as_ref(), page_size)
.num_items_and_pages()
.await?;
@ -251,7 +251,7 @@ pub async fn query_user_stats<'a>(
// query the database (todo: combine with the pages_result query?)
let query_response = q
.into_json()
.paginate(db_replica.conn(), page_size)
.paginate(db_replica.as_ref(), page_size)
.fetch_page(page)
.await?;

@ -86,7 +86,7 @@ pub async fn query_user_stats<'a>(
// Fetch all rpc_secret_key_ids, and filter for these
let mut user_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(user_id))
.all(db_replica.conn())
.all(db_replica.as_ref())
.await
.web3_context("failed loading user's key")?
.into_iter()
@ -102,7 +102,7 @@ pub async fn query_user_stats<'a>(
let mut subuser_rpc_keys = secondary_user::Entity::find()
.filter(secondary_user::Column::UserId.eq(user_id))
.find_also_related(rpc_key::Entity)
.all(db_replica.conn())
.all(db_replica.as_ref())
// TODO: Do a join with rpc-keys
.await
.web3_context("failed loading subuser keys")?