diff --git a/Cargo.lock b/Cargo.lock index 58ca010f..781cb665 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1701,7 +1701,7 @@ dependencies = [ [[package]] name = "entities" -version = "0.28.0" +version = "0.30.0" dependencies = [ "ethers", "sea-orm", @@ -3368,7 +3368,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.28.0" +version = "0.30.0" dependencies = [ "sea-orm-migration", "tokio", @@ -7002,7 +7002,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.28.0" +version = "0.30.0" dependencies = [ "anyhow", "arc-swap", diff --git a/README.md b/README.md index 0f8d4bc5..15feae4f 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,11 @@ cargo install sea-orm-cli 1. (optional) drop the current dev db 2. `sea-orm-cli migrate` 3. `sea-orm-cli generate entity -u mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy -o entities/src --with-serde both` -4. After running the above, you will need to manually fix some columns: `Vec` -> `sea_orm::prelude::Uuid` and `i8` -> `bool`. Related: + - Be careful when adding the `--tables THE,MODIFIED,TABLES` flag. It will delete relationships if they aren't listed +4. After running the above, you will need to manually fix some things + - `Vec` -> `sea_orm::prelude::Uuid` (Related: ) + - `i8` -> `bool` (Related: ) + - add all the tables back into `mod.rs` ## Flame Graphs diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 60a85ce8..8b6eb419 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.28.0" +version = "0.30.0" edition = "2021" [lib] diff --git a/entities/src/admin.rs b/entities/src/admin.rs index d1d46999..adf88d0a 100644 --- a/entities/src/admin.rs +++ b/entities/src/admin.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -14,6 +14,8 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm(has_many = "super::admin_increase_balance_receipt::Entity")] + AdminIncreaseBalanceReceipt, #[sea_orm( belongs_to = "super::user::Entity", from = "Column::UserId", @@ -24,6 +26,12 @@ pub enum Relation { User, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::AdminIncreaseBalanceReceipt.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::User.def() diff --git a/entities/src/admin_increase_balance_receipt.rs b/entities/src/admin_increase_balance_receipt.rs index d3d3cf27..76dbda95 100644 --- a/entities/src/admin_increase_balance_receipt.rs +++ b/entities/src/admin_increase_balance_receipt.rs @@ -1,8 +1,9 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "admin_increase_balance_receipt")] pub struct Model { #[sea_orm(primary_key)] diff --git a/entities/src/admin_trail.rs b/entities/src/admin_trail.rs index 26ad1e3b..fe7348d5 100644 --- a/entities/src/admin_trail.rs +++ b/entities/src/admin_trail.rs @@ -1,8 +1,9 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "admin_trail")] pub struct Model { #[sea_orm(primary_key)] diff --git a/entities/src/balance.rs b/entities/src/balance.rs index b8e31269..75423e6d 100644 --- a/entities/src/balance.rs +++ b/entities/src/balance.rs @@ -1,16 +1,17 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "balance")] pub struct Model { #[sea_orm(primary_key)] pub id: i32, - #[sea_orm(unique)] - pub user_id: u64, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub total_spent_including_free_tier: Decimal, + #[sea_orm(unique)] + pub user_id: u64, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub total_spent_outside_free_tier: Decimal, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] diff --git a/entities/src/increase_on_chain_balance_receipt.rs b/entities/src/increase_on_chain_balance_receipt.rs index 2e4821c7..d547fc69 100644 --- a/entities/src/increase_on_chain_balance_receipt.rs +++ b/entities/src/increase_on_chain_balance_receipt.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -13,6 +13,9 @@ pub struct Model { #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub amount: Decimal, pub deposit_to_user_id: u64, + pub block_hash: String, + pub log_index: u64, + pub token_address: String, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/entities/src/referee.rs b/entities/src/referee.rs index 5fad66a4..cf8ee3c3 100644 --- a/entities/src/referee.rs +++ b/entities/src/referee.rs @@ -1,8 +1,9 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "referee")] pub struct Model { #[sea_orm(primary_key)] diff --git a/entities/src/referrer.rs b/entities/src/referrer.rs index c2069776..7cbd3b98 100644 --- a/entities/src/referrer.rs +++ b/entities/src/referrer.rs @@ -1,8 +1,9 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7 use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "referrer")] pub struct Model { #[sea_orm(primary_key)] diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index df3d3e88..fac4d09f 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -40,7 +40,7 @@ pub struct Model { pub max_response_bytes: u64, pub archive_request: bool, pub origin: Option, - pub migrated: Option, + pub migrated: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/entities/src/sea_orm_active_enums.rs b/entities/src/sea_orm_active_enums.rs index 7882cf35..a6e536ae 100644 --- a/entities/src/sea_orm_active_enums.rs +++ b/entities/src/sea_orm_active_enums.rs @@ -3,10 +3,11 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -// TODO: rename to StatLevel? AccountingLevel? What? +// TODO: rename TrackingLevel to StatLevel? AccountingLevel? What? #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "log_level")] pub enum TrackingLevel { + /// TODO: rename to minimal #[sea_orm(string_value = "none")] None, #[sea_orm(string_value = "aggregated")] diff --git a/entities/src/secondary_user.rs b/entities/src/secondary_user.rs index 11ce8c2d..15b44f70 100644 --- a/entities/src/secondary_user.rs +++ b/entities/src/secondary_user.rs @@ -11,20 +11,12 @@ pub struct Model { pub id: u64, pub user_id: u64, pub description: Option, - pub rpc_secret_key_id: u64, pub role: Role, + pub rpc_secret_key_id: u64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { - #[sea_orm( - belongs_to = "super::user::Entity", - from = "Column::UserId", - to = "super::user::Column::Id", - on_update = "NoAction", - on_delete = "NoAction" - )] - User, #[sea_orm( belongs_to = "super::rpc_key::Entity", from = "Column::RpcSecretKeyId", @@ -33,12 +25,14 @@ pub enum Relation { on_delete = "NoAction" )] RpcKey, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::User.def() - } + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::UserId", + to = "super::user::Column::Id", + on_update = "NoAction", + on_delete = "NoAction" + )] + User, } impl Related for Entity { @@ -47,4 +41,10 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + impl ActiveModelBehavior for ActiveModel {} diff --git a/entities/src/user.rs b/entities/src/user.rs index 289e1fd1..71ca1101 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -23,6 +23,8 @@ pub enum Relation { Login, #[sea_orm(has_many = "super::rpc_key::Entity")] RpcKey, + #[sea_orm(has_many = "super::increase_on_chain_balance_receipt::Entity")] + IncreaseOnChainBalanceReceipt, #[sea_orm(has_many = "super::secondary_user::Entity")] SecondaryUser, #[sea_orm( @@ -47,6 +49,12 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::IncreaseOnChainBalanceReceipt.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::SecondaryUser.def() diff --git a/migration/Cargo.toml b/migration/Cargo.toml index cc3585d4..520a9c8f 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.28.0" +version = "0.30.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index e788f77a..cccd18a7 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -29,6 +29,7 @@ mod m20230511_161214_remove_columns_statsv2_origin_and_method; mod m20230512_220213_allow_null_rpc_key_id_in_stats_v2; mod m20230514_114803_admin_add_credits; mod m20230607_221917_total_deposits; +mod m20230615_221201_handle_payment_uncles; pub struct Migrator; @@ -65,6 +66,7 @@ impl MigratorTrait for Migrator { Box::new(m20230512_220213_allow_null_rpc_key_id_in_stats_v2::Migration), Box::new(m20230514_114803_admin_add_credits::Migration), Box::new(m20230607_221917_total_deposits::Migration), + Box::new(m20230615_221201_handle_payment_uncles::Migration), ] } } diff --git a/migration/src/m20230615_221201_handle_payment_uncles.rs b/migration/src/m20230615_221201_handle_payment_uncles.rs new file mode 100644 index 00000000..7f0f84be --- /dev/null +++ b/migration/src/m20230615_221201_handle_payment_uncles.rs @@ -0,0 +1,71 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // TODO: also alter the index to include the BlockHash? or + manager + .alter_table( + Table::alter() + .table(IncreaseOnChainBalanceReceipt::Table) + .add_column( + ColumnDef::new(IncreaseOnChainBalanceReceipt::BlockHash) + .string() + .not_null(), + ) + .add_column( + ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex) + .big_integer() + .unsigned() + .not_null(), + ) + .add_column( + ColumnDef::new(IncreaseOnChainBalanceReceipt::TokenAddress) + .string() + .not_null(), + ) + .drop_foreign_key(Alias::new("fk-deposit_to_user_id")) + .add_foreign_key( + TableForeignKey::new() + .name("fk-deposit_to_user_id-v2") + .from_col(IncreaseOnChainBalanceReceipt::DepositToUserId) + .to_tbl(User::Table) + .to_col(User::Id), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(IncreaseOnChainBalanceReceipt::Table) + .drop_column(IncreaseOnChainBalanceReceipt::BlockHash) + .drop_column(IncreaseOnChainBalanceReceipt::LogIndex) + .drop_column(IncreaseOnChainBalanceReceipt::TokenAddress) + .to_owned(), + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum IncreaseOnChainBalanceReceipt { + Table, + BlockHash, + LogIndex, + TokenAddress, + DepositToUserId, +} + +#[derive(Iden)] +enum User { + Table, + Id, +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index b762a27a..c4384fef 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.28.0" +version = "0.30.0" edition = "2021" default-run = "web3_proxy_cli" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 063c2144..8c68e80d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -41,7 +41,7 @@ use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; use log::{error, info, trace, warn, Level}; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::{EntityTrait, PaginatorTrait}; +use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use moka::future::{Cache, CacheBuilder}; use parking_lot::Mutex; use redis_rate_limiter::redis::AsyncCommands; @@ -679,8 +679,6 @@ impl Web3ProxyApp { .changed() .await .context("failed awaiting top_config change")?; - - info!("config changed"); } }); @@ -688,7 +686,7 @@ impl Web3ProxyApp { } if important_background_handles.is_empty() { - info!("no important background handles"); + trace!("no important background handles"); let f = tokio::spawn(async move { let _ = background_shutdown_receiver.recv().await; @@ -1063,10 +1061,22 @@ impl Web3ProxyApp { } /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() + #[inline] pub fn db_conn(&self) -> Option { self.db_conn.clone() } + #[inline] + pub async fn db_transaction(&self) -> Web3ProxyResult { + if let Some(ref db_conn) = self.db_conn { + let x = db_conn.begin().await?; + Ok(x) + } else { + Err(Web3ProxyError::NoDatabase) + } + } + + #[inline] pub fn db_replica(&self) -> Option { self.db_replica.clone() } diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index f377bee0..0523057c 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -48,7 +48,7 @@ impl ProxydSubCommand { } async fn run( - mut top_config: TopConfig, + top_config: TopConfig, top_config_path: Option, frontend_port: u16, prometheus_port: u16, @@ -87,13 +87,18 @@ async fn run( if let Some(top_config_path) = top_config_path { let config_sender = spawned_app.new_top_config_sender; { + let mut current_config = config_sender.borrow().clone(); + thread::spawn(move || loop { match fs::read_to_string(&top_config_path) { - Ok(new_top_config) => match toml::from_str(&new_top_config) { + Ok(new_top_config) => match toml::from_str::(&new_top_config) { Ok(new_top_config) => { - if new_top_config != top_config { - top_config = new_top_config; - config_sender.send(top_config.clone()).unwrap(); + if new_top_config != current_config { + // TODO: print the differences + // TODO: first run seems to always see differences. why? + info!("config @ {:?} changed", top_config_path); + config_sender.send(new_top_config.clone()).unwrap(); + current_config = new_top_config; } } Err(err) => { @@ -358,12 +363,12 @@ mod tests { let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await .unwrap() .unwrap(); let proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await .unwrap() .unwrap(); @@ -378,12 +383,12 @@ mod tests { .unwrap(); let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await .unwrap() .unwrap(); let proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await .unwrap() .unwrap(); diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 45fc5eb2..e0be3c1a 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -15,7 +15,7 @@ use derive_more::{Display, Error, From}; use ethers::prelude::ContractError; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, trace, warn}; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; @@ -97,6 +97,7 @@ pub enum Web3ProxyError { NoBlockNumberOrHash, NoBlocksKnown, NoConsensusHeadBlock, + NoDatabase, NoHandleReady, NoServersSynced, #[display(fmt = "{}/{}", num_known, min_head_rpcs)] @@ -201,7 +202,7 @@ impl Web3ProxyError { return err.as_response_parts::(); } Self::BadRequest(err) => { - debug!("BAD_REQUEST: {}", err); + trace!("BAD_REQUEST: {}", err); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -268,9 +269,10 @@ impl Web3ProxyError { ) } Self::EipVerificationFailed(err_1, err_191) => { - info!( + trace!( "EipVerificationFailed err_1={:#?} err2={:#?}", - err_1, err_191 + err_1, + err_191 ); ( StatusCode::UNAUTHORIZED, @@ -331,7 +333,7 @@ impl Web3ProxyError { } // Self::JsonRpcForwardedError(x) => (StatusCode::OK, x), Self::GasEstimateNotU256 => { - warn!("GasEstimateNotU256"); + trace!("GasEstimateNotU256"); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { @@ -353,7 +355,7 @@ impl Web3ProxyError { ) } Self::Headers(err) => { - warn!("HeadersError {:?}", err); + trace!("HeadersError {:?}", err); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -388,7 +390,7 @@ impl Web3ProxyError { ) } Self::InvalidBlockBounds { min, max } => { - debug!("InvalidBlockBounds min={} max={}", min, max); + trace!("InvalidBlockBounds min={} max={}", min, max); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -414,7 +416,7 @@ impl Web3ProxyError { ) } Self::IpNotAllowed(ip) => { - debug!("IpNotAllowed ip={})", ip); + trace!("IpNotAllowed ip={})", ip); ( StatusCode::FORBIDDEN, JsonRpcErrorData { @@ -425,7 +427,7 @@ impl Web3ProxyError { ) } Self::InvalidHeaderValue(err) => { - debug!("InvalidHeaderValue err={:?}", err); + trace!("InvalidHeaderValue err={:?}", err); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -436,7 +438,7 @@ impl Web3ProxyError { ) } Self::InvalidEip => { - debug!("InvalidEip"); + trace!("InvalidEip"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -447,7 +449,7 @@ impl Web3ProxyError { ) } Self::InvalidInviteCode => { - debug!("InvalidInviteCode"); + trace!("InvalidInviteCode"); ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { @@ -470,7 +472,7 @@ impl Web3ProxyError { ) } Self::UnknownReferralCode => { - debug!("UnknownReferralCode"); + trace!("UnknownReferralCode"); ( StatusCode::UNAUTHORIZED, JsonRpcErrorData { @@ -481,7 +483,7 @@ impl Web3ProxyError { ) } Self::InvalidReferer => { - debug!("InvalidReferer"); + trace!("InvalidReferer"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -492,7 +494,7 @@ impl Web3ProxyError { ) } Self::InvalidSignatureLength => { - debug!("InvalidSignatureLength"); + trace!("InvalidSignatureLength"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -503,7 +505,7 @@ impl Web3ProxyError { ) } Self::InvalidUserAgent => { - debug!("InvalidUserAgent"); + trace!("InvalidUserAgent"); ( StatusCode::FORBIDDEN, JsonRpcErrorData { @@ -514,7 +516,7 @@ impl Web3ProxyError { ) } Self::InvalidUserKey => { - warn!("InvalidUserKey"); + trace!("InvalidUserKey"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -602,6 +604,17 @@ impl Web3ProxyError { }, ) } + Self::NoDatabase => { + error!("no database configured"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: "no database configured!".into(), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, + ) + } Self::NoHandleReady => { error!("NoHandleReady"); ( @@ -790,7 +803,7 @@ impl Web3ProxyError { ) } Self::RefererRequired => { - debug!("referer required"); + trace!("referer required"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -801,7 +814,7 @@ impl Web3ProxyError { ) } Self::RefererNotAllowed(referer) => { - debug!("referer not allowed referer={:?}", referer); + trace!("referer not allowed referer={:?}", referer); ( StatusCode::FORBIDDEN, JsonRpcErrorData { @@ -915,7 +928,7 @@ impl Web3ProxyError { }, ), Self::UserAgentRequired => { - debug!("UserAgentRequired"); + trace!("UserAgentRequired"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -926,7 +939,7 @@ impl Web3ProxyError { ) } Self::UserAgentNotAllowed(ua) => { - debug!("UserAgentNotAllowed ua={}", ua); + trace!("UserAgentNotAllowed ua={}", ua); ( StatusCode::FORBIDDEN, JsonRpcErrorData { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index e437cc51..6e80df5f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1022,9 +1022,11 @@ impl Web3ProxyApp { )?; // no semaphore is needed here because login rate limits are low - // TODO: are we sure do we want a semaphore here? + // TODO: are we sure do not we want a semaphore here? let semaphore = None; + // TODO: if ip is on the local network, always allow? + if let Some(rate_limiter) = &self.login_rate_limiter { match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { Ok(RedisRateLimitResult::Allowed(_)) => { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 734e83e2..75d0fbbc 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -174,6 +174,10 @@ pub async fn serve( "/user/balance/:tx_hash", post(users::payment::user_balance_post), ) + .route( + "/user/balance_uncle/:uncle_hash", + post(users::payment::user_balance_uncle_post), + ) .route("/user/keys", get(users::rpc_keys::rpc_keys_get)) .route("/user/keys", post(users::rpc_keys::rpc_keys_management)) .route("/user/keys", put(users::rpc_keys::rpc_keys_management)) diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 0285a440..050b5db5 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -339,8 +339,6 @@ async fn handle_socket_payload( let key: U64 = serde_json::from_str(subscription_id.get()).unwrap(); - info!("key: {}", key); - x.insert(key, handle); } @@ -373,8 +371,6 @@ async fn handle_socket_payload( } }; - info!("key: {}", subscription_id); - // TODO: is this the right response? let partial_response = { let mut x = subscriptions.write().await; diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index c283d979..e27d7b72 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -21,7 +21,7 @@ use http::StatusCode; use log::{debug, trace, warn}; use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + self, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, QueryFilter, TransactionTrait, }; use serde_json::json; @@ -143,13 +143,11 @@ pub async fn user_login_get( Ok(message.into_response()) } +/// you MUST commit the `txn` after calling this function! pub async fn register_new_user( - db_conn: &DatabaseConnection, + txn: &DatabaseTransaction, address: Address, ) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> { - // all or nothing - let txn = db_conn.begin().await?; - // the only thing we need from them is an address // everything else is optional // TODO: different invite codes should allow different levels @@ -160,7 +158,7 @@ pub async fn register_new_user( ..Default::default() }; - let new_user = new_user.insert(&txn).await?; + let new_user = new_user.insert(txn).await?; // create the user's first api key let rpc_secret_key = RpcSecretKey::new(); @@ -173,7 +171,7 @@ pub async fn register_new_user( }; let user_rpc_key = user_rpc_key - .insert(&txn) + .insert(txn) .await .web3_context("Failed saving new user key")?; @@ -183,10 +181,7 @@ pub async fn register_new_user( ..Default::default() }; - let user_balance = user_balance.insert(&txn).await?; - - // save the user and key and balance to the database - txn.commit().await?; + let user_balance = user_balance.insert(txn).await?; Ok((new_user, user_rpc_key, user_balance)) } @@ -312,10 +307,14 @@ pub async fn user_login_post( } } - let (caller, caller_key, _) = - register_new_user(&db_conn, our_msg.address.into()).await?; + let txn = db_conn.begin().await?; + + let (caller, caller_key, _) = register_new_user(&txn, our_msg.address.into()).await?; + + txn.commit().await?; let txn = db_conn.begin().await?; + // First, optionally catch a referral code from the parameters if there is any debug!("Refferal code is: {:?}", payload.referral_code); if let Some(referral_code) = payload.referral_code.as_ref() { @@ -323,7 +322,7 @@ pub async fn user_login_post( trace!("Using register referral code: {:?}", referral_code); let user_referrer = referrer::Entity::find() .filter(referrer::Column::ReferralCode.eq(referral_code)) - .one(db_replica.as_ref()) + .one(&txn) .await? .ok_or(Web3ProxyError::UnknownReferralCode)?; @@ -354,7 +353,7 @@ pub async fn user_login_post( trace!("Using referral code: {:?}", referral_code); let user_referrer = referrer::Entity::find() .filter(referrer::Column::ReferralCode.eq(referral_code)) - .one(db_replica.as_ref()) + .one(&txn) .await? .ok_or(Web3ProxyError::BadRequest( format!( @@ -382,7 +381,7 @@ pub async fn user_login_post( // the user is already registered let user_rpc_keys = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(caller.id)) - .all(db_replica.as_ref()) + .all(&db_conn) .await .web3_context("failed loading user's key")?; diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 2378f44c..73a55b6c 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,6 +1,8 @@ use crate::app::Web3ProxyApp; -use crate::errors::{Web3ProxyError, Web3ProxyResponse}; -use crate::frontend::authorization::login_is_authorized; +use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult}; +use crate::frontend::authorization::{ + login_is_authorized, Authorization as Web3ProxyAuthorization, +}; use crate::frontend::users::authentication::register_new_user; use anyhow::Context; use axum::{ @@ -13,13 +15,14 @@ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user}; use ethers::abi::AbiEncode; -use ethers::types::{Address, TransactionReceipt, H256}; -use hashbrown::HashMap; +use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256}; +use hashbrown::{HashMap, HashSet}; use http::StatusCode; use log::{debug, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait, + self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait, + QueryFilter, QuerySelect, TransactionTrait, }; use migration::{Expr, OnConflict}; use payment_contracts::ierc20::IERC20; @@ -100,10 +103,7 @@ pub async fn user_deposits_get( Ok(Json(response).into_response()) } -/// `POST /user/balance/:tx_hash` -- Manually process a confirmed txid to update a user's balance. -/// -/// We will subscribe to events to watch for any user deposits, but sometimes events can be missed. -/// TODO: change this. just have a /tx/:txhash that is open to anyone. rate limit like we rate limit /login +/// `POST /user/balance/:tx_hash` -- Process a confirmed txid to update a user's balance. #[debug_handler] pub async fn user_balance_post( Extension(app): Extension>, @@ -113,10 +113,9 @@ pub async fn user_balance_post( // I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap" // we rate limit by ip instead of bearer token so transactions are easy to submit from scripts // TODO: if ip is a 10. or a 172., allow unlimited - login_is_authorized(&app, ip).await?; + let authorization = login_is_authorized(&app, ip).await?; - // Get the transaction hash, and the amount that the user wants to top up by. - // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) + // Get the transaction hash let tx_hash: H256 = params .remove("tx_hash") .ok_or(Web3ProxyError::BadRequest( @@ -129,36 +128,89 @@ pub async fn user_balance_post( let db_conn = app.db_conn().context("query_user_stats needs a db")?; - // Return early if the tx was already added - if increase_on_chain_balance_receipt::Entity::find() - .filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex())) - .one(&db_conn) - .await? - .is_some() - { - // TODO: double check that the transaction is still seen as "confirmed" if it is NOT, we need to remove credits! - - // this will be status code 200, not 204 - let response = Json(json!({ - "result": "success", - "message": "this transaction was already in the database", - })) - .into_response(); - - return Ok(response); - }; + let authorization = Arc::new(authorization); // get the transaction receipt let transaction_receipt = app - .internal_request::<_, Option>("eth_getTransactionReceipt", (tx_hash,)) - .await? - .ok_or(Web3ProxyError::BadRequest( - format!("transaction receipt not found for {}", tx_hash,).into(), - ))?; + .authorized_request::<_, Option>( + "eth_getTransactionReceipt", + (tx_hash,), + authorization.clone(), + ) + .await?; + + // check for uncles + let mut find_uncles = increase_on_chain_balance_receipt::Entity::find() + // .lock_exclusive() + .filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex())) + .filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id)); + + let tx_pending = + if let Some(block_hash) = transaction_receipt.as_ref().and_then(|x| x.block_hash) { + // check for uncles + // this transaction is confirmed + // any rows in the db with a block hash that doesn't match the receipt should be deleted + find_uncles = find_uncles.filter( + increase_on_chain_balance_receipt::Column::BlockHash.ne(block_hash.encode_hex()), + ); + + false + } else { + // no block_hash to check + // this transaction is not confirmed + // any rows in the db should be deleted + true + }; + + let uncle_hashes = find_uncles.all(&db_conn).await?; + + let uncle_hashes: HashSet<_> = uncle_hashes + .into_iter() + .map(|x| serde_json::from_str(x.block_hash.as_str()).unwrap()) + .collect(); + + for uncle_hash in uncle_hashes.into_iter() { + if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? { + info!("balance changes from uncle: {:#?}", x); + } + } + + if tx_pending { + // the transaction isn't confirmed. return early + // TODO: BadRequest, or something else? + return Err(Web3ProxyError::BadRequest( + "this transaction has not confirmed yet. Please try again later.".into(), + )); + } + + let transaction_receipt = + transaction_receipt.expect("if tx_pending is false, transaction_receipt must be set"); + + let block_hash = transaction_receipt + .block_hash + .expect("if tx_pending is false, block_hash must be set"); trace!("Transaction receipt: {:#?}", transaction_receipt); // TODO: if the transaction doesn't have enough confirmations yet, add it to a queue to try again later + // 1 confirmation should be fine though + + let txn = db_conn.begin().await?; + + // if the transaction is already saved, return early + if increase_on_chain_balance_receipt::Entity::find() + .filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex())) + .filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id)) + .filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex())) + .one(&txn) + .await? + .is_some() + { + return Ok(Json(json!({ + "result": "tx_hash already saved", + })) + .into_response()); + }; let payment_factory_address = app .config @@ -168,47 +220,17 @@ pub async fn user_balance_post( let payment_factory_contract = PaymentFactory::new(payment_factory_address, app.internal_provider().clone()); - debug!( - "Payment Factory Filter: {:?}", - payment_factory_contract.payment_received_filter() - ); - - // check bloom filter to be sure this transaction contains any relevant logs - // TODO: This does not work properly right now, get back this eventually - // TODO: compare to code in llamanodes/web3-this-then-that - // if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract - // .payment_received_filter() - // .filter - // .topics[0] - // { - // debug!("Bloom input bytes is: {:?}", x); - // debug!("Bloom input bytes is: {:?}", x.as_fixed_bytes()); - // debug!("Bloom input as hex is: {:?}", hex!(x)); - // let bloom_input = BloomInput::Raw(hex!(x)); - // debug!( - // "Transaction receipt logs_bloom: {:?}", - // transaction_receipt.logs_bloom - // ); - // - // // do a quick check that this transaction contains the required log - // if !transaction_receipt.logs_bloom.contains_input(x) { - // return Err(Web3ProxyError::BadRequest("no matching logs found".into())); - // } - // } + // TODO: check bloom filters // the transaction might contain multiple relevant logs. collect them all let mut response_data = vec![]; - - // all or nothing - let txn = db_conn.begin().await?; - - // parse the logs from the transaction receipt for log in transaction_receipt.logs { if let Some(true) = log.removed { - todo!("delete this transaction from the database"); + // TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block` + continue; } - // Create a new transaction that will be used for joint transaction + // Parse the log into an event if let Ok(event) = payment_factory_contract .decode_event::( "PaymentReceived", @@ -228,7 +250,8 @@ pub async fn user_balance_post( let log_index = log .log_index - .context("no log_index. transaction must not be confirmed")?; + .context("no log_index. transaction must not be confirmed")? + .as_u64(); // the internal provider will handle caching of requests let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone()); @@ -241,31 +264,33 @@ pub async fn user_balance_post( // Setting the scale already does the decimal shift, no need to divide a second time payment_token_amount.set_scale(payment_token_decimals)?; - info!( - "Found deposit transaction for: {:?} {:?} {:?}", + debug!( + "Found deposit event for: {:?} {:?} {:?}", recipient_account, payment_token_address, payment_token_amount ); let recipient = match user::Entity::find() .filter(user::Column::Address.eq(recipient_account.to_fixed_bytes().as_slice())) - .one(&db_conn) + .one(&txn) .await? { Some(x) => x, None => { - let (user, _, _) = register_new_user(&db_conn, recipient_account).await?; + let (user, _, _) = register_new_user(&txn, recipient_account).await?; user } }; - // For now we only accept stablecoins - // And we hardcode the peg (later we would have to depeg this, for example + // For now we only accept stablecoins. This will need conversions if we accept other tokens. // 1$ = Decimal(1) for any stablecoin // TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now - debug!( + // TODO: double check. why >$1M? Decimal type in the database? + trace!( "Arithmetic is: {:?} / 10 ^ {:?} = {:?}", - payment_token_wei, payment_token_decimals, payment_token_amount + payment_token_wei, + payment_token_decimals, + payment_token_amount ); // create or update the balance @@ -275,7 +300,7 @@ pub async fn user_balance_post( user_id: sea_orm::Set(recipient.id), ..Default::default() }; - info!("Trying to insert into balance entry: {:?}", balance_entry); + trace!("Trying to insert into balance entry: {:?}", balance_entry); balance::Entity::insert(balance_entry) .on_conflict( OnConflict::new() @@ -288,17 +313,18 @@ pub async fn user_balance_post( .exec(&txn) .await?; - debug!("Saving tx_hash: {:?}", tx_hash); + debug!("Saving log {} of txid {:?}", log_index, tx_hash); let receipt = increase_on_chain_balance_receipt::ActiveModel { - tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()), - chain_id: sea_orm::ActiveValue::Set(app.config.chain_id), - // TODO: need a migration that adds log_index - // TODO: need a migration that adds payment_token_address. will be useful for stats + id: sea_orm::ActiveValue::NotSet, amount: sea_orm::ActiveValue::Set(payment_token_amount), + block_hash: sea_orm::ActiveValue::Set(block_hash.encode_hex()), + chain_id: sea_orm::ActiveValue::Set(app.config.chain_id), deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id), - ..Default::default() + log_index: sea_orm::ActiveValue::Set(log_index), + token_address: sea_orm::ActiveValue::Set(payment_token_address.encode_hex()), + tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()), }; - info!("Trying to insert receipt {:?}", receipt); + trace!("Trying to insert receipt {:?}", receipt); receipt.save(&txn).await?; @@ -323,19 +349,118 @@ pub async fn user_balance_post( let x = json!({ "tx_hash": tx_hash, + "block_hash": block_hash, "log_index": log_index, "token": payment_token_address, "amount": payment_token_amount, }); + debug!("deposit data: {:#?}", x); + response_data.push(x); } } txn.commit().await?; - debug!("Saved to db"); let response = (StatusCode::CREATED, Json(json!(response_data))).into_response(); Ok(response) } + +/// `POST /user/balance_uncle/:uncle_hash` -- Process an uncle block to potentially update a user's balance. +#[debug_handler] +pub async fn user_balance_uncle_post( + Extension(app): Extension>, + InsecureClientIp(ip): InsecureClientIp, + Path(mut params): Path>, +) -> Web3ProxyResponse { + let authorization = login_is_authorized(&app, ip).await?; + + // Get the transaction hash, and the amount that the user wants to top up by. + // Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg) + let uncle_hash: H256 = params + .remove("uncle_hash") + .ok_or(Web3ProxyError::BadRequest( + "You have not provided a uncle_hash".into(), + ))? + .parse() + .map_err(|err| { + Web3ProxyError::BadRequest(format!("unable to parse uncle_hash: {}", err).into()) + })?; + + let authorization = Arc::new(authorization); + + if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? { + Ok(Json(x).into_response()) + } else { + // TODO: is BadRequest the right error to use? + Err(Web3ProxyError::BadRequest("block is not an uncle".into())) + } +} + +pub async fn handle_uncle_block( + app: &Arc, + authorization: &Arc, + uncle_hash: H256, +) -> Web3ProxyResult>> { + info!("handling uncle: {:?}", uncle_hash); + + // cancel if uncle_hash is actually a confirmed block + if app + .authorized_request::<_, Option>>( + "eth_getBlockByHash", + (uncle_hash, false), + authorization.clone(), + ) + .await + .context("eth_getBlockByHash failed")? + .is_some() + { + return Ok(None); + } + + // user_id -> balance that we need to subtract + let mut reversed_balances: HashMap = HashMap::new(); + + let txn = app.db_transaction().await?; + + // delete any deposit txids with uncle_hash + for reversed_deposit in increase_on_chain_balance_receipt::Entity::find() + .lock_exclusive() + .filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(uncle_hash.encode_hex())) + .all(&txn) + .await? + { + let reversed_balance = reversed_balances + .entry(reversed_deposit.deposit_to_user_id) + .or_default(); + + *reversed_balance += reversed_deposit.amount; + + // TODO: instead of delete, mark as uncled? seems like it would bloat the db unnecessarily. a stat should be enough + reversed_deposit.delete(&txn).await?; + } + + debug!("removing balances: {:#?}", reversed_balances); + + for (user_id, reversed_balance) in reversed_balances.iter() { + if let Some(user_balance) = balance::Entity::find() + .lock_exclusive() + .filter(balance::Column::Id.eq(*user_id)) + .one(&txn) + .await? + { + let mut user_balance = user_balance.into_active_model(); + + user_balance.total_deposits = + ActiveValue::Set(user_balance.total_deposits.as_ref() - reversed_balance); + + user_balance.update(&txn).await?; + } + } + + txn.commit().await?; + + Ok(Some(reversed_balances)) +} diff --git a/web3_proxy/src/frontend/users/rpc_keys.rs b/web3_proxy/src/frontend/users/rpc_keys.rs index b22b97db..c07f9bc5 100644 --- a/web3_proxy/src/frontend/users/rpc_keys.rs +++ b/web3_proxy/src/frontend/users/rpc_keys.rs @@ -151,7 +151,7 @@ pub async fn rpc_keys_management( Err(Web3ProxyError::AccessDenied) } } - Some((x, None)) => Err(Web3ProxyError::BadResponse( + Some((_, None)) => Err(Web3ProxyError::BadResponse( "a subuser record was found, but no corresponding RPC key".into(), )), // Match statement here, check in the user's RPC keys directly if it's not part of the secondary user diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index d8fd4255..e8fb77b8 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -128,6 +128,10 @@ impl Web3ProxyBlock { .as_ref() .expect("saved blocks must have a number") } + + pub fn uncles(&self) -> &[H256] { + &self.block.uncles + } } impl TryFrom for Web3ProxyBlock { @@ -164,28 +168,87 @@ impl Web3Rpcs { pub async fn try_cache_block( &self, block: Web3ProxyBlock, - heaviest_chain: bool, + consensus_head: bool, ) -> Web3ProxyResult { + let block_hash = *block.hash(); + // TODO: i think we can rearrange this function to make it faster on the hot path - if block.hash().is_zero() { + if block_hash.is_zero() { debug!("Skipping block without hash!"); return Ok(block); } // this block is very likely already in block_hashes - // TODO: use their get_with - let block_hash = *block.hash(); - // TODO: think more about heaviest_chain. would be better to do the check inside this function - if heaviest_chain { - // this is the only place that writes to block_numbers - // multiple inserts should be okay though - // TODO: info if there was a fork? + if consensus_head { let block_num = block.number(); - self.blocks_by_number - .get_with_by_ref(block_num, async move { block_hash }) - .await; + // TODO: if there is an existing entry with a different block_hash, + // TODO: use entry api to handle changing existing entries + self.blocks_by_number.insert(*block_num, block_hash).await; + + for uncle in block.uncles() { + self.blocks_by_hash.invalidate(uncle).await; + // TODO: save uncles somewhere? + } + + // loop to make sure parent hashes match our caches + // set the first ancestor to the blocks' parent hash. but keep going up the chain + if let Some(parent_num) = block.number().checked_sub(1.into()) { + struct Ancestor { + num: U64, + hash: H256, + } + let mut ancestor = Ancestor { + num: parent_num, + hash: *block.parent_hash(), + }; + loop { + let ancestor_number_to_hash_entry = self + .blocks_by_number + .entry_by_ref(&ancestor.num) + .or_insert(ancestor.hash) + .await; + + if *ancestor_number_to_hash_entry.value() == ancestor.hash { + // the existing number entry matches. all good + break; + } + + // oh no! ancestor_number_to_hash_entry is different + + // remove the uncled entry in blocks_by_hash + // we will look it up later if necessary + self.blocks_by_hash + .invalidate(ancestor_number_to_hash_entry.value()) + .await; + + // TODO: delete any cached entries for eth_getBlockByHash or eth_getBlockByNumber + + // TODO: race on this drop and insert? + drop(ancestor_number_to_hash_entry); + + // update the entry in blocks_by_number + self.blocks_by_number + .insert(ancestor.num, ancestor.hash) + .await; + + // try to check the parent of this ancestor + if let Some(ancestor_block) = self.blocks_by_hash.get(&ancestor.hash) { + match ancestor_block.number().checked_sub(1.into()) { + None => break, + Some(ancestor_parent_num) => { + ancestor = Ancestor { + num: ancestor_parent_num, + hash: *ancestor_block.parent_hash(), + } + } + } + } else { + break; + } + } + } } let block = self @@ -208,14 +271,26 @@ impl Web3Rpcs { // the cache is set last, so if its here, its everywhere // TODO: use try_get_with if let Some(block) = self.blocks_by_hash.get(hash) { - return Ok(block); + // double check that it matches the blocks_by_number cache + let cached_hash = self + .blocks_by_number + .get_with_by_ref(block.number(), async { *hash }) + .await; + + if cached_hash == *hash { + return Ok(block); + } + + // hashes don't match! this block must be in the middle of being uncled + // TODO: check known uncles } // block not in cache. we need to ask an rpc for it let get_block_params = (*hash, false); let block: Option = if let Some(rpc) = rpc { - // TODO: request_with_metadata would probably be better + // ask a specific rpc + // TODO: request_with_metadata would probably be better than authorized_request rpc.authorized_request::<_, Option>( "eth_getBlockByHash", &get_block_params, @@ -224,7 +299,8 @@ impl Web3Rpcs { ) .await? } else { - // TODO: request_with_metadata would probably be better + // ask any rpc + // TODO: request_with_metadata instead of internal_request self.internal_request::<_, Option>("eth_getBlockByHash", &get_block_params) .await? }; @@ -254,7 +330,6 @@ impl Web3Rpcs { /// Get the heaviest chain's block from cache or backend rpc /// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this! - /// TODO: take a RequestMetadata pub async fn cannonical_block( &self, authorization: &Arc, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 0e0b13fd..ecc62412 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -217,26 +217,36 @@ impl Web3Rpcs { // web3 connection worked let mut new_by_name = (*self.by_name.load_full()).clone(); + // make sure that any new requests use the new connection let old_rpc = new_by_name.insert(rpc.name.clone(), rpc.clone()); + // update the arc swap self.by_name.store(Arc::new(new_by_name)); + // clean up the old rpc if let Some(old_rpc) = old_rpc { + trace!("old_rpc: {}", old_rpc); + + // if the old rpc was synced, wait for the new one to sync if old_rpc.head_block.as_ref().unwrap().borrow().is_some() { let mut new_head_receiver = rpc.head_block.as_ref().unwrap().subscribe(); - debug!("waiting for new {} to sync", rpc); + trace!("waiting for new {} connection to sync", rpc); - // TODO: maximum wait time or this could block things for too long + // TODO: maximum wait time while new_head_receiver.borrow_and_update().is_none() { - new_head_receiver.changed().await?; + if new_head_receiver.changed().await.is_err() { + break; + }; } + } - // TODO: tell ethers to disconnect? is there a function for that? + // tell the old rpc to disconnect + if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { + trace!("telling {} to disconnect", old_rpc); + disconnect_sender.send_replace(true); } } - - // TODO: what should we do with the new handle? make sure error logs aren't dropped } Ok(Err(err)) => { // if we got an error here, the app can continue on @@ -923,11 +933,11 @@ impl Web3Rpcs { let rate_limit_substrings = ["limit", "exceeded", "quota usage"]; for rate_limit_substr in rate_limit_substrings { if error_msg.contains(rate_limit_substr) { - if rate_limit_substr.contains("result on length") { + if error_msg.contains("result on length") { // this error contains "limit" but is not a rate limit error // TODO: make the expected limit configurable // TODO: parse the rate_limit_substr and only continue if it is < expected limit - if rate_limit_substr.contains("exceeding limit 2000000") { + if error_msg.contains("exceeding limit 2000000") { // they hit our expected limit. return the error now return Err(error.into()); } else { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index f33710ee..208e08da 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -29,7 +29,6 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; -use tokio::select; use tokio::sync::watch; use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; use url::Url; @@ -96,7 +95,7 @@ impl Web3Rpc { redis_pool: Option, block_interval: Duration, block_map: BlocksByHashCache, - block_sender: Option>, + block_and_rpc_sender: Option>, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { let created_at = Instant::now(); @@ -132,8 +131,8 @@ impl Web3Rpc { let backup = config.backup; let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into(); - let automatic_block_limit = - (block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some(); + let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0) + && block_and_rpc_sender.is_some(); // have a sender for tracking hard limit anywhere. we use this in case we // and track on servers that have a configured hard limit @@ -219,7 +218,12 @@ impl Web3Rpc { tokio::spawn(async move { // TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff new_connection - .subscribe_with_reconnect(block_map, block_sender, chain_id, tx_id_sender) + .subscribe_with_reconnect( + block_map, + block_and_rpc_sender, + chain_id, + tx_id_sender, + ) .await }) }; @@ -580,7 +584,7 @@ impl Web3Rpc { async fn subscribe_with_reconnect( self: Arc, block_map: BlocksByHashCache, - block_sender: Option>, + block_and_rpc_sender: Option>, chain_id: u64, tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { @@ -589,7 +593,7 @@ impl Web3Rpc { .clone() .subscribe( block_map.clone(), - block_sender.clone(), + block_and_rpc_sender.clone(), chain_id, tx_id_sender.clone(), ) @@ -623,7 +627,7 @@ impl Web3Rpc { async fn subscribe( self: Arc, block_map: BlocksByHashCache, - block_sender: Option>, + block_and_rpc_sender: Option>, chain_id: u64, tx_id_sender: Option)>>, ) -> Web3ProxyResult<()> { @@ -633,6 +637,10 @@ impl Web3Rpc { Some(RequestErrorHandler::ErrorLevel) }; + if self.should_disconnect() { + return Ok(()); + } + if let Some(url) = self.ws_url.clone() { debug!("starting websocket provider on {}", self); @@ -643,6 +651,10 @@ impl Web3Rpc { self.ws_provider.store(Some(x)); } + if self.should_disconnect() { + return Ok(()); + } + debug!("starting subscriptions on {}", self); self.check_provider(chain_id).await?; @@ -650,21 +662,21 @@ impl Web3Rpc { let mut futures = vec![]; // TODO: use this channel instead of self.disconnect_watch - let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false); + let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false); - // subscribe to the disconnect watch. the app uses this when shutting down + // subscribe to the disconnect watch. the app uses this when shutting down or when configs change if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() { + let clone = self.clone(); let mut disconnect_watch_rx = disconnect_watch_tx.subscribe(); let f = async move { - // TODO: make sure it changed to "true" - select! { - x = disconnect_watch_rx.changed() => { - x?; - }, - x = subscribe_stop_rx.changed() => { - x?; - }, + loop { + if *disconnect_watch_rx.borrow_and_update() { + info!("disconnect triggered on {}", clone); + break; + } + + disconnect_watch_rx.changed().await?; } Ok(()) }; @@ -681,8 +693,6 @@ impl Web3Rpc { // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though let health_sleep_seconds = 5; - let subscribe_stop_rx = subscribe_stop_tx.subscribe(); - // health check loop let f = async move { // TODO: benchmark this and lock contention @@ -718,18 +728,22 @@ impl Web3Rpc { } // subscribe to new heads - if let Some(block_sender) = block_sender.clone() { + if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let clone = self.clone(); let subscribe_stop_rx = subscribe_stop_tx.subscribe(); let f = async move { let x = clone - .subscribe_new_heads(block_sender.clone(), block_map.clone(), subscribe_stop_rx) + .subscribe_new_heads( + block_and_rpc_sender.clone(), + block_map.clone(), + subscribe_stop_rx, + ) .await; // error or success, we clear the block when subscribe_new_heads exits clone - .send_head_block_result(Ok(None), &block_sender, &block_map) + .send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) .await?; x @@ -763,6 +777,9 @@ impl Web3Rpc { // TODO: wait for all of the futures to exit? + // TODO: tell ethers to disconnect? + self.ws_provider.store(None); + Ok(()) } @@ -806,6 +823,7 @@ impl Web3Rpc { while let Some(block) = blocks.next().await { if *subscribe_stop_rx.borrow() { + trace!("stopping ws block subscription on {}", self); break; } @@ -822,6 +840,7 @@ impl Web3Rpc { loop { if *subscribe_stop_rx.borrow() { + trace!("stopping http block subscription on {}", self); break; } @@ -848,6 +867,7 @@ impl Web3Rpc { .await?; if *subscribe_stop_rx.borrow() { + debug!("new heads subscription exited"); Ok(()) } else { Err(anyhow!("new_heads subscription exited. reconnect needed").into()) @@ -860,7 +880,14 @@ impl Web3Rpc { tx_id_sender: flume::Sender<(TxHash, Arc)>, mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { - subscribe_stop_rx.changed().await?; + // TODO: check that it actually changed to true + loop { + if *subscribe_stop_rx.borrow_and_update() { + break; + } + + subscribe_stop_rx.changed().await?; + } /* trace!("watching pending transactions on {}", self); @@ -1174,7 +1201,6 @@ impl fmt::Debug for Web3Rpc { impl fmt::Display for Web3Rpc { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: filter basic auth and api keys write!(f, "{}", &self.name) } }