Compute units (#136)

* wip

* add basic compute unit calculator

* calculate costs with compute units
This commit is contained in:
Bryan Stitt 2023-06-19 17:47:38 -07:00 committed by GitHub
parent b7935f77fc
commit 6d25c41faf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 191 additions and 96 deletions

@ -12,7 +12,7 @@ use migration::{Expr, Value};
use parking_lot::Mutex;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::sync::{broadcast};
use tokio::sync::broadcast;
use tokio::time::Instant;
use ulid::Ulid;
use web3_proxy::app::BILLING_PERIOD_SECONDS;
@ -181,10 +181,15 @@ impl MigrateStatsToV2 {
archive_request: x.archive_request.into(),
authorization: Some(authorization.clone()),
backend_requests: Mutex::new(backend_rpcs),
chain_id: x.chain_id,
error_response: x.error_response.into(),
// debug data is in kafka, not mysql or influx
kafka_debug_logger: None,
method: x.method.clone(),
method: x
.method
.clone()
.unwrap_or_else(|| "unknown".to_string())
.into(),
// This is not relevant in the new version
no_servers: 0.into(),
// Get the mean of all the request bytes

@ -0,0 +1,134 @@
//! Compute Units based on median request latencies and sizes.
//! Designed to match Alchemy's system.
//! I'm sure there will be memes about us copying, but the user experience of consistency makes a lot of sense to me.
//! TODO? pricing based on latency and bytes and
//! TODO: rate limit on compute units
//! TODO: pricing on compute units
//! TODO: script that queries influx and calculates observed relative costs
use log::warn;
use migration::sea_orm::prelude::Decimal;
pub struct ComputeUnit(Decimal);
impl ComputeUnit {
/// costs can vary widely depending on method and chain
pub fn new(method: &str, chain_id: u64) -> Self {
let cu = match (chain_id, method) {
(_, "debug_traceTransaction") => 309,
(_, "debug_traceCall") => 309,
(_, "debug_traceBlockByHash") => 497,
(_, "debug_traceBlockByNumber") => 497,
(_, "trace_get") => 17,
(_, "trace_block") => 24,
(_, "trace_transaction") => 26,
(_, "trace_call") => 75,
(_, "trace_rawTransaction") => 75,
(_, "trace_filter") => 75,
(_, "trace_replayTransaction") => 2983,
(_, "trace_replayBlockTransactions") => 2983,
(_, "net_version") => 0,
(_, "eth_chainId") => 0,
(_, "eth_syncing") => 0,
(_, "eth_protocolVersion") => 0,
(_, "net_listening") => 0,
(_, "eth_uninstallFilter") => 10,
(_, "eth_accounts") => 10,
(_, "eth_blockNumber") => 10,
(_, "eth_subscribe") => 10,
(_, "eth_unsubscribe") => 10,
(_, "eth_feeHistory") => 10,
(_, "eth_maxPriorityFeePerGas") => 10,
(_, "eth_createAccessList") => 10,
(_, "eth_getTransactionReceipt") => 15,
(_, "eth_getUncleByBlockHashAndIndex") => 15,
(_, "eth_getUncleByBlockNumberAndIndex") => 15,
(_, "eth_getTransactionByBlockHashAndIndex") => 15,
(_, "eth_getTransactionByBlockNumberAndIndex") => 15,
(_, "eth_getUncleCountByBlockHash") => 15,
(_, "eth_getUncleCountByBlockNumber") => 15,
(_, "web3_clientVersion") => 15,
(_, "web3_sha3") => 15,
(_, "eth_getBlockByNumber") => 16,
(_, "eth_getStorageAt") => 17,
(_, "eth_getTransactionByHash") => 17,
(_, "eth_gasPrice") => 19,
(_, "eth_getBalance") => 19,
(_, "eth_getCode") => 19,
(_, "eth_getFilterChanges") => 20,
(_, "eth_newBlockFilter") => 20,
(_, "eth_newFilter") => 20,
(_, "eth_newPendingTransactionFilter") => 20,
(_, "eth_getBlockTransactionCountByHash") => 20,
(_, "eth_getBlockTransactionCountByNumber") => 20,
(_, "eth_getProof") => 21,
(_, "eth_getBlockByHash") => 21,
(_, "erigon_forks") => 24,
(_, "erigon_getHeaderByHash") => 24,
(_, "erigon_getHeaderByNumber") => 24,
(_, "erigon_getLogsByHash") => 24,
(_, "erigon_issuance") => 24,
(_, "eth_getTransactionCount") => 26,
(_, "eth_call") => 26,
(_, "eth_getFilterLogs") => 75,
(_, "eth_getLogs") => 75,
(_, "eth_estimateGas") => 87,
(_, "eth_sendRawTransaction") => 250,
(_, "eth_getBlockReceipts") => 500,
(137, "bor_getAuthor") => 10,
(137, "bor_getCurrentProposer") => 10,
(137, "bor_getCurrentValidators") => 10,
(137, "bor_getRootHash") => 10,
(137, "bor_getSignersAtHash") => 10,
(1101, "zkevm_batchNumber") => 0,
(1101, "zkevm_batchNumberByBlockNumber") => 0,
(1101, "zkevm_consolidatedBlockNumber") => 0,
(1101, "zkevm_getBatchByNumber") => 0,
(1101, "zkevm_getBroadcastURI") => 0,
(1101, "zkevm_isBlockConsolidated") => 0,
(1101, "zkevm_isBlockVirtualized") => 0,
(1101, "zkevm_verifiedBatchNumber") => 0,
(1101, "zkevm_virtualBatchNumber") => 0,
(_, "eth_sendUserOperation") => 1000,
(_, "eth_estimateUserOperationGas") => 500,
(_, "eth_getUserOperationByHash") => 17,
(_, "eth_getUserOperationReceipt") => 15,
(_, "eth_supportedEntryPoints") => 5,
(_, method) => {
// default to 10 CU for methods that aren't included here
warn!("unknown method {}", method);
10
}
};
let cu = Decimal::from(cu);
Self(cu)
}
/// notifications and subscription responses cost per-byte
pub fn subscription_response<D: Into<Decimal>>(num_bytes: D) -> Self {
let cu = num_bytes.into() * Decimal::new(4, 2);
Self(cu)
}
/// requesting an unimplemented function costs 2 CU
pub fn unimplemented() -> Self {
Self(2.into())
}
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
pub fn cost(&self, cache_hit: bool, usd_per_cu: Decimal) -> Decimal {
let mut cost = self.0 * usd_per_cu;
// cache hits get a 50% discount
if cache_hit {
cost /= Decimal::from(2)
}
cost
}
}

@ -31,6 +31,7 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::borrow::Cow;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::mem;
@ -296,14 +297,15 @@ pub struct RequestMetadata {
pub authorization: Option<Arc<Authorization>>,
pub chain_id: u64,
pub request_ulid: Ulid,
/// Size of the JSON request. Does not include headers or things like that.
pub request_bytes: usize,
/// users can opt out of method tracking for their personal dashboads
/// but we still have to store the method at least temporarily for cost calculations
pub method: Option<String>,
/// The JSON-RPC request method.
pub method: Cow<'static, str>,
/// Instant that the request was received (or at least close to it)
/// We use Instant and not timestamps to avoid problems with leap seconds and similar issues
@ -341,12 +343,14 @@ impl Default for Authorization {
}
}
/// this is only implemented so that we can use `mem::take`. You probably shouldn't use this.
impl Default for RequestMetadata {
fn default() -> Self {
Self {
archive_request: Default::default(),
authorization: Default::default(),
backend_requests: Default::default(),
chain_id: Default::default(),
error_response: Default::default(),
kafka_debug_logger: Default::default(),
method: Default::default(),
@ -385,19 +389,19 @@ impl RequestMetadata {
#[derive(From)]
pub enum RequestOrMethod<'a> {
Request(&'a JsonRpcRequest),
/// jsonrpc method (or similar label) and the size that the request should count as (sometimes 0)
Method(&'a str, usize),
RequestSize(usize),
Request(&'a JsonRpcRequest),
}
impl<'a> RequestOrMethod<'a> {
fn method(&self) -> Option<&str> {
match self {
Self::Request(x) => Some(&x.method),
Self::Method(x, _) => Some(x),
_ => None,
}
fn method(&self) -> Cow<'static, str> {
let x = match self {
Self::Request(x) => x.method.to_string(),
Self::Method(x, _) => x.to_string(),
};
x.into()
}
fn jsonrpc_request(&self) -> Option<&JsonRpcRequest> {
@ -411,18 +415,13 @@ impl<'a> RequestOrMethod<'a> {
match self {
RequestOrMethod::Method(_, num_bytes) => *num_bytes,
RequestOrMethod::Request(x) => x.num_bytes(),
RequestOrMethod::RequestSize(num_bytes) => *num_bytes,
}
}
}
impl<'a> From<&'a str> for RequestOrMethod<'a> {
fn from(value: &'a str) -> Self {
if value.is_empty() {
Self::RequestSize(0)
} else {
Self::Method(value, 0)
}
Self::Method(value, 0)
}
}
@ -463,7 +462,7 @@ impl RequestMetadata {
) -> Arc<Self> {
let request = request.into();
let method = request.method().map(|x| x.to_string());
let method = request.method();
let request_bytes = request.num_bytes();
@ -499,6 +498,7 @@ impl RequestMetadata {
archive_request: false.into(),
authorization: Some(authorization),
backend_requests: Default::default(),
chain_id: app.config.chain_id,
error_response: false.into(),
kafka_debug_logger,
method,

@ -4,6 +4,7 @@
pub mod admin_queries;
pub mod app;
pub mod block_number;
pub mod compute_units;
pub mod config;
pub mod errors;
pub mod frontend;

@ -7,6 +7,7 @@ pub mod influxdb_queries;
use self::stat_buffer::BufferedRpcQueryStats;
use crate::app::{RpcSecretKeyCache, UserBalanceCache};
use crate::compute_units::ComputeUnit;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::rpcs::one::Web3Rpc;
@ -25,7 +26,9 @@ use migration::sea_orm::{DatabaseTransaction, QuerySelect};
use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
@ -42,8 +45,9 @@ pub type BackendRequests = Mutex<Vec<Arc<Web3Rpc>>>;
/// TODO: better name? RpcQueryStatBuilder?
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
pub chain_id: u64,
pub authorization: Arc<Authorization>,
pub method: Option<String>,
pub method: Cow<'static, str>,
pub archive_request: bool,
pub error_response: bool,
pub request_bytes: u64,
@ -53,8 +57,9 @@ pub struct RpcQueryStats {
pub response_bytes: u64,
pub response_millis: u64,
pub response_timestamp: i64,
/// Credits used signifies how how much money was used up
pub credits_used: Decimal,
/// The cost of the query in USD
/// If the user is on a free tier, this is still calculated so we know how much we are giving away.
pub compute_unit_cost: Decimal,
}
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
@ -67,9 +72,9 @@ pub struct RpcQueryKey {
archive_needed: bool,
/// true if the response was some sort of JSONRPC error.
error_response: bool,
/// method tracking is opt-in.
method: Option<String>,
/// origin tracking is opt-in.
/// the rpc method used.
method: Cow<'static, str>,
/// origin tracking was opt-in. Now it is "None"
origin: Option<Origin>,
/// None if the public url was used.
rpc_secret_key_id: Option<NonZeroU64>,
@ -192,7 +197,7 @@ impl BufferedRpcQueryStats {
self.sum_request_bytes += stat.request_bytes;
self.sum_response_bytes += stat.response_bytes;
self.sum_response_millis += stat.response_millis;
self.sum_credits_used += stat.credits_used;
self.sum_credits_used += stat.compute_unit_cost;
}
async fn _save_db_stats(
@ -714,9 +719,7 @@ impl BufferedRpcQueryStats {
builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string());
}
if let Some(method) = key.method {
builder = builder.tag("method", method);
}
builder = builder.tag("method", key.method);
// Read the latest balance ...
let remaining = self.latest_balance.read().remaining();
@ -803,83 +806,35 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
x => x,
};
let method = metadata.method.take();
let method = metadata.method.clone();
let chain_id = metadata.chain_id;
let credits_used = Self::compute_cost(
request_bytes,
response_bytes,
backend_rpcs_used.is_empty(),
method.as_deref(),
);
let cu = ComputeUnit::new(&method, metadata.chain_id);
// TODO: get from config? a helper function? how should we pick this?
let usd_per_cu = match chain_id {
137 => Decimal::from_str("0.000000692307692307"),
_ => Decimal::from_str("0.000000692307692307"),
}?;
let cache_hit = !backend_rpcs_used.is_empty();
let compute_unit_cost = cu.cost(cache_hit, usd_per_cu);
let x = Self {
authorization,
archive_request,
method,
authorization,
backend_rpcs_used,
request_bytes,
chain_id,
compute_unit_cost,
error_response,
method,
request_bytes,
response_bytes,
response_millis,
response_timestamp,
credits_used,
};
Ok(x)
}
}
impl RpcQueryStats {
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
pub fn compute_cost(
request_bytes: u64,
response_bytes: u64,
cache_hit: bool,
method: Option<&str>,
) -> Decimal {
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() {
if [
"eth_chainId",
"eth_syncing",
"eth_protocolVersion",
"net_version",
"net_listening",
]
.contains(method)
{
return 0.into();
}
}
// TODO: finalize cost calculation
// TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different
// pays at least $0.000018 / credits per request
let cost_minimum = Decimal::new(18, 6);
// 1kb is included on each call
let cost_free_bytes = 1024;
// after that, we add cost per bytes, $0.000000006 / credits per byte
// amazon charges $.09/GB outbound
// but we also have to cover our RAM and expensive nics on the servers (haproxy/web3-proxy/blockchains)
let cost_per_byte = Decimal::new(6, 9);
let total_bytes = request_bytes + response_bytes;
let total_chargable_bytes = Decimal::from(total_bytes.saturating_sub(cost_free_bytes));
let mut cost = cost_minimum + cost_per_byte * total_chargable_bytes;
// cache hits get a 50% discount
if cache_hit {
cost /= Decimal::from(2)
}
cost
}
}