proxyd: Support Redis read endpoint for caching (#45)

This commit is contained in:
Tei Im 2024-08-19 14:23:09 -06:00 committed by GitHub
parent 30560d3d8c
commit 98e261e7f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 122 additions and 8 deletions

@ -45,13 +45,14 @@ func (c *cache) Put(ctx context.Context, key string, value string) error {
} }
type redisCache struct { type redisCache struct {
rdb *redis.Client redisClient *redis.Client
prefix string redisReadClient *redis.Client
ttl time.Duration prefix string
ttl time.Duration
} }
func newRedisCache(rdb *redis.Client, prefix string, ttl time.Duration) *redisCache { func newRedisCache(redisClient *redis.Client, redisReadClient *redis.Client, prefix string, ttl time.Duration) *redisCache {
return &redisCache{rdb, prefix, ttl} return &redisCache{redisClient, redisReadClient, prefix, ttl}
} }
func (c *redisCache) namespaced(key string) string { func (c *redisCache) namespaced(key string) string {
@ -63,7 +64,7 @@ func (c *redisCache) namespaced(key string) string {
func (c *redisCache) Get(ctx context.Context, key string) (string, error) { func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
start := time.Now() start := time.Now()
val, err := c.rdb.Get(ctx, c.namespaced(key)).Result() val, err := c.redisReadClient.Get(ctx, c.namespaced(key)).Result()
redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds())) redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds()))
if err == redis.Nil { if err == redis.Nil {
@ -77,7 +78,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) {
func (c *redisCache) Put(ctx context.Context, key string, value string) error { func (c *redisCache) Put(ctx context.Context, key string, value string) error {
start := time.Now() start := time.Now()
err := c.rdb.SetEx(ctx, c.namespaced(key), value, c.ttl).Err() err := c.redisClient.SetEx(ctx, c.namespaced(key), value, c.ttl).Err()
redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds())) redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds()))
if err != nil { if err != nil {

@ -39,6 +39,7 @@ type CacheConfig struct {
type RedisConfig struct { type RedisConfig struct {
URL string `toml:"url"` URL string `toml:"url"`
Namespace string `toml:"namespace"` Namespace string `toml:"namespace"`
ReadURL string `toml:"read_url"`
} }
type MetricsConfig struct { type MetricsConfig struct {

@ -264,6 +264,63 @@ func TestBatchCaching(t *testing.T) {
require.Equal(t, 1, countRequests(backend, "eth_call")) require.Equal(t, 1, countRequests(backend, "eth_call"))
} }
func TestCachingWithReadReplica(t *testing.T) {
primary, err := miniredis.Run()
require.NoError(t, err)
defer primary.Close()
replica, err := miniredis.Run()
require.NoError(t, err)
defer replica.Close()
hdlr := NewBatchRPCResponseRouter()
hdlr.SetRoute("eth_getBlockByHash", "999", "eth_getBlockByHash")
backend := NewMockBackend(hdlr)
defer backend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://%s", primary.Addr())))
require.NoError(t, os.Setenv("REDIS_READ_URL", fmt.Sprintf("redis://%s", replica.Addr())))
config := ReadConfig("caching_replica")
client := NewProxydClient("http://127.0.0.1:8545")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
// allow time for the block number fetcher to fire
time.Sleep(1500 * time.Millisecond)
params := []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"}
response := "{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}"
resRaw, _, err := client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)
// because the cache is not replicated to the replica, count request must be increased
resCache, _, err := client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)
RequireEqualJSON(t, []byte(response), resCache)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
// replicate cache data
for _, key := range primary.Keys() {
value, err := primary.Get(key)
require.NoError(t, err)
err = replica.Set(key, value)
require.NoError(t, err)
}
// now cache hit. count request must be same
resCache, _, err = client.SendRPC("eth_getBlockByHash", params)
require.NoError(t, err)
RequireEqualJSON(t, []byte(response), resCache)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
}
func countRequests(backend *MockBackend, name string) int { func countRequests(backend *MockBackend, name string) int {
var count int var count int
for _, req := range backend.Requests() { for _, req := range backend.Requests() {

@ -0,0 +1,37 @@
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
[redis]
url = "$REDIS_URL"
read_url = "$REDIS_READ_URL"
namespace = "proxyd"
[cache]
enabled = true
[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"
[backend_groups]
[backend_groups.main]
backends = ["good"]
[rpc_method_mappings]
eth_chainId = "main"
net_version = "main"
eth_getBlockByNumber = "main"
eth_blockNumber = "main"
eth_call = "main"
eth_getBlockTransactionCountByHash = "main"
eth_getUncleCountByBlockHash = "main"
eth_getBlockByHash = "main"
eth_getTransactionByHash = "main"
eth_getTransactionByBlockHashAndIndex = "main"
eth_getUncleByBlockHashAndIndex = "main"
eth_getTransactionReceipt = "main"
debug_getRawReceipts = "main"

@ -39,6 +39,7 @@ func Start(config *Config) (*Server, func(), error) {
} }
} }
// redis primary client
var redisClient *redis.Client var redisClient *redis.Client
if config.Redis.URL != "" { if config.Redis.URL != "" {
rURL, err := ReadFromEnvOrConfig(config.Redis.URL) rURL, err := ReadFromEnvOrConfig(config.Redis.URL)
@ -51,6 +52,23 @@ func Start(config *Config) (*Server, func(), error) {
} }
} }
// redis read replica client
// if read endpoint is not set, use primary endpoint
var redisReadClient = redisClient
if config.Redis.ReadURL != "" {
if redisClient == nil {
return nil, nil, errors.New("must specify a Redis primary URL. only read endpoint is set")
}
rURL, err := ReadFromEnvOrConfig(config.Redis.ReadURL)
if err != nil {
return nil, nil, err
}
redisReadClient, err = NewRedisClient(rURL)
if err != nil {
return nil, nil, err
}
}
if redisClient == nil && config.RateLimit.UseRedis { if redisClient == nil && config.RateLimit.UseRedis {
return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config")
} }
@ -276,7 +294,7 @@ func Start(config *Config) (*Server, func(), error) {
if config.Cache.TTL != 0 { if config.Cache.TTL != 0 {
ttl = time.Duration(config.Cache.TTL) ttl = time.Duration(config.Cache.TTL)
} }
cache = newRedisCache(redisClient, config.Redis.Namespace, ttl) cache = newRedisCache(redisClient, redisReadClient, config.Redis.Namespace, ttl)
} }
rpcCache = newRPCCache(newCacheWithCompression(cache)) rpcCache = newRPCCache(newCacheWithCompression(cache))
} }