document more and cache in block_map

This commit is contained in:
Bryan Stitt 2022-07-19 06:41:04 +00:00
parent 85b8a3a4e8
commit 88cb2cd1d1
6 changed files with 85 additions and 30 deletions

5
Cargo.lock generated

@ -2730,9 +2730,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "petgraph"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b305cc4569dd4e8765bab46261f67ef5d4d11a4b6e745100ee5dad8948b46c"
checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
dependencies = [
"fixedbitset",
"indexmap",
@ -4338,6 +4338,7 @@ dependencies = [
"notify",
"num",
"parking_lot 0.12.1",
"petgraph",
"proctitle",
"redis-cell-client",
"regex",

@ -28,19 +28,35 @@ Options:
--help display usage information
```
Start the server with the defaults (listen on `http://localhost:8544` and use `./config/example.toml` which proxies to a local websocket on 8546 and ankr's public ETH node):
Start the server with the defaults (listen on `http://localhost:8544` and use `./config/example.toml` which proxies to a bunch of public nodes:
```
cargo run --release -p web3-proxy
cargo run --release -p web3-proxy -- --config ./config/example.toml
```
## Common commands
Check that the proxy is working:
```
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544
```
You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start a proxies for many chains.
Check that the websocket is working:
```
$ websocat ws://127.0.0.1:8544
{"id": 1, "method": "eth_subscribe", "params": ["newHeads"]}
{"id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
{"id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]}
{"id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]}
```
You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains.
## Flame Graphs
@ -67,15 +83,18 @@ Test the proxy:
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544
Test geth:
Test geth (assuming it is on 8545):
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
Test erigon:
Test erigon (assuming it is on 8945):
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another.
Run ethspam for a more realistic load test:
docker run --rm --name spam shazow/ethspam --rpc http://$LOCAL_IP:8544 | versus --concurrency=100 --stop-after=10000 http://$LOCAL_IP:8544; docker stop spam

@ -139,4 +139,5 @@ in another repo: event subscriber
- [ ] i saw "WebSocket connection closed unexpectedly" but no auto reconnect. need better logs on these
- [ ] if archive servers are added to the rotation while they are still syncing, they might get requests too soon. keep archive servers out of the configs until they are done syncing. full nodes should be fine to add to the configs even while syncing, though its a wasted connection
- [x] when under load, i'm seeing "http interval lagging!". sometimes it happens when not loaded.
- we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times
- we were skipping our delay interval when block hash wasn't changed. so if a block was ever slow, the http provider would get the same hash twice and then would try eth_getBlockByNumber a ton of times
- [ ] load tests: docker run --rm --name spam shazow/ethspam --rpc http://$LOCAL_IP:8544 | versus --concurrency=100 --stop-after=10000 http://$LOCAL_IP:8544; docker stop spam

@ -27,8 +27,8 @@ linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
notify = "4.0.17"
num = "0.4.0"
redis-cell-client = { path = "../redis-cell-client" }
# TODO: parking_lot has an "arc_lock" feature that we might want to use
parking_lot = "0.12.1"
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
petgraph = "0.6.2"
proctitle = "0.1.1"
# TODO: regex has several "perf" features that we might want to use
regex = "1.6.0"

@ -86,11 +86,14 @@ impl Serialize for Web3Connection {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Connection", 3)?;
let mut state = serializer.serialize_struct("Web3Connection", 4)?;
// TODO: sanitize any credentials in the url
state.serialize_field("url", &self.url)?;
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
state.serialize_field("block_data_limit", &block_data_limit)?;
state.serialize_field("soft_limit", &self.soft_limit)?;
state.serialize_field(
@ -120,6 +123,7 @@ impl fmt::Debug for Web3Connection {
impl fmt::Display for Web3Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: filter basic auth and api keys
write!(f, "{}", &self.url)
}
}
@ -188,12 +192,11 @@ impl Web3Connection {
chain_id,
found_chain_id
)
.context(format!("failed spawning {}", new_connection)));
.context(format!("failed @ {}", new_connection)));
}
}
Err(e) => {
let e =
anyhow::Error::from(e).context(format!("failed spawning {}", new_connection));
let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection));
return Err(e);
}
}

@ -9,6 +9,8 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
// use parking_lot::RwLock;
// use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -36,15 +38,14 @@ struct SyncedConnections {
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
inner: BTreeSet<Arc<Web3Connection>>,
// TODO: use petgraph for keeping track of the chain so we can do better fork handling
}
impl fmt::Debug for SyncedConnections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("SyncedConnections")
.field("head_block_num", &self.head_block_num)
.field("head_block_hash", &self.head_block_hash)
.field("head_num", &self.head_block_num)
.field("head_hash", &self.head_block_hash)
.finish_non_exhaustive()
}
}
@ -65,6 +66,10 @@ pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections: ArcSwap<SyncedConnections>,
pending_transactions: Arc<DashMap<TxHash, TxState>>,
// TODO: i think chain is what we want, but i'm not sure how we'll use it yet
// TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
// chain: Arc<RwLock<DiGraphMap<H256, Block<TxHash>>>>,
block_map: DashMap<u64, BTreeSet<H256>>,
}
impl Serialize for Web3Connections {
@ -197,6 +202,7 @@ impl Web3Connections {
inner: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions,
block_map: Default::default(),
});
let handle = {
@ -377,7 +383,13 @@ impl Web3Connections {
pub async fn get_block_hash(&self, num: U64) -> anyhow::Result<H256> {
// TODO: this definitely needs caching
warn!("this needs to be much more efficient");
// first, try to get the hash from petgraph. but how do we get all blocks with a given num and then pick the one on the correct chain?
if let Some(hash) = self.block_map.get(&num.as_u64()) {
// for now, just return the first seen block. we actually want the winning block!
return Ok(*hash.iter().next().unwrap());
unimplemented!("use petgraph to find the heaviest chain");
}
// TODO: helper for this
let request =
@ -392,7 +404,11 @@ impl Web3Connections {
let block = response.result.unwrap().to_string();
let block: Block<TxHash> = serde_json::from_str(&block)?;
Ok(block.hash.unwrap())
let hash = block.hash.unwrap();
self.block_map.entry(num.as_u64()).or_default().insert(hash);
Ok(hash)
}
pub fn get_head_block_hash(&self) -> H256 {
@ -478,6 +494,7 @@ impl Web3Connections {
let mut connection_states: HashMap<Arc<Web3Connection>, _> =
HashMap::with_capacity(total_rpcs);
// keep a pending one so that we can delay publishing a new head block until multiple servers are synced
let mut pending_synced_connections = SyncedConnections::default();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
@ -524,12 +541,18 @@ impl Web3Connections {
// TODO: if the parent hash isn't our previous best block, ignore it
pending_synced_connections.head_block_hash = new_block_hash;
// TODO: wait to send this until we publish
head_block_sender
.send(new_block.clone())
.context("head_block_sender")?;
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
self.block_map
.entry(new_block_num)
.or_default()
.insert(new_block_hash);
}
cmp::Ordering::Equal => {
if new_block_hash == pending_synced_connections.head_block_hash {
@ -541,6 +564,11 @@ impl Web3Connections {
} else {
// same height, but different chain
self.block_map
.entry(new_block_num)
.or_default()
.insert(new_block_hash);
// check connection_states to see which head block is more popular!
let mut rpc_ids_by_block: BTreeMap<H256, Vec<Arc<Web3Connection>>> =
BTreeMap::new();
@ -596,24 +624,26 @@ impl Web3Connections {
// we didn't remove anything. nothing more to do
continue;
}
// TODO: insert the hash if it isn't known?
// we removed. don't continue so that we update self.synced_connections
}
}
// the synced connections have changed
let synced_connections = Arc::new(pending_synced_connections.clone());
if synced_connections.inner.len() == total_rpcs {
if pending_synced_connections.inner.len() == total_rpcs {
// TODO: more metrics
trace!("all head: {}", new_block_hash);
} else {
trace!(
"rpcs at {}: {:?}",
pending_synced_connections.head_block_hash,
pending_synced_connections.inner
);
}
trace!(
"rpcs at {}: {:?}",
synced_connections.head_block_hash,
synced_connections.inner
);
// TODO: what if the hashes don't match?
if pending_synced_connections.head_block_hash == new_block_hash {
// mark all transactions in the block as confirmed
@ -630,9 +660,10 @@ impl Web3Connections {
// TODO: mark any orphaned transactions as unconfirmed
}
// TODO: only publish if there are x (default 2) nodes synced to this block?
// TODO: only publish if there are x (default 50%) nodes synced to this block?
// TODO: do this before or after processing all the transactions in this block?
self.synced_connections.swap(synced_connections);
self.synced_connections
.swap(Arc::new(pending_synced_connections.clone()));
}
// TODO: if there was an error, we should return it