From 537717610e56dee4f9b553dc19bce797044093b6 Mon Sep 17 00:00:00 2001 From: Matthew Slipper Date: Fri, 23 Sep 2022 23:06:02 +0200 Subject: [PATCH] proxyd: Add batch size metric and configurable max (#3545) * proxyd: Add batch size metric and configurable max The max batch size will be overwritten if it is over `MaxBatchRPCCallsHardLimit`. Builds on https://github.com/ethereum-optimism/optimism/pull/3544. * changeset * fix lint * fix test --- proxyd/proxyd/config.go | 6 ++ .../proxyd/integration_tests/batching_test.go | 57 ++++++++++++------- .../integration_tests/testdata/batching.toml | 4 ++ proxyd/proxyd/metrics.go | 18 ++++++ proxyd/proxyd/proxyd.go | 4 ++ proxyd/proxyd/server.go | 13 ++++- 6 files changed, 79 insertions(+), 23 deletions(-) diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 466deb9..0647074 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -95,6 +95,11 @@ type BackendGroupsConfig map[string]*BackendGroupConfig type MethodMappingsConfig map[string]string +type BatchConfig struct { + MaxSize int `toml:"max_size"` + ErrorMessage string `toml:"error_message"` +} + type Config struct { WSBackendGroup string `toml:"ws_backend_group"` Server ServerConfig `toml:"server"` @@ -104,6 +109,7 @@ type Config struct { RateLimit RateLimitConfig `toml:"rate_limit"` BackendOptions BackendOptions `toml:"backend"` Backends BackendsConfig `toml:"backends"` + BatchConfig BatchConfig `toml:"batch"` Authentication map[string]string `toml:"authentication"` BackendGroups BackendGroupsConfig `toml:"backend_groups"` RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` diff --git a/proxyd/proxyd/integration_tests/batching_test.go b/proxyd/proxyd/integration_tests/batching_test.go index 1bcbba9..b0f811c 100644 --- a/proxyd/proxyd/integration_tests/batching_test.go +++ b/proxyd/proxyd/integration_tests/batching_test.go @@ -33,13 +33,13 @@ func TestBatching(t *testing.T) { callMock1 := mockResult{"eth_call", "1", "ekans1"} tests := []struct { - name string - handler http.Handler - mocks []mockResult - reqs []*proxyd.RPCReq - expectedRes string - maxBatchSize int - numExpectedForwards int + name string + handler http.Handler + mocks []mockResult + reqs []*proxyd.RPCReq + expectedRes string + maxUpstreamBatchSize int + numExpectedForwards int }{ { name: "backend returns batches out of order", @@ -49,9 +49,9 @@ func TestBatching(t *testing.T) { NewRPCReq("2", "eth_chainId", nil), NewRPCReq("3", "eth_chainId", nil), }, - expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), - maxBatchSize: 2, - numExpectedForwards: 2, + expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), + maxUpstreamBatchSize: 2, + numExpectedForwards: 2, }, { // infura behavior @@ -65,8 +65,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 10, - numExpectedForwards: 1, + maxUpstreamBatchSize: 10, + numExpectedForwards: 1, }, { name: "backend returns single RPC response object for minibatches", @@ -79,8 +79,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 1, - numExpectedForwards: 2, + maxUpstreamBatchSize: 1, + numExpectedForwards: 2, }, { name: "duplicate request ids are on distinct batches", @@ -96,9 +96,24 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_chainId", nil), NewRPCReq("1", "eth_call", nil), }, - expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), - maxBatchSize: 2, - numExpectedForwards: 3, + expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), + maxUpstreamBatchSize: 2, + numExpectedForwards: 3, + }, + { + name: "over max size", + mocks: []mockResult{}, + reqs: []*proxyd.RPCReq{ + NewRPCReq("1", "net_version", nil), + NewRPCReq("2", "eth_chainId", nil), + NewRPCReq("3", "eth_chainId", nil), + NewRPCReq("4", "eth_call", nil), + NewRPCReq("5", "eth_call", nil), + NewRPCReq("6", "eth_call", nil), + }, + expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}", + maxUpstreamBatchSize: 2, + numExpectedForwards: 0, }, { name: "eth_accounts does not get forwarded", @@ -109,15 +124,15 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_call", nil), NewRPCReq("2", "eth_accounts", nil), }, - expectedRes: asArray(callResponse1, ethAccountsResponse2), - maxBatchSize: 2, - numExpectedForwards: 1, + expectedRes: asArray(callResponse1, ethAccountsResponse2), + maxUpstreamBatchSize: 2, + numExpectedForwards: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - config.Server.MaxUpstreamBatchSize = tt.maxBatchSize + config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize handler := tt.handler if handler == nil { diff --git a/proxyd/proxyd/integration_tests/testdata/batching.toml b/proxyd/proxyd/integration_tests/testdata/batching.toml index 2aa591d..4762835 100644 --- a/proxyd/proxyd/integration_tests/testdata/batching.toml +++ b/proxyd/proxyd/integration_tests/testdata/batching.toml @@ -17,3 +17,7 @@ backends = ["good"] eth_chainId = "main" net_version = "main" eth_call = "main" + +[batch] +error_message = "over batch size custom message" +max_size = 5 \ No newline at end of file diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 5241dfa..a3cfe45 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -222,6 +222,20 @@ var ( }, []string{ "backend_name", }) + + batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Name: "batch_size_summary", + Help: "Summary of batch sizes", + Buckets: []float64{ + 1, + 5, + 10, + 25, + 50, + 100, + }, + }) ) func RecordRedisError(source string) { @@ -278,3 +292,7 @@ func RecordCacheHit(method string) { func RecordCacheMiss(method string) { cacheMissesTotal.WithLabelValues(method).Inc() } + +func RecordBatchSize(size int) { + batchSizeHistogram.Observe(float64(size)) +} diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index af9f44e..12a6a1a 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -65,6 +65,9 @@ func Start(config *Config) (func(), error) { if config.WhitelistErrorMessage != "" { ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage } + if config.BatchConfig.ErrorMessage != "" { + ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage + } maxConcurrentRPCs := config.Server.MaxConcurrentRPCs if maxConcurrentRPCs == 0 { @@ -236,6 +239,7 @@ func Start(config *Config) (func(), error) { config.RateLimit, config.Server.EnableRequestLog, config.Server.MaxRequestBodyLogLen, + config.BatchConfig.MaxSize, ) if err != nil { return nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/proxyd/server.go b/proxyd/proxyd/server.go index 509dc6a..94787d1 100644 --- a/proxyd/proxyd/server.go +++ b/proxyd/proxyd/server.go @@ -28,7 +28,7 @@ const ( ContextKeyAuth = "authorization" ContextKeyReqID = "req_id" ContextKeyXForwardedFor = "x_forwarded_for" - MaxBatchRPCCalls = 100 + MaxBatchRPCCallsHardLimit = 100 cacheStatusHdr = "X-Proxyd-Cache-Status" defaultServerTimeout = time.Second * 10 maxRequestBodyLogLen = 2000 @@ -48,6 +48,7 @@ type Server struct { authenticatedPaths map[string]string timeout time.Duration maxUpstreamBatchSize int + maxBatchSize int upgrader *websocket.Upgrader mainLim limiter.Store overrideLims map[string]limiter.Store @@ -75,6 +76,7 @@ func NewServer( rateLimitConfig RateLimitConfig, enableRequestLog bool, maxRequestBodyLogLen int, + maxBatchSize int, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -92,6 +94,10 @@ func NewServer( maxUpstreamBatchSize = defaultMaxUpstreamBatchSize } + if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit { + maxBatchSize = MaxBatchRPCCallsHardLimit + } + var mainLim limiter.Store limExemptOrigins := make(map[string]bool) limExemptUserAgents := make(map[string]bool) @@ -139,6 +145,7 @@ func NewServer( cache: cache, enableRequestLog: enableRequestLog, maxRequestBodyLogLen: maxRequestBodyLogLen, + maxBatchSize: maxBatchSize, upgrader: &websocket.Upgrader{ HandshakeTimeout: 5 * time.Second, }, @@ -291,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - if len(reqs) > MaxBatchRPCCalls { + RecordBatchSize(len(reqs)) + + if len(reqs) > s.maxBatchSize { RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests) writeRPCError(ctx, w, nil, ErrTooManyBatchRequests) return