quick lru cache

This commit is contained in:
Bryan Stitt 2022-05-05 19:07:09 +00:00
parent bc91bd1c6f
commit 89ced9b152
16 changed files with 5558 additions and 4 deletions

8
.gitignore vendored
View File

@ -1,5 +1,5 @@
/data/config/*.toml
/flamegraph.svg
/perf.data
/perf.data.old
/web3-proxy/data/config/*.toml
flamegraph.svg
perf.data
perf.data.old
/target

18
linkedhashmap/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "linkedhashmap"
version = "0.2.0"
authors = ["quininer <quininer@live.com>"]
edition = "2018"
[dependencies]
slab = "0.4.6"
hashbrown = "0.12.1"
[dev-dependencies]
criterion = "0.3.5"
hashlink = "0.8.0"
linked-hash-map = "0.5.4"
[[bench]]
name = "lru"
harness = false

1
linkedhashmap/LICENSE Normal file
View File

@ -0,0 +1 @@
https://github.com/quininer/linkedhashmap

View File

@ -0,0 +1,68 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use std::collections::hash_map;
struct Bar([u64; 4]);
const CAP: usize = 128;
fn bench_linkedhsahmap(c: &mut Criterion) {
use linkedhashmap::LinkedHashMap;
c.bench_function("linkedhashmap", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
fn bench_hashlink(c: &mut Criterion) {
use hashlink::LinkedHashMap;
c.bench_function("hashlink", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
fn bench_linked_hash_map(c: &mut Criterion) {
use linked_hash_map::LinkedHashMap;
c.bench_function("linked-hash-map", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
criterion_group!(
lru,
bench_linkedhsahmap,
bench_hashlink,
bench_linked_hash_map
);
criterion_main!(lru);

191
linkedhashmap/src/lib.rs Normal file
View File

@ -0,0 +1,191 @@
pub mod linkedlist;
use hashbrown::HashMap;
use linkedlist::{LinkedList, NodeSlab};
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash};
pub struct LinkedHashMap<K, V, S = RandomState> {
slab: NodeSlab<(K, V)>,
list: LinkedList,
map: HashMap<K, usize, S>,
}
impl<K, V> LinkedHashMap<K, V>
where
K: Hash + Eq + Clone,
{
#[inline]
pub fn new() -> LinkedHashMap<K, V> {
LinkedHashMap {
slab: NodeSlab::new(),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(0, RandomState::default()),
}
}
#[inline]
pub fn with_capacity(cap: usize) -> LinkedHashMap<K, V> {
LinkedHashMap {
slab: NodeSlab::with_capacity(cap),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(cap, RandomState::default()),
}
}
}
impl<K, V, S> Default for LinkedHashMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
#[inline]
fn default() -> Self {
Self::with_hasher(S::default())
}
}
impl<K, V, S> LinkedHashMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher,
{
pub fn with_hasher(hash_builder: S) -> Self {
LinkedHashMap {
slab: NodeSlab::new(),
list: LinkedList::new(),
map: HashMap::with_hasher(hash_builder),
}
}
pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> Self {
LinkedHashMap {
slab: NodeSlab::with_capacity(capacity),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(capacity, hash_builder),
}
}
#[inline]
pub fn len(&self) -> usize {
self.map.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional);
self.map.reserve(additional);
}
#[inline]
pub fn shrink_to_fit(&mut self) {
self.slab.shrink_to_fit();
self.map.shrink_to_fit();
}
#[inline]
pub fn clear(&mut self) {
self.slab.clear();
self.list = LinkedList::new();
self.map.clear();
}
#[inline]
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let index = self.list.push(&mut self.slab, (key.clone(), value));
let index = self.map.insert(key, index)?;
let (_, value) = self.list.remove(&mut self.slab, index)?;
Some(value)
}
#[inline]
pub fn get<Q>(&self, key: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.slab.get(index)?;
Some(value)
}
#[inline]
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.slab.get_mut(index)?;
Some(value)
}
#[inline]
pub fn touch<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.list.touch(&mut self.slab, index)?;
Some(value)
}
#[inline]
pub fn remove<Q>(&mut self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = self.map.remove(key)?;
let (_, value) = self.list.remove(&mut self.slab, index)?;
Some(value)
}
#[inline]
pub fn pop_front(&mut self) -> Option<(K, V)> {
let (k, v) = self.list.pop_front(&mut self.slab)?;
self.map.remove(&k)?;
Some((k, v))
}
#[inline]
pub fn pop_last(&mut self) -> Option<(K, V)> {
let (k, v) = self.list.pop_last(&mut self.slab)?;
self.map.remove(&k)?;
Some((k, v))
}
}
#[test]
fn test_linkedhashmap() {
#[derive(PartialEq, Eq, Debug)]
struct Bar(u64);
let mut map = LinkedHashMap::new();
map.insert(0, Bar(0));
map.insert(3, Bar(3));
map.insert(2, Bar(2));
map.insert(1, Bar(1));
assert_eq!(4, map.len());
assert_eq!(Some(&Bar(2)), map.get(&2));
assert_eq!(Some(&mut Bar(3)), map.touch(&3));
assert_eq!(Some((0, Bar(0))), map.pop_front());
assert_eq!(Some((3, Bar(3))), map.pop_last());
assert_eq!(Some((1, Bar(1))), map.pop_last());
assert_eq!(1, map.len());
assert_eq!(Some(&mut Bar(2)), map.get_mut(&2));
map.clear();
assert_eq!(0, map.len());
}

View File

@ -0,0 +1,249 @@
use slab::Slab;
#[derive(Default)]
pub struct NodeSlab<T>(Slab<Node<T>>);
#[derive(Default, Debug)]
pub struct LinkedList {
start: Option<usize>,
end: Option<usize>,
}
pub struct Node<T> {
value: T,
prev: Option<usize>,
next: Option<usize>,
}
impl LinkedList {
pub const fn new() -> LinkedList {
LinkedList {
start: None,
end: None,
}
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn push<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>, value: T) -> usize {
let index = slab.insert(Node {
value,
prev: self.end.or(self.start),
next: None,
});
if let Some(old_end) = self.end.replace(index) {
slab[old_end].next = Some(index);
} else {
self.start = Some(index);
}
index
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_front<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>) -> Option<T> {
let index = self.start?;
let node = slab.remove(index);
debug_assert!(node.prev.is_none());
self.start = node.next;
if let Some(index) = self.start {
slab[index].prev.take();
}
if Some(index) == self.end {
self.end.take();
}
Some(node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_last<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>) -> Option<T> {
let index = self.end?;
let node = slab.remove(index);
debug_assert!(node.next.is_none());
self.end = node.prev;
if let Some(index) = self.end {
slab[index].next.take();
}
if Some(index) == self.start {
self.start.take();
}
Some(node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn touch<'a, T>(
&mut self,
NodeSlab(slab): &'a mut NodeSlab<T>,
index: usize,
) -> Option<&'a mut T> {
let (node_prev, node_next) = {
let node = slab.get(index)?;
(node.prev, node.next)
};
if let Some(next) = node_next {
slab[next].prev = node_prev;
} else {
debug_assert_eq!(Some(index), self.end);
return Some(&mut slab[index].value);
}
if let Some(prev) = node_prev {
slab[prev].next = node_next;
} else {
self.start = node_next;
}
let end = self.end.replace(index)?;
slab[end].next = Some(index);
let node = &mut slab[index];
node.prev = Some(end);
node.next.take();
Some(&mut node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn remove<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>, index: usize) -> Option<T> {
let node = if slab.contains(index) {
// why not return Option :(
slab.remove(index)
} else {
return None;
};
if let Some(prev) = node.prev {
slab[prev].next = node.next;
} else {
self.start = node.next;
}
if let Some(next) = node.next {
slab[next].prev = node.prev;
} else {
self.end = node.prev;
}
Some(node.value)
}
}
impl<T> NodeSlab<T> {
#[inline]
pub fn new() -> NodeSlab<T> {
NodeSlab(Slab::new())
}
#[inline]
pub fn with_capacity(cap: usize) -> NodeSlab<T> {
NodeSlab(Slab::with_capacity(cap))
}
#[inline]
pub fn get(&self, index: usize) -> Option<&T> {
Some(&self.0.get(index)?.value)
}
#[inline]
pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
Some(&mut self.0.get_mut(index)?.value)
}
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional);
}
#[inline]
pub fn shrink_to_fit(&mut self) {
self.0.shrink_to_fit();
}
#[inline]
pub(crate) fn clear(&mut self) {
self.0.clear();
}
}
#[test]
fn test_linkedlist() {
let mut slab = NodeSlab::new();
let mut list = LinkedList::new();
list.push(&mut slab, 0);
assert_eq!(Some(0), list.pop_front(&mut slab));
assert_eq!(None, list.pop_front(&mut slab));
list.push(&mut slab, 1);
assert_eq!(Some(1), list.pop_last(&mut slab));
assert_eq!(None, list.pop_last(&mut slab));
list.push(&mut slab, 2);
list.push(&mut slab, 3);
assert_eq!(Some(2), list.pop_front(&mut slab));
assert_eq!(Some(3), list.pop_last(&mut slab));
eprintln!("{:?}", list);
assert_eq!(None, list.pop_front(&mut slab));
eprintln!("{:?}", list);
assert_eq!(None, list.pop_last(&mut slab));
list.push(&mut slab, 4);
list.push(&mut slab, 5);
assert_eq!(Some(5), list.pop_last(&mut slab));
assert_eq!(Some(4), list.pop_front(&mut slab));
assert_eq!(None, list.pop_last(&mut slab));
assert_eq!(None, list.pop_front(&mut slab));
let index6 = list.push(&mut slab, 6);
let index7 = list.push(&mut slab, 7);
let index8 = list.push(&mut slab, 8);
assert_eq!(Some(7), list.remove(&mut slab, index7));
assert_eq!(None, list.remove(&mut slab, index7));
assert_eq!(Some(&6), slab.get(index6));
assert_eq!(Some(&8), slab.get(index8));
assert_eq!(Some(6), list.pop_front(&mut slab));
assert_eq!(Some(8), list.pop_front(&mut slab));
let index9 = list.push(&mut slab, 9);
list.push(&mut slab, 10);
assert_eq!(Some(&mut 9), list.touch(&mut slab, index9));
assert_eq!(Some(10), list.pop_front(&mut slab));
assert_eq!(Some(9), list.pop_front(&mut slab));
let index11 = list.push(&mut slab, 11);
let index12 = list.push(&mut slab, 12);
list.push(&mut slab, 13);
assert_eq!(Some(&mut 12), list.touch(&mut slab, index12));
assert_eq!(Some(&mut 11), list.touch(&mut slab, index11));
assert_eq!(Some(13), list.pop_front(&mut slab));
assert_eq!(Some(12), list.pop_front(&mut slab));
assert_eq!(Some(11), list.pop_front(&mut slab));
for i in 0..32 {
list.push(&mut slab, i);
}
for i in 0..32 {
assert_eq!(Some(i), list.pop_front(&mut slab));
}
assert_eq!(None, list.pop_front(&mut slab));
for i in 0..32 {
list.push(&mut slab, i);
}
for i in (0..32).rev() {
assert_eq!(Some(i), list.pop_last(&mut slab));
}
assert_eq!(None, list.pop_last(&mut slab));
}

3945
web3-proxy/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

30
web3-proxy/Cargo.toml Normal file
View File

@ -0,0 +1,30 @@
[package]
name = "web3-proxy"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arc-swap = "1.5.0"
argh = "0.1.7"
anyhow = "1.0.57"
derive_more = "0.99.17"
ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] }
flume = "0.10.12"
futures = { version = "0.3.21", features = ["thread-pool"] }
hashbrown = "0.12.1"
governor = { version = "0.4.2", features = ["dashmap", "std"] }
linkedhashmap = { path = "../linkedhashmap" }
tokio = { version = "1.18.1", features = ["full"] }
parking_lot = "0.12.0"
regex = "1.5.5"
reqwest = { version = "0.11.10", features = ["json", "rustls"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.81", default-features = false, features = ["alloc"] }
toml = "0.5.9"
tracing = "0.1.34"
tracing-subscriber = "0.3.11"
url = "2.2.2"
warp = "0.3.2"

View File

@ -0,0 +1,3 @@
wrk.method = "POST"
wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":420}"
wrk.headers["Content-Type"] = "application/json"

View File

@ -0,0 +1,3 @@
wrk.method = "POST"
wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"latest\", false],\"id\":420}"
wrk.headers["Content-Type"] = "application/json"

View File

@ -0,0 +1,29 @@
/// subscribe to a websocket rpc
use ethers::prelude::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// erigon
let url = "ws://10.11.12.16:8545";
// geth
// let url = "ws://10.11.12.16:8946";
println!("Subscribing to blocks from {}", url);
let provider = Ws::connect(url).await?;
let provider = Provider::new(provider).interval(Duration::from_secs(1));
let mut stream = provider.subscribe_blocks().await?.take(3);
while let Some(block) = stream.next().await {
println!(
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
);
}
Ok(())
}

View File

@ -0,0 +1,30 @@
/// poll an http rpc
use ethers::prelude::*;
use std::{str::FromStr, time::Duration};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// erigon does not support most filters
// let url = "http://10.11.12.16:8545";
// geth
let url = "http://10.11.12.16:8945";
println!("Watching blocks from {:?}", url);
let provider = Http::from_str(url)?;
let provider = Provider::new(provider).interval(Duration::from_secs(1));
let mut stream = provider.watch_blocks().await?.take(3);
while let Some(block_number) = stream.next().await {
let block = provider.get_block(block_number).await?.unwrap();
println!(
"{:?} = Ts: {:?}, block number: {}",
block.hash.unwrap(),
block.timestamp,
block.number.unwrap(),
);
}
Ok(())
}

58
web3-proxy/src/config.rs Normal file
View File

@ -0,0 +1,58 @@
use governor::clock::QuantaClock;
use serde::Deserialize;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use crate::connection::Web3Connection;
use crate::Web3ProxyApp;
#[derive(Deserialize)]
pub struct RootConfig {
pub config: Web3ProxyConfig,
// BTreeMap so that iterating keeps the same order
pub balanced_rpc_tiers: BTreeMap<String, HashMap<String, Web3ConnectionConfig>>,
pub private_rpcs: HashMap<String, Web3ConnectionConfig>,
}
#[derive(Deserialize)]
pub struct Web3ProxyConfig {
pub listen_port: u16,
}
#[derive(Deserialize)]
pub struct Web3ConnectionConfig {
url: String,
soft_limit: u32,
hard_limit: Option<u32>,
}
impl RootConfig {
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
let balanced_rpc_tiers = self
.balanced_rpc_tiers
.into_values()
.map(|x| x.into_values().collect())
.collect();
let private_rpcs = self.private_rpcs.into_values().collect();
Web3ProxyApp::try_new(balanced_rpc_tiers, private_rpcs).await
}
}
impl Web3ConnectionConfig {
pub async fn try_build(
self,
clock: &QuantaClock,
http_client: Option<reqwest::Client>,
) -> anyhow::Result<Arc<Web3Connection>> {
Web3Connection::try_new(
self.url,
http_client,
self.hard_limit,
Some(clock),
self.soft_limit,
)
.await
.map(Arc::new)
}
}

View File

@ -0,0 +1,310 @@
///! Communicate with a web3 provider
use derive_more::From;
use ethers::prelude::Middleware;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32, AtomicU64};
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::interval;
use tracing::{info, warn};
use crate::connections::Web3Connections;
type Web3RateLimiter =
RateLimiter<NotKeyed, InMemoryState, QuantaClock, NoOpMiddleware<QuantaInstant>>;
/// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f.debug_struct("Web3Provider").finish_non_exhaustive()
}
}
/// An active connection to a Web3Rpc
pub struct Web3Connection {
/// TODO: can we get this from the provider? do we even need it?
url: String,
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
provider: Web3Provider,
ratelimiter: Option<Web3RateLimiter>,
/// used for load balancing to the least loaded server
soft_limit: u32,
head_block_number: AtomicU64,
}
impl fmt::Debug for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Web3Connection")
.field("url", &self.url)
.finish_non_exhaustive()
}
}
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", &self.url)
}
}
impl Web3Connection {
/// Connect to a web3 rpc and subscribe to new heads
pub async fn try_new(
url_str: String,
http_client: Option<reqwest::Client>,
hard_rate_limit: Option<u32>,
clock: Option<&QuantaClock>,
// TODO: think more about this type
soft_limit: u32,
) -> anyhow::Result<Web3Connection> {
let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit {
let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap());
let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap());
Some(rate_limiter)
} else {
None
};
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(Web3Connection {
url: url_str.clone(),
active_requests: Default::default(),
provider,
ratelimiter: hard_rate_limiter,
soft_limit,
head_block_number: 0.into(),
})
}
pub fn active_requests(&self) -> u32 {
self.active_requests.load(atomic::Ordering::Acquire)
}
pub fn url(&self) -> &str {
&self.url
}
/// Subscribe to new blocks
// #[instrument]
pub async fn new_heads(
self: Arc<Self>,
connections: Option<Arc<Web3Connections>>,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
match &self.provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
let mut interval = interval(Duration::from_secs(2));
loop {
// wait for the interval
interval.tick().await;
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
info!("new block on {}: {}", self, block_number);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self, block_number)?;
}
}
}
}
Web3Provider::Ws(provider) => {
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
let block_number = provider.get_block_number().await.map(|x| x.as_u64())?;
info!("current block on {}: {}", self, block_number);
self.head_block_number
.store(block_number, atomic::Ordering::Release);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self, block_number)?;
}
while let Some(block) = stream.next().await {
let block_number = block.number.unwrap().as_u64();
// TODO: only store if this isn't already stored?
// TODO: also send something to the provider_tier so it can sort?
// TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run
self.head_block_number
.store(block_number, atomic::Ordering::Release);
info!("new block on {}: {}", self, block_number);
if let Some(connections) = &connections {
connections.update_synced_rpcs(&self, block_number)?;
}
}
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
/// Send a web3 request
pub async fn request(
&self,
method: &str,
params: &serde_json::value::RawValue,
) -> Result<JsonRpcForwardedResponse, ethers::prelude::ProviderError> {
match &self.provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
}
}
pub fn try_inc_active_requests(&self) -> Result<(), NotUntil<QuantaInstant>> {
// check rate limits
if let Some(ratelimiter) = self.ratelimiter.as_ref() {
match ratelimiter.check() {
Ok(_) => {
// rate limit succeeded
}
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
// TODO: use tracing better
warn!("Exhausted rate limit on {:?}: {}", self, not_until);
return Err(not_until);
}
}
};
// TODO: what ordering?!
self.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
Ok(())
}
pub fn dec_active_requests(&self) {
// TODO: what ordering?!
self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
// TODO: how should we include the soft limit? floats are slower than integer math
let a = a as f32 / self.soft_limit as f32;
let b = b as f32 / other.soft_limit as f32;
a.partial_cmp(&b).unwrap()
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// note that this is just comparing the active requests. two providers with different rpc urls are equal!
impl PartialEq for Web3Connection {
fn eq(&self, other: &Self) -> bool {
// TODO: what ordering?!
self.active_requests.load(atomic::Ordering::Acquire)
== other.active_requests.load(atomic::Ordering::Acquire)
}
}
#[derive(Clone, Deserialize)]
pub struct JsonRpcRequest {
pub id: Box<RawValue>,
pub method: String,
pub params: Box<RawValue>,
}
impl fmt::Debug for JsonRpcRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.finish_non_exhaustive()
}
}
// TODO: check for errors too!
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
pub id: Box<RawValue>,
pub result: Box<RawValue>,
}
impl fmt::Debug for JsonRpcForwardedResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("JsonRpcForwardedResponse")
.field("id", &self.id)
.finish_non_exhaustive()
}
}

View File

@ -0,0 +1,319 @@
///! Communicate with a group of web3 providers
use arc_swap::ArcSwap;
use derive_more::From;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use hashbrown::HashMap;
use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::Arc;
use tracing::warn;
use crate::config::Web3ConnectionConfig;
use crate::connection::{JsonRpcForwardedResponse, Web3Connection};
#[derive(Clone, Default)]
struct SyncedConnections {
head_block_number: u64,
inner: Vec<usize>,
}
impl fmt::Debug for SyncedConnections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("SyncedConnections").finish_non_exhaustive()
}
}
impl SyncedConnections {
fn new(max_connections: usize) -> Self {
let inner = Vec::with_capacity(max_connections);
Self {
head_block_number: 0,
inner,
}
}
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
/// TODO: arcswap was a lot faster, but i think we need a lock for proper logic
synced_connections: ArcSwap<SyncedConnections>,
}
impl fmt::Debug for Web3Connections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Connections")
.field("inner", &self.inner)
.finish_non_exhaustive()
}
}
impl Web3Connections {
pub async fn try_new(
// TODO: servers should be a Web3ConnectionBuilder struct
servers: Vec<Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
clock: &QuantaClock,
) -> anyhow::Result<Arc<Self>> {
let mut connections = vec![];
let num_connections = servers.len();
for server_config in servers.into_iter() {
let connection = server_config.try_build(clock, http_client.clone()).await?;
connections.push(connection);
}
let connections = Arc::new(Self {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))),
});
for connection in connections.inner.iter() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right
let connection = Arc::clone(connection);
let connections = connections.clone();
tokio::spawn(async move {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection.new_heads(Some(connections)).await {
warn!("new_heads error: {:?}", e);
}
});
}
Ok(connections)
}
pub fn head_block_number(&self) -> u64 {
self.synced_connections.load().head_block_number
}
pub async fn try_send_request(
&self,
connection: &Web3Connection,
method: &str,
params: &RawValue,
) -> anyhow::Result<JsonRpcForwardedResponse> {
// connection.in_active_requests was called when this rpc was selected
let response = connection.request(method, params).await;
connection.dec_active_requests();
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
response.map_err(Into::into)
}
pub async fn try_send_requests(
self: Arc<Self>,
connections: Vec<Arc<Web3Connection>>,
method: String,
params: Box<RawValue>,
response_sender: flume::Sender<anyhow::Result<JsonRpcForwardedResponse>>,
) -> anyhow::Result<()> {
let mut unordered_futures = FuturesUnordered::new();
for connection in connections {
// clone things so we can pass them to a future
let connections = self.clone();
let method = method.clone();
let params = params.clone();
let response_sender = response_sender.clone();
let handle = tokio::spawn(async move {
// get the client for this rpc server
let response = connections
.try_send_request(connection.as_ref(), &method, &params)
.await?;
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
response_sender.send(Ok(response)).map_err(Into::into)
});
unordered_futures.push(handle);
}
// TODO: use iterators instead of pushing into a vec
let mut errs = vec![];
if let Some(x) = unordered_futures.next().await {
match x.unwrap() {
Ok(_) => {}
Err(e) => {
// TODO: better errors
warn!("Got an error sending request: {}", e);
errs.push(e);
}
}
}
// get the first error (if any)
// TODO: why collect multiple errors if we only pop one?
let e = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// send the error to the channel
if response_sender.send(e).is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// if sending the error failed. the other side must be closed (which means we sent a success earlier)
Ok(())
}
}
pub fn update_synced_rpcs(
&self,
rpc: &Arc<Web3Connection>,
new_block: u64,
) -> anyhow::Result<()> {
// TODO: try a left_right instead of an ArcSwap.
let synced_connections = self.synced_connections.load();
// should we load new_block here?
let mut new_synced_connections: SyncedConnections =
match synced_connections.head_block_number.cmp(&new_block) {
cmp::Ordering::Equal => {
// this rpc is synced, but it isn't the first to this block
(**synced_connections).to_owned()
}
cmp::Ordering::Less => {
// this is a new head block. clear the current synced connections
// TODO: this is too verbose with a bunch of tiers. include the tier
// info!("new head block from {:?}: {}", rpc, new_block);
let mut new_synced_connections = SyncedConnections::new(self.inner.len());
// synced_connections.inner.clear();
new_synced_connections.head_block_number = new_block;
new_synced_connections
}
cmp::Ordering::Greater => {
// not the latest block. return now
return Ok(());
}
};
let rpc_index = self
.inner
.iter()
.position(|x| x.url() == rpc.url())
.unwrap();
new_synced_connections.inner.push(rpc_index);
self.synced_connections
.swap(Arc::new(new_synced_connections));
Ok(())
}
/// get the best available rpc server
pub async fn next_upstream_server(
&self,
) -> Result<Arc<Web3Connection>, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
// TODO: this clone is probably not the best way to do this
let mut synced_rpc_indexes = self.synced_connections.load().inner.clone();
let cache: HashMap<usize, u32> = synced_rpc_indexes
.iter()
.map(|synced_index| {
(
*synced_index,
self.inner.get(*synced_index).unwrap().active_requests(),
)
})
.collect();
// TODO: i think we might need to load active connections and then
synced_rpc_indexes.sort_unstable_by(|a, b| {
let a = cache.get(a).unwrap();
let b = cache.get(b).unwrap();
a.cmp(b)
});
for selected_rpc in synced_rpc_indexes.into_iter() {
let selected_rpc = self.inner.get(selected_rpc).unwrap();
// increment our connection counter
if let Err(not_until) = selected_rpc.try_inc_active_requests() {
earliest_possible(&mut earliest_not_until, not_until);
continue;
}
// return the selected RPC
return Ok(selected_rpc.clone());
}
// this might be None
Err(earliest_not_until)
}
/// get all rpc servers that are not rate limited
/// even fetches if they aren't in sync. This is useful for broadcasting signed transactions
pub fn get_upstream_servers(
&self,
) -> Result<Vec<Arc<Web3Connection>>, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
// TODO: with capacity?
let mut selected_rpcs = vec![];
for connection in self.inner.iter() {
// check rate limits and increment our connection counter
if let Err(not_until) = connection.try_inc_active_requests() {
earliest_possible(&mut earliest_not_until, not_until);
// this rpc is not available. skip it
continue;
}
selected_rpcs.push(connection.clone());
}
if !selected_rpcs.is_empty() {
return Ok(selected_rpcs);
}
// return the earliest not_until (if no rpcs are synced, this will be None)
Err(earliest_not_until)
}
}
fn earliest_possible(
earliest_not_until_option: &mut Option<NotUntil<QuantaInstant>>,
new_not_until: NotUntil<QuantaInstant>,
) {
match earliest_not_until_option.as_ref() {
None => *earliest_not_until_option = Some(new_not_until),
Some(earliest_not_until) => {
let earliest_possible = earliest_not_until.earliest_possible();
let new_earliest_possible = new_not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
*earliest_not_until_option = Some(new_not_until);
}
}
}
}

300
web3-proxy/src/main.rs Normal file
View File

@ -0,0 +1,300 @@
mod config;
mod connection;
mod connections;
use config::Web3ConnectionConfig;
use futures::future;
use governor::clock::{Clock, QuantaClock};
use linkedhashmap::LinkedHashMap;
use serde_json::json;
use std::fmt;
use std::fs;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::warn;
use warp::Filter;
use warp::Reply;
use crate::config::RootConfig;
use crate::connection::JsonRpcRequest;
use crate::connections::Web3Connections;
static APP_USER_AGENT: &str = concat!(
"satoshiandkin/",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
const RESPONSE_CACHE_CAP: usize = 128;
/// The application
// TODO: this debug impl is way too verbose. make something smaller
pub struct Web3ProxyApp {
/// clock used for rate limiting
/// TODO: use tokio's clock (will require a different ratelimiting crate)
clock: QuantaClock,
/// Send requests to the best server available
balanced_rpc_tiers: Vec<Arc<Web3Connections>>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Option<Arc<Web3Connections>>,
/// TODO: these types are probably very bad keys and values. i couldn't get caching of warp::reply::Json to work
response_cache: RwLock<LinkedHashMap<(u64, String, String), serde_json::Value>>,
}
impl fmt::Debug for Web3ProxyApp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3ProxyApp").finish_non_exhaustive()
}
}
impl Web3ProxyApp {
async fn try_new(
balanced_rpc_tiers: Vec<Vec<Web3ConnectionConfig>>,
private_rpcs: Vec<Web3ConnectionConfig>,
) -> anyhow::Result<Web3ProxyApp> {
let clock = QuantaClock::default();
// make a http shared client
// TODO: how should we configure the connection pool?
// TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server
let http_client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(300))
.user_agent(APP_USER_AGENT)
.build()?;
let balanced_rpc_tiers =
future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| {
Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock)
}))
.await
.into_iter()
.collect::<anyhow::Result<Vec<Arc<Web3Connections>>>>()?;
let private_rpcs = if private_rpcs.is_empty() {
warn!("No private relays configured. Any transactions will be broadcast to the public mempool!");
// TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly
None
} else {
Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?)
};
Ok(Web3ProxyApp {
clock,
balanced_rpc_tiers,
private_rpcs,
response_cache: Default::default(),
})
}
/// send the request to the approriate RPCs
/// TODO: dry this up
async fn proxy_web3_rpc(
self: Arc<Web3ProxyApp>,
json_body: JsonRpcRequest,
) -> anyhow::Result<impl warp::Reply> {
if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" {
let private_rpcs = self.private_rpcs.as_ref().unwrap();
// there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs
loop {
// TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit
match private_rpcs.get_upstream_servers() {
Ok(upstream_servers) => {
let (tx, rx) = flume::unbounded();
let connections = private_rpcs.clone();
let method = json_body.method.clone();
let params = json_body.params.clone();
tokio::spawn(async move {
connections
.try_send_requests(upstream_servers, method, params, tx)
.await
});
// wait for the first response
let response = rx.recv_async().await?;
if let Ok(partial_response) = response {
let response = json!({
"jsonrpc": "2.0",
"id": json_body.id,
"result": partial_response
});
return Ok(warp::reply::json(&response));
}
}
Err(not_until) => {
// TODO: move this to a helper function
// sleep (with a lock) until our rate limits should be available
if let Some(not_until) = not_until {
let deadline = not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
}
}
};
}
} else {
// this is not a private transaction (or no private relays are configured)
// try to send to each tier, stopping at the first success
loop {
// there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again
let mut earliest_not_until = None;
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
let current_block = balanced_rpcs.head_block_number(); // TODO: we don't store current block for everything anymore. we store it on the connections
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
current_block,
json_body.method.clone(),
json_body.params.to_string(),
);
if let Some(cached) = self.response_cache.read().await.get(&cache_key) {
// TODO: this still serializes every time
return Ok(warp::reply::json(cached));
}
// TODO: what allowed lag?
match balanced_rpcs.next_upstream_server().await {
Ok(upstream_server) => {
let response = balanced_rpcs
.try_send_request(
&upstream_server,
&json_body.method,
&json_body.params,
)
.await;
let response = match response {
Ok(partial_response) => {
// TODO: trace here was really slow with millions of requests.
// info!("forwarding request from {}", upstream_server);
let response = json!({
// TODO: re-use their jsonrpc?
"jsonrpc": "2.0",
"id": json_body.id,
// TODO: since we only use the result here, should that be all we return from try_send_request?
"result": partial_response.result,
});
// TODO: small race condidition here. parallel requests with the same query will both be saved to the cache
let mut response_cache = self.response_cache.write().await;
response_cache.insert(cache_key, response.clone());
if response_cache.len() >= RESPONSE_CACHE_CAP {
response_cache.pop_front();
}
response
}
Err(e) => {
// TODO: what is the proper format for an error?
json!({
"jsonrpc": "2.0",
"id": json_body.id,
"error": format!("{}", e)
})
}
};
return Ok(warp::reply::json(&response));
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, use those!
warn!("No servers in sync!");
}
Err(Some(not_until)) => {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
// TODO: helper function for this
if earliest_not_until.is_none() {
earliest_not_until.replace(not_until);
} else {
let earliest_possible =
earliest_not_until.as_ref().unwrap().earliest_possible();
let new_earliest_possible = not_until.earliest_possible();
if earliest_possible > new_earliest_possible {
earliest_not_until = Some(not_until);
}
}
}
}
}
// we haven't returned an Ok, sleep and try again
if let Some(earliest_not_until) = earliest_not_until {
let deadline = earliest_not_until.wait_time_from(self.clock.now());
sleep(deadline).await;
} else {
// TODO: how long should we wait?
// TODO: max wait time?
sleep(Duration::from_millis(500)).await;
};
}
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
// TODO: use flags for the config path
let config = "./data/config/example.toml";
let config: String = fs::read_to_string(config)?;
let config: RootConfig = toml::from_str(&config)?;
// TODO: load the config from yaml instead of hard coding
// TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else
// TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable
let listen_port = config.config.listen_port;
let app = config.try_build().await?;
let app: Arc<Web3ProxyApp> = Arc::new(app);
let proxy_rpc_filter = warp::any()
.and(warp::post())
.and(warp::body::json())
.then(move |json_body| app.clone().proxy_web3_rpc(json_body));
// TODO: filter for displaying connections and their block heights
// TODO: warp trace is super verbose. how do we make this more readable?
// let routes = proxy_rpc_filter.with(warp::trace::request());
let routes = proxy_rpc_filter.map(handle_anyhow_errors);
warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await;
Ok(())
}
/// convert result into an http response. use this at the end of your warp filter
/// TODO: using boxes can't be the best way. think about this more
fn handle_anyhow_errors<T: warp::Reply>(
res: anyhow::Result<T>,
) -> warp::http::Response<warp::hyper::Body> {
match res {
Ok(r) => r.into_response(),
Err(e) => warp::reply::with_status(
format!("{}", e),
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response(),
}
}