From 6d25c41faf80a5dd4766a440b8527fa461755d4a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 19 Jun 2023 17:47:38 -0700 Subject: [PATCH] Compute units (#136) * wip * add basic compute unit calculator * calculate costs with compute units --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 9 +- web3_proxy/src/compute_units.rs | 134 ++++++++++++++++++ web3_proxy/src/frontend/authorization.rs | 36 ++--- web3_proxy/src/lib.rs | 1 + web3_proxy/src/stats/mod.rs | 107 ++++---------- 5 files changed, 191 insertions(+), 96 deletions(-) create mode 100644 web3_proxy/src/compute_units.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index fb04724f..0f3feadd 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -12,7 +12,7 @@ use migration::{Expr, Value}; use parking_lot::Mutex; use std::num::NonZeroU64; use std::sync::Arc; -use tokio::sync::{broadcast}; +use tokio::sync::broadcast; use tokio::time::Instant; use ulid::Ulid; use web3_proxy::app::BILLING_PERIOD_SECONDS; @@ -181,10 +181,15 @@ impl MigrateStatsToV2 { archive_request: x.archive_request.into(), authorization: Some(authorization.clone()), backend_requests: Mutex::new(backend_rpcs), + chain_id: x.chain_id, error_response: x.error_response.into(), // debug data is in kafka, not mysql or influx kafka_debug_logger: None, - method: x.method.clone(), + method: x + .method + .clone() + .unwrap_or_else(|| "unknown".to_string()) + .into(), // This is not relevant in the new version no_servers: 0.into(), // Get the mean of all the request bytes diff --git a/web3_proxy/src/compute_units.rs b/web3_proxy/src/compute_units.rs new file mode 100644 index 00000000..628c83dd --- /dev/null +++ b/web3_proxy/src/compute_units.rs @@ -0,0 +1,134 @@ +//! Compute Units based on median request latencies and sizes. +//! Designed to match Alchemy's system. +//! I'm sure there will be memes about us copying, but the user experience of consistency makes a lot of sense to me. +//! TODO? pricing based on latency and bytes and +//! TODO: rate limit on compute units +//! TODO: pricing on compute units +//! TODO: script that queries influx and calculates observed relative costs + +use log::warn; +use migration::sea_orm::prelude::Decimal; + +pub struct ComputeUnit(Decimal); + +impl ComputeUnit { + /// costs can vary widely depending on method and chain + pub fn new(method: &str, chain_id: u64) -> Self { + let cu = match (chain_id, method) { + (_, "debug_traceTransaction") => 309, + (_, "debug_traceCall") => 309, + (_, "debug_traceBlockByHash") => 497, + (_, "debug_traceBlockByNumber") => 497, + (_, "trace_get") => 17, + (_, "trace_block") => 24, + (_, "trace_transaction") => 26, + (_, "trace_call") => 75, + (_, "trace_rawTransaction") => 75, + (_, "trace_filter") => 75, + (_, "trace_replayTransaction") => 2983, + (_, "trace_replayBlockTransactions") => 2983, + (_, "net_version") => 0, + (_, "eth_chainId") => 0, + (_, "eth_syncing") => 0, + (_, "eth_protocolVersion") => 0, + (_, "net_listening") => 0, + (_, "eth_uninstallFilter") => 10, + (_, "eth_accounts") => 10, + (_, "eth_blockNumber") => 10, + (_, "eth_subscribe") => 10, + (_, "eth_unsubscribe") => 10, + (_, "eth_feeHistory") => 10, + (_, "eth_maxPriorityFeePerGas") => 10, + (_, "eth_createAccessList") => 10, + (_, "eth_getTransactionReceipt") => 15, + (_, "eth_getUncleByBlockHashAndIndex") => 15, + (_, "eth_getUncleByBlockNumberAndIndex") => 15, + (_, "eth_getTransactionByBlockHashAndIndex") => 15, + (_, "eth_getTransactionByBlockNumberAndIndex") => 15, + (_, "eth_getUncleCountByBlockHash") => 15, + (_, "eth_getUncleCountByBlockNumber") => 15, + (_, "web3_clientVersion") => 15, + (_, "web3_sha3") => 15, + (_, "eth_getBlockByNumber") => 16, + (_, "eth_getStorageAt") => 17, + (_, "eth_getTransactionByHash") => 17, + (_, "eth_gasPrice") => 19, + (_, "eth_getBalance") => 19, + (_, "eth_getCode") => 19, + (_, "eth_getFilterChanges") => 20, + (_, "eth_newBlockFilter") => 20, + (_, "eth_newFilter") => 20, + (_, "eth_newPendingTransactionFilter") => 20, + (_, "eth_getBlockTransactionCountByHash") => 20, + (_, "eth_getBlockTransactionCountByNumber") => 20, + (_, "eth_getProof") => 21, + (_, "eth_getBlockByHash") => 21, + (_, "erigon_forks") => 24, + (_, "erigon_getHeaderByHash") => 24, + (_, "erigon_getHeaderByNumber") => 24, + (_, "erigon_getLogsByHash") => 24, + (_, "erigon_issuance") => 24, + (_, "eth_getTransactionCount") => 26, + (_, "eth_call") => 26, + (_, "eth_getFilterLogs") => 75, + (_, "eth_getLogs") => 75, + (_, "eth_estimateGas") => 87, + (_, "eth_sendRawTransaction") => 250, + (_, "eth_getBlockReceipts") => 500, + (137, "bor_getAuthor") => 10, + (137, "bor_getCurrentProposer") => 10, + (137, "bor_getCurrentValidators") => 10, + (137, "bor_getRootHash") => 10, + (137, "bor_getSignersAtHash") => 10, + (1101, "zkevm_batchNumber") => 0, + (1101, "zkevm_batchNumberByBlockNumber") => 0, + (1101, "zkevm_consolidatedBlockNumber") => 0, + (1101, "zkevm_getBatchByNumber") => 0, + (1101, "zkevm_getBroadcastURI") => 0, + (1101, "zkevm_isBlockConsolidated") => 0, + (1101, "zkevm_isBlockVirtualized") => 0, + (1101, "zkevm_verifiedBatchNumber") => 0, + (1101, "zkevm_virtualBatchNumber") => 0, + (_, "eth_sendUserOperation") => 1000, + (_, "eth_estimateUserOperationGas") => 500, + (_, "eth_getUserOperationByHash") => 17, + (_, "eth_getUserOperationReceipt") => 15, + (_, "eth_supportedEntryPoints") => 5, + (_, method) => { + // default to 10 CU for methods that aren't included here + warn!("unknown method {}", method); + 10 + } + }; + + let cu = Decimal::from(cu); + + Self(cu) + } + + /// notifications and subscription responses cost per-byte + pub fn subscription_response>(num_bytes: D) -> Self { + let cu = num_bytes.into() * Decimal::new(4, 2); + + Self(cu) + } + + /// requesting an unimplemented function costs 2 CU + pub fn unimplemented() -> Self { + Self(2.into()) + } + + /// Compute cost per request + /// All methods cost the same + /// The number of bytes are based on input, and output bytes + pub fn cost(&self, cache_hit: bool, usd_per_cu: Decimal) -> Decimal { + let mut cost = self.0 * usd_per_cu; + + // cache hits get a 50% discount + if cache_hit { + cost /= Decimal::from(2) + } + + cost + } +} diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index dc2ae0f8..95c84a2f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -31,6 +31,7 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; +use std::borrow::Cow; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::mem; @@ -296,14 +297,15 @@ pub struct RequestMetadata { pub authorization: Option>, + pub chain_id: u64, + pub request_ulid: Ulid, /// Size of the JSON request. Does not include headers or things like that. pub request_bytes: usize, - /// users can opt out of method tracking for their personal dashboads - /// but we still have to store the method at least temporarily for cost calculations - pub method: Option, + /// The JSON-RPC request method. + pub method: Cow<'static, str>, /// Instant that the request was received (or at least close to it) /// We use Instant and not timestamps to avoid problems with leap seconds and similar issues @@ -341,12 +343,14 @@ impl Default for Authorization { } } +/// this is only implemented so that we can use `mem::take`. You probably shouldn't use this. impl Default for RequestMetadata { fn default() -> Self { Self { archive_request: Default::default(), authorization: Default::default(), backend_requests: Default::default(), + chain_id: Default::default(), error_response: Default::default(), kafka_debug_logger: Default::default(), method: Default::default(), @@ -385,19 +389,19 @@ impl RequestMetadata { #[derive(From)] pub enum RequestOrMethod<'a> { - Request(&'a JsonRpcRequest), /// jsonrpc method (or similar label) and the size that the request should count as (sometimes 0) Method(&'a str, usize), - RequestSize(usize), + Request(&'a JsonRpcRequest), } impl<'a> RequestOrMethod<'a> { - fn method(&self) -> Option<&str> { - match self { - Self::Request(x) => Some(&x.method), - Self::Method(x, _) => Some(x), - _ => None, - } + fn method(&self) -> Cow<'static, str> { + let x = match self { + Self::Request(x) => x.method.to_string(), + Self::Method(x, _) => x.to_string(), + }; + + x.into() } fn jsonrpc_request(&self) -> Option<&JsonRpcRequest> { @@ -411,18 +415,13 @@ impl<'a> RequestOrMethod<'a> { match self { RequestOrMethod::Method(_, num_bytes) => *num_bytes, RequestOrMethod::Request(x) => x.num_bytes(), - RequestOrMethod::RequestSize(num_bytes) => *num_bytes, } } } impl<'a> From<&'a str> for RequestOrMethod<'a> { fn from(value: &'a str) -> Self { - if value.is_empty() { - Self::RequestSize(0) - } else { - Self::Method(value, 0) - } + Self::Method(value, 0) } } @@ -463,7 +462,7 @@ impl RequestMetadata { ) -> Arc { let request = request.into(); - let method = request.method().map(|x| x.to_string()); + let method = request.method(); let request_bytes = request.num_bytes(); @@ -499,6 +498,7 @@ impl RequestMetadata { archive_request: false.into(), authorization: Some(authorization), backend_requests: Default::default(), + chain_id: app.config.chain_id, error_response: false.into(), kafka_debug_logger, method, diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 4038dcc7..56c5da5e 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -4,6 +4,7 @@ pub mod admin_queries; pub mod app; pub mod block_number; +pub mod compute_units; pub mod config; pub mod errors; pub mod frontend; diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 86c8149f..e2c2711c 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -7,6 +7,7 @@ pub mod influxdb_queries; use self::stat_buffer::BufferedRpcQueryStats; use crate::app::{RpcSecretKeyCache, UserBalanceCache}; +use crate::compute_units::ComputeUnit; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::rpcs::one::Web3Rpc; @@ -25,7 +26,9 @@ use migration::sea_orm::{DatabaseTransaction, QuerySelect}; use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; +use std::borrow::Cow; use std::num::NonZeroU64; +use std::str::FromStr; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; @@ -42,8 +45,9 @@ pub type BackendRequests = Mutex>>; /// TODO: better name? RpcQueryStatBuilder? #[derive(Clone, Debug)] pub struct RpcQueryStats { + pub chain_id: u64, pub authorization: Arc, - pub method: Option, + pub method: Cow<'static, str>, pub archive_request: bool, pub error_response: bool, pub request_bytes: u64, @@ -53,8 +57,9 @@ pub struct RpcQueryStats { pub response_bytes: u64, pub response_millis: u64, pub response_timestamp: i64, - /// Credits used signifies how how much money was used up - pub credits_used: Decimal, + /// The cost of the query in USD + /// If the user is on a free tier, this is still calculated so we know how much we are giving away. + pub compute_unit_cost: Decimal, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] @@ -67,9 +72,9 @@ pub struct RpcQueryKey { archive_needed: bool, /// true if the response was some sort of JSONRPC error. error_response: bool, - /// method tracking is opt-in. - method: Option, - /// origin tracking is opt-in. + /// the rpc method used. + method: Cow<'static, str>, + /// origin tracking was opt-in. Now it is "None" origin: Option, /// None if the public url was used. rpc_secret_key_id: Option, @@ -192,7 +197,7 @@ impl BufferedRpcQueryStats { self.sum_request_bytes += stat.request_bytes; self.sum_response_bytes += stat.response_bytes; self.sum_response_millis += stat.response_millis; - self.sum_credits_used += stat.credits_used; + self.sum_credits_used += stat.compute_unit_cost; } async fn _save_db_stats( @@ -714,9 +719,7 @@ impl BufferedRpcQueryStats { builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); } - if let Some(method) = key.method { - builder = builder.tag("method", method); - } + builder = builder.tag("method", key.method); // Read the latest balance ... let remaining = self.latest_balance.read().remaining(); @@ -803,83 +806,35 @@ impl TryFrom for RpcQueryStats { x => x, }; - let method = metadata.method.take(); + let method = metadata.method.clone(); + let chain_id = metadata.chain_id; - let credits_used = Self::compute_cost( - request_bytes, - response_bytes, - backend_rpcs_used.is_empty(), - method.as_deref(), - ); + let cu = ComputeUnit::new(&method, metadata.chain_id); + + // TODO: get from config? a helper function? how should we pick this? + let usd_per_cu = match chain_id { + 137 => Decimal::from_str("0.000000692307692307"), + _ => Decimal::from_str("0.000000692307692307"), + }?; + + let cache_hit = !backend_rpcs_used.is_empty(); + + let compute_unit_cost = cu.cost(cache_hit, usd_per_cu); let x = Self { - authorization, archive_request, - method, + authorization, backend_rpcs_used, - request_bytes, + chain_id, + compute_unit_cost, error_response, + method, + request_bytes, response_bytes, response_millis, response_timestamp, - credits_used, }; Ok(x) } } - -impl RpcQueryStats { - /// Compute cost per request - /// All methods cost the same - /// The number of bytes are based on input, and output bytes - pub fn compute_cost( - request_bytes: u64, - response_bytes: u64, - cache_hit: bool, - method: Option<&str>, - ) -> Decimal { - // some methods should be free. there might be cases where method isn't set (though they should be uncommon) - // TODO: get this list from config (and add more to it) - if let Some(method) = method.as_ref() { - if [ - "eth_chainId", - "eth_syncing", - "eth_protocolVersion", - "net_version", - "net_listening", - ] - .contains(method) - { - return 0.into(); - } - } - - // TODO: finalize cost calculation - - // TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different - // pays at least $0.000018 / credits per request - let cost_minimum = Decimal::new(18, 6); - - // 1kb is included on each call - let cost_free_bytes = 1024; - - // after that, we add cost per bytes, $0.000000006 / credits per byte - // amazon charges $.09/GB outbound - // but we also have to cover our RAM and expensive nics on the servers (haproxy/web3-proxy/blockchains) - let cost_per_byte = Decimal::new(6, 9); - - let total_bytes = request_bytes + response_bytes; - - let total_chargable_bytes = Decimal::from(total_bytes.saturating_sub(cost_free_bytes)); - - let mut cost = cost_minimum + cost_per_byte * total_chargable_bytes; - - // cache hits get a 50% discount - if cache_hit { - cost /= Decimal::from(2) - } - - cost - } -}