From 2f0f949784b8fc77b94e3d9ab8b3be460141954c Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Sat, 13 May 2023 20:54:27 -0700 Subject: [PATCH] refactor(proxyd): clean up backend rate limiter --- proxyd/proxyd/backend.go | 91 ------ proxyd/proxyd/backend_rate_limiter.go | 286 ------------------ proxyd/proxyd/consensus_poller.go | 19 +- .../proxyd/integration_tests/failover_test.go | 13 +- .../integration_tests/rate_limit_test.go | 17 -- proxyd/proxyd/integration_tests/ws_test.go | 29 -- proxyd/proxyd/proxyd.go | 24 +- proxyd/proxyd/server.go | 10 + 8 files changed, 25 insertions(+), 464 deletions(-) delete mode 100644 proxyd/proxyd/backend_rate_limiter.go diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 9244315..4e8fd70 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -121,7 +121,6 @@ type Backend struct { wsURL string authUsername string authPassword string - rateLimiter BackendRateLimiter client *LimitedHTTPClient dialer *websocket.Dialer maxRetries int @@ -243,7 +242,6 @@ func NewBackend( name string, rpcURL string, wsURL string, - rateLimiter BackendRateLimiter, rpcSemaphore *semaphore.Weighted, opts ...BackendOpt, ) *Backend { @@ -251,7 +249,6 @@ func NewBackend( Name: name, rpcURL: rpcURL, wsURL: wsURL, - rateLimiter: rateLimiter, maxResponseSize: math.MaxInt64, client: &LimitedHTTPClient{ Client: http.Client{Timeout: 5 * time.Second}, @@ -281,15 +278,6 @@ func NewBackend( } func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { - if !b.Online() { - RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOffline) - return nil, ErrBackendOffline - } - if b.IsRateLimited() { - RecordBatchRPCError(ctx, b.Name, reqs, ErrBackendOverCapacity) - return nil, ErrBackendOverCapacity - } - var lastError error // <= to account for the first attempt not technically being // a retry @@ -340,24 +328,12 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] return res, err } - b.setOffline() return nil, wrapErr(lastError, "permanent error forwarding request") } func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { - if !b.Online() { - return nil, ErrBackendOffline - } - if b.IsWSSaturated() { - return nil, ErrBackendOverCapacity - } - backendConn, _, err := b.dialer.Dial(b.wsURL, nil) // nolint:bodyclose if err != nil { - b.setOffline() - if err := b.rateLimiter.DecBackendWSConns(b.Name); err != nil { - log.Error("error decrementing backend ws conns", "name", b.Name, "err", err) - } return nil, wrapErr(err, "error dialing backend") } @@ -365,66 +341,6 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet return NewWSProxier(b, clientConn, backendConn, methodWhitelist), nil } -func (b *Backend) Online() bool { - online, err := b.rateLimiter.IsBackendOnline(b.Name) - if err != nil { - log.Warn( - "error getting backend availability, assuming it is offline", - "name", b.Name, - "err", err, - ) - return false - } - return online -} - -func (b *Backend) IsRateLimited() bool { - if b.maxRPS == 0 { - return false - } - - usedLimit, err := b.rateLimiter.IncBackendRPS(b.Name) - if err != nil { - log.Error( - "error getting backend used rate limit, assuming limit is exhausted", - "name", b.Name, - "err", err, - ) - return true - } - - return b.maxRPS < usedLimit -} - -func (b *Backend) IsWSSaturated() bool { - if b.maxWSConns == 0 { - return false - } - - incremented, err := b.rateLimiter.IncBackendWSConns(b.Name, b.maxWSConns) - if err != nil { - log.Error( - "error getting backend used ws conns, assuming limit is exhausted", - "name", b.Name, - "err", err, - ) - return true - } - - return !incremented -} - -func (b *Backend) setOffline() { - err := b.rateLimiter.SetBackendOffline(b.Name, b.outOfServiceInterval) - if err != nil { - log.Warn( - "error setting backend offline", - "name", b.Name, - "err", err, - ) - } -} - // ForwardRPC makes a call directly to a backend and populate the response into `res` func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error { jsonParams, err := json.Marshal(params) @@ -968,9 +884,6 @@ func (w *WSProxier) backendPump(ctx context.Context, errC chan error) { func (w *WSProxier) close() { w.clientConn.Close() w.backendConn.Close() - if err := w.backend.rateLimiter.DecBackendWSConns(w.backend.Name); err != nil { - log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err) - } activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec() } @@ -984,10 +897,6 @@ func (w *WSProxier) prepareClientMsg(msg []byte) (*RPCReq, error) { return req, ErrMethodNotWhitelisted } - if w.backend.IsRateLimited() { - return req, ErrBackendOverCapacity - } - return req, nil } diff --git a/proxyd/proxyd/backend_rate_limiter.go b/proxyd/proxyd/backend_rate_limiter.go deleted file mode 100644 index 3cc6fae..0000000 --- a/proxyd/proxyd/backend_rate_limiter.go +++ /dev/null @@ -1,286 +0,0 @@ -package proxyd - -import ( - "context" - "crypto/rand" - "encoding/hex" - "fmt" - "math" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/go-redis/redis/v8" -) - -const MaxRPSScript = ` -local current -current = redis.call("incr", KEYS[1]) -if current == 1 then - redis.call("expire", KEYS[1], 1) -end -return current -` - -const MaxConcurrentWSConnsScript = ` -redis.call("sadd", KEYS[1], KEYS[2]) -local total = 0 -local scanres = redis.call("sscan", KEYS[1], 0) -for _, k in ipairs(scanres[2]) do - local value = redis.call("get", k) - if value then - total = total + value - end -end - -if total < tonumber(ARGV[1]) then - redis.call("incr", KEYS[2]) - redis.call("expire", KEYS[2], 300) - return true -end - -return false -` - -type BackendRateLimiter interface { - IsBackendOnline(name string) (bool, error) - SetBackendOffline(name string, duration time.Duration) error - IncBackendRPS(name string) (int, error) - IncBackendWSConns(name string, max int) (bool, error) - DecBackendWSConns(name string) error - FlushBackendWSConns(names []string) error -} - -type RedisBackendRateLimiter struct { - rdb *redis.Client - randID string - touchKeys map[string]time.Duration - tkMtx sync.Mutex -} - -func NewRedisRateLimiter(rdb *redis.Client) BackendRateLimiter { - out := &RedisBackendRateLimiter{ - rdb: rdb, - randID: randStr(20), - touchKeys: make(map[string]time.Duration), - } - go out.touch() - return out -} - -func (r *RedisBackendRateLimiter) IsBackendOnline(name string) (bool, error) { - exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result() - if err != nil { - RecordRedisError("IsBackendOnline") - return false, wrapErr(err, "error getting backend availability") - } - - return exists == 0, nil -} - -func (r *RedisBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error { - if duration == 0 { - return nil - } - err := r.rdb.SetEX( - context.Background(), - fmt.Sprintf("backend:%s:offline", name), - 1, - duration, - ).Err() - if err != nil { - RecordRedisError("SetBackendOffline") - return wrapErr(err, "error setting backend unavailable") - } - return nil -} - -func (r *RedisBackendRateLimiter) IncBackendRPS(name string) (int, error) { - cmd := r.rdb.Eval( - context.Background(), - MaxRPSScript, - []string{fmt.Sprintf("backend:%s:ratelimit", name)}, - ) - rps, err := cmd.Int() - if err != nil { - RecordRedisError("IncBackendRPS") - return -1, wrapErr(err, "error upserting backend rate limit") - } - return rps, nil -} - -func (r *RedisBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) { - connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) - r.tkMtx.Lock() - r.touchKeys[connsKey] = 5 * time.Minute - r.tkMtx.Unlock() - cmd := r.rdb.Eval( - context.Background(), - MaxConcurrentWSConnsScript, - []string{ - fmt.Sprintf("backend:%s:proxies", name), - connsKey, - }, - max, - ) - incremented, err := cmd.Bool() - // false gets coerced to redis.nil, see https://redis.io/commands/eval#conversion-between-lua-and-redis-data-types - if err == redis.Nil { - return false, nil - } - if err != nil { - RecordRedisError("IncBackendWSConns") - return false, wrapErr(err, "error incrementing backend ws conns") - } - return incremented, nil -} - -func (r *RedisBackendRateLimiter) DecBackendWSConns(name string) error { - connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) - err := r.rdb.Decr(context.Background(), connsKey).Err() - if err != nil { - RecordRedisError("DecBackendWSConns") - return wrapErr(err, "error decrementing backend ws conns") - } - return nil -} - -func (r *RedisBackendRateLimiter) FlushBackendWSConns(names []string) error { - ctx := context.Background() - for _, name := range names { - connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) - err := r.rdb.SRem( - ctx, - fmt.Sprintf("backend:%s:proxies", name), - connsKey, - ).Err() - if err != nil { - return wrapErr(err, "error flushing backend ws conns") - } - err = r.rdb.Del(ctx, connsKey).Err() - if err != nil { - return wrapErr(err, "error flushing backend ws conns") - } - } - return nil -} - -func (r *RedisBackendRateLimiter) touch() { - for { - r.tkMtx.Lock() - for key, dur := range r.touchKeys { - if err := r.rdb.Expire(context.Background(), key, dur).Err(); err != nil { - RecordRedisError("touch") - log.Error("error touching redis key", "key", key, "err", err) - } - } - r.tkMtx.Unlock() - time.Sleep(5 * time.Second) - } -} - -type LocalBackendRateLimiter struct { - deadBackends map[string]time.Time - backendRPS map[string]int - backendWSConns map[string]int - mtx sync.RWMutex -} - -func NewLocalBackendRateLimiter() *LocalBackendRateLimiter { - out := &LocalBackendRateLimiter{ - deadBackends: make(map[string]time.Time), - backendRPS: make(map[string]int), - backendWSConns: make(map[string]int), - } - go out.clear() - return out -} - -func (l *LocalBackendRateLimiter) IsBackendOnline(name string) (bool, error) { - l.mtx.RLock() - defer l.mtx.RUnlock() - return l.deadBackends[name].Before(time.Now()), nil -} - -func (l *LocalBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error { - l.mtx.Lock() - defer l.mtx.Unlock() - l.deadBackends[name] = time.Now().Add(duration) - return nil -} - -func (l *LocalBackendRateLimiter) IncBackendRPS(name string) (int, error) { - l.mtx.Lock() - defer l.mtx.Unlock() - l.backendRPS[name] += 1 - return l.backendRPS[name], nil -} - -func (l *LocalBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) { - l.mtx.Lock() - defer l.mtx.Unlock() - if l.backendWSConns[name] == max { - return false, nil - } - l.backendWSConns[name] += 1 - return true, nil -} - -func (l *LocalBackendRateLimiter) DecBackendWSConns(name string) error { - l.mtx.Lock() - defer l.mtx.Unlock() - if l.backendWSConns[name] == 0 { - return nil - } - l.backendWSConns[name] -= 1 - return nil -} - -func (l *LocalBackendRateLimiter) FlushBackendWSConns(names []string) error { - return nil -} - -func (l *LocalBackendRateLimiter) clear() { - for { - time.Sleep(time.Second) - l.mtx.Lock() - l.backendRPS = make(map[string]int) - l.mtx.Unlock() - } -} - -func randStr(l int) string { - b := make([]byte, l) - if _, err := rand.Read(b); err != nil { - panic(err) - } - return hex.EncodeToString(b) -} - -type NoopBackendRateLimiter struct{} - -var noopBackendRateLimiter = &NoopBackendRateLimiter{} - -func (n *NoopBackendRateLimiter) IsBackendOnline(name string) (bool, error) { - return true, nil -} - -func (n *NoopBackendRateLimiter) SetBackendOffline(name string, duration time.Duration) error { - return nil -} - -func (n *NoopBackendRateLimiter) IncBackendRPS(name string) (int, error) { - return math.MaxInt, nil -} - -func (n *NoopBackendRateLimiter) IncBackendWSConns(name string, max int) (bool, error) { - return true, nil -} - -func (n *NoopBackendRateLimiter) DecBackendWSConns(name string) error { - return nil -} - -func (n *NoopBackendRateLimiter) FlushBackendWSConns(names []string) error { - return nil -} diff --git a/proxyd/proxyd/consensus_poller.go b/proxyd/proxyd/consensus_poller.go index f077a0f..a170aca 100644 --- a/proxyd/proxyd/consensus_poller.go +++ b/proxyd/proxyd/consensus_poller.go @@ -233,14 +233,8 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) { return } - // if backend exhausted rate limit we'll skip it for now - if be.IsRateLimited() { - log.Debug("skipping backend - rate limited", "backend", be.Name) - return - } - - // if backend it not online or not in a health state we'll only resume checkin it after ban - if !be.Online() || !be.IsHealthy() { + // if backend is not healthy state we'll only resume checking it after ban + if !be.IsHealthy() { log.Warn("backend banned - not online or not healthy", "backend", be.Name) cp.Ban(be) return @@ -361,12 +355,10 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { /* a serving node needs to be: - healthy (network) - - not rate limited - - online + - updated recently - not banned - with minimum peer count - - updated recently - - not lagging + - not lagging latest block - in sync */ @@ -375,7 +367,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { isBanned := time.Now().Before(bannedUntil) notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount lagging := latestBlockNumber < proposedBlock - if !be.IsHealthy() || be.IsRateLimited() || !be.Online() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { + if !be.IsHealthy() || notUpdated || isBanned || notEnoughPeers || lagging || !inSync { filteredBackendsNames = append(filteredBackendsNames, be.Name) continue } @@ -411,6 +403,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) { } if broken { + // propagate event to other interested parts, such as cache invalidator for _, l := range cp.listeners { l() } diff --git a/proxyd/proxyd/integration_tests/failover_test.go b/proxyd/proxyd/integration_tests/failover_test.go index 119a901..501542a 100644 --- a/proxyd/proxyd/integration_tests/failover_test.go +++ b/proxyd/proxyd/integration_tests/failover_test.go @@ -190,7 +190,7 @@ func TestOutOfServiceInterval(t *testing.T) { require.NoError(t, err) require.Equal(t, 200, statusCode) RequireEqualJSON(t, []byte(goodResponse), res) - require.Equal(t, 2, len(badBackend.Requests())) + require.Equal(t, 4, len(badBackend.Requests())) require.Equal(t, 2, len(goodBackend.Requests())) _, statusCode, err = client.SendBatchRPC( @@ -199,7 +199,7 @@ func TestOutOfServiceInterval(t *testing.T) { ) require.NoError(t, err) require.Equal(t, 200, statusCode) - require.Equal(t, 2, len(badBackend.Requests())) + require.Equal(t, 8, len(badBackend.Requests())) require.Equal(t, 4, len(goodBackend.Requests())) time.Sleep(time.Second) @@ -209,7 +209,7 @@ func TestOutOfServiceInterval(t *testing.T) { require.NoError(t, err) require.Equal(t, 200, statusCode) RequireEqualJSON(t, []byte(goodResponse), res) - require.Equal(t, 3, len(badBackend.Requests())) + require.Equal(t, 9, len(badBackend.Requests())) require.Equal(t, 4, len(goodBackend.Requests())) } @@ -261,7 +261,6 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) { config.BackendOptions.MaxRetries = 2 // Setup redis to detect offline backends config.Redis.URL = fmt.Sprintf("redis://127.0.0.1:%s", redis.Port()) - redisClient, err := proxyd.NewRedisClient(config.Redis.URL) require.NoError(t, err) goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse, goodResponse)) @@ -286,10 +285,4 @@ func TestInfuraFailoverOnUnexpectedResponse(t *testing.T) { RequireEqualJSON(t, []byte(asArray(goodResponse, goodResponse)), res) require.Equal(t, 1, len(badBackend.Requests())) require.Equal(t, 1, len(goodBackend.Requests())) - - rr := proxyd.NewRedisRateLimiter(redisClient) - require.NoError(t, err) - online, err := rr.IsBackendOnline("bad") - require.NoError(t, err) - require.Equal(t, true, online) } diff --git a/proxyd/proxyd/integration_tests/rate_limit_test.go b/proxyd/proxyd/integration_tests/rate_limit_test.go index dd69089..4e17f62 100644 --- a/proxyd/proxyd/integration_tests/rate_limit_test.go +++ b/proxyd/proxyd/integration_tests/rate_limit_test.go @@ -21,23 +21,6 @@ const frontendOverLimitResponseWithID = `{"error":{"code":-32016,"message":"over var ethChainID = "eth_chainId" -func TestBackendMaxRPSLimit(t *testing.T) { - goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse)) - defer goodBackend.Close() - - require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL())) - - config := ReadConfig("backend_rate_limit") - client := NewProxydClient("http://127.0.0.1:8545") - _, shutdown, err := proxyd.Start(config) - require.NoError(t, err) - defer shutdown() - limitedRes, codes := spamReqs(t, client, ethChainID, 503, 3) - require.Equal(t, 2, codes[200]) - require.Equal(t, 1, codes[503]) - RequireEqualJSON(t, []byte(noBackendsResponse), limitedRes) -} - func TestFrontendMaxRPSLimit(t *testing.T) { goodBackend := NewMockBackend(BatchedResponseHandler(200, goodResponse)) defer goodBackend.Close() diff --git a/proxyd/proxyd/integration_tests/ws_test.go b/proxyd/proxyd/integration_tests/ws_test.go index ed33779..c0907fe 100644 --- a/proxyd/proxyd/integration_tests/ws_test.go +++ b/proxyd/proxyd/integration_tests/ws_test.go @@ -270,32 +270,3 @@ func TestWSClientClosure(t *testing.T) { }) } } - -func TestWSClientMaxConns(t *testing.T) { - backend := NewMockWSBackend(nil, nil, nil) - defer backend.Close() - - require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL())) - - config := ReadConfig("ws") - _, shutdown, err := proxyd.Start(config) - require.NoError(t, err) - defer shutdown() - - doneCh := make(chan struct{}, 1) - _, err = NewProxydWSClient("ws://127.0.0.1:8546", nil, nil) - require.NoError(t, err) - _, err = NewProxydWSClient("ws://127.0.0.1:8546", nil, func(err error) { - require.Contains(t, err.Error(), "unexpected EOF") - doneCh <- struct{}{} - }) - require.NoError(t, err) - - timeout := time.NewTicker(30 * time.Second) - select { - case <-timeout.C: - t.Fatalf("timed out") - case <-doneCh: - return - } -} diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 4fc286e..20faa67 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -51,19 +51,6 @@ func Start(config *Config) (*Server, func(), error) { return nil, nil, errors.New("must specify a Redis URL if UseRedis is true in rate limit config") } - var lim BackendRateLimiter - var err error - if config.RateLimit.EnableBackendRateLimiter { - if redisClient != nil { - lim = NewRedisRateLimiter(redisClient) - } else { - log.Warn("redis is not configured, using local rate limiter") - lim = NewLocalBackendRateLimiter() - } - } else { - lim = noopBackendRateLimiter - } - // While modifying shared globals is a bad practice, the alternative // is to clone these errors on every invocation. This is inefficient. // We'd also have to make sure that errors.Is and errors.As continue @@ -159,10 +146,14 @@ func Start(config *Config) (*Server, func(), error) { opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithSkipPeerCountCheck(cfg.SkipPeerCountCheck)) - back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...) + back := NewBackend(name, rpcURL, wsURL, rpcRequestSemaphore, opts...) backendNames = append(backendNames, name) backendsByName[name] = back - log.Info("configured backend", "name", name, "rpc_url", rpcURL, "ws_url", wsURL) + log.Info("configured backend", + "name", name, + "backend_names", backendNames, + "rpc_url", rpcURL, + "ws_url", wsURL) } backendGroups := make(map[string]*BackendGroup) @@ -352,9 +343,6 @@ func Start(config *Config) (*Server, func(), error) { gasPriceLVC.Stop() } srv.Shutdown() - if err := lim.FlushBackendWSConns(backendNames); err != nil { - log.Error("error flushing backend ws conns", "err", err) - } log.Info("goodbye") } diff --git a/proxyd/proxyd/server.go b/proxyd/proxyd/server.go index b52224c..794d4f8 100644 --- a/proxyd/proxyd/server.go +++ b/proxyd/proxyd/server.go @@ -2,6 +2,8 @@ package proxyd import ( "context" + "crypto/rand" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -591,6 +593,14 @@ func (s *Server) populateContext(w http.ResponseWriter, r *http.Request) context ) } +func randStr(l int) string { + b := make([]byte, l) + if _, err := rand.Read(b); err != nil { + panic(err) + } + return hex.EncodeToString(b) +} + func (s *Server) isUnlimitedOrigin(origin string) bool { for _, pat := range s.limExemptOrigins { if pat.MatchString(origin) {