From 98e261e7f914730efed6bcd098a32de84053ea3a Mon Sep 17 00:00:00 2001 From: Tei Im <40449056+ImTei@users.noreply.github.com> Date: Mon, 19 Aug 2024 14:23:09 -0600 Subject: [PATCH] proxyd: Support Redis read endpoint for caching (#45) --- proxyd/cache.go | 15 ++--- proxyd/config.go | 1 + proxyd/integration_tests/caching_test.go | 57 +++++++++++++++++++ .../testdata/caching_replica.toml | 37 ++++++++++++ proxyd/proxyd.go | 20 ++++++- 5 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 proxyd/integration_tests/testdata/caching_replica.toml diff --git a/proxyd/cache.go b/proxyd/cache.go index 5add4f2..782cfa3 100644 --- a/proxyd/cache.go +++ b/proxyd/cache.go @@ -45,13 +45,14 @@ func (c *cache) Put(ctx context.Context, key string, value string) error { } type redisCache struct { - rdb *redis.Client - prefix string - ttl time.Duration + redisClient *redis.Client + redisReadClient *redis.Client + prefix string + ttl time.Duration } -func newRedisCache(rdb *redis.Client, prefix string, ttl time.Duration) *redisCache { - return &redisCache{rdb, prefix, ttl} +func newRedisCache(redisClient *redis.Client, redisReadClient *redis.Client, prefix string, ttl time.Duration) *redisCache { + return &redisCache{redisClient, redisReadClient, prefix, ttl} } func (c *redisCache) namespaced(key string) string { @@ -63,7 +64,7 @@ func (c *redisCache) namespaced(key string) string { func (c *redisCache) Get(ctx context.Context, key string) (string, error) { start := time.Now() - val, err := c.rdb.Get(ctx, c.namespaced(key)).Result() + val, err := c.redisReadClient.Get(ctx, c.namespaced(key)).Result() redisCacheDurationSumm.WithLabelValues("GET").Observe(float64(time.Since(start).Milliseconds())) if err == redis.Nil { @@ -77,7 +78,7 @@ func (c *redisCache) Get(ctx context.Context, key string) (string, error) { func (c *redisCache) Put(ctx context.Context, key string, value string) error { start := time.Now() - err := c.rdb.SetEx(ctx, c.namespaced(key), value, c.ttl).Err() + err := c.redisClient.SetEx(ctx, c.namespaced(key), value, c.ttl).Err() redisCacheDurationSumm.WithLabelValues("SETEX").Observe(float64(time.Since(start).Milliseconds())) if err != nil { diff --git a/proxyd/config.go b/proxyd/config.go index 83935e7..3dd310f 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -39,6 +39,7 @@ type CacheConfig struct { type RedisConfig struct { URL string `toml:"url"` Namespace string `toml:"namespace"` + ReadURL string `toml:"read_url"` } type MetricsConfig struct { diff --git a/proxyd/integration_tests/caching_test.go b/proxyd/integration_tests/caching_test.go index e74b85b..283d600 100644 --- a/proxyd/integration_tests/caching_test.go +++ b/proxyd/integration_tests/caching_test.go @@ -264,6 +264,63 @@ func TestBatchCaching(t *testing.T) { require.Equal(t, 1, countRequests(backend, "eth_call")) } +func TestCachingWithReadReplica(t *testing.T) { + primary, err := miniredis.Run() + require.NoError(t, err) + defer primary.Close() + + replica, err := miniredis.Run() + require.NoError(t, err) + defer replica.Close() + + hdlr := NewBatchRPCResponseRouter() + hdlr.SetRoute("eth_getBlockByHash", "999", "eth_getBlockByHash") + + backend := NewMockBackend(hdlr) + defer backend.Close() + + require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) + require.NoError(t, os.Setenv("REDIS_URL", fmt.Sprintf("redis://%s", primary.Addr()))) + require.NoError(t, os.Setenv("REDIS_READ_URL", fmt.Sprintf("redis://%s", replica.Addr()))) + + config := ReadConfig("caching_replica") + client := NewProxydClient("http://127.0.0.1:8545") + _, shutdown, err := proxyd.Start(config) + require.NoError(t, err) + defer shutdown() + + // allow time for the block number fetcher to fire + time.Sleep(1500 * time.Millisecond) + + params := []interface{}{"0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", "false"} + response := "{\"jsonrpc\": \"2.0\", \"result\": \"eth_getBlockByHash\", \"id\": 999}" + resRaw, _, err := client.SendRPC("eth_getBlockByHash", params) + require.NoError(t, err) + + // because the cache is not replicated to the replica, count request must be increased + resCache, _, err := client.SendRPC("eth_getBlockByHash", params) + require.NoError(t, err) + RequireEqualJSON(t, []byte(response), resCache) + RequireEqualJSON(t, resRaw, resCache) + require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash")) + + // replicate cache data + for _, key := range primary.Keys() { + value, err := primary.Get(key) + require.NoError(t, err) + + err = replica.Set(key, value) + require.NoError(t, err) + } + + // now cache hit. count request must be same + resCache, _, err = client.SendRPC("eth_getBlockByHash", params) + require.NoError(t, err) + RequireEqualJSON(t, []byte(response), resCache) + RequireEqualJSON(t, resRaw, resCache) + require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash")) +} + func countRequests(backend *MockBackend, name string) int { var count int for _, req := range backend.Requests() { diff --git a/proxyd/integration_tests/testdata/caching_replica.toml b/proxyd/integration_tests/testdata/caching_replica.toml new file mode 100644 index 0000000..ed9088d --- /dev/null +++ b/proxyd/integration_tests/testdata/caching_replica.toml @@ -0,0 +1,37 @@ +[server] +rpc_port = 8545 + +[backend] +response_timeout_seconds = 1 + +[redis] +url = "$REDIS_URL" +read_url = "$REDIS_READ_URL" +namespace = "proxyd" + +[cache] +enabled = true + +[backends] +[backends.good] +rpc_url = "$GOOD_BACKEND_RPC_URL" +ws_url = "$GOOD_BACKEND_RPC_URL" + +[backend_groups] +[backend_groups.main] +backends = ["good"] + +[rpc_method_mappings] +eth_chainId = "main" +net_version = "main" +eth_getBlockByNumber = "main" +eth_blockNumber = "main" +eth_call = "main" +eth_getBlockTransactionCountByHash = "main" +eth_getUncleCountByBlockHash = "main" +eth_getBlockByHash = "main" +eth_getTransactionByHash = "main" +eth_getTransactionByBlockHashAndIndex = "main" +eth_getUncleByBlockHashAndIndex = "main" +eth_getTransactionReceipt = "main" +debug_getRawReceipts = "main" diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index ba13d66..f7b8256 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -39,6 +39,7 @@ func Start(config *Config) (*Server, func(), error) { } } + // redis primary client var redisClient *redis.Client if config.Redis.URL != "" { rURL, err := ReadFromEnvOrConfig(config.Redis.URL) @@ -51,6 +52,23 @@ func Start(config *Config) (*Server, func(), error) { } } + // redis read replica client + // if read endpoint is not set, use primary endpoint + var redisReadClient = redisClient + if config.Redis.ReadURL != "" { + if redisClient == nil { + return nil, nil, errors.New("must specify a Redis primary URL. only read endpoint is set") + } + rURL, err := ReadFromEnvOrConfig(config.Redis.ReadURL) + if err != nil { + return nil, nil, err + } + redisReadClient, err = NewRedisClient(rURL) + if err != nil { + return nil, nil, err + } + } + if redisClient == nil && config.RateLimit.UseRedis { return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") } @@ -276,7 +294,7 @@ func Start(config *Config) (*Server, func(), error) { if config.Cache.TTL != 0 { ttl = time.Duration(config.Cache.TTL) } - cache = newRedisCache(redisClient, config.Redis.Namespace, ttl) + cache = newRedisCache(redisClient, redisReadClient, config.Redis.Namespace, ttl) } rpcCache = newRPCCache(newCacheWithCompression(cache)) }