add optional kafka feature

This commit is contained in:
Bryan Stitt 2023-03-03 01:39:50 +00:00
parent a54e33f598
commit 3098791ad9
12 changed files with 356 additions and 52 deletions

93
Cargo.lock generated

@ -2708,6 +2708,18 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]]
name = "libz-sys"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.8"
@ -2997,6 +3009,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799"
dependencies = [
"proc-macro-crate 1.3.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
@ -3677,6 +3710,36 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "rdkafka"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7c5d6d17442bcb9f943aae96d67d98c6d36af60442dd5da62aaa7fcbb25c48"
dependencies = [
"futures-channel",
"futures-util",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
]
[[package]]
name = "rdkafka-sys"
version = "4.3.0+1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4"
dependencies = [
"libc",
"libz-sys",
"num_enum",
"pkg-config",
]
[[package]]
name = "redis"
version = "0.22.3"
@ -3895,6 +3958,28 @@ dependencies = [
"syn",
]
[[package]]
name = "rmp"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5b13be192e0220b8afb7222aa5813cb62cc269ebb5cac346ca6487681d2913e"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rsa"
version = "0.6.1"
@ -5579,6 +5664,12 @@ dependencies = [
"serde",
]
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@ -5755,9 +5846,11 @@ dependencies = [
"parking_lot 0.12.1",
"prettytable",
"proctitle",
"rdkafka",
"redis-rate-limiter",
"regex",
"reqwest",
"rmp-serde",
"rustc-hash",
"sentry",
"serde",

@ -17,6 +17,8 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
ENV PATH /root/.foundry/bin:$PATH
RUN curl -L https://foundry.paradigm.xyz | bash && foundryup
RUN apt-get update && apt-get install librdkafka && rm -rf /var/lib/apt/lists/*
# copy the application
COPY . .
@ -32,6 +34,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
cargo install \
--locked \
--no-default-features \
--features rdkafka \
--profile faster_release \
--root /opt/bin \
--path ./web3_proxy

@ -393,6 +393,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] rename "concurrent" requests to "parallel" requests
- [ ] minimum allowed query_start on query_user_stats
- [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100
- [ ] if kafka fails to connect at the start, automatically reconnect
- [ ] during shutdown, mark the proxy unhealthy and send unsubscribe responses for any open websocket subscriptions
- [ ] some chains still use total_difficulty. have total_difficulty be used only if the chain needs it
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header

@ -56,8 +56,10 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
rdkafka = { version = "0.29.0", optional = true }
regex = "1.7.1"
reqwest = { version = "0.11.14", default-features = false, features = ["json", "tokio-rustls"] }
rmp-serde = "1.1.1"
rustc-hash = "1.1.0"
sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.152", features = [] }

@ -37,6 +37,10 @@ use migration::sea_orm::{
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
#[cfg(rdkafka)]
use rdkafka::message::{Header, OwnedHeaders};
#[cfg(rdkafka)]
use rdkafka::producer::FutureRecord;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -179,6 +183,7 @@ pub struct AuthorizationChecks {
pub log_revert_chance: f64,
/// if true, transactions are broadcast to private mempools. They will still be public on the blockchain!
pub private_txs: bool,
pub proxy_mode: ProxyMode,
}
/// Simple wrapper so that we can keep track of read only connections.
@ -225,6 +230,11 @@ pub struct Web3ProxyApp {
pub bearer_token_semaphores:
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub stat_sender: Option<flume::Sender<Web3ProxyStat>>,
#[cfg(rdkafka)]
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
#[cfg(not(rdkafka))]
pub kafka_producer: Option<()>,
}
/// flatten a JoinError into an anyhow error
@ -457,6 +467,29 @@ impl Web3ProxyApp {
warn!("no database. some features will be disabled");
};
// connect to kafka for logging requests from the /debug/ urls
#[cfg(rdkafka)]
let mut kafka_producer: Option<rdkafka::producer::FutureProducer> = None;
#[cfg(not(rdkafka))]
let kafka_producer: Option<()> = None;
#[cfg(rdkafka)]
if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() {
match rdkafka::ClientConfig::new()
.set("bootstrap.servers", kafka_brokers)
.set("message.timeout.ms", "5000")
.create()
{
Ok(k) => kafka_producer = Some(k),
Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err),
}
}
#[cfg(not(rdkafka))]
if top_config.app.kafka_urls.is_some() {
warn!("rdkafka rust feature is not enabled!");
}
// TODO: do this during apply_config so that we can change redis url while running
// create a connection pool for redis
// a failure to connect does NOT block the application from starting
@ -680,6 +713,7 @@ impl Web3ProxyApp {
config: top_config.app.clone(),
balanced_rpcs,
http_client,
kafka_producer,
private_rpcs,
response_cache,
watch_consensus_head_receiver,
@ -943,7 +977,6 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
proxy_mode: ProxyMode,
) -> Result<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
// trace!(?request, "proxy_web3_rpc");
@ -956,7 +989,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, request, proxy_mode, None),
self.proxy_cached_request(&authorization, request, None),
)
.await??;
@ -965,7 +998,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_requests(&authorization, requests, proxy_mode),
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??;
@ -982,7 +1015,6 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
proxy_mode: ProxyMode,
) -> Result<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1002,12 +1034,7 @@ impl Web3ProxyApp {
requests
.into_iter()
.map(|request| {
self.proxy_cached_request(
authorization,
request,
proxy_mode,
Some(head_block_num),
)
self.proxy_cached_request(authorization, request, Some(head_block_num))
})
.collect::<Vec<_>>(),
)
@ -1062,13 +1089,62 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
proxy_mode: ProxyMode,
head_block_num: Option<U64>,
) -> Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
// trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
#[cfg(rdkafka)]
let mut kafka_stuff = None;
#[cfg(rdkafka)]
if let Some(kafka_producer) = self.kafka_producer.clone() {
let request_bytes = rmp_serde::to_vec(&request)?;
let rpc_secret_key_id = authorization
.checks
.rpc_secret_key_id
.map(|x| x.get())
.unwrap_or_default();
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)
.context("failed serializing kafka key")
.unwrap();
let request_hash = Some(keccak256(&request_bytes));
// the third item is added with the response
let kafka_headers = OwnedHeaders::new_with_capacity(3)
.insert(Header {
key: "request_hash",
value: request_hash.as_ref(),
})
.insert(Header {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
});
// save the key and headers for when we log the response
kafka_stuff = Some((kafka_key.clone(), kafka_headers.clone()));
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to("proxy_rpc_request")
.key(&kafka_key)
.payload(&request_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
if let Err((err, msg)) = produce_future.await {
error!("produce kafka request log: {}. {:#?}", err, msg);
}
};
tokio::spawn(f);
}
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out?
let request_id = request.id.clone();
@ -1206,7 +1282,6 @@ impl Web3ProxyApp {
let mut response = self
.balanced_rpcs
.try_proxy_connection(
proxy_mode,
authorization,
request,
Some(&request_metadata),
@ -1253,9 +1328,9 @@ impl Web3ProxyApp {
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// TODO: how should we handle private_mode here?
let default_num = match proxy_mode {
let default_num = match authorization.checks.proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best => Some(4),
ProxyMode::Best | ProxyMode::Debug => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
@ -1595,7 +1670,6 @@ impl Web3ProxyApp {
let mut response = self
.balanced_rpcs
.try_proxy_connection(
proxy_mode,
&authorization,
request,
Some(&request_metadata),
@ -1624,7 +1698,6 @@ impl Web3ProxyApp {
} else {
self.balanced_rpcs
.try_proxy_connection(
proxy_mode,
&authorization,
request,
Some(&request_metadata),
@ -1679,6 +1752,33 @@ impl Web3ProxyApp {
.context("stat_sender sending response stat")?;
}
#[cfg(rdkafka)]
if let Some((kafka_key, kafka_headers)) = kafka_stuff {
let kafka_producer = self
.kafka_producer
.clone()
.expect("if headers are set, producer must exist");
let response_bytes =
rmp_serde::to_vec(&response).context("failed msgpack serialize response")?;
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to("proxy_rpc_response")
.key(&kafka_key)
.payload(&response_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
if let Err((err, msg)) = produce_future.await {
error!("produce kafka request log: {}. {:#?}", err, msg);
}
};
tokio::spawn(f);
}
Ok((response, rpcs))
}
}

@ -103,6 +103,12 @@ pub struct AppConfig {
/// Restrict user registration.
/// None = no code needed
pub invite_code: Option<String>,
/// Optional kafka brokers
/// Used by /debug/:rpc_key urls for logging requests and responses. No other endpoints log request/response data.
pub kafka_urls: Option<String>,
/// domain in sign-in-with-ethereum messages
pub login_domain: Option<String>,
/// do not serve any requests if the best known block is older than this many seconds.

@ -1,6 +1,7 @@
//! Utilities for authorization of logged in and anonymous users.
use super::errors::FrontendErrorResponse;
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT};
use crate::rpcs::one::Web3Rpc;
use crate::user_token::UserBearerToken;
@ -205,6 +206,7 @@ impl Authorization {
db_conn: Option<DatabaseConnection>,
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> anyhow::Result<Self> {
@ -221,6 +223,7 @@ impl Authorization {
// TODO: default or None?
let authorization_checks = AuthorizationChecks {
max_requests_per_period,
proxy_mode,
..Default::default()
};
@ -235,6 +238,7 @@ impl Authorization {
)
}
#[allow(clippy::too_many_arguments)]
pub fn try_new(
authorization_checks: AuthorizationChecks,
db_conn: Option<DatabaseConnection>,
@ -311,7 +315,7 @@ pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<Authorization, FrontendErrorResponse> {
let authorization = match app.rate_limit_login(ip).await? {
let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? {
RateLimitResult::Allowed(authorization, None) => authorization,
RateLimitResult::RateLimited(authorization, retry_at) => {
return Err(FrontendErrorResponse::RateLimited(authorization, retry_at));
@ -328,11 +332,17 @@ pub async fn ip_is_authorized(
app: &Arc<Web3ProxyApp>,
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let (authorization, semaphore) = match app
.rate_limit_by_ip(&app.config.allowed_origin_requests_per_period, ip, origin)
.rate_limit_by_ip(
&app.config.allowed_origin_requests_per_period,
ip,
origin,
proxy_mode,
)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
@ -388,13 +398,14 @@ pub async fn key_is_authorized(
rpc_key: RpcSecretKey,
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
referer: Option<Referer>,
user_agent: Option<UserAgent>,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
// check the rate limits. error if over the limit
// TODO: i think this should be in an "impl From" or "impl Into"
let (authorization, semaphore) = match app
.rate_limit_by_rpc_key(ip, origin, referer, rpc_key, user_agent)
.rate_limit_by_rpc_key(ip, origin, proxy_mode, referer, rpc_key, user_agent)
.await?
{
RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore),
@ -542,7 +553,11 @@ impl Web3ProxyApp {
Ok((user, semaphore_permit))
}
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
pub async fn rate_limit_login(
&self,
ip: IpAddr,
proxy_mode: ProxyMode,
) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_rpc_key?
// we don't care about user agent or origin or referer
@ -551,6 +566,7 @@ impl Web3ProxyApp {
self.db_conn(),
ip,
None,
proxy_mode,
None,
None,
)?;
@ -597,6 +613,7 @@ impl Web3ProxyApp {
allowed_origin_requests_per_period: &HashMap<String, u64>,
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
) -> anyhow::Result<RateLimitResult> {
// ip rate limits don't check referer or user agent
// the do check
@ -605,6 +622,7 @@ impl Web3ProxyApp {
self.db_conn.clone(),
ip,
origin,
proxy_mode,
None,
None,
)?;
@ -653,6 +671,7 @@ impl Web3ProxyApp {
// check the local cache for user data, or query the database
pub(crate) async fn authorization_checks(
&self,
proxy_mode: ProxyMode,
rpc_secret_key: RpcSecretKey,
) -> anyhow::Result<AuthorizationChecks> {
let authorization_checks: Result<_, Arc<anyhow::Error>> = self
@ -752,6 +771,7 @@ impl Web3ProxyApp {
max_concurrent_requests: user_tier_model.max_concurrent_requests,
max_requests_per_period: user_tier_model.max_requests_per_period,
private_txs: rpc_key_model.private_txs,
proxy_mode,
})
}
None => Ok(AuthorizationChecks::default()),
@ -768,11 +788,12 @@ impl Web3ProxyApp {
&self,
ip: IpAddr,
origin: Option<Origin>,
proxy_mode: ProxyMode,
referer: Option<Referer>,
rpc_key: RpcSecretKey,
user_agent: Option<UserAgent>,
) -> anyhow::Result<RateLimitResult> {
let authorization_checks = self.authorization_checks(rpc_key).await?;
let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?;
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
if authorization_checks.rpc_secret_key_id.is_none() {
@ -859,12 +880,13 @@ impl Authorization {
rpc_secret_key,
self.ip,
self.origin.clone(),
self.checks.proxy_mode,
self.referer.clone(),
self.user_agent.clone(),
)
.await?
} else {
ip_is_authorized(app, self.ip, self.origin.clone()).await?
ip_is_authorized(app, self.ip, self.origin.clone(), self.checks.proxy_mode).await?
};
let a = Arc::new(a);

@ -28,11 +28,12 @@ pub enum FrontendErrorResponse {
BadRequest(String),
SemaphoreAcquireError(AcquireError),
Database(DbErr),
HeadersError(headers::Error),
Headers(headers::Error),
HeaderToString(ToStrError),
InvalidHeaderValue(InvalidHeaderValue),
IpAddrParse(AddrParseError),
JoinError(JoinError),
MsgPackEncode(rmp_serde::encode::Error),
NotFound,
RateLimited(Authorization, Option<Instant>),
Redis(RedisError),
@ -40,7 +41,7 @@ pub enum FrontendErrorResponse {
StatusCode(StatusCode, String, Option<anyhow::Error>),
/// TODO: what should be attached to the timout?
Timeout(tokio::time::error::Elapsed),
UlidDecodeError(ulid::DecodeError),
UlidDecode(ulid::DecodeError),
UnknownKey,
}
@ -94,7 +95,7 @@ impl FrontendErrorResponse {
),
)
}
Self::HeadersError(err) => {
Self::Headers(err) => {
warn!("HeadersError {:?}", err);
(
StatusCode::BAD_REQUEST,
@ -146,6 +147,17 @@ impl FrontendErrorResponse {
),
)
}
Self::MsgPackEncode(err) => {
debug!("MsgPackEncode Error: {}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
&format!("msgpack encode error: {}", err),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404
@ -247,7 +259,7 @@ impl FrontendErrorResponse {
),
)
}
Self::UlidDecodeError(err) => {
Self::UlidDecode(err) => {
// trace!(?err, "UlidDecodeError");
(
StatusCode::BAD_REQUEST,

@ -67,6 +67,15 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
"/rpc/:rpc_key",
post(rpc_proxy_http::proxy_web3_rpc_with_key),
)
// authenticated debug route with and without trailing slash
.route(
"/debug/:rpc_key/",
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key),
)
.route(
"/debug/:rpc_key",
post(rpc_proxy_http::debug_proxy_web3_rpc_with_key),
)
// public fastest with and without trailing slash
.route("/fastest/", post(rpc_proxy_http::fastest_proxy_web3_rpc))
.route("/fastest", post(rpc_proxy_http::fastest_proxy_web3_rpc))
@ -106,7 +115,15 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
"/rpc/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
)
// public fastest with and without trailing slash
// debug with and without trailing slash
.route(
"/debug/:rpc_key/",
get(rpc_proxy_ws::websocket_handler_with_key),
)
.route(
"/debug/:rpc_key",
get(rpc_proxy_ws::websocket_handler_with_key),
) // public fastest with and without trailing slash
.route("/fastest/", get(rpc_proxy_ws::fastest_websocket_handler))
.route("/fastest", get(rpc_proxy_ws::fastest_websocket_handler))
// authenticated fastest with and without trailing slash
@ -169,7 +186,10 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
.route("/user/stats/detailed", get(users::user_stats_detailed_get))
.route("/user/logout", post(users::user_logout_post))
.route("/admin/modify_role", get(admin::admin_change_user_roles))
.route("/admin/imitate-login/:admin_address/:user_address", get(admin::admin_login_get))
.route(
"/admin/imitate-login/:admin_address/:user_address",
get(admin::admin_login_get),
)
.route(
"/admin/imitate-login/:admin_address/:user_address/:message_eip",
get(admin::admin_login_get),

@ -59,12 +59,12 @@ async fn _proxy_web3_rpc(
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let (authorization, semaphore) = ip_is_authorized(&app, ip, origin).await?;
let (authorization, semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload, proxy_mode)
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;
@ -138,6 +138,29 @@ pub async fn proxy_web3_rpc_with_key(
.await
}
#[debug_handler]
pub async fn debug_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
_proxy_web3_rpc_with_key(
app,
ip,
origin,
referer,
user_agent,
rpc_key,
payload,
ProxyMode::Debug,
)
.await
}
#[debug_handler]
pub async fn fastest_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
@ -204,6 +227,7 @@ async fn _proxy_web3_rpc_with_key(
rpc_key,
ip,
origin.map(|x| x.0),
proxy_mode,
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
@ -214,7 +238,7 @@ async fn _proxy_web3_rpc_with_key(
let rpc_secret_key_id = authorization.checks.rpc_secret_key_id;
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload, proxy_mode)
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;

@ -34,7 +34,7 @@ use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock};
#[derive(Copy, Clone)]
#[derive(Copy, Clone, Debug)]
pub enum ProxyMode {
/// send to the "best" synced server
Best,
@ -42,6 +42,14 @@ pub enum ProxyMode {
Fastest(usize),
/// send to all servers for benchmarking. return the fastest non-error response
Versus,
/// send all requests and responses to kafka
Debug,
}
impl Default for ProxyMode {
fn default() -> Self {
Self::Best
}
}
/// Public entrypoint for WebSocket JSON-RPC requests.
@ -92,13 +100,13 @@ async fn _websocket_handler(
) -> FrontendResult {
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws) => Ok(ws
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket))
.into_response()),
None => {
if let Some(redirect) = &app.config.redirect_public_url {
@ -141,6 +149,29 @@ pub async fn websocket_handler_with_key(
.await
}
#[debug_handler]
pub async fn debug_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp,
Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
_websocket_handler_with_key(
ProxyMode::Debug,
app,
ip,
rpc_key,
origin,
referer,
user_agent,
ws_upgrade,
)
.await
}
#[debug_handler]
pub async fn fastest_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
@ -206,6 +237,7 @@ async fn _websocket_handler_with_key(
rpc_key,
ip,
origin.map(|x| x.0),
proxy_mode,
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
@ -216,8 +248,9 @@ async fn _websocket_handler_with_key(
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade
.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))),
Some(ws_upgrade) => {
Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket)))
}
None => {
// if no websocket upgrade, this is probably a user loading the url with their browser
@ -273,7 +306,6 @@ async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
socket: WebSocket,
proxy_mode: ProxyMode,
) {
// split the websocket so we can read and write concurrently
let (ws_tx, ws_rx) = socket.split();
@ -282,13 +314,7 @@ async fn proxy_web3_socket(
let (response_sender, response_receiver) = flume::unbounded::<Message>();
tokio::spawn(write_web3_socket(response_receiver, ws_tx));
tokio::spawn(read_web3_socket(
app,
authorization,
ws_rx,
response_sender,
proxy_mode,
));
tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender));
}
/// websockets support a few more methods than http clients
@ -299,7 +325,6 @@ async fn handle_socket_payload(
response_sender: &flume::Sender<Message>,
subscription_count: &AtomicUsize,
subscriptions: Arc<RwLock<HashMap<String, AbortHandle>>>,
proxy_mode: ProxyMode,
) -> (Message, Option<OwnedSemaphorePermit>) {
let (authorization, semaphore) = match authorization.check_again(&app).await {
Ok((a, s)) => (a, s),
@ -392,7 +417,7 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => app
.proxy_web3_rpc(authorization.clone(), json_request.into(), proxy_mode)
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
.map_or_else(
|err| match err {
@ -434,7 +459,6 @@ async fn read_web3_socket(
authorization: Arc<Authorization>,
mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>,
proxy_mode: ProxyMode,
) {
// TODO: need a concurrent hashmap
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
@ -468,7 +492,6 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
subscriptions,
proxy_mode,
)
.await;
@ -500,7 +523,6 @@ async fn read_web3_socket(
&response_sender,
&subscription_count,
subscriptions,
proxy_mode,
)
.await;

@ -1136,15 +1136,14 @@ impl Web3Rpcs {
pub async fn try_proxy_connection(
&self,
proxy_mode: ProxyMode,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
match proxy_mode {
ProxyMode::Best => {
match authorization.checks.proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
self.try_send_best_consensus_head_connection(
authorization,
request,