streaming responses 2 (#219)

* streaming responses compile, still small TODOs & cleanup

* refactor to allow short curcuits on uncacheable requests (#220)

* refactor to allow short curcuits on uncacheable requests

* handle error inside an Arc from moka

* arc instead of box

* lint

* more lint

* add bonus rate limits and keep all semaphores after rate limits (#221)

* add bonus rate limits and keep all semaphores after rate limits

* remove stale todos

* better function names

* cargo upgrade and update

* make some panics warn. more todo

* pass request_metadata through so streaming responses include their final size

* remove stale TODO

* remove more stale todos

* add file i missed

* make to_json_string work well enough

* check config for free_subscriptions

---------

Co-authored-by: Rory Neithinger <rory@llamanodes.com>
This commit is contained in:
Bryan Stitt 2023-09-26 18:18:06 -07:00 committed by GitHub
parent 5b0aebb6e5
commit f0b6465069
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1275 additions and 567 deletions

63
Cargo.lock generated

@ -777,9 +777,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.4.4"
version = "4.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136"
checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3"
dependencies = [
"clap_builder",
"clap_derive 4.4.2",
@ -787,9 +787,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.4.4"
version = "4.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56"
checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab"
dependencies = [
"anstream",
"anstyle",
@ -1240,23 +1240,22 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "deadpool"
version = "0.9.5"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"serde",
"tokio",
]
[[package]]
name = "deadpool-redis"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f1760f60ffc6653b4afd924c5792098d8c00d9a3deb6b3d989eac17949dc422"
checksum = "84930e585871d35b8e06d3e03d03e3a8a4c5dc71afa4376c7cd5f9223e1da1ea"
dependencies = [
"deadpool",
"redis",
@ -1265,9 +1264,9 @@ dependencies = [
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
@ -1952,9 +1951,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "2.0.0"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "fdlimit"
@ -3553,9 +3552,9 @@ dependencies = [
[[package]]
name = "parking"
version = "2.1.0"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067"
[[package]]
name = "parking_lot"
@ -3662,9 +3661,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pest"
version = "2.7.3"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33"
checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4"
dependencies = [
"memchr",
"thiserror",
@ -3673,9 +3672,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.7.3"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bee7be22ce7918f641a33f08e3f43388c7656772244e2bbb2477f44cc9021a"
checksum = "35513f630d46400a977c4cb58f78e1bfbe01434316e60c37d27b9ad6139c66d8"
dependencies = [
"pest",
"pest_generator",
@ -3683,9 +3682,9 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.7.3"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1511785c5e98d79a05e8a6bc34b4ac2168a0e3e92161862030ad84daa223141"
checksum = "bc9fc1b9e7057baba189b5c626e2d6f40681ae5b6eb064dc7c7834101ec8123a"
dependencies = [
"pest",
"pest_meta",
@ -3696,9 +3695,9 @@ dependencies = [
[[package]]
name = "pest_meta"
version = "2.7.3"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42f0394d3123e33353ca5e1e89092e533d2cc490389f2bd6131c43c634ebc5f"
checksum = "1df74e9e7ec4053ceb980e7c0c8bd3594e977fde1af91daba9c928e8e8c6708d"
dependencies = [
"once_cell",
"pest",
@ -4351,12 +4350,6 @@ dependencies = [
"winreg",
]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]]
name = "rfc6979"
version = "0.4.0"
@ -4736,7 +4729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bef60732e6016c5643350c87f43a697e8c074e41e4e2a9d961c056cb1310915"
dependencies = [
"chrono",
"clap 4.4.4",
"clap 4.4.5",
"dotenvy",
"glob",
"regex",
@ -4767,7 +4760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e53b6ddaf6dbb84e5dfc3fb78634ed0a4d6d64e7479500ab2585db239747031"
dependencies = [
"async-trait",
"clap 4.4.4",
"clap 4.4.5",
"dotenvy",
"futures",
"sea-orm",
@ -5172,9 +5165,9 @@ dependencies = [
[[package]]
name = "sha2"
version = "0.10.7"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
@ -5777,7 +5770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef"
dependencies = [
"cfg-if",
"fastrand 2.0.0",
"fastrand 2.0.1",
"redox_syscall 0.3.5",
"rustix",
"windows-sys",
@ -6632,6 +6625,7 @@ dependencies = [
"axum-client-ip",
"axum-macros",
"base64 0.21.4",
"bytes",
"chrono",
"console-subscriber",
"counter",
@ -6646,6 +6640,7 @@ dependencies = [
"fdlimit",
"fstrings",
"futures",
"futures-util",
"glob",
"handlebars",
"hashbrown 0.14.0",

@ -97,6 +97,6 @@ impl MigrationTrait for Migration {
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
todo!();
unimplemented!();
}
}

@ -27,7 +27,7 @@ impl MigrationTrait for Migration {
}
async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
todo!();
unimplemented!();
}
}

@ -7,5 +7,5 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
chrono = "0.4.31"
deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] }
deadpool-redis = { version = "0.13.0", features = ["rt_tokio_1", "serde"] }
tokio = "1.32.0"

@ -99,6 +99,8 @@ uuid = { version = "1.4.1", default-features = false, features = ["fast-rng", "v
# TODO: why doesn't this work in dev-dependencies
test-log = { version = "0.2.12", default-features = false, features = ["trace"] }
bytes = "1.5.0"
futures-util = "0.3.28"
# # TODO: bring this back
# check-if-email-exists = "0.9.0"

@ -10,8 +10,8 @@ use crate::frontend::authorization::{
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::globals::{global_db_conn, DatabaseError, DB_CONN, DB_REPLICA};
use crate::jsonrpc::{
JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcId,
JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResultData,
self, JsonRpcErrorData, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum,
JsonRpcResultData, SingleResponse,
};
use crate::relational_db::{connect_db, migrate_db};
use crate::response_cache::{
@ -84,6 +84,10 @@ pub struct Web3ProxyApp {
pub http_client: Option<reqwest::Client>,
/// track JSONRPC responses
pub jsonrpc_response_cache: JsonRpcResponseCache,
/// track JSONRPC cache keys that have failed caching
pub jsonrpc_response_failed_cache_keys: Cache<u64, ()>,
/// de-dupe requests (but with easy timeouts)
pub jsonrpc_response_semaphores: Cache<u64, Arc<Semaphore>>,
/// rpc clients that subscribe to newHeads use this channel
/// don't drop this or the sender will stop working
/// TODO: broadcast channel instead?
@ -93,10 +97,13 @@ pub struct Web3ProxyApp {
pub hostname: Option<String>,
pub frontend_port: Arc<AtomicU16>,
/// rate limit anonymous users
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_public_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// bonus rate limit for anonymous users
pub bonus_frontend_public_rate_limiter: Option<RedisRateLimiter>,
/// rate limit authenticated users
pub frontend_registered_user_rate_limiter:
Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
pub frontend_premium_rate_limiter: Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
/// bonus rate limit for authenticated users
pub bonus_frontend_premium_rate_limiter: Option<RedisRateLimiter>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
/// give some bonus capacity to public users
@ -359,9 +366,11 @@ impl Web3ProxyApp {
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut frontend_public_rate_limiter = None;
let mut frontend_premium_rate_limiter = None;
let mut login_rate_limiter = None;
let mut bonus_frontend_public_rate_limiter: Option<RedisRateLimiter> = None;
let mut bonus_frontend_premium_rate_limiter: Option<RedisRateLimiter> = None;
if let Some(ref redis_pool) = vredis_pool {
if let Some(public_requests_per_period) = top_config.app.public_requests_per_period {
@ -377,10 +386,29 @@ impl Web3ProxyApp {
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter =
frontend_public_rate_limiter =
Some(DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await);
frontend_registered_user_rate_limiter =
frontend_premium_rate_limiter =
Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await);
if top_config.app.bonus_frontend_public_rate_limit > 0 {
bonus_frontend_public_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"bonus_frontend_public",
top_config.app.bonus_frontend_public_rate_limit,
60.0,
redis_pool.clone(),
));
}
if top_config.app.bonus_frontend_premium_rate_limit > 0 {
bonus_frontend_premium_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"bonus_frontend_premium",
top_config.app.bonus_frontend_premium_rate_limit,
60.0,
redis_pool.clone(),
));
}
}
// login rate limiter
@ -493,26 +521,39 @@ impl Web3ProxyApp {
.ok()
.and_then(|x| x.to_str().map(|x| x.to_string()));
// TODO: get the size out of the config
let bonus_ip_concurrency = Arc::new(Semaphore::new(top_config.app.bonus_ip_concurrency));
let bonus_ip_concurrency =
Arc::new(Semaphore::new(top_config.app.bonus_public_concurrency));
let bonus_user_concurrency =
Arc::new(Semaphore::new(top_config.app.bonus_user_concurrency));
Arc::new(Semaphore::new(top_config.app.bonus_premium_concurrency));
// TODO: what size?
let jsonrpc_response_semaphores = CacheBuilder::new(10_000)
.name("jsonrpc_response_semaphores")
.build();
let jsonrpc_response_failed_cache_keys = CacheBuilder::new(100_000)
.name("jsonrpc_response_failed_cache_keys")
.build();
let app = Self {
balanced_rpcs,
bonus_frontend_public_rate_limiter,
bonus_frontend_premium_rate_limiter,
bonus_ip_concurrency,
bonus_user_concurrency,
bundler_4337_rpcs,
config: top_config.app.clone(),
frontend_ip_rate_limiter,
frontend_public_rate_limiter,
frontend_port: frontend_port.clone(),
frontend_registered_user_rate_limiter,
frontend_premium_rate_limiter,
hostname,
http_client,
influxdb_client,
internal_provider: Default::default(),
ip_semaphores,
jsonrpc_response_cache,
jsonrpc_response_failed_cache_keys,
jsonrpc_response_semaphores,
kafka_producer,
login_rate_limiter,
pending_txid_firehose: deduped_txid_firehose,
@ -983,15 +1024,10 @@ impl Web3ProxyApp {
let (_, response, _) = self.proxy_request(request, authorization, None).await;
if let Some(result) = response.result {
let result = serde_json::from_str(result.get())?;
Ok(result)
} else if let Some(error_data) = response.error {
// TODO: this might lose the http error code
Err(Web3ProxyError::JsonRpcErrorData(error_data))
} else {
unimplemented!();
// TODO: error handling?
match response.parsed().await?.payload {
jsonrpc::Payload::Success { result } => Ok(serde_json::from_str(result.get())?),
jsonrpc::Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)),
}
}
@ -1000,7 +1036,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>)> {
) -> Web3ProxyResult<(StatusCode, jsonrpc::Response, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
let response = match request {
@ -1009,11 +1045,7 @@ impl Web3ProxyApp {
.proxy_request(request, authorization.clone(), None)
.await;
(
status_code,
JsonRpcForwardedResponseEnum::Single(response),
rpcs,
)
(status_code, jsonrpc::Response::Single(response), rpcs)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = self
@ -1021,11 +1053,7 @@ impl Web3ProxyApp {
.await?;
// TODO: real status code. if an error happens, i don't think we are following the spec here
(
StatusCode::OK,
JsonRpcForwardedResponseEnum::Batch(responses),
rpcs,
)
(StatusCode::OK, jsonrpc::Response::Batch(responses), rpcs)
}
};
@ -1038,7 +1066,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
) -> Web3ProxyResult<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>)> {
) -> Web3ProxyResult<(Vec<jsonrpc::ParsedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1065,14 +1093,15 @@ impl Web3ProxyApp {
)
.await;
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected: Vec<jsonrpc::ParsedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpc_names: HashSet<String> = HashSet::new();
let mut collected_rpcs: Vec<Arc<Web3Rpc>> = vec![];
for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (_status_code, response, rpcs) = response;
collected.push(response);
// TODO: individual error handling
collected.push(response.parsed().await?);
collected_rpcs.extend(rpcs.into_iter().filter(|x| {
if collected_rpc_names.contains(&x.name) {
false
@ -1107,14 +1136,14 @@ impl Web3ProxyApp {
method: &str,
params: &P,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<Box<RawValue>> {
) -> Web3ProxyResult<Arc<RawValue>> {
if let Some(protected_rpcs) = self.private_rpcs.as_ref() {
if !protected_rpcs.is_empty() {
let protected_response = protected_rpcs
.try_send_all_synced_connections(
method,
params,
Some(request_metadata),
request_metadata,
None,
None,
Some(Duration::from_secs(10)),
@ -1144,7 +1173,7 @@ impl Web3ProxyApp {
.try_send_all_synced_connections(
method,
params,
Some(request_metadata),
request_metadata,
None,
None,
Some(Duration::from_secs(10)),
@ -1160,7 +1189,7 @@ impl Web3ProxyApp {
mut request: JsonRpcRequest,
authorization: Arc<Authorization>,
head_block: Option<&Web3ProxyBlock>,
) -> (StatusCode, JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>) {
) -> (StatusCode, jsonrpc::SingleResponse, Vec<Arc<Web3Rpc>>) {
let request_metadata = RequestMetadata::new(
self,
authorization,
@ -1184,8 +1213,10 @@ impl Web3ProxyApp {
tries += 1;
let (code, response_data) = match self
let (code, response) = match self
._proxy_request_with_caching(
// TODO: avoid clone here
response_id.clone(),
&request.method,
&mut request.params,
head_block,
@ -1211,7 +1242,7 @@ impl Web3ProxyApp {
.user_error_response
.store(false, Ordering::Relaxed);
err.as_response_parts()
err.as_json_response_parts(response_id)
}
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
request_metadata
@ -1221,7 +1252,9 @@ impl Web3ProxyApp {
.user_error_response
.store(response_data.is_error(), Ordering::Relaxed);
(StatusCode::OK, response_data)
let response =
jsonrpc::ParsedResponse::from_response_data(response_data, response_id);
(StatusCode::OK, response.into())
}
Err(err) => {
if tries <= max_tries {
@ -1238,12 +1271,10 @@ impl Web3ProxyApp {
.user_error_response
.store(false, Ordering::Relaxed);
err.as_response_parts()
err.as_json_response_parts(response_id)
}
};
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
@ -1260,14 +1291,15 @@ impl Web3ProxyApp {
/// TODO: how can we make this generic?
async fn _proxy_request_with_caching(
self: &Arc<Self>,
id: Box<RawValue>,
method: &str,
params: &mut serde_json::Value,
head_block: Option<&Web3ProxyBlock>,
request_metadata: &Arc<RequestMetadata>,
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
) -> Web3ProxyResult<jsonrpc::SingleResponse> {
// TODO: serve net_version without querying the backend
// TODO: don't force RawValue
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = match method {
let response: jsonrpc::SingleResponse = match method {
// lots of commands are blocked
method @ ("db_getHex"
| "db_getString"
@ -1357,18 +1389,16 @@ impl Web3ProxyApp {
| "eth_supportedEntryPoints"
| "web3_bundlerVersion") => match self.bundler_4337_rpcs.as_ref() {
Some(bundler_4337_rpcs) => {
let x = bundler_4337_rpcs
.try_proxy_connection::<_, Box<RawValue>>(
bundler_4337_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
Some(request_metadata),
request_metadata,
Some(Duration::from_secs(30)),
None,
None,
)
.await?;
x.into()
.await?
}
None => {
// TODO: stats even when we error!
@ -1376,22 +1406,23 @@ impl Web3ProxyApp {
return Err(Web3ProxyError::NoServersSynced);
}
},
"eth_accounts" => JsonRpcResponseEnum::from(serde_json::Value::Array(vec![])),
// TODO: id
"eth_accounts" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Array(vec![]), id).into(),
"eth_blockNumber" => {
match head_block.cloned().or(self.balanced_rpcs.head_block()) {
Some(head_block) => JsonRpcResponseEnum::from(json!(head_block.number())),
Some(head_block) => jsonrpc::ParsedResponse::from_value(json!(head_block.number()), id).into(),
None => {
return Err(Web3ProxyError::NoServersSynced);
}
}
}
"eth_chainId" => JsonRpcResponseEnum::from(json!(U64::from(self.config.chain_id))),
"eth_chainId" => jsonrpc::ParsedResponse::from_value(json!(U64::from(self.config.chain_id)), id).into(),
// TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle)
// TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject)
// TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction)
"eth_coinbase" => {
// no need for serving coinbase
JsonRpcResponseEnum::from(json!(Address::zero()))
jsonrpc::ParsedResponse::from_value(json!(Address::zero()), id).into()
}
"eth_estimateGas" => {
// TODO: timeout
@ -1400,12 +1431,15 @@ impl Web3ProxyApp {
.try_proxy_connection::<_, U256>(
method,
params,
Some(request_metadata),
request_metadata,
Some(Duration::from_secs(30)),
None,
None,
)
.await?;
.await?
.parsed()
.await?
.into_result()?;
let gas_increase = if let Some(gas_increase_percent) =
self.config.gas_increase_percent
@ -1422,59 +1456,62 @@ impl Web3ProxyApp {
gas_estimate += gas_increase;
// TODO: from_serializable?
JsonRpcResponseEnum::from(json!(gas_estimate))
jsonrpc::ParsedResponse::from_value(json!(gas_estimate), id).into()
}
"eth_getTransactionReceipt" | "eth_getTransactionByHash" => {
// try to get the transaction without specifying a min_block_height
// TODO: timeout
let mut response_data = self
let parsed = match self
.balanced_rpcs
.try_proxy_connection::<_, Box<RawValue>>(
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
Some(request_metadata),
request_metadata,
Some(Duration::from_secs(30)),
None,
None,
)
.await;
.await {
Ok(response) => response.parsed().await.map_err(Into::into),
Err(err) => Err(err),
};
// if we got "null", it is probably because the tx is old. retry on nodes with old block data
let try_archive = if let Ok(value) = &response_data {
let try_archive = if let Ok(Some(value)) = parsed.as_ref().map(|r| r.result()) {
value.get() == "null" || value.get() == "" || value.get() == "\"\""
} else {
true
};
if try_archive {
if let Some(head_block_num) = head_block.map(|x| x.number()) {
// TODO: only charge for archive if it gave a result
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
if try_archive && let Some(head_block_num) = head_block.map(|x| x.number()) {
// TODO: only charge for archive if it gave a result
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
response_data = self
.balanced_rpcs
.try_proxy_connection::<_, Box<RawValue>>(
method,
params,
Some(request_metadata),
Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead?
Some(&U64::one()),
// TODO: is this a good way to allow lagged archive nodes a try
Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)),
)
.await;
}
self
.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
request_metadata,
Some(Duration::from_secs(30)),
// TODO: should this be block 0 instead?
Some(&U64::one()),
// TODO: is this a good way to allow lagged archive nodes a try
Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)),
)
.await?
} else {
parsed?.into()
}
response_data.try_into()?
// TODO: if parsed is an error, return a null instead
}
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => JsonRpcResponseEnum::from(json!(U64::zero())),
"eth_mining" => JsonRpcResponseEnum::from(serde_json::Value::Bool(false)),
"eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), id).into(),
"eth_mining" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), id).into(),
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
@ -1574,35 +1611,34 @@ impl Web3ProxyApp {
}
}
response
jsonrpc::ParsedResponse::from_response_data(response, id).into()
}
"eth_syncing" => {
// no stats on this. its cheap
// TODO: return a real response if all backends are syncing or if no servers in sync
// TODO: const
JsonRpcResponseEnum::from(serde_json::Value::Bool(false))
jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), id).into()
}
"eth_subscribe" => JsonRpcErrorData {
"eth_subscribe" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData {
message: "notifications not supported. eth_subscribe is only available over a websocket".into(),
code: -32601,
data: None,
}
.into(),
"eth_unsubscribe" => JsonRpcErrorData {
}, id).into(),
"eth_unsubscribe" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData {
message: "notifications not supported. eth_unsubscribe is only available over a websocket".into(),
code: -32601,
data: None,
}.into(),
}, id).into(),
"net_listening" => {
// TODO: only true if there are some backends on balanced_rpcs?
// TODO: const
JsonRpcResponseEnum::from(serde_json::Value::Bool(true))
jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(true), id).into()
}
"net_peerCount" =>
JsonRpcResponseEnum::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())))
"net_peerCount" =>
jsonrpc::ParsedResponse::from_value(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())), id).into()
,
"web3_clientVersion" =>
JsonRpcResponseEnum::from(serde_json::Value::String(APP_USER_AGENT.to_string()))
"web3_clientVersion" =>
jsonrpc::ParsedResponse::from_value(serde_json::Value::String(APP_USER_AGENT.to_string()), id).into()
,
"web3_sha3" => {
// returns Keccak-256 (not the standardized SHA3-256) of the given data.
@ -1615,11 +1651,11 @@ impl Web3ProxyApp {
{
// TODO: what error code?
// TODO: use Web3ProxyError::BadRequest
JsonRpcErrorData {
jsonrpc::ParsedResponse::from_error(JsonRpcErrorData {
message: "Invalid request".into(),
code: -32600,
data: None
}.into()
}, id).into()
} else {
// TODO: BadRequest instead of web3_context
let param = Bytes::from_str(
@ -1637,25 +1673,25 @@ impl Web3ProxyApp {
let hash = H256::from(keccak256(param));
JsonRpcResponseEnum::from(json!(hash))
jsonrpc::ParsedResponse::from_value(json!(hash), id).into()
}
}
_ => {
// TODO: this needs the correct error code in the response
// TODO: Web3ProxyError::BadRequest instead?
JsonRpcErrorData {
jsonrpc::ParsedResponse::from_error(JsonRpcErrorData {
message: "invalid request".into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
}.into()
}, id).into()
}
}
}
"test" => JsonRpcErrorData {
"test" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData {
message: "The method test does not exist/is not available.".into(),
code: -32601,
data: None,
}.into(),
}, id).into(),
// anything else gets sent to backend rpcs and cached
method => {
if method.starts_with("admin_") {
@ -1744,89 +1780,157 @@ impl Web3ProxyApp {
// TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata
// TODO: different user tiers should have different timeouts
// erigon's timeout is 300, so keep this a few seconds shorter
let max_wait = Some(Duration::from_secs(295));
let max_wait = Some(Duration::from_secs(290));
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block_num().copied();
let to_block_num = cache_key.to_block_num().copied();
let cache_jsonrpc_errors = cache_key.cache_errors();
let cache_key_hash = cache_key.hash();
// don't cache anything larger than 16 MiB
let max_response_cache_bytes = 16 * (1024 ^ 2); // self.config.max_response_cache_bytes;
// TODO: try to fetch out of s3
// TODO: clone less?
let app = self.clone();
let method = method.to_string();
let params = params.clone();
let request_metadata = request_metadata.clone();
let x: SingleResponse = if let Some(data) = self.jsonrpc_response_cache.get(&cache_key_hash).await {
// it was cached! easy!
// TODO: wait. this currently panics. why?
jsonrpc::ParsedResponse::from_response_data(data, id).into()
} else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key_hash) {
// this is a cache_key that we know won't cache
// NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes
timeout(
Duration::from_secs(295),
self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
request_metadata,
max_wait,
None,
None,
)
).await??
} else {
// TODO: acquire a semaphore from a map with the cache key as the key
// TODO: try it, if that fails, then we are already running. wait until the semaphore completes and then run on. they will all go only one at a time though
// TODO: if we got the semaphore, do the try_get_with
// TODO: if the response is too big to cache mark the cache_key as not cacheable. maybe CacheMode can check that cache?
let f = async move {
app
.jsonrpc_response_cache
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs
let s = self.jsonrpc_response_semaphores.get_with(cache_key_hash, async move {
Arc::new(Semaphore::new(1))
}).await;
// TODO: don't always do 1 second. use the median request latency instead
match timeout(Duration::from_secs(1), s.acquire_owned()).await {
Err(_) => {
// TODO: should we try to cache this? whatever has the semaphore //should// handle that for us
timeout(
Duration::from_secs(295),
self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
&method,
&params,
Some(&request_metadata),
method,
params,
request_metadata,
max_wait,
from_block_num.as_ref(),
to_block_num.as_ref(),
)).await;
None,
None,
)
).await??
}
Ok(_p) => {
// we got the permit! we are either first, or we were waiting a short time to get it in which case this response should be cached
// TODO: clone less?
let f = {
let app = self.clone();
let method = method.to_string();
let params = params.clone();
let request_metadata = request_metadata.clone();
match response_data {
Ok(response_data) => {
if !cache_jsonrpc_errors && let Err(err) = response_data {
// if we are not supposed to cache jsonrpc errors,
// then we must not convert Provider errors into a JsonRpcResponseEnum
// return all the errors now. moka will not cache Err results
Err(err)
} else {
// convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
async move {
app
.jsonrpc_response_cache
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
&method,
&params,
&request_metadata,
max_wait,
from_block_num.as_ref(),
to_block_num.as_ref(),
)).await;
if response_data.is_null() {
// don't ever cache "null" as a success. its too likely to be a problem
Err(Web3ProxyError::NullJsonRpcResult)
} else if response_data.num_bytes() > max_response_cache_bytes {
// don't cache really large requests
// TODO: emit a stat
Err(Web3ProxyError::JsonRpcResponse(response_data))
} else {
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
Ok(response_data)
}
}
match response_data {
Ok(response_data) => {
if !cache_jsonrpc_errors && let Err(err) = response_data {
// if we are not supposed to cache jsonrpc errors,
// then we must not convert Provider errors into a JsonRpcResponseEnum
// return all the errors now. moka will not cache Err results
Err(err)
} else {
// convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
if response_data.is_null() {
// don't ever cache "null" as a success. its too likely to be a problem
Err(Web3ProxyError::NullJsonRpcResult)
} else if response_data.num_bytes() > max_response_cache_bytes {
// don't cache really large requests
// TODO: emit a stat
Err(Web3ProxyError::JsonRpcResponse(response_data))
} else {
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
Ok(response_data)
}
}
}
Err(err) => Err(Web3ProxyError::from(err)),
}
}).await
}
Err(err) => Err(Web3ProxyError::from(err)),
}
}).await
};
};
// this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache
tokio::spawn(f).await??
// this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache
// TODO: is this expect actually safe!? could there be a background process that still has the arc?
match tokio::spawn(f).await? {
Ok(response_data) => Ok(jsonrpc::ParsedResponse::from_response_data(response_data, id).into()),
Err(err) => {
self.jsonrpc_response_failed_cache_keys.insert(cache_key_hash, ()).await;
if let Web3ProxyError::StreamResponse(x) = err.as_ref() {
let x = x.lock().take().expect("stream processing should only happen once");
Ok(jsonrpc::SingleResponse::Stream(x))
} else {
Err(err)
}
},
}?
}
}
};
x
} else {
let x = timeout(
Duration::from_secs(300),
timeout(
Duration::from_secs(295),
self.balanced_rpcs
.try_proxy_connection::<_, Arc<RawValue>>(
method,
params,
Some(request_metadata),
request_metadata,
max_wait,
None,
None,
)
).await??;
x.into()
).await??
}
}
};
Ok(response_data)
Ok(response)
}
}

@ -3,8 +3,7 @@
use super::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod};
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::{self, JsonRpcRequest};
use crate::response_cache::JsonRpcResponseEnum;
use axum::extract::ws::{CloseFrame, Message};
use deferred_rate_limiter::DeferredRateLimitResult;
@ -30,7 +29,7 @@ impl Web3ProxyApp {
subscription_count: &'a AtomicU64,
// TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now
response_sender: mpsc::Sender<Message>,
) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> {
) -> Web3ProxyResult<(AbortHandle, jsonrpc::ParsedResponse)> {
let subscribe_to = jsonrpc_request
.params
.get(0)
@ -41,7 +40,10 @@ impl Web3ProxyApp {
// anyone can subscribe to newHeads
// only premium users are allowed to subscribe to the other things
if !(subscribe_to == "newHeads" || authorization.active_premium().await) {
if !(self.config.free_subscriptions
|| subscribe_to == "newHeads"
|| authorization.active_premium().await)
{
return Err(Web3ProxyError::AccessDenied(
"eth_subscribe for this event requires an active premium account".into(),
));
@ -214,10 +216,13 @@ impl Web3ProxyApp {
let response_data = JsonRpcResponseEnum::from(json!(subscription_id));
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);
let response = jsonrpc::ParsedResponse::from_response_data(response_data, id);
// TODO: better way of passing in ParsedResponse
let response = jsonrpc::SingleResponse::Parsed(response);
// TODO: this serializes twice
request_metadata.add_response(&response);
let response = response.parsed().await.expect("Response already parsed");
// TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct?
Ok((subscription_abort_handle, response))
@ -227,45 +232,44 @@ impl Web3ProxyApp {
&self,
request_metadata: &RequestMetadata,
) -> Option<Message> {
if let Some(authorization) = request_metadata.authorization.as_ref() {
if authorization.checks.rpc_secret_key_id.is_none() {
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter
.throttle(
authorization.ip,
authorization.checks.max_requests_per_period,
1,
)
.await
{
Ok(DeferredRateLimitResult::RetryNever) => {
let close_frame = CloseFrame {
let authorization = &request_metadata.authorization;
if !authorization.active_premium().await {
if let Some(rate_limiter) = &self.frontend_public_rate_limiter {
match rate_limiter
.throttle(
authorization.ip,
authorization.checks.max_requests_per_period,
1,
)
.await
{
Ok(DeferredRateLimitResult::RetryNever) => {
let close_frame = CloseFrame {
code: StatusCode::TOO_MANY_REQUESTS.as_u16(),
reason:
"rate limited. upgrade to premium for unlimited websocket messages"
.into(),
};
return Some(Message::Close(Some(close_frame)));
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
let retry_at = retry_at.duration_since(Instant::now());
return Some(Message::Close(Some(close_frame)));
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
let retry_at = retry_at.duration_since(Instant::now());
let reason = format!("rate limited. upgrade to premium for unlimited websocket messages. retry in {}s", retry_at.as_secs_f32());
let reason = format!("rate limited. upgrade to premium for unlimited websocket messages. retry in {}s", retry_at.as_secs_f32());
let close_frame = CloseFrame {
code: StatusCode::TOO_MANY_REQUESTS.as_u16(),
reason: reason.into(),
};
let close_frame = CloseFrame {
code: StatusCode::TOO_MANY_REQUESTS.as_u16(),
reason: reason.into(),
};
return Some(Message::Close(Some(close_frame)));
}
Ok(_) => {}
Err(err) => {
// this an internal error of some kind, not the rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
}
return Some(Message::Close(Some(close_frame)));
}
Ok(_) => {}
Err(err) => {
// this an internal error of some kind, not the rate limit being hit
error!(?err, "rate limiter is unhappy. allowing ip");
}
}
}

@ -85,10 +85,17 @@ pub struct AppConfig {
/// pool of extra connections allowed for authenticated users
#[serde_inline_default(0usize)]
pub bonus_user_concurrency: usize,
pub bonus_premium_concurrency: usize,
/// pool of extra connections allowed for anonymous users
#[serde_inline_default(0usize)]
pub bonus_ip_concurrency: usize,
pub bonus_public_concurrency: usize,
/// pool of extra requests per second allowed for authenticaed users
#[serde_inline_default(0u64)]
pub bonus_frontend_public_rate_limit: u64,
#[serde_inline_default(0u64)]
pub bonus_frontend_premium_rate_limit: u64,
/// EVM chain id. 1 for ETH
/// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` <https://github.com/ethereum/EIPs/issues/2294>

@ -1,7 +1,7 @@
//! Utlities for logging errors for admins and displaying errors to users.
use crate::frontend::authorization::Authorization;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::response_cache::JsonRpcResponseEnum;
use crate::rpcs::provider::EthersHttpProvider;
use axum::extract::rejection::JsonRejection;
@ -19,6 +19,7 @@ use http::header::InvalidHeaderValue;
use http::uri::InvalidUri;
use ipnet::AddrParseError;
use migration::sea_orm::DbErr;
use parking_lot::Mutex;
use redis_rate_limiter::redis::RedisError;
use redis_rate_limiter::RedisPoolError;
use reqwest::header::ToStrError;
@ -127,6 +128,10 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[display(fmt = "{:?}", _0)]
JsonRpcResponse(JsonRpcResponseEnum<Arc<RawValue>>),
/// make it easy to skip caching streaming results
#[error(ignore)]
#[display(fmt = "{:?}", _0)]
StreamResponse(Mutex<Option<jsonrpc::StreamResponse>>),
/// make it easy to skip caching null results
NullJsonRpcResult,
OriginRequired,
@ -184,6 +189,15 @@ pub enum Web3ProxyError {
}
impl Web3ProxyError {
pub fn as_json_response_parts(
&self,
id: Box<RawValue>,
) -> (StatusCode, jsonrpc::SingleResponse) {
let (code, response_data) = self.as_response_parts();
let response = jsonrpc::ParsedResponse::from_response_data(response_data, id);
(code, response.into())
}
/// turn the error into an axum response.
/// <https://www.jsonrpc.org/specification#error_object>
pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum<Arc<RawValue>>) {
@ -769,6 +783,10 @@ impl Web3ProxyError {
// TODO: shame we have to clone, but its an Arc so its not terrible
return (StatusCode::OK, response_enum.clone());
}
Self::StreamResponse(_resp) => {
// TODO: better way of doing this?
unreachable!("stream is pulled out, not used here");
}
Self::NullJsonRpcResult => {
return (StatusCode::OK, JsonRpcResponseEnum::NullResult);
}

@ -4,9 +4,10 @@ use super::rpc_proxy_ws::ProxyMode;
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::balance::Balance;
use crate::caches::RegisteredUserRateLimitKey;
use crate::compute_units::default_usd_per_cu;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::globals::global_db_replica_conn;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::jsonrpc::{self, JsonRpcParams, JsonRpcRequest};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::one::Web3Rpc;
use crate::stats::{AppStat, BackendRequests};
@ -16,7 +17,7 @@ use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use core::fmt;
use deferred_rate_limiter::DeferredRateLimitResult;
use deferred_rate_limiter::{DeferredRateLimitResult, DeferredRateLimiter};
use derivative::Derivative;
use derive_more::From;
use entities::{login, rpc_key, user, user_tier};
@ -32,8 +33,9 @@ use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders,
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::borrow::Cow;
use std::fmt::Debug;
use std::fmt::Display;
@ -339,7 +341,7 @@ pub struct RequestMetadata {
/// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently
pub archive_request: AtomicBool,
pub authorization: Option<Arc<Authorization>>,
pub authorization: Arc<Authorization>,
pub chain_id: u64,
@ -394,10 +396,7 @@ impl Default for Authorization {
impl RequestMetadata {
pub fn proxy_mode(&self) -> ProxyMode {
self.authorization
.as_ref()
.map(|x| x.checks.proxy_mode)
.unwrap_or_default()
self.authorization.checks.proxy_mode
}
}
@ -443,7 +442,7 @@ impl<'a> From<&'a str> for RequestOrMethod<'a> {
#[derive(From)]
pub enum ResponseOrBytes<'a> {
Json(&'a serde_json::Value),
Response(&'a JsonRpcForwardedResponse),
Response(&'a jsonrpc::SingleResponse),
Error(&'a Web3ProxyError),
Bytes(usize),
}
@ -460,9 +459,7 @@ impl ResponseOrBytes<'_> {
Self::Json(x) => serde_json::to_string(x)
.expect("this should always serialize")
.len(),
Self::Response(x) => serde_json::to_string(x)
.expect("this should always serialize")
.len(),
Self::Response(x) => x.num_bytes(),
Self::Bytes(num_bytes) => *num_bytes,
Self::Error(x) => {
let (_, x) = x.as_response_parts();
@ -518,7 +515,7 @@ impl RequestMetadata {
let x = Self {
archive_request: false.into(),
authorization: Some(authorization),
authorization,
backend_requests: Default::default(),
chain_id,
error_response: false.into(),
@ -540,6 +537,51 @@ impl RequestMetadata {
Arc::new(x)
}
pub fn new_internal<P: JsonRpcParams>(chain_id: u64, method: &str, params: &P) -> Arc<Self> {
let authorization = Arc::new(Authorization::internal().unwrap());
let request_ulid = Ulid::new();
let method = method.to_string().into();
// TODO: how can we get this?
let stat_sender = None;
// TODO: how can we do this efficiently? having to serialize sucks
let request_bytes = json!({
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
})
.to_string()
.len();
// TODO: we should be getting this from config instead!
let usd_per_cu = default_usd_per_cu(chain_id);
let x = Self {
archive_request: false.into(),
authorization,
backend_requests: Default::default(),
chain_id,
error_response: false.into(),
kafka_debug_logger: None,
method,
no_servers: 0.into(),
request_bytes,
request_ulid,
response_bytes: 0.into(),
response_from_backup_rpc: false.into(),
response_millis: 0.into(),
response_timestamp: 0.into(),
start_instant: Instant::now(),
stat_sender,
usd_per_cu,
user_error_response: false.into(),
};
Arc::new(x)
}
pub fn backend_rpcs_used(&self) -> Vec<Arc<Web3Rpc>> {
self.backend_requests.lock().clone()
}
@ -584,7 +626,14 @@ impl RequestMetadata {
if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() {
if let ResponseOrBytes::Response(response) = response {
kafka_debug_logger.log_debug_response(response);
match response {
jsonrpc::SingleResponse::Parsed(response) => {
kafka_debug_logger.log_debug_response(response);
}
jsonrpc::SingleResponse::Stream(_) => {
warn!("need to handle streaming response debug logging");
}
}
}
}
}
@ -828,7 +877,7 @@ pub async fn ip_is_authorized(
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (authorization, semaphore) = match app.rate_limit_by_ip(ip, origin, proxy_mode).await? {
let (authorization, semaphore) = match app.rate_limit_public(ip, origin, proxy_mode).await? {
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
RateLimitResult::RateLimited(authorization, retry_at) => {
// TODO: in the background, emit a stat (maybe simplest to use a channel?)
@ -892,7 +941,7 @@ pub async fn key_is_authorized(
// check the rate limits. error if over the limit
// TODO: i think this should be in an "impl From" or "impl Into"
let (authorization, semaphore) = match app
.rate_limit_by_rpc_key(ip, origin, proxy_mode, referer, rpc_key, user_agent)
.rate_limit_premium(ip, origin, proxy_mode, referer, rpc_key, user_agent)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
@ -944,7 +993,10 @@ pub async fn key_is_authorized(
impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given ip address.
pub async fn ip_semaphore(&self, ip: &IpAddr) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
pub async fn permit_public_concurrency(
&self,
ip: &IpAddr,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
@ -974,11 +1026,13 @@ impl Web3ProxyApp {
/// Limit the number of concurrent requests for a given user across all of their keys
/// keep the semaphore alive until the user's request is entirely complete
pub async fn user_semaphore(
pub async fn permit_premium_concurrency(
&self,
authorization_checks: &AuthorizationChecks,
authorization: &Authorization,
ip: &IpAddr,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
let authorization_checks = &authorization.checks;
if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests {
let user_id = authorization_checks
.user_id
@ -1042,7 +1096,7 @@ impl Web3ProxyApp {
ip: IpAddr,
proxy_mode: ProxyMode,
) -> Web3ProxyResult<RateLimitResult> {
// TODO: dry this up with rate_limit_by_rpc_key?
// TODO: if ip is on the local network, always allow?
// we don't care about user agent or origin or referer
let authorization = Authorization::external(
@ -1054,46 +1108,20 @@ impl Web3ProxyApp {
None,
)?;
// no semaphore is needed here because login rate limits are low
// TODO: are we sure do not we want a semaphore here?
let semaphore = None;
let label = ip.to_string();
// TODO: if ip is on the local network, always allow?
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => {
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// // trace!(?ip, "login rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(RedisRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
// // trace!(?ip, "login rate limit is 0");
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!("login rate limiter is unhappy. allowing ip. err={:?}", err);
Ok(RateLimitResult::Allowed(authorization, None))
}
}
} else {
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::Allowed(authorization, None))
}
redis_rate_limit(
&self.login_rate_limiter,
authorization,
None,
Some(&label),
None,
)
.await
}
/// origin is included because it can override the default rate limits
pub async fn rate_limit_by_ip(
pub async fn rate_limit_public(
&self,
ip: &IpAddr,
origin: Option<&Origin>,
@ -1119,44 +1147,38 @@ impl Web3ProxyApp {
None,
)?;
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter
.throttle(*ip, authorization.checks.max_requests_per_period, 1)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
// rate limit allowed us. check concurrent request limits
let semaphore = self.ip_semaphore(ip).await?;
if let Some(rate_limiter) = &self.frontend_public_rate_limiter {
let mut x = deferred_redis_rate_limit(authorization, *ip, None, rate_limiter).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// // trace!(?ip, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
// // trace!(?ip, "rate limit is 0");
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// this an internal error of some kind, not the rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
// at least we can still check the semaphore
let semaphore = self.ip_semaphore(ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
if let RateLimitResult::RateLimited(authorization, retry_at) = x {
// we got rate limited, try bonus_frontend_public_rate_limiter
x = redis_rate_limit(
&self.bonus_frontend_public_rate_limiter,
authorization,
retry_at,
None,
None,
)
.await?;
}
if let RateLimitResult::Allowed(a, b) = x {
debug_assert!(b.is_none());
let permit = self.permit_public_concurrency(ip).await?;
x = RateLimitResult::Allowed(a, permit)
}
debug_assert!(!matches!(x, RateLimitResult::UnknownKey));
Ok(x)
} else {
// no redis, but we can still check the ip semaphore
let semaphore = self.ip_semaphore(ip).await?;
let permit = self.permit_public_concurrency(ip).await?;
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::Allowed(authorization, semaphore))
Ok(RateLimitResult::Allowed(authorization, permit))
}
}
@ -1323,8 +1345,8 @@ impl Web3ProxyApp {
Ok(x)
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
pub async fn rate_limit_by_rpc_key(
/// Authorize the key/ip/origin/referer/useragent and handle rate and concurrency limits
pub async fn rate_limit_premium(
&self,
ip: &IpAddr,
origin: Option<&Origin>,
@ -1342,7 +1364,7 @@ impl Web3ProxyApp {
// ?err,
// "db is down. cannot check rpc key. fallback to ip rate limits"
// );
return self.rate_limit_by_ip(ip, origin, proxy_mode).await;
return self.rate_limit_public(ip, origin, proxy_mode).await;
}
return Err(err);
@ -1352,13 +1374,9 @@ impl Web3ProxyApp {
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
if authorization_checks.rpc_secret_key_id.is_none() {
trace!("unknown key. falling back to free limits");
return self.rate_limit_by_ip(ip, origin, proxy_mode).await;
return self.rate_limit_public(ip, origin, proxy_mode).await;
}
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self.user_semaphore(&authorization_checks, ip).await?;
let authorization = Authorization::try_new(
authorization_checks,
ip,
@ -1370,47 +1388,61 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits
if let Some(user_max_requests_per_period) = authorization.checks.max_requests_per_period {
if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter {
match rate_limiter
.throttle(
RegisteredUserRateLimitKey(authorization.checks.user_id, *ip),
Some(user_max_requests_per_period),
1,
)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
return Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
// trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
return Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)));
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
// trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
return Ok(RateLimitResult::RateLimited(authorization, None));
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing rpc_key");
if let Some(rate_limiter) = &self.frontend_premium_rate_limiter {
let key = RegisteredUserRateLimitKey(authorization.checks.user_id, *ip);
return Ok(RateLimitResult::Allowed(authorization, semaphore));
}
let mut x = deferred_redis_rate_limit(
authorization,
key,
Some(user_max_requests_per_period),
rate_limiter,
)
.await?;
if let RateLimitResult::RateLimited(authorization, retry_at) = x {
// rate limited by the user's key+ip. check to see if there are any limits available in the bonus premium pool
x = redis_rate_limit(
&self.bonus_frontend_premium_rate_limiter,
authorization,
retry_at,
None,
None,
)
.await?;
}
if let RateLimitResult::RateLimited(authorization, retry_at) = x {
// premium got rate limited too. check the bonus public pool
x = redis_rate_limit(
&self.bonus_frontend_public_rate_limiter,
authorization,
retry_at,
None,
None,
)
.await?;
}
if let RateLimitResult::Allowed(a, b) = x {
debug_assert!(b.is_none());
// only allow this rpc_key to run a limited amount of concurrent requests
let permit = self.permit_premium_concurrency(&a, ip).await?;
x = RateLimitResult::Allowed(a, permit)
}
debug_assert!(!matches!(x, RateLimitResult::UnknownKey));
return Ok(x);
} else {
// TODO: if no redis, rate limit with just a local cache?
}
}
Ok(RateLimitResult::Allowed(authorization, semaphore))
let permit = self.permit_premium_concurrency(&authorization, ip).await?;
Ok(RateLimitResult::Allowed(authorization, permit))
}
}
@ -1440,3 +1472,85 @@ impl Authorization {
Ok((a, s))
}
}
/// this fails open!
/// this never includes a semaphore! if you want one, add it after this call
/// if `max_requests_per_period` is none, the limit in the authorization is used
pub async fn deferred_redis_rate_limit<K>(
authorization: Authorization,
key: K,
max_requests_per_period: Option<u64>,
rate_limiter: &DeferredRateLimiter<K>,
) -> Web3ProxyResult<RateLimitResult>
where
K: Send + Sync + Copy + Clone + Display + Hash + Eq + PartialEq + 'static,
{
let max_requests_per_period =
max_requests_per_period.or(authorization.checks.max_requests_per_period);
let x = match rate_limiter.throttle(key, max_requests_per_period, 1).await {
Ok(DeferredRateLimitResult::Allowed) => RateLimitResult::Allowed(authorization, None),
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// TODO: emit a stat
// trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
RateLimitResult::RateLimited(authorization, Some(retry_at))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
// trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
RateLimitResult::RateLimited(authorization, None)
}
Err(err) => {
// internal error, not rate limit being hit
error!(?err, %key, "rate limiter is unhappy. allowing key");
RateLimitResult::Allowed(authorization, None)
}
};
Ok(x)
}
/// this never includes a semaphore! if you want one, add it after this call
/// if `max_requests_per_period` is none, the limit in the authorization is used
pub async fn redis_rate_limit(
rate_limiter: &Option<RedisRateLimiter>,
authorization: Authorization,
mut retry_at: Option<Instant>,
label: Option<&str>,
max_requests_per_period: Option<u64>,
) -> Web3ProxyResult<RateLimitResult> {
let max_requests_per_period =
max_requests_per_period.or(authorization.checks.max_requests_per_period);
let x = if let Some(rate_limiter) = rate_limiter {
match rate_limiter
.throttle_label(label.unwrap_or_default(), max_requests_per_period, 1)
.await
{
Ok(RedisRateLimitResult::Allowed(..)) => RateLimitResult::Allowed(authorization, None),
Ok(RedisRateLimitResult::RetryAt(new_retry_at, ..)) => {
retry_at = retry_at.min(Some(new_retry_at));
RateLimitResult::RateLimited(authorization, retry_at)
}
Ok(RedisRateLimitResult::RetryNever) => {
RateLimitResult::RateLimited(authorization, retry_at)
}
Err(err) => {
// this an internal error of some kind, not the rate limit being hit
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
RateLimitResult::Allowed(authorization, None)
}
}
} else {
RateLimitResult::Allowed(authorization, None)
};
Ok(x)
}

@ -85,7 +85,7 @@ async fn _proxy_web3_rpc(
.await
.map_err(|e| e.into_response_with_id(first_id))?;
let mut response = (status_code, Json(response)).into_response();
let mut response = (status_code, response).into_response();
// TODO: DRY this up. it is the same code for public and private queries
let response_headers = response.headers_mut();
@ -280,7 +280,7 @@ async fn _proxy_web3_rpc_with_key(
.await
.map_err(|e| e.into_response_with_id(first_id))?;
let mut response = (status_code, Json(response)).into_response();
let mut response = (status_code, response).into_response();
let headers = response.headers_mut();

@ -4,11 +4,11 @@
use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata};
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::jsonrpc::JsonRpcId;
use crate::jsonrpc::{self, JsonRpcId};
use crate::{
app::Web3ProxyApp,
errors::Web3ProxyResult,
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest},
};
use axum::headers::{Origin, Referer, UserAgent};
use axum::{
@ -323,11 +323,11 @@ async fn websocket_proxy_web3_rpc(
response_sender: &mpsc::Sender<Message>,
subscription_count: &AtomicU64,
subscriptions: &AsyncRwLock<HashMap<U64, AbortHandle>>,
) -> (Box<RawValue>, Web3ProxyResult<JsonRpcForwardedResponseEnum>) {
) -> (Box<RawValue>, Web3ProxyResult<jsonrpc::Response>) {
let response_id = json_request.id.clone();
// TODO: move this to a seperate function so we can use the try operator
let response: Web3ProxyResult<JsonRpcForwardedResponseEnum> = match &json_request.method[..] {
let response: Web3ProxyResult<jsonrpc::Response> = match &json_request.method[..] {
"eth_subscribe" => {
// TODO: how can we subscribe with proxy_mode?
match app
@ -340,7 +340,10 @@ async fn websocket_proxy_web3_rpc(
.await
{
Ok((handle, response)) => {
if let Some(subscription_id) = response.result.clone() {
if let jsonrpc::Payload::Success {
result: ref subscription_id,
} = response.payload
{
let mut x = subscriptions.write().await;
let key: U64 = serde_json::from_str(subscription_id.get()).unwrap();
@ -389,9 +392,12 @@ async fn websocket_proxy_web3_rpc(
};
let response =
JsonRpcForwardedResponse::from_value(json!(partial_response), response_id.clone());
jsonrpc::ParsedResponse::from_value(json!(partial_response), response_id.clone());
// TODO: better way of passing in ParsedResponse
let response = jsonrpc::SingleResponse::Parsed(response);
request_metadata.add_response(&response);
let response = response.parsed().await.expect("Response already parsed");
Ok(response.into())
}
@ -441,7 +447,7 @@ async fn handle_socket_payload(
};
let response_str = match response {
Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"),
Ok(x) => x.to_json_string().await?,
Err(err) => {
let (_, response_data) = err.as_response_parts();

@ -0,0 +1,40 @@
use axum::{body::BoxBody, response::IntoResponse};
use bytes::Bytes;
use futures::StreamExt;
use http::Response;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::stream::Stream;
struct SizingBody<B> {
inner: B,
request_metadata: RequestMetadata,
}
impl<B> SizingBody<B> {
fn new(inner: B) -> Self {
Self { inner, size: 0 }
}
}
impl<B> Stream for SizingBody<B>
where
B: Stream<Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>> + Unpin,
{
type Item = Result<Bytes, Box<dyn std::error::Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
self.size += chunk.len();
Poll::Ready(Some(Ok(chunk)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
println!("Final response size: {}", self.size);
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}

@ -1,9 +1,11 @@
use crate::app::Web3ProxyApp;
use crate::errors::Web3ProxyError;
use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod};
use crate::response_cache::JsonRpcResponseEnum;
use axum::response::Response;
use axum::body::StreamBody;
use axum::response::{IntoResponse, Response as AxumResponse};
use axum::Json;
use bytes::{Bytes, BytesMut};
use derive_more::From;
use ethers::providers::ProviderError;
use futures_util::stream::{self, StreamExt};
use futures_util::TryStreamExt;
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_inline_default::serde_inline_default;
@ -11,13 +13,367 @@ use serde_json::json;
use serde_json::value::{to_raw_value, RawValue};
use std::borrow::Cow;
use std::fmt;
use std::marker::PhantomData;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::time::sleep;
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod};
use crate::response_cache::JsonRpcResponseEnum;
pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static;
pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send;
// TODO: borrow values to avoid allocs if possible
#[derive(Debug, Serialize)]
pub struct ParsedResponse<T = Arc<RawValue>> {
jsonrpc: String,
id: Option<Box<RawValue>>,
#[serde(flatten)]
pub payload: Payload<T>,
}
impl ParsedResponse {
pub fn from_value(value: serde_json::Value, id: Box<RawValue>) -> Self {
let result = serde_json::value::to_raw_value(&value)
.expect("this should not fail")
.into();
Self::from_result(result, Some(id))
}
}
impl ParsedResponse<Arc<RawValue>> {
pub fn from_response_data(data: JsonRpcResponseEnum<Arc<RawValue>>, id: Box<RawValue>) -> Self {
match data {
JsonRpcResponseEnum::NullResult => {
let x: Box<RawValue> = Default::default();
Self::from_result(Arc::from(x), Some(id))
}
JsonRpcResponseEnum::RpcError { error_data, .. } => Self::from_error(error_data, id),
JsonRpcResponseEnum::Result { value, .. } => Self::from_result(value, Some(id)),
}
}
}
impl<T> ParsedResponse<T> {
pub fn from_result(result: T, id: Option<Box<RawValue>>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id,
payload: Payload::Success { result },
}
}
pub fn from_error(error: JsonRpcErrorData, id: Box<RawValue>) -> Self {
Self {
jsonrpc: "2.0".to_string(),
id: Some(id),
payload: Payload::Error { error },
}
}
pub fn result(&self) -> Option<&T> {
match &self.payload {
Payload::Success { result } => Some(result),
Payload::Error { .. } => None,
}
}
pub fn into_result(self) -> Web3ProxyResult<T> {
match self.payload {
Payload::Success { result } => Ok(result),
Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)),
}
}
}
impl<'de, T> Deserialize<'de> for ParsedResponse<T>
where
T: de::DeserializeOwned,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ResponseVisitor<T>(PhantomData<T>);
impl<'de, T> de::Visitor<'de> for ResponseVisitor<T>
where
T: de::DeserializeOwned,
{
type Value = ParsedResponse<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a valid jsonrpc 2.0 response object")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
let mut jsonrpc = None;
// response & error
let mut id = None;
// only response
let mut result = None;
// only error
let mut error = None;
while let Some(key) = map.next_key()? {
match key {
"jsonrpc" => {
if jsonrpc.is_some() {
return Err(de::Error::duplicate_field("jsonrpc"));
}
let value = map.next_value()?;
if value != "2.0" {
return Err(de::Error::invalid_value(
de::Unexpected::Str(value),
&"2.0",
));
}
jsonrpc = Some(value);
}
"id" => {
if id.is_some() {
return Err(de::Error::duplicate_field("id"));
}
let value: Box<RawValue> = map.next_value()?;
id = Some(value);
}
"result" => {
if result.is_some() {
return Err(de::Error::duplicate_field("result"));
}
let value: T = map.next_value()?;
result = Some(value);
}
"error" => {
if error.is_some() {
return Err(de::Error::duplicate_field("Error"));
}
let value: JsonRpcErrorData = map.next_value()?;
error = Some(value);
}
key => {
return Err(de::Error::unknown_field(
key,
&["jsonrpc", "id", "result", "error"],
));
}
}
}
// jsonrpc version must be present in all responses
let jsonrpc = jsonrpc
.ok_or_else(|| de::Error::missing_field("jsonrpc"))?
.to_string();
let payload = match (result, error) {
(Some(result), None) => Payload::Success { result },
(None, Some(error)) => Payload::Error { error },
_ => {
return Err(de::Error::custom(
"response must be either a success or error object",
))
}
};
Ok(ParsedResponse {
jsonrpc,
id,
payload,
})
}
}
deserializer.deserialize_map(ResponseVisitor(PhantomData))
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum Payload<T> {
Success { result: T },
Error { error: JsonRpcErrorData },
}
#[derive(Debug)]
pub struct StreamResponse {
buffer: Bytes,
response: reqwest::Response,
request_metadata: Arc<RequestMetadata>,
}
impl StreamResponse {
// TODO: error handing
pub async fn read<T>(self) -> Result<ParsedResponse<T>, ProviderError>
where
T: de::DeserializeOwned,
{
let mut buffer = BytesMut::with_capacity(self.buffer.len());
buffer.extend_from_slice(&self.buffer);
buffer.extend_from_slice(&self.response.bytes().await?);
let parsed = serde_json::from_slice(&buffer)?;
Ok(parsed)
}
}
impl IntoResponse for StreamResponse {
fn into_response(self) -> axum::response::Response {
let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) })
.chain(self.response.bytes_stream())
.map_ok(move |x| {
let len = x.len();
self.request_metadata.add_response(len);
x
});
let body = StreamBody::new(stream);
body.into_response()
}
}
#[derive(Debug)]
pub enum SingleResponse<T = Arc<RawValue>> {
Parsed(ParsedResponse<T>),
Stream(StreamResponse),
}
impl<T> SingleResponse<T>
where
T: de::DeserializeOwned + Serialize,
{
// TODO: threshold from configs
// TODO: error handling
pub async fn read_if_short(
mut response: reqwest::Response,
nbytes: u64,
request_metadata: Arc<RequestMetadata>,
) -> Result<SingleResponse<T>, ProviderError> {
match response.content_length() {
// short
Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?),
// long
Some(_) => Ok(Self::Stream(StreamResponse {
buffer: Bytes::new(),
response,
request_metadata,
})),
None => {
let mut buffer = BytesMut::new();
while (buffer.len() as u64) < nbytes {
match response.chunk().await? {
Some(chunk) => {
buffer.extend_from_slice(&chunk);
}
None => return Ok(Self::from_bytes(buffer.freeze())?),
}
}
let buffer = buffer.freeze();
Ok(Self::Stream(StreamResponse {
buffer,
response,
request_metadata,
}))
}
}
}
fn from_bytes(buf: Bytes) -> Result<Self, serde_json::Error> {
let val = serde_json::from_slice(&buf)?;
Ok(Self::Parsed(val))
}
// TODO: error handling
pub async fn parsed(self) -> Result<ParsedResponse<T>, ProviderError> {
match self {
Self::Parsed(resp) => Ok(resp),
Self::Stream(resp) => resp.read().await,
}
}
pub fn num_bytes(&self) -> usize {
match self {
Self::Parsed(response) => serde_json::to_string(response)
.expect("this should always serialize")
.len(),
Self::Stream(response) => match response.response.content_length() {
Some(len) => len as usize,
None => 0,
},
}
}
}
impl<T> From<ParsedResponse<T>> for SingleResponse<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Parsed(response)
}
}
impl<T> IntoResponse for SingleResponse<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Parsed(resp) => Json(resp).into_response(),
Self::Stream(resp) => resp.into_response(),
}
}
}
#[derive(Debug)]
pub enum Response<T = Arc<RawValue>> {
Single(SingleResponse<T>),
Batch(Vec<ParsedResponse<T>>),
}
impl Response<Arc<RawValue>> {
pub async fn to_json_string(self) -> Result<String, ProviderError> {
let x = match self {
Self::Single(resp) => {
// TODO: handle streaming differently?
let parsed = resp.parsed().await?;
serde_json::to_string(&parsed)
}
Self::Batch(resps) => serde_json::to_string(&resps),
};
let x = x.expect("to_string should always work");
Ok(x)
}
}
impl<T> From<ParsedResponse<T>> for Response<T> {
fn from(response: ParsedResponse<T>) -> Self {
Self::Single(SingleResponse::Parsed(response))
}
}
impl<T> IntoResponse for Response<T>
where
T: Serialize,
{
fn into_response(self) -> axum::response::Response {
match self {
Self::Single(resp) => resp.into_response(),
Self::Batch(resps) => Json(resps).into_response(),
}
}
}
// TODO: &str here instead of String should save a lot of allocations
// TODO: generic type for params?
#[serde_inline_default]
@ -120,7 +476,7 @@ impl JsonRpcRequestEnum {
app: &Web3ProxyApp,
authorization: &Arc<Authorization>,
duration: Duration,
) -> Result<(), Response> {
) -> Result<(), AxumResponse> {
let err_id = match self.validate() {
None => return Ok(()),
Some(x) => x,
@ -269,6 +625,14 @@ pub struct JsonRpcErrorData {
pub data: Option<serde_json::Value>,
}
impl JsonRpcErrorData {
pub fn num_bytes(&self) -> usize {
serde_json::to_string(self)
.expect("should always serialize")
.len()
}
}
impl From<&'static str> for JsonRpcErrorData {
fn from(value: &'static str) -> Self {
Self {
@ -397,6 +761,26 @@ pub enum JsonRpcForwardedResponseEnum {
mod tests {
use super::*;
#[test]
fn deserialize_response() {
let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#;
let obj: ParsedResponse = serde_json::from_str(json).unwrap();
assert!(matches!(obj.payload, Payload::Success { .. }));
}
#[test]
fn serialize_response() {
let obj = ParsedResponse {
jsonrpc: "2.0".to_string(),
id: None,
payload: Payload::Success {
result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(),
},
};
let json = serde_json::to_string(&obj).unwrap();
assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#);
}
#[test]
fn this_deserialize_single() {
let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#;

@ -1,4 +1,8 @@
use crate::{block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData};
use crate::{
block_number::BlockNumAndHash,
errors::{Web3ProxyError, Web3ProxyResult},
jsonrpc::{self, JsonRpcErrorData},
};
use derive_more::From;
use ethers::{
providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError},
@ -6,6 +10,7 @@ use ethers::{
};
use hashbrown::hash_map::DefaultHashBuilder;
use moka::future::Cache;
use parking_lot::Mutex;
use serde_json::value::RawValue;
use std::{
hash::{BuildHasher, Hash, Hasher},
@ -135,6 +140,35 @@ impl JsonRpcResponseEnum<Arc<RawValue>> {
}
}
impl TryFrom<Web3ProxyResult<jsonrpc::SingleResponse>> for JsonRpcResponseEnum<Arc<RawValue>> {
type Error = Web3ProxyError;
fn try_from(response: Web3ProxyResult<jsonrpc::SingleResponse>) -> Result<Self, Self::Error> {
match response {
Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload {
jsonrpc::Payload::Success { result } => {
let num_bytes = result.get().len() as u32;
Ok(JsonRpcResponseEnum::Result {
value: result,
num_bytes,
})
}
jsonrpc::Payload::Error { error } => {
let num_bytes = error.num_bytes() as u32;
Ok(JsonRpcResponseEnum::RpcError {
error_data: error,
// TODO: this double serializes
num_bytes,
})
}
},
Ok(jsonrpc::SingleResponse::Stream(stream)) => {
Err(Web3ProxyError::StreamResponse(Mutex::new(Some(stream))))
}
Err(err) => err.try_into(),
}
}
}
impl From<serde_json::Value> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: serde_json::Value) -> Self {
let value = RawValue::from_string(value.to_string()).unwrap();

@ -108,20 +108,6 @@ pub struct RankedRpcs {
}
impl RankedRpcs {
// /// useful when Web3Rpcs does not track the head block
// pub fn from_all(rpcs: &Web3Rpcs) -> Self {
// let inner = vec![(
// RpcRanking::default(),
// rpcs.by_name
// .read()
// .values()
// .cloned()
// .collect::<Vec<Arc<_>>>(),
// )];
// todo!()
// }
pub fn from_votes(
min_synced_rpcs: usize,
min_sum_soft_limit: u32,
@ -915,11 +901,3 @@ impl ConsensusFinder {
.max()
}
}
#[cfg(test)]
mod test {
// #[test]
// fn test_simplest_case_consensus_head_connections() {
// todo!();
// }
}

@ -6,16 +6,16 @@ use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::authorization::RequestMetadata;
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use counter::Counter;
use derive_more::From;
use ethers::prelude::{TxHash, U64};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use futures::StreamExt;
use hashbrown::HashMap;
use itertools::Itertools;
use moka::future::CacheBuilder;
@ -371,7 +371,7 @@ impl Web3Rpcs {
method: &str,
params: &P,
max_wait: Option<Duration>,
) -> Result<Box<RawValue>, Web3ProxyError> {
) -> Result<Arc<RawValue>, Web3ProxyError> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300));
@ -380,20 +380,23 @@ impl Web3Rpcs {
let responses = active_request_handles
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Result<Box<RawValue>, Web3ProxyError>, Web3ProxyError> =
timeout(
max_wait,
active_request_handle
.request(method, &json!(&params))
.map_err(Web3ProxyError::EthersProvider),
)
let result: Result<Result<Arc<RawValue>, Web3ProxyError>, Web3ProxyError> =
timeout(max_wait, async {
match active_request_handle.request(method, &json!(&params)).await {
Ok(response) => match response.parsed().await {
Ok(parsed) => parsed.into_result(),
Err(err) => Err(Web3ProxyError::EthersProvider(err)),
},
Err(err) => Err(Web3ProxyError::EthersProvider(err)),
}
})
.await
.map_err(Web3ProxyError::from);
result.flatten()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<Box<RawValue>, Web3ProxyError>>>()
.collect::<Vec<Result<Arc<RawValue>, Web3ProxyError>>>()
.await;
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
@ -443,7 +446,7 @@ impl Web3Rpcs {
async fn _best_available_rpc(
&self,
authorization: &Arc<Authorization>,
request_metadata: &Arc<RequestMetadata>,
error_handler: Option<RequestErrorHandler>,
potential_rpcs: &[Arc<Web3Rpc>],
skip: &mut Vec<Arc<Web3Rpc>>,
@ -463,7 +466,7 @@ impl Web3Rpcs {
// just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits
// TODO: what error_handler?
match faster_rpc
.try_request_handle(authorization, error_handler)
.try_request_handle(request_metadata, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -503,7 +506,7 @@ impl Web3Rpcs {
#[instrument(level = "trace")]
pub async fn wait_for_best_rpc(
&self,
request_metadata: Option<&Arc<RequestMetadata>>,
request_metadata: &Arc<RequestMetadata>,
skip_rpcs: &mut Vec<Arc<Web3Rpc>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
@ -514,18 +517,13 @@ impl Web3Rpcs {
let mut earliest_retry_at: Option<Instant> = None;
// TODO: pass db_conn to the "default" authorization for revert logging
let authorization = request_metadata
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
if self.watch_head_block.is_none() {
// if this group of servers is not watching the head block, we don't know what is "best" based on block height
// TODO: do this without cloning here
let potential_rpcs = self.by_name.read().values().cloned().collect::<Vec<_>>();
let x = self
._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs)
._best_available_rpc(request_metadata, error_handler, &potential_rpcs, skip_rpcs)
.await;
return Ok(x);
@ -569,7 +567,7 @@ impl Web3Rpcs {
match self
._best_available_rpc(
&authorization,
request_metadata,
error_handler,
&potential_rpcs,
skip_rpcs,
@ -632,9 +630,7 @@ impl Web3Rpcs {
potential_rpcs.clear();
}
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
if let Some(retry_at) = earliest_retry_at {
// TODO: log the server that retry_at came from
@ -660,7 +656,7 @@ impl Web3Rpcs {
// TODO: this is broken
pub async fn all_connections(
&self,
request_metadata: Option<&Arc<RequestMetadata>>,
request_metadata: &Arc<RequestMetadata>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
@ -691,10 +687,6 @@ impl Web3Rpcs {
trace!("all_rpcs: {:#?}", all_rpcs);
let authorization = request_metadata
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
for rpc in all_rpcs {
trace!("trying {}", rpc);
@ -714,7 +706,7 @@ impl Web3Rpcs {
}
// check rate limits and increment our connection counter
match rpc.try_request_handle(&authorization, error_level).await {
match rpc.try_request_handle(request_metadata, error_level).await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
trace!("{} is rate limited. skipping", rpc);
@ -752,9 +744,17 @@ impl Web3Rpcs {
params: &P,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
// TODO: no request_metadata means we won't have stats on this internal request.
self.request_with_metadata(method, params, None, max_wait, None, None)
.await
let request_metadata = RequestMetadata::new_internal(self.chain_id, method, params);
let response = self
.request_with_metadata(method, params, &request_metadata, max_wait, None, None)
.await?;
let parsed = response.parsed().await?;
match parsed.payload {
jsonrpc::Payload::Success { result } => Ok(result),
// TODO: confirm this error type is correct
jsonrpc::Payload::Error { error } => Err(error.into()),
}
}
/// Make a request with stat tracking.
@ -762,11 +762,11 @@ impl Web3Rpcs {
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
request_metadata: &Arc<RequestMetadata>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
@ -804,28 +804,24 @@ impl Web3Rpcs {
// TODO: look at backend_requests instead
let rpc = active_request_handle.clone_connection();
if let Some(request_metadata) = request_metadata {
request_metadata.backend_requests.lock().push(rpc.clone());
}
request_metadata.backend_requests.lock().push(rpc.clone());
let is_backup_response = rpc.backup;
match active_request_handle.request::<P, R>(method, params).await {
Ok(response) => {
// TODO: if there are multiple responses being aggregated, this will only use the last server's backup type
if let Some(request_metadata) = request_metadata {
request_metadata
.response_from_backup_rpc
.store(is_backup_response, Ordering::Relaxed);
request_metadata
.response_from_backup_rpc
.store(is_backup_response, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Relaxed);
request_metadata
.error_response
.store(false, Ordering::Relaxed);
}
request_metadata
.error_response
.store(false, Ordering::Relaxed);
return Ok(response);
}
@ -833,25 +829,21 @@ impl Web3Rpcs {
// TODO: if this is an error, do NOT return. continue to try on another server
let error = match JsonRpcErrorData::try_from(&error) {
Ok(x) => {
if let Some(request_metadata) = request_metadata {
request_metadata
.user_error_response
.store(true, Ordering::Relaxed);
}
request_metadata
.user_error_response
.store(true, Ordering::Relaxed);
x
}
Err(err) => {
warn!(?err, "error from {}", rpc);
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Relaxed);
request_metadata
.error_response
.store(true, Ordering::Relaxed);
request_metadata
.user_error_response
.store(false, Ordering::Relaxed);
}
request_metadata
.user_error_response
.store(false, Ordering::Relaxed);
last_provider_error = Some(error);
@ -980,9 +972,7 @@ impl Web3Rpcs {
);
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
select! {
_ = sleep_until(retry_at) => {
@ -997,26 +987,22 @@ impl Web3Rpcs {
}
}
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Relaxed);
}
request_metadata
.error_response
.store(true, Ordering::Relaxed);
break;
}
}
}
if let Some(err) = method_not_available_response {
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(false, Ordering::Relaxed);
request_metadata
.error_response
.store(false, Ordering::Relaxed);
request_metadata
.user_error_response
.store(true, Ordering::Relaxed);
}
request_metadata
.user_error_response
.store(true, Ordering::Relaxed);
// this error response is likely the user's fault
// TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature
@ -1092,13 +1078,13 @@ impl Web3Rpcs {
self: &Arc<Self>,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
request_metadata: &Arc<RequestMetadata>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_wait: Option<Duration>,
error_level: Option<RequestErrorHandler>,
max_sends: Option<usize>,
) -> Web3ProxyResult<Box<RawValue>> {
) -> Web3ProxyResult<Arc<RawValue>> {
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
let start = Instant::now();
@ -1121,26 +1107,24 @@ impl Web3Rpcs {
.await
{
Ok(active_request_handles) => {
if let Some(request_metadata) = request_metadata {
let mut only_backups_used = true;
let mut only_backups_used = true;
request_metadata.backend_requests.lock().extend(
active_request_handles.iter().map(|x| {
let rpc = x.clone_connection();
request_metadata.backend_requests.lock().extend(
active_request_handles.iter().map(|x| {
let rpc = x.clone_connection();
if !rpc.backup {
// TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more
only_backups_used = false;
}
if !rpc.backup {
// TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more
only_backups_used = false;
}
rpc
}),
);
rpc
}),
);
request_metadata
.response_from_backup_rpc
.store(only_backups_used, Ordering::Relaxed);
}
request_metadata
.response_from_backup_rpc
.store(only_backups_used, Ordering::Relaxed);
let x = self
.try_send_parallel_requests(
@ -1161,10 +1145,8 @@ impl Web3Rpcs {
"No servers in sync on! Retrying",
);
if let Some(request_metadata) = &request_metadata {
// TODO: if this times out, i think we drop this
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
// TODO: if this times out, i think we drop this
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
let max_sleep = if let Some(max_wait) = max_wait {
start + max_wait
@ -1188,9 +1170,7 @@ impl Web3Rpcs {
}
}
Err(Some(retry_at)) => {
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
}
request_metadata.no_servers.fetch_add(1, Ordering::Relaxed);
if let Some(max_wait) = max_wait {
if start.elapsed() > max_wait {
@ -1240,12 +1220,12 @@ impl Web3Rpcs {
&self,
method: &str,
params: &P,
request_metadata: Option<&Arc<RequestMetadata>>,
request_metadata: &Arc<RequestMetadata>,
max_wait: Option<Duration>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> Web3ProxyResult<R> {
let proxy_mode = request_metadata.map(|x| x.proxy_mode()).unwrap_or_default();
) -> Web3ProxyResult<jsonrpc::SingleResponse<R>> {
let proxy_mode = request_metadata.proxy_mode();
match proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
@ -1547,8 +1527,9 @@ mod tests {
assert!(rpcs.head_block_hash().is_none());
// all_backend_connections gives all non-backup servers regardless of sync status
let m = Arc::new(RequestMetadata::default());
assert_eq!(
rpcs.all_connections(None, None, None, None, None)
rpcs.all_connections(&m, None, None, None, None)
.await
.unwrap()
.len(),
@ -1556,9 +1537,10 @@ mod tests {
);
// best_synced_backend_connection which servers to be synced with the head block should not find any nodes
let m = Arc::new(RequestMetadata::default());
let x = rpcs
.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(head_block.number.as_ref().unwrap()),
None,
@ -1651,9 +1633,10 @@ mod tests {
assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap()));
// TODO: make sure the handle is for the expected rpc
let m = Arc::new(RequestMetadata::default());
assert!(matches!(
rpcs.wait_for_best_rpc(
None,
&m,
&mut vec![],
None,
None,
@ -1665,9 +1648,10 @@ mod tests {
));
// TODO: make sure the handle is for the expected rpc
let m = Arc::new(RequestMetadata::default());
assert!(matches!(
rpcs.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(&0.into()),
None,
@ -1679,9 +1663,10 @@ mod tests {
));
// TODO: make sure the handle is for the expected rpc
let m = Arc::new(RequestMetadata::default());
assert!(matches!(
rpcs.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(&1.into()),
None,
@ -1693,9 +1678,10 @@ mod tests {
));
// future block should not get a handle
let m = Arc::new(RequestMetadata::default());
let future_rpc = rpcs
.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(&2.into()),
None,
@ -1801,9 +1787,10 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let m = Arc::new(RequestMetadata::default());
let best_available_server = rpcs
.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(head_block.number()),
None,
@ -1819,9 +1806,10 @@ mod tests {
OpenRequestResult::Handle(_)
));
let m = Arc::new(RequestMetadata::default());
let _best_available_server_from_none = rpcs
.wait_for_best_rpc(
None,
&m,
&mut vec![],
None,
None,
@ -1832,9 +1820,10 @@ mod tests {
// assert_eq!(best_available_server, best_available_server_from_none);
let m = Arc::new(RequestMetadata::default());
let best_archive_server = rpcs
.wait_for_best_rpc(
None,
&m,
&mut vec![],
Some(&1.into()),
None,
@ -1961,8 +1950,9 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let m = Arc::new(RequestMetadata::default());
let head_connections = rpcs
.all_connections(None, Some(block_2.number()), None, None, None)
.all_connections(&m, Some(block_2.number()), None, None, None)
.await;
debug!("head_connections: {:#?}", head_connections);
@ -1973,8 +1963,9 @@ mod tests {
"wrong number of connections"
);
let m = Arc::new(RequestMetadata::default());
let all_connections = rpcs
.all_connections(None, Some(block_1.number()), None, None, None)
.all_connections(&m, Some(block_1.number()), None, None, None)
.await;
debug!("all_connections: {:#?}", all_connections);
@ -1985,7 +1976,8 @@ mod tests {
"wrong number of connections"
);
let all_connections = rpcs.all_connections(None, None, None, None, None).await;
let m = Arc::new(RequestMetadata::default());
let all_connections = rpcs.all_connections(&m, None, None, None, None).await;
debug!("all_connections: {:#?}", all_connections);

@ -1,12 +1,12 @@
//! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider};
use super::provider::{connect_ws, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, Web3ProxyJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::frontend::authorization::RequestMetadata;
use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
@ -39,7 +39,8 @@ pub struct Web3Rpc {
pub db_conn: Option<DatabaseConnection>,
pub subscribe_txs: bool,
/// most all requests prefer use the http_provider
pub(super) http_provider: Option<EthersHttpProvider>,
pub(super) http_client: Option<reqwest::Client>,
pub(super) http_url: Option<Url>,
/// the websocket url is only used for subscriptions
pub(super) ws_url: Option<Url>,
/// the websocket provider is only used for subscriptions
@ -164,14 +165,13 @@ impl Web3Rpc {
let median_request_latency = RollingQuantileLatency::spawn_median(1_000).await;
let http_provider = if let Some(http_url) = config.http_url {
let (http_url, http_client) = if let Some(http_url) = config.http_url {
let http_url = http_url.parse::<Url>()?;
Some(connect_http(http_url, http_client, block_interval)?)
// TODO: check the provider is on the right chain
// TODO: double-check not missing anything from connect_http()
let http_client = http_client.unwrap_or_default();
(Some(http_url), Some(http_client))
} else {
None
(None, None)
};
let ws_url = if let Some(ws_url) = config.ws_url {
@ -194,7 +194,8 @@ impl Web3Rpc {
hard_limit,
hard_limit_until: Some(hard_limit_until),
head_block: Some(head_block),
http_provider,
http_url,
http_client,
max_head_block_age,
name,
peak_latency: Some(peak_latency),
@ -916,7 +917,7 @@ impl Web3Rpc {
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
.await?;
}
} else if self.http_provider.is_some() {
} else if self.http_client.is_some() {
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
// TODO: is 1/2 the block time okay?
let mut i = interval(self.block_interval / 2);
@ -960,7 +961,7 @@ impl Web3Rpc {
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request_metadata: &Arc<RequestMetadata>,
max_wait: Option<Duration>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestHandle> {
@ -969,7 +970,10 @@ impl Web3Rpc {
let max_wait_until = max_wait.map(|x| Instant::now() + x);
loop {
match self.try_request_handle(authorization, error_handler).await {
match self
.try_request_handle(request_metadata, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
@ -1011,7 +1015,7 @@ impl Web3Rpc {
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
request_metadata: &Arc<RequestMetadata>,
error_handler: Option<RequestErrorHandler>,
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: if websocket is reconnecting, return an error?
@ -1062,7 +1066,7 @@ impl Web3Rpc {
};
let handle =
OpenRequestHandle::new(authorization.clone(), self.clone(), error_handler).await;
OpenRequestHandle::new(request_metadata.clone(), self.clone(), error_handler).await;
Ok(handle.into())
}
@ -1084,17 +1088,20 @@ impl Web3Rpc {
self: &Arc<Self>,
method: &str,
params: &P,
authorization: &Arc<Authorization>,
request_metadata: &Arc<RequestMetadata>,
error_handler: Option<RequestErrorHandler>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<R> {
let handle = self
.wait_for_request_handle(authorization, max_wait, error_handler)
.wait_for_request_handle(request_metadata, max_wait, error_handler)
.await?;
let x = handle.request::<P, R>(method, params).await?;
Ok(x)
let response = handle.request::<P, R>(method, params).await?;
let parsed = response.parsed().await?;
match parsed.payload {
jsonrpc::Payload::Success { result } => Ok(result),
jsonrpc::Payload::Error { error } => Err(error.into()),
}
}
}
@ -1108,7 +1115,7 @@ impl Hash for Web3Rpc {
self.name.hash(state);
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
self.http_provider.as_ref().map(|x| x.url()).hash(state);
self.http_url.hash(state);
// TODO: figure out how to get the url for the ws provider
// self.ws_provider.map(|x| x.url()).hash(state);

@ -1,8 +1,8 @@
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata};
use crate::globals::{global_db_conn, DB_CONN};
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData};
use chrono::Utc;
use derive_more::From;
use entities::revert_log;
@ -30,7 +30,7 @@ pub enum OpenRequestResult {
/// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible
#[derive(Debug)]
pub struct OpenRequestHandle {
authorization: Arc<Authorization>,
request_metadata: Arc<RequestMetadata>,
error_handler: RequestErrorHandler,
rpc: Arc<Web3Rpc>,
}
@ -133,7 +133,7 @@ impl Drop for OpenRequestHandle {
impl OpenRequestHandle {
pub async fn new(
authorization: Arc<Authorization>,
request_metadata: Arc<RequestMetadata>,
rpc: Arc<Web3Rpc>,
error_handler: Option<RequestErrorHandler>,
) -> Self {
@ -146,7 +146,7 @@ impl OpenRequestHandle {
let error_handler = error_handler.unwrap_or_default();
Self {
authorization,
request_metadata,
error_handler,
rpc,
}
@ -169,13 +169,15 @@ impl OpenRequestHandle {
self,
method: &str,
params: &P,
) -> Result<R, ProviderError> {
) -> Result<jsonrpc::SingleResponse<R>, ProviderError> {
// TODO: use tracing spans
// TODO: including params in this log is way too verbose
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
match self.authorization.authorization_type {
let authorization = &self.request_metadata.authorization;
match &authorization.authorization_type {
AuthorizationType::Frontend => {
self.rpc
.external_requests
@ -194,10 +196,37 @@ impl OpenRequestHandle {
// TODO: replace ethers-rs providers with our own that supports streaming the responses
// TODO: replace ethers-rs providers with our own that handles "id" being null
let response: Result<R, _> = if let Some(ref p) = self.rpc.http_provider {
p.request(method, params).await
let response: Result<jsonrpc::SingleResponse<R>, _> = if let (
Some(ref url),
Some(ref client),
) =
(&self.rpc.http_url, &self.rpc.http_client)
{
let params: serde_json::Value = serde_json::to_value(params)?;
let request = jsonrpc::JsonRpcRequest::new(
// TODO: proper id
jsonrpc::JsonRpcId::Number(1),
method.to_string(),
params,
)
.expect("request creation cannot fail");
match client.post(url.clone()).json(&request).send().await {
// TODO: threshold from configs
Ok(response) => {
jsonrpc::SingleResponse::read_if_short(
response,
1024,
self.request_metadata.clone(),
)
.await
}
Err(err) => Err(err.into()),
}
} else if let Some(p) = self.rpc.ws_provider.load().as_ref() {
p.request(method, params).await
p.request(method, params)
.await
// TODO: Id here
.map(|result| jsonrpc::ParsedResponse::from_result(result, None).into())
} else {
return Err(ProviderError::CustomError(
"no provider configured!".to_string(),
@ -227,7 +256,8 @@ impl OpenRequestHandle {
// trace!(%method, "skipping save on revert");
RequestErrorHandler::TraceLevel
} else if DB_CONN.read().await.is_ok() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
let log_revert_chance =
self.request_metadata.authorization.checks.log_revert_chance;
if log_revert_chance == 0 {
// trace!(%method, "no chance. skipping save on revert");
@ -385,7 +415,7 @@ impl OpenRequestHandle {
Ok(params) => {
// spawn saving to the database so we don't slow down the request
// TODO: log if this errors
let f = self.authorization.clone().save_revert(method, params.0 .0);
let f = authorization.clone().save_revert(method, params.0 .0);
tokio::spawn(f);
}

@ -539,13 +539,8 @@ impl BufferedRpcQueryStats {
/// There are often multiple copies if a request is being sent to multiple servers in parallel
impl RpcQueryStats {
fn try_from_metadata(mut metadata: RequestMetadata) -> Web3ProxyResult<Self> {
let mut authorization = metadata.authorization.take();
if authorization.is_none() {
authorization = Some(Arc::new(Authorization::internal()?));
}
let authorization = authorization.expect("Authorization will always be set");
// TODO: do this without a clone
let authorization = metadata.authorization.clone();
let archive_request = metadata.archive_request.load(Ordering::Relaxed);

@ -44,8 +44,6 @@ async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "wait_for_sync=debug");
}
// todo!("set up tracing");
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();

@ -187,7 +187,7 @@ impl MigrateStatsToV2SubCommand {
// Create RequestMetadata
let request_metadata = RequestMetadata {
archive_request: x.archive_request.into(),
authorization: Some(authorization.clone()),
authorization: authorization.clone(),
backend_requests: Mutex::new(backend_rpcs),
chain_id,
error_response: x.error_response.into(),