more kafka things

This commit is contained in:
Bryan Stitt 2023-03-03 06:58:45 -08:00
parent 2eb8df22ec
commit 4c18657387
4 changed files with 77 additions and 52 deletions

@ -11,6 +11,8 @@ db_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy"
# read-only replica useful when running the proxy in multiple regions
db_replica_url = "mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy"
kafka_urls = "127.0.0.1:19092"
# thundering herd protection
# only mark a block as the head block if the sum of their soft limits is greater than or equal to min_sum_soft_limit
min_sum_soft_limit = 2_000

@ -39,6 +39,18 @@ services:
- ./data/dev_influxdb/data:/var/lib/influxdb2
- ./data/dev_influxdb/config:/etc/influxdb2
dev-kafka:
image: bitnami/kafka:3.4
ports:
- "127.0.0.1:19092:9092"
volumes:
- "./data/dev_kafka:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=dev-zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- dev-zookeeper
# volatile redis for storing rate limits
dev-vredis:
extends:
@ -46,3 +58,13 @@ services:
service: volatile_redis
ports:
- 127.0.0.1:16379:6379
# TODO: kafka doesn't need zookeeper anymore, but all the docs still use it
dev-zookeeper:
image: bitnami/zookeeper:3.8
ports:
- "127.0.0.1:12181:2181"
volumes:
- "./data/zookeeper:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

@ -13,7 +13,9 @@ RUSTFLAGS="--cfg tokio_unstable" cargo run create_user --address 0x077e43dcca20d
RUSTFLAGS="--cfg tokio_unstable" cargo run --release -- proxyd
# Check if the instance is running
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544
curl --verbose -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"web3_clientVersion","id":1}' 127.0.0.1:8544
curl --verbose -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params": ["latest", false], "id":1}' 127.0.0.1:8544
# Open this website to get the nonce to log in
curl -X GET "http://127.0.0.1:8544/user/login/0xeB3E928A2E54BE013EF8241d4C9EaF4DfAE94D5a"

@ -474,10 +474,6 @@ impl Web3ProxyApp {
Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err),
}
}
#[cfg(not(rdkafka))]
if top_config.app.kafka_urls.is_some() {
warn!("rdkafka rust feature is not enabled!");
}
// TODO: do this during apply_config so that we can change redis url while running
// create a connection pool for redis
@ -1085,61 +1081,64 @@ impl Web3ProxyApp {
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
let mut kafka_stuff = None;
if let Some(kafka_producer) = self.kafka_producer.clone() {
let kafka_topic = "proxy_cached_request".to_string();
let rpc_secret_key_id = authorization
.checks
.rpc_secret_key_id
.map(|x| x.get())
.unwrap_or_default();
if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
if let Some(kafka_producer) = self.kafka_producer.clone() {
let kafka_topic = "proxy_cached_request".to_string();
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?;
let rpc_secret_key_id = authorization
.checks
.rpc_secret_key_id
.map(|x| x.get())
.unwrap_or_default();
let request_bytes = rmp_serde::to_vec(&request)?;
let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id)?;
let request_hash = Some(keccak256(&request_bytes));
let request_bytes = rmp_serde::to_vec(&request)?;
let chain_id = self.config.chain_id;
let request_hash = Some(keccak256(&request_bytes));
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = OwnedHeaders::new_with_capacity(4)
.insert(Header {
key: "request_hash",
value: request_hash.as_ref(),
})
.insert(Header {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
})
.insert(Header {
key: "chain_id",
value: Some(&chain_id.to_le_bytes()),
});
let chain_id = self.config.chain_id;
// save the key and headers for when we log the response
kafka_stuff = Some((
kafka_topic.clone(),
kafka_key.clone(),
kafka_headers.clone(),
));
// another item is added with the response, so initial_capacity is +1 what is needed here
let kafka_headers = OwnedHeaders::new_with_capacity(4)
.insert(Header {
key: "request_hash",
value: request_hash.as_ref(),
})
.insert(Header {
key: "head_block_num",
value: head_block_num.map(|x| x.to_string()).as_ref(),
})
.insert(Header {
key: "chain_id",
value: Some(&chain_id.to_le_bytes()),
});
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&request_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
// save the key and headers for when we log the response
kafka_stuff = Some((
kafka_topic.clone(),
kafka_key.clone(),
kafka_headers.clone(),
));
if let Err((err, msg)) = produce_future.await {
error!("produce kafka request log: {}. {:#?}", err, msg);
// TODO: re-queue the msg?
}
};
let f = async move {
let produce_future = kafka_producer.send(
FutureRecord::to(&kafka_topic)
.key(&kafka_key)
.payload(&request_bytes)
.headers(kafka_headers),
Duration::from_secs(0),
);
tokio::spawn(f);
if let Err((err, _)) = produce_future.await {
error!("produce kafka request log: {}", err);
// TODO: re-queue the msg?
}
};
tokio::spawn(f);
}
}
// save the id so we can attach it to the response
@ -1767,8 +1766,8 @@ impl Web3ProxyApp {
Duration::from_secs(0),
);
if let Err((err, msg)) = produce_future.await {
error!("produce kafka request log: {}. {:#?}", err, msg);
if let Err((err, _)) = produce_future.await {
error!("produce kafka response log: {}", err);
}
};