Handle uncle transactions (#129)

* add more columns to handle uncled transactions

* handle payment uncles

* put relations back

* include all the new columns

* lower log levels

* improve block caching

if we have a block with a number, its canonical. uncles don't get returned

* improve disconnect logic

* lint

* clear first changed for new_top_config_receiver

* better logs around config changing

* i guess we do want one apply top_config at the start

* check correct variable for data limits
This commit is contained in:
Bryan Stitt 2023-06-16 00:46:27 -07:00 committed by GitHub
parent 6040ca297f
commit a083bc652d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 596 additions and 229 deletions

6
Cargo.lock generated

@ -1701,7 +1701,7 @@ dependencies = [
[[package]]
name = "entities"
version = "0.28.0"
version = "0.30.0"
dependencies = [
"ethers",
"sea-orm",
@ -3368,7 +3368,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.28.0"
version = "0.30.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -7002,7 +7002,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.28.0"
version = "0.30.0"
dependencies = [
"anyhow",
"arc-swap",

@ -122,7 +122,11 @@ cargo install sea-orm-cli
1. (optional) drop the current dev db
2. `sea-orm-cli migrate`
3. `sea-orm-cli generate entity -u mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy -o entities/src --with-serde both`
4. After running the above, you will need to manually fix some columns: `Vec<u8>` -> `sea_orm::prelude::Uuid` and `i8` -> `bool`. Related: <https://github.com/SeaQL/sea-query/issues/375> <https://github.com/SeaQL/sea-orm/issues/924>
- Be careful when adding the `--tables THE,MODIFIED,TABLES` flag. It will delete relationships if they aren't listed
4. After running the above, you will need to manually fix some things
- `Vec<u8>` -> `sea_orm::prelude::Uuid` (Related: <https://github.com/SeaQL/sea-query/issues/375>)
- `i8` -> `bool` (Related: <https://github.com/SeaQL/sea-orm/issues/924>)
- add all the tables back into `mod.rs`
## Flame Graphs

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.28.0"
version = "0.30.0"
edition = "2021"
[lib]

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -14,6 +14,8 @@ pub struct Model {
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::admin_increase_balance_receipt::Entity")]
AdminIncreaseBalanceReceipt,
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::UserId",
@ -24,6 +26,12 @@ pub enum Relation {
User,
}
impl Related<super::admin_increase_balance_receipt::Entity> for Entity {
fn to() -> RelationDef {
Relation::AdminIncreaseBalanceReceipt.def()
}
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()

@ -1,8 +1,9 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "admin_increase_balance_receipt")]
pub struct Model {
#[sea_orm(primary_key)]

@ -1,8 +1,9 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "admin_trail")]
pub struct Model {
#[sea_orm(primary_key)]

@ -1,16 +1,17 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "balance")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(unique)]
pub user_id: u64,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub total_spent_including_free_tier: Decimal,
#[sea_orm(unique)]
pub user_id: u64,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub total_spent_outside_free_tier: Decimal,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]

@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
@ -13,6 +13,9 @@ pub struct Model {
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub amount: Decimal,
pub deposit_to_user_id: u64,
pub block_hash: String,
pub log_index: u64,
pub token_address: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -1,8 +1,9 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "referee")]
pub struct Model {
#[sea_orm(primary_key)]

@ -1,8 +1,9 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.6
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.10.7
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "referrer")]
pub struct Model {
#[sea_orm(primary_key)]

@ -40,7 +40,7 @@ pub struct Model {
pub max_response_bytes: u64,
pub archive_request: bool,
pub origin: Option<String>,
pub migrated: Option<DateTime>,
pub migrated: Option<DateTimeUtc>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -3,10 +3,11 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};
// TODO: rename to StatLevel? AccountingLevel? What?
// TODO: rename TrackingLevel to StatLevel? AccountingLevel? What?
#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "log_level")]
pub enum TrackingLevel {
/// TODO: rename to minimal
#[sea_orm(string_value = "none")]
None,
#[sea_orm(string_value = "aggregated")]

@ -11,20 +11,12 @@ pub struct Model {
pub id: u64,
pub user_id: u64,
pub description: Option<String>,
pub rpc_secret_key_id: u64,
pub role: Role,
pub rpc_secret_key_id: u64,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::UserId",
to = "super::user::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
User,
#[sea_orm(
belongs_to = "super::rpc_key::Entity",
from = "Column::RpcSecretKeyId",
@ -33,12 +25,14 @@ pub enum Relation {
on_delete = "NoAction"
)]
RpcKey,
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
#[sea_orm(
belongs_to = "super::user::Entity",
from = "Column::UserId",
to = "super::user::Column::Id",
on_update = "NoAction",
on_delete = "NoAction"
)]
User,
}
impl Related<super::rpc_key::Entity> for Entity {
@ -47,4 +41,10 @@ impl Related<super::rpc_key::Entity> for Entity {
}
}
impl Related<super::user::Entity> for Entity {
fn to() -> RelationDef {
Relation::User.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

@ -23,6 +23,8 @@ pub enum Relation {
Login,
#[sea_orm(has_many = "super::rpc_key::Entity")]
RpcKey,
#[sea_orm(has_many = "super::increase_on_chain_balance_receipt::Entity")]
IncreaseOnChainBalanceReceipt,
#[sea_orm(has_many = "super::secondary_user::Entity")]
SecondaryUser,
#[sea_orm(
@ -47,6 +49,12 @@ impl Related<super::rpc_key::Entity> for Entity {
}
}
impl Related<super::increase_on_chain_balance_receipt::Entity> for Entity {
fn to() -> RelationDef {
Relation::IncreaseOnChainBalanceReceipt.def()
}
}
impl Related<super::secondary_user::Entity> for Entity {
fn to() -> RelationDef {
Relation::SecondaryUser.def()

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.28.0"
version = "0.30.0"
edition = "2021"
publish = false

@ -29,6 +29,7 @@ mod m20230511_161214_remove_columns_statsv2_origin_and_method;
mod m20230512_220213_allow_null_rpc_key_id_in_stats_v2;
mod m20230514_114803_admin_add_credits;
mod m20230607_221917_total_deposits;
mod m20230615_221201_handle_payment_uncles;
pub struct Migrator;
@ -65,6 +66,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230512_220213_allow_null_rpc_key_id_in_stats_v2::Migration),
Box::new(m20230514_114803_admin_add_credits::Migration),
Box::new(m20230607_221917_total_deposits::Migration),
Box::new(m20230615_221201_handle_payment_uncles::Migration),
]
}
}

@ -0,0 +1,71 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// TODO: also alter the index to include the BlockHash? or
manager
.alter_table(
Table::alter()
.table(IncreaseOnChainBalanceReceipt::Table)
.add_column(
ColumnDef::new(IncreaseOnChainBalanceReceipt::BlockHash)
.string()
.not_null(),
)
.add_column(
ColumnDef::new(IncreaseOnChainBalanceReceipt::LogIndex)
.big_integer()
.unsigned()
.not_null(),
)
.add_column(
ColumnDef::new(IncreaseOnChainBalanceReceipt::TokenAddress)
.string()
.not_null(),
)
.drop_foreign_key(Alias::new("fk-deposit_to_user_id"))
.add_foreign_key(
TableForeignKey::new()
.name("fk-deposit_to_user_id-v2")
.from_col(IncreaseOnChainBalanceReceipt::DepositToUserId)
.to_tbl(User::Table)
.to_col(User::Id),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(IncreaseOnChainBalanceReceipt::Table)
.drop_column(IncreaseOnChainBalanceReceipt::BlockHash)
.drop_column(IncreaseOnChainBalanceReceipt::LogIndex)
.drop_column(IncreaseOnChainBalanceReceipt::TokenAddress)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum IncreaseOnChainBalanceReceipt {
Table,
BlockHash,
LogIndex,
TokenAddress,
DepositToUserId,
}
#[derive(Iden)]
enum User {
Table,
Id,
}

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.28.0"
version = "0.30.0"
edition = "2021"
default-run = "web3_proxy_cli"

@ -41,7 +41,7 @@ use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{error, info, trace, warn, Level};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{EntityTrait, PaginatorTrait};
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
use moka::future::{Cache, CacheBuilder};
use parking_lot::Mutex;
use redis_rate_limiter::redis::AsyncCommands;
@ -679,8 +679,6 @@ impl Web3ProxyApp {
.changed()
.await
.context("failed awaiting top_config change")?;
info!("config changed");
}
});
@ -688,7 +686,7 @@ impl Web3ProxyApp {
}
if important_background_handles.is_empty() {
info!("no important background handles");
trace!("no important background handles");
let f = tokio::spawn(async move {
let _ = background_shutdown_receiver.recv().await;
@ -1063,10 +1061,22 @@ impl Web3ProxyApp {
}
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
#[inline]
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
}
#[inline]
pub async fn db_transaction(&self) -> Web3ProxyResult<DatabaseTransaction> {
if let Some(ref db_conn) = self.db_conn {
let x = db_conn.begin().await?;
Ok(x)
} else {
Err(Web3ProxyError::NoDatabase)
}
}
#[inline]
pub fn db_replica(&self) -> Option<DatabaseReplica> {
self.db_replica.clone()
}

@ -48,7 +48,7 @@ impl ProxydSubCommand {
}
async fn run(
mut top_config: TopConfig,
top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: u16,
prometheus_port: u16,
@ -87,13 +87,18 @@ async fn run(
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config_sender;
{
let mut current_config = config_sender.borrow().clone();
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str(&new_top_config) {
Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) {
Ok(new_top_config) => {
if new_top_config != top_config {
top_config = new_top_config;
config_sender.send(top_config.clone()).unwrap();
if new_top_config != current_config {
// TODO: print the differences
// TODO: first run seems to always see differences. why?
info!("config @ {:?} changed", top_config_path);
config_sender.send(new_top_config.clone()).unwrap();
current_config = new_top_config;
}
}
Err(err) => {
@ -358,12 +363,12 @@ mod tests {
let proxy_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
@ -378,12 +383,12 @@ mod tests {
.unwrap();
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();

@ -15,7 +15,7 @@ use derive_more::{Display, Error, From};
use ethers::prelude::ContractError;
use http::header::InvalidHeaderValue;
use ipnet::AddrParseError;
use log::{debug, error, info, trace, warn};
use log::{debug, error, trace, warn};
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
@ -97,6 +97,7 @@ pub enum Web3ProxyError {
NoBlockNumberOrHash,
NoBlocksKnown,
NoConsensusHeadBlock,
NoDatabase,
NoHandleReady,
NoServersSynced,
#[display(fmt = "{}/{}", num_known, min_head_rpcs)]
@ -201,7 +202,7 @@ impl Web3ProxyError {
return err.as_response_parts::<R>();
}
Self::BadRequest(err) => {
debug!("BAD_REQUEST: {}", err);
trace!("BAD_REQUEST: {}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -268,9 +269,10 @@ impl Web3ProxyError {
)
}
Self::EipVerificationFailed(err_1, err_191) => {
info!(
trace!(
"EipVerificationFailed err_1={:#?} err2={:#?}",
err_1, err_191
err_1,
err_191
);
(
StatusCode::UNAUTHORIZED,
@ -331,7 +333,7 @@ impl Web3ProxyError {
}
// Self::JsonRpcForwardedError(x) => (StatusCode::OK, x),
Self::GasEstimateNotU256 => {
warn!("GasEstimateNotU256");
trace!("GasEstimateNotU256");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
@ -353,7 +355,7 @@ impl Web3ProxyError {
)
}
Self::Headers(err) => {
warn!("HeadersError {:?}", err);
trace!("HeadersError {:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -388,7 +390,7 @@ impl Web3ProxyError {
)
}
Self::InvalidBlockBounds { min, max } => {
debug!("InvalidBlockBounds min={} max={}", min, max);
trace!("InvalidBlockBounds min={} max={}", min, max);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -414,7 +416,7 @@ impl Web3ProxyError {
)
}
Self::IpNotAllowed(ip) => {
debug!("IpNotAllowed ip={})", ip);
trace!("IpNotAllowed ip={})", ip);
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
@ -425,7 +427,7 @@ impl Web3ProxyError {
)
}
Self::InvalidHeaderValue(err) => {
debug!("InvalidHeaderValue err={:?}", err);
trace!("InvalidHeaderValue err={:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -436,7 +438,7 @@ impl Web3ProxyError {
)
}
Self::InvalidEip => {
debug!("InvalidEip");
trace!("InvalidEip");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -447,7 +449,7 @@ impl Web3ProxyError {
)
}
Self::InvalidInviteCode => {
debug!("InvalidInviteCode");
trace!("InvalidInviteCode");
(
StatusCode::UNAUTHORIZED,
JsonRpcErrorData {
@ -470,7 +472,7 @@ impl Web3ProxyError {
)
}
Self::UnknownReferralCode => {
debug!("UnknownReferralCode");
trace!("UnknownReferralCode");
(
StatusCode::UNAUTHORIZED,
JsonRpcErrorData {
@ -481,7 +483,7 @@ impl Web3ProxyError {
)
}
Self::InvalidReferer => {
debug!("InvalidReferer");
trace!("InvalidReferer");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -492,7 +494,7 @@ impl Web3ProxyError {
)
}
Self::InvalidSignatureLength => {
debug!("InvalidSignatureLength");
trace!("InvalidSignatureLength");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -503,7 +505,7 @@ impl Web3ProxyError {
)
}
Self::InvalidUserAgent => {
debug!("InvalidUserAgent");
trace!("InvalidUserAgent");
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
@ -514,7 +516,7 @@ impl Web3ProxyError {
)
}
Self::InvalidUserKey => {
warn!("InvalidUserKey");
trace!("InvalidUserKey");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -602,6 +604,17 @@ impl Web3ProxyError {
},
)
}
Self::NoDatabase => {
error!("no database configured");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: "no database configured!".into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::NoHandleReady => {
error!("NoHandleReady");
(
@ -790,7 +803,7 @@ impl Web3ProxyError {
)
}
Self::RefererRequired => {
debug!("referer required");
trace!("referer required");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -801,7 +814,7 @@ impl Web3ProxyError {
)
}
Self::RefererNotAllowed(referer) => {
debug!("referer not allowed referer={:?}", referer);
trace!("referer not allowed referer={:?}", referer);
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
@ -915,7 +928,7 @@ impl Web3ProxyError {
},
),
Self::UserAgentRequired => {
debug!("UserAgentRequired");
trace!("UserAgentRequired");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -926,7 +939,7 @@ impl Web3ProxyError {
)
}
Self::UserAgentNotAllowed(ua) => {
debug!("UserAgentNotAllowed ua={}", ua);
trace!("UserAgentNotAllowed ua={}", ua);
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {

@ -1022,9 +1022,11 @@ impl Web3ProxyApp {
)?;
// no semaphore is needed here because login rate limits are low
// TODO: are we sure do we want a semaphore here?
// TODO: are we sure do not we want a semaphore here?
let semaphore = None;
// TODO: if ip is on the local network, always allow?
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => {

@ -174,6 +174,10 @@ pub async fn serve(
"/user/balance/:tx_hash",
post(users::payment::user_balance_post),
)
.route(
"/user/balance_uncle/:uncle_hash",
post(users::payment::user_balance_uncle_post),
)
.route("/user/keys", get(users::rpc_keys::rpc_keys_get))
.route("/user/keys", post(users::rpc_keys::rpc_keys_management))
.route("/user/keys", put(users::rpc_keys::rpc_keys_management))

@ -339,8 +339,6 @@ async fn handle_socket_payload(
let key: U64 = serde_json::from_str(subscription_id.get()).unwrap();
info!("key: {}", key);
x.insert(key, handle);
}
@ -373,8 +371,6 @@ async fn handle_socket_payload(
}
};
info!("key: {}", subscription_id);
// TODO: is this the right response?
let partial_response = {
let mut x = subscriptions.write().await;

@ -21,7 +21,7 @@ use http::StatusCode;
use log::{debug, trace, warn};
use migration::sea_orm::prelude::{Decimal, Uuid};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
self, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
QueryFilter, TransactionTrait,
};
use serde_json::json;
@ -143,13 +143,11 @@ pub async fn user_login_get(
Ok(message.into_response())
}
/// you MUST commit the `txn` after calling this function!
pub async fn register_new_user(
db_conn: &DatabaseConnection,
txn: &DatabaseTransaction,
address: Address,
) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> {
// all or nothing
let txn = db_conn.begin().await?;
// the only thing we need from them is an address
// everything else is optional
// TODO: different invite codes should allow different levels
@ -160,7 +158,7 @@ pub async fn register_new_user(
..Default::default()
};
let new_user = new_user.insert(&txn).await?;
let new_user = new_user.insert(txn).await?;
// create the user's first api key
let rpc_secret_key = RpcSecretKey::new();
@ -173,7 +171,7 @@ pub async fn register_new_user(
};
let user_rpc_key = user_rpc_key
.insert(&txn)
.insert(txn)
.await
.web3_context("Failed saving new user key")?;
@ -183,10 +181,7 @@ pub async fn register_new_user(
..Default::default()
};
let user_balance = user_balance.insert(&txn).await?;
// save the user and key and balance to the database
txn.commit().await?;
let user_balance = user_balance.insert(txn).await?;
Ok((new_user, user_rpc_key, user_balance))
}
@ -312,10 +307,14 @@ pub async fn user_login_post(
}
}
let (caller, caller_key, _) =
register_new_user(&db_conn, our_msg.address.into()).await?;
let txn = db_conn.begin().await?;
let (caller, caller_key, _) = register_new_user(&txn, our_msg.address.into()).await?;
txn.commit().await?;
let txn = db_conn.begin().await?;
// First, optionally catch a referral code from the parameters if there is any
debug!("Refferal code is: {:?}", payload.referral_code);
if let Some(referral_code) = payload.referral_code.as_ref() {
@ -323,7 +322,7 @@ pub async fn user_login_post(
trace!("Using register referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.as_ref())
.one(&txn)
.await?
.ok_or(Web3ProxyError::UnknownReferralCode)?;
@ -354,7 +353,7 @@ pub async fn user_login_post(
trace!("Using referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.as_ref())
.one(&txn)
.await?
.ok_or(Web3ProxyError::BadRequest(
format!(
@ -382,7 +381,7 @@ pub async fn user_login_post(
// the user is already registered
let user_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(caller.id))
.all(db_replica.as_ref())
.all(&db_conn)
.await
.web3_context("failed loading user's key")?;

@ -1,6 +1,8 @@
use crate::app::Web3ProxyApp;
use crate::errors::{Web3ProxyError, Web3ProxyResponse};
use crate::frontend::authorization::login_is_authorized;
use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult};
use crate::frontend::authorization::{
login_is_authorized, Authorization as Web3ProxyAuthorization,
};
use crate::frontend::users::authentication::register_new_user;
use anyhow::Context;
use axum::{
@ -13,13 +15,14 @@ use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user};
use ethers::abi::AbiEncode;
use ethers::types::{Address, TransactionReceipt, H256};
use hashbrown::HashMap;
use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256};
use hashbrown::{HashMap, HashSet};
use http::StatusCode;
use log::{debug, info, trace};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait,
self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait,
QueryFilter, QuerySelect, TransactionTrait,
};
use migration::{Expr, OnConflict};
use payment_contracts::ierc20::IERC20;
@ -100,10 +103,7 @@ pub async fn user_deposits_get(
Ok(Json(response).into_response())
}
/// `POST /user/balance/:tx_hash` -- Manually process a confirmed txid to update a user's balance.
///
/// We will subscribe to events to watch for any user deposits, but sometimes events can be missed.
/// TODO: change this. just have a /tx/:txhash that is open to anyone. rate limit like we rate limit /login
/// `POST /user/balance/:tx_hash` -- Process a confirmed txid to update a user's balance.
#[debug_handler]
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
@ -113,10 +113,9 @@ pub async fn user_balance_post(
// I suppose this is ok / good, so people don't spam this endpoint as it is not "cheap"
// we rate limit by ip instead of bearer token so transactions are easy to submit from scripts
// TODO: if ip is a 10. or a 172., allow unlimited
login_is_authorized(&app, ip).await?;
let authorization = login_is_authorized(&app, ip).await?;
// Get the transaction hash, and the amount that the user wants to top up by.
// Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg)
// Get the transaction hash
let tx_hash: H256 = params
.remove("tx_hash")
.ok_or(Web3ProxyError::BadRequest(
@ -129,36 +128,89 @@ pub async fn user_balance_post(
let db_conn = app.db_conn().context("query_user_stats needs a db")?;
// Return early if the tx was already added
if increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.one(&db_conn)
.await?
.is_some()
{
// TODO: double check that the transaction is still seen as "confirmed" if it is NOT, we need to remove credits!
// this will be status code 200, not 204
let response = Json(json!({
"result": "success",
"message": "this transaction was already in the database",
}))
.into_response();
return Ok(response);
};
let authorization = Arc::new(authorization);
// get the transaction receipt
let transaction_receipt = app
.internal_request::<_, Option<TransactionReceipt>>("eth_getTransactionReceipt", (tx_hash,))
.await?
.ok_or(Web3ProxyError::BadRequest(
format!("transaction receipt not found for {}", tx_hash,).into(),
))?;
.authorized_request::<_, Option<TransactionReceipt>>(
"eth_getTransactionReceipt",
(tx_hash,),
authorization.clone(),
)
.await?;
// check for uncles
let mut find_uncles = increase_on_chain_balance_receipt::Entity::find()
// .lock_exclusive()
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id));
let tx_pending =
if let Some(block_hash) = transaction_receipt.as_ref().and_then(|x| x.block_hash) {
// check for uncles
// this transaction is confirmed
// any rows in the db with a block hash that doesn't match the receipt should be deleted
find_uncles = find_uncles.filter(
increase_on_chain_balance_receipt::Column::BlockHash.ne(block_hash.encode_hex()),
);
false
} else {
// no block_hash to check
// this transaction is not confirmed
// any rows in the db should be deleted
true
};
let uncle_hashes = find_uncles.all(&db_conn).await?;
let uncle_hashes: HashSet<_> = uncle_hashes
.into_iter()
.map(|x| serde_json::from_str(x.block_hash.as_str()).unwrap())
.collect();
for uncle_hash in uncle_hashes.into_iter() {
if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? {
info!("balance changes from uncle: {:#?}", x);
}
}
if tx_pending {
// the transaction isn't confirmed. return early
// TODO: BadRequest, or something else?
return Err(Web3ProxyError::BadRequest(
"this transaction has not confirmed yet. Please try again later.".into(),
));
}
let transaction_receipt =
transaction_receipt.expect("if tx_pending is false, transaction_receipt must be set");
let block_hash = transaction_receipt
.block_hash
.expect("if tx_pending is false, block_hash must be set");
trace!("Transaction receipt: {:#?}", transaction_receipt);
// TODO: if the transaction doesn't have enough confirmations yet, add it to a queue to try again later
// 1 confirmation should be fine though
let txn = db_conn.begin().await?;
// if the transaction is already saved, return early
if increase_on_chain_balance_receipt::Entity::find()
.filter(increase_on_chain_balance_receipt::Column::TxHash.eq(tx_hash.encode_hex()))
.filter(increase_on_chain_balance_receipt::Column::ChainId.eq(app.config.chain_id))
.filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(block_hash.encode_hex()))
.one(&txn)
.await?
.is_some()
{
return Ok(Json(json!({
"result": "tx_hash already saved",
}))
.into_response());
};
let payment_factory_address = app
.config
@ -168,47 +220,17 @@ pub async fn user_balance_post(
let payment_factory_contract =
PaymentFactory::new(payment_factory_address, app.internal_provider().clone());
debug!(
"Payment Factory Filter: {:?}",
payment_factory_contract.payment_received_filter()
);
// check bloom filter to be sure this transaction contains any relevant logs
// TODO: This does not work properly right now, get back this eventually
// TODO: compare to code in llamanodes/web3-this-then-that
// if let Some(ValueOrArray::Value(Some(x))) = payment_factory_contract
// .payment_received_filter()
// .filter
// .topics[0]
// {
// debug!("Bloom input bytes is: {:?}", x);
// debug!("Bloom input bytes is: {:?}", x.as_fixed_bytes());
// debug!("Bloom input as hex is: {:?}", hex!(x));
// let bloom_input = BloomInput::Raw(hex!(x));
// debug!(
// "Transaction receipt logs_bloom: {:?}",
// transaction_receipt.logs_bloom
// );
//
// // do a quick check that this transaction contains the required log
// if !transaction_receipt.logs_bloom.contains_input(x) {
// return Err(Web3ProxyError::BadRequest("no matching logs found".into()));
// }
// }
// TODO: check bloom filters
// the transaction might contain multiple relevant logs. collect them all
let mut response_data = vec![];
// all or nothing
let txn = db_conn.begin().await?;
// parse the logs from the transaction receipt
for log in transaction_receipt.logs {
if let Some(true) = log.removed {
todo!("delete this transaction from the database");
// TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block`
continue;
}
// Create a new transaction that will be used for joint transaction
// Parse the log into an event
if let Ok(event) = payment_factory_contract
.decode_event::<payment_factory::PaymentReceivedFilter>(
"PaymentReceived",
@ -228,7 +250,8 @@ pub async fn user_balance_post(
let log_index = log
.log_index
.context("no log_index. transaction must not be confirmed")?;
.context("no log_index. transaction must not be confirmed")?
.as_u64();
// the internal provider will handle caching of requests
let payment_token = IERC20::new(payment_token_address, app.internal_provider().clone());
@ -241,31 +264,33 @@ pub async fn user_balance_post(
// Setting the scale already does the decimal shift, no need to divide a second time
payment_token_amount.set_scale(payment_token_decimals)?;
info!(
"Found deposit transaction for: {:?} {:?} {:?}",
debug!(
"Found deposit event for: {:?} {:?} {:?}",
recipient_account, payment_token_address, payment_token_amount
);
let recipient = match user::Entity::find()
.filter(user::Column::Address.eq(recipient_account.to_fixed_bytes().as_slice()))
.one(&db_conn)
.one(&txn)
.await?
{
Some(x) => x,
None => {
let (user, _, _) = register_new_user(&db_conn, recipient_account).await?;
let (user, _, _) = register_new_user(&txn, recipient_account).await?;
user
}
};
// For now we only accept stablecoins
// And we hardcode the peg (later we would have to depeg this, for example
// For now we only accept stablecoins. This will need conversions if we accept other tokens.
// 1$ = Decimal(1) for any stablecoin
// TODO: Let's assume that people don't buy too much at _once_, we do support >$1M which should be fine for now
debug!(
// TODO: double check. why >$1M? Decimal type in the database?
trace!(
"Arithmetic is: {:?} / 10 ^ {:?} = {:?}",
payment_token_wei, payment_token_decimals, payment_token_amount
payment_token_wei,
payment_token_decimals,
payment_token_amount
);
// create or update the balance
@ -275,7 +300,7 @@ pub async fn user_balance_post(
user_id: sea_orm::Set(recipient.id),
..Default::default()
};
info!("Trying to insert into balance entry: {:?}", balance_entry);
trace!("Trying to insert into balance entry: {:?}", balance_entry);
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
@ -288,17 +313,18 @@ pub async fn user_balance_post(
.exec(&txn)
.await?;
debug!("Saving tx_hash: {:?}", tx_hash);
debug!("Saving log {} of txid {:?}", log_index, tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel {
tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
// TODO: need a migration that adds log_index
// TODO: need a migration that adds payment_token_address. will be useful for stats
id: sea_orm::ActiveValue::NotSet,
amount: sea_orm::ActiveValue::Set(payment_token_amount),
block_hash: sea_orm::ActiveValue::Set(block_hash.encode_hex()),
chain_id: sea_orm::ActiveValue::Set(app.config.chain_id),
deposit_to_user_id: sea_orm::ActiveValue::Set(recipient.id),
..Default::default()
log_index: sea_orm::ActiveValue::Set(log_index),
token_address: sea_orm::ActiveValue::Set(payment_token_address.encode_hex()),
tx_hash: sea_orm::ActiveValue::Set(tx_hash.encode_hex()),
};
info!("Trying to insert receipt {:?}", receipt);
trace!("Trying to insert receipt {:?}", receipt);
receipt.save(&txn).await?;
@ -323,19 +349,118 @@ pub async fn user_balance_post(
let x = json!({
"tx_hash": tx_hash,
"block_hash": block_hash,
"log_index": log_index,
"token": payment_token_address,
"amount": payment_token_amount,
});
debug!("deposit data: {:#?}", x);
response_data.push(x);
}
}
txn.commit().await?;
debug!("Saved to db");
let response = (StatusCode::CREATED, Json(json!(response_data))).into_response();
Ok(response)
}
/// `POST /user/balance_uncle/:uncle_hash` -- Process an uncle block to potentially update a user's balance.
#[debug_handler]
pub async fn user_balance_uncle_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
InsecureClientIp(ip): InsecureClientIp,
Path(mut params): Path<HashMap<String, String>>,
) -> Web3ProxyResponse {
let authorization = login_is_authorized(&app, ip).await?;
// Get the transaction hash, and the amount that the user wants to top up by.
// Let's say that for now, 1 credit is equivalent to 1 dollar (assuming any stablecoin has a 1:1 peg)
let uncle_hash: H256 = params
.remove("uncle_hash")
.ok_or(Web3ProxyError::BadRequest(
"You have not provided a uncle_hash".into(),
))?
.parse()
.map_err(|err| {
Web3ProxyError::BadRequest(format!("unable to parse uncle_hash: {}", err).into())
})?;
let authorization = Arc::new(authorization);
if let Some(x) = handle_uncle_block(&app, &authorization, uncle_hash).await? {
Ok(Json(x).into_response())
} else {
// TODO: is BadRequest the right error to use?
Err(Web3ProxyError::BadRequest("block is not an uncle".into()))
}
}
pub async fn handle_uncle_block(
app: &Arc<Web3ProxyApp>,
authorization: &Arc<Web3ProxyAuthorization>,
uncle_hash: H256,
) -> Web3ProxyResult<Option<HashMap<u64, Decimal>>> {
info!("handling uncle: {:?}", uncle_hash);
// cancel if uncle_hash is actually a confirmed block
if app
.authorized_request::<_, Option<Block<TxHash>>>(
"eth_getBlockByHash",
(uncle_hash, false),
authorization.clone(),
)
.await
.context("eth_getBlockByHash failed")?
.is_some()
{
return Ok(None);
}
// user_id -> balance that we need to subtract
let mut reversed_balances: HashMap<u64, Decimal> = HashMap::new();
let txn = app.db_transaction().await?;
// delete any deposit txids with uncle_hash
for reversed_deposit in increase_on_chain_balance_receipt::Entity::find()
.lock_exclusive()
.filter(increase_on_chain_balance_receipt::Column::BlockHash.eq(uncle_hash.encode_hex()))
.all(&txn)
.await?
{
let reversed_balance = reversed_balances
.entry(reversed_deposit.deposit_to_user_id)
.or_default();
*reversed_balance += reversed_deposit.amount;
// TODO: instead of delete, mark as uncled? seems like it would bloat the db unnecessarily. a stat should be enough
reversed_deposit.delete(&txn).await?;
}
debug!("removing balances: {:#?}", reversed_balances);
for (user_id, reversed_balance) in reversed_balances.iter() {
if let Some(user_balance) = balance::Entity::find()
.lock_exclusive()
.filter(balance::Column::Id.eq(*user_id))
.one(&txn)
.await?
{
let mut user_balance = user_balance.into_active_model();
user_balance.total_deposits =
ActiveValue::Set(user_balance.total_deposits.as_ref() - reversed_balance);
user_balance.update(&txn).await?;
}
}
txn.commit().await?;
Ok(Some(reversed_balances))
}

@ -151,7 +151,7 @@ pub async fn rpc_keys_management(
Err(Web3ProxyError::AccessDenied)
}
}
Some((x, None)) => Err(Web3ProxyError::BadResponse(
Some((_, None)) => Err(Web3ProxyError::BadResponse(
"a subuser record was found, but no corresponding RPC key".into(),
)),
// Match statement here, check in the user's RPC keys directly if it's not part of the secondary user

@ -128,6 +128,10 @@ impl Web3ProxyBlock {
.as_ref()
.expect("saved blocks must have a number")
}
pub fn uncles(&self) -> &[H256] {
&self.block.uncles
}
}
impl TryFrom<ArcBlock> for Web3ProxyBlock {
@ -164,28 +168,87 @@ impl Web3Rpcs {
pub async fn try_cache_block(
&self,
block: Web3ProxyBlock,
heaviest_chain: bool,
consensus_head: bool,
) -> Web3ProxyResult<Web3ProxyBlock> {
let block_hash = *block.hash();
// TODO: i think we can rearrange this function to make it faster on the hot path
if block.hash().is_zero() {
if block_hash.is_zero() {
debug!("Skipping block without hash!");
return Ok(block);
}
// this block is very likely already in block_hashes
// TODO: use their get_with
let block_hash = *block.hash();
// TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain {
// this is the only place that writes to block_numbers
// multiple inserts should be okay though
// TODO: info if there was a fork?
if consensus_head {
let block_num = block.number();
self.blocks_by_number
.get_with_by_ref(block_num, async move { block_hash })
.await;
// TODO: if there is an existing entry with a different block_hash,
// TODO: use entry api to handle changing existing entries
self.blocks_by_number.insert(*block_num, block_hash).await;
for uncle in block.uncles() {
self.blocks_by_hash.invalidate(uncle).await;
// TODO: save uncles somewhere?
}
// loop to make sure parent hashes match our caches
// set the first ancestor to the blocks' parent hash. but keep going up the chain
if let Some(parent_num) = block.number().checked_sub(1.into()) {
struct Ancestor {
num: U64,
hash: H256,
}
let mut ancestor = Ancestor {
num: parent_num,
hash: *block.parent_hash(),
};
loop {
let ancestor_number_to_hash_entry = self
.blocks_by_number
.entry_by_ref(&ancestor.num)
.or_insert(ancestor.hash)
.await;
if *ancestor_number_to_hash_entry.value() == ancestor.hash {
// the existing number entry matches. all good
break;
}
// oh no! ancestor_number_to_hash_entry is different
// remove the uncled entry in blocks_by_hash
// we will look it up later if necessary
self.blocks_by_hash
.invalidate(ancestor_number_to_hash_entry.value())
.await;
// TODO: delete any cached entries for eth_getBlockByHash or eth_getBlockByNumber
// TODO: race on this drop and insert?
drop(ancestor_number_to_hash_entry);
// update the entry in blocks_by_number
self.blocks_by_number
.insert(ancestor.num, ancestor.hash)
.await;
// try to check the parent of this ancestor
if let Some(ancestor_block) = self.blocks_by_hash.get(&ancestor.hash) {
match ancestor_block.number().checked_sub(1.into()) {
None => break,
Some(ancestor_parent_num) => {
ancestor = Ancestor {
num: ancestor_parent_num,
hash: *ancestor_block.parent_hash(),
}
}
}
} else {
break;
}
}
}
}
let block = self
@ -208,14 +271,26 @@ impl Web3Rpcs {
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
if let Some(block) = self.blocks_by_hash.get(hash) {
return Ok(block);
// double check that it matches the blocks_by_number cache
let cached_hash = self
.blocks_by_number
.get_with_by_ref(block.number(), async { *hash })
.await;
if cached_hash == *hash {
return Ok(block);
}
// hashes don't match! this block must be in the middle of being uncled
// TODO: check known uncles
}
// block not in cache. we need to ask an rpc for it
let get_block_params = (*hash, false);
let block: Option<ArcBlock> = if let Some(rpc) = rpc {
// TODO: request_with_metadata would probably be better
// ask a specific rpc
// TODO: request_with_metadata would probably be better than authorized_request
rpc.authorized_request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&get_block_params,
@ -224,7 +299,8 @@ impl Web3Rpcs {
)
.await?
} else {
// TODO: request_with_metadata would probably be better
// ask any rpc
// TODO: request_with_metadata instead of internal_request
self.internal_request::<_, Option<ArcBlock>>("eth_getBlockByHash", &get_block_params)
.await?
};
@ -254,7 +330,6 @@ impl Web3Rpcs {
/// Get the heaviest chain's block from cache or backend rpc
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
/// TODO: take a RequestMetadata
pub async fn cannonical_block(
&self,
authorization: &Arc<Authorization>,

@ -217,26 +217,36 @@ impl Web3Rpcs {
// web3 connection worked
let mut new_by_name = (*self.by_name.load_full()).clone();
// make sure that any new requests use the new connection
let old_rpc = new_by_name.insert(rpc.name.clone(), rpc.clone());
// update the arc swap
self.by_name.store(Arc::new(new_by_name));
// clean up the old rpc
if let Some(old_rpc) = old_rpc {
trace!("old_rpc: {}", old_rpc);
// if the old rpc was synced, wait for the new one to sync
if old_rpc.head_block.as_ref().unwrap().borrow().is_some() {
let mut new_head_receiver =
rpc.head_block.as_ref().unwrap().subscribe();
debug!("waiting for new {} to sync", rpc);
trace!("waiting for new {} connection to sync", rpc);
// TODO: maximum wait time or this could block things for too long
// TODO: maximum wait time
while new_head_receiver.borrow_and_update().is_none() {
new_head_receiver.changed().await?;
if new_head_receiver.changed().await.is_err() {
break;
};
}
}
// TODO: tell ethers to disconnect? is there a function for that?
// tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
trace!("telling {} to disconnect", old_rpc);
disconnect_sender.send_replace(true);
}
}
// TODO: what should we do with the new handle? make sure error logs aren't dropped
}
Ok(Err(err)) => {
// if we got an error here, the app can continue on
@ -923,11 +933,11 @@ impl Web3Rpcs {
let rate_limit_substrings = ["limit", "exceeded", "quota usage"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
if rate_limit_substr.contains("result on length") {
if error_msg.contains("result on length") {
// this error contains "limit" but is not a rate limit error
// TODO: make the expected limit configurable
// TODO: parse the rate_limit_substr and only continue if it is < expected limit
if rate_limit_substr.contains("exceeding limit 2000000") {
if error_msg.contains("exceeding limit 2000000") {
// they hit our expected limit. return the error now
return Err(error.into());
} else {

@ -29,7 +29,6 @@ use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::select;
use tokio::sync::watch;
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
use url::Url;
@ -96,7 +95,7 @@ impl Web3Rpc {
redis_pool: Option<RedisPool>,
block_interval: Duration,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<(Arc<Web3Rpc>, Web3ProxyJoinHandle<()>)> {
let created_at = Instant::now();
@ -132,8 +131,8 @@ impl Web3Rpc {
let backup = config.backup;
let block_data_limit: AtomicU64 = config.block_data_limit.unwrap_or_default().into();
let automatic_block_limit =
(block_data_limit.load(atomic::Ordering::Acquire) == 0) && block_sender.is_some();
let automatic_block_limit = (block_data_limit.load(atomic::Ordering::Acquire) == 0)
&& block_and_rpc_sender.is_some();
// have a sender for tracking hard limit anywhere. we use this in case we
// and track on servers that have a configured hard limit
@ -219,7 +218,12 @@ impl Web3Rpc {
tokio::spawn(async move {
// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
new_connection
.subscribe_with_reconnect(block_map, block_sender, chain_id, tx_id_sender)
.subscribe_with_reconnect(
block_map,
block_and_rpc_sender,
chain_id,
tx_id_sender,
)
.await
})
};
@ -580,7 +584,7 @@ impl Web3Rpc {
async fn subscribe_with_reconnect(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
@ -589,7 +593,7 @@ impl Web3Rpc {
.clone()
.subscribe(
block_map.clone(),
block_sender.clone(),
block_and_rpc_sender.clone(),
chain_id,
tx_id_sender.clone(),
)
@ -623,7 +627,7 @@ impl Web3Rpc {
async fn subscribe(
self: Arc<Self>,
block_map: BlocksByHashCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
block_and_rpc_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> Web3ProxyResult<()> {
@ -633,6 +637,10 @@ impl Web3Rpc {
Some(RequestErrorHandler::ErrorLevel)
};
if self.should_disconnect() {
return Ok(());
}
if let Some(url) = self.ws_url.clone() {
debug!("starting websocket provider on {}", self);
@ -643,6 +651,10 @@ impl Web3Rpc {
self.ws_provider.store(Some(x));
}
if self.should_disconnect() {
return Ok(());
}
debug!("starting subscriptions on {}", self);
self.check_provider(chain_id).await?;
@ -650,21 +662,21 @@ impl Web3Rpc {
let mut futures = vec![];
// TODO: use this channel instead of self.disconnect_watch
let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false);
let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false);
// subscribe to the disconnect watch. the app uses this when shutting down
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
let clone = self.clone();
let mut disconnect_watch_rx = disconnect_watch_tx.subscribe();
let f = async move {
// TODO: make sure it changed to "true"
select! {
x = disconnect_watch_rx.changed() => {
x?;
},
x = subscribe_stop_rx.changed() => {
x?;
},
loop {
if *disconnect_watch_rx.borrow_and_update() {
info!("disconnect triggered on {}", clone);
break;
}
disconnect_watch_rx.changed().await?;
}
Ok(())
};
@ -681,8 +693,6 @@ impl Web3Rpc {
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
let health_sleep_seconds = 5;
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
// health check loop
let f = async move {
// TODO: benchmark this and lock contention
@ -718,18 +728,22 @@ impl Web3Rpc {
}
// subscribe to new heads
if let Some(block_sender) = block_sender.clone() {
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let f = async move {
let x = clone
.subscribe_new_heads(block_sender.clone(), block_map.clone(), subscribe_stop_rx)
.subscribe_new_heads(
block_and_rpc_sender.clone(),
block_map.clone(),
subscribe_stop_rx,
)
.await;
// error or success, we clear the block when subscribe_new_heads exits
clone
.send_head_block_result(Ok(None), &block_sender, &block_map)
.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?;
x
@ -763,6 +777,9 @@ impl Web3Rpc {
// TODO: wait for all of the futures to exit?
// TODO: tell ethers to disconnect?
self.ws_provider.store(None);
Ok(())
}
@ -806,6 +823,7 @@ impl Web3Rpc {
while let Some(block) = blocks.next().await {
if *subscribe_stop_rx.borrow() {
trace!("stopping ws block subscription on {}", self);
break;
}
@ -822,6 +840,7 @@ impl Web3Rpc {
loop {
if *subscribe_stop_rx.borrow() {
trace!("stopping http block subscription on {}", self);
break;
}
@ -848,6 +867,7 @@ impl Web3Rpc {
.await?;
if *subscribe_stop_rx.borrow() {
debug!("new heads subscription exited");
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed").into())
@ -860,7 +880,14 @@ impl Web3Rpc {
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
mut subscribe_stop_rx: watch::Receiver<bool>,
) -> Web3ProxyResult<()> {
subscribe_stop_rx.changed().await?;
// TODO: check that it actually changed to true
loop {
if *subscribe_stop_rx.borrow_and_update() {
break;
}
subscribe_stop_rx.changed().await?;
}
/*
trace!("watching pending transactions on {}", self);
@ -1174,7 +1201,6 @@ impl fmt::Debug for Web3Rpc {
impl fmt::Display for Web3Rpc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: filter basic auth and api keys
write!(f, "{}", &self.name)
}
}