diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index ba60d67..0647074 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -95,19 +95,26 @@ type BackendGroupsConfig map[string]*BackendGroupConfig type MethodMappingsConfig map[string]string +type BatchConfig struct { + MaxSize int `toml:"max_size"` + ErrorMessage string `toml:"error_message"` +} + type Config struct { - WSBackendGroup string `toml:"ws_backend_group"` - Server ServerConfig `toml:"server"` - Cache CacheConfig `toml:"cache"` - Redis RedisConfig `toml:"redis"` - Metrics MetricsConfig `toml:"metrics"` - RateLimit RateLimitConfig `toml:"rate_limit"` - BackendOptions BackendOptions `toml:"backend"` - Backends BackendsConfig `toml:"backends"` - Authentication map[string]string `toml:"authentication"` - BackendGroups BackendGroupsConfig `toml:"backend_groups"` - RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` - WSMethodWhitelist []string `toml:"ws_method_whitelist"` + WSBackendGroup string `toml:"ws_backend_group"` + Server ServerConfig `toml:"server"` + Cache CacheConfig `toml:"cache"` + Redis RedisConfig `toml:"redis"` + Metrics MetricsConfig `toml:"metrics"` + RateLimit RateLimitConfig `toml:"rate_limit"` + BackendOptions BackendOptions `toml:"backend"` + Backends BackendsConfig `toml:"backends"` + BatchConfig BatchConfig `toml:"batch"` + Authentication map[string]string `toml:"authentication"` + BackendGroups BackendGroupsConfig `toml:"backend_groups"` + RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` + WSMethodWhitelist []string `toml:"ws_method_whitelist"` + WhitelistErrorMessage string `toml:"whitelist_error_message"` } func ReadFromEnvOrConfig(value string) (string, error) { diff --git a/proxyd/proxyd/integration_tests/batching_test.go b/proxyd/proxyd/integration_tests/batching_test.go index 1bcbba9..b0f811c 100644 --- a/proxyd/proxyd/integration_tests/batching_test.go +++ b/proxyd/proxyd/integration_tests/batching_test.go @@ -33,13 +33,13 @@ func TestBatching(t *testing.T) { callMock1 := mockResult{"eth_call", "1", "ekans1"} tests := []struct { - name string - handler http.Handler - mocks []mockResult - reqs []*proxyd.RPCReq - expectedRes string - maxBatchSize int - numExpectedForwards int + name string + handler http.Handler + mocks []mockResult + reqs []*proxyd.RPCReq + expectedRes string + maxUpstreamBatchSize int + numExpectedForwards int }{ { name: "backend returns batches out of order", @@ -49,9 +49,9 @@ func TestBatching(t *testing.T) { NewRPCReq("2", "eth_chainId", nil), NewRPCReq("3", "eth_chainId", nil), }, - expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), - maxBatchSize: 2, - numExpectedForwards: 2, + expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), + maxUpstreamBatchSize: 2, + numExpectedForwards: 2, }, { // infura behavior @@ -65,8 +65,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 10, - numExpectedForwards: 1, + maxUpstreamBatchSize: 10, + numExpectedForwards: 1, }, { name: "backend returns single RPC response object for minibatches", @@ -79,8 +79,8 @@ func TestBatching(t *testing.T) { `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, ), - maxBatchSize: 1, - numExpectedForwards: 2, + maxUpstreamBatchSize: 1, + numExpectedForwards: 2, }, { name: "duplicate request ids are on distinct batches", @@ -96,9 +96,24 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_chainId", nil), NewRPCReq("1", "eth_call", nil), }, - expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), - maxBatchSize: 2, - numExpectedForwards: 3, + expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), + maxUpstreamBatchSize: 2, + numExpectedForwards: 3, + }, + { + name: "over max size", + mocks: []mockResult{}, + reqs: []*proxyd.RPCReq{ + NewRPCReq("1", "net_version", nil), + NewRPCReq("2", "eth_chainId", nil), + NewRPCReq("3", "eth_chainId", nil), + NewRPCReq("4", "eth_call", nil), + NewRPCReq("5", "eth_call", nil), + NewRPCReq("6", "eth_call", nil), + }, + expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}", + maxUpstreamBatchSize: 2, + numExpectedForwards: 0, }, { name: "eth_accounts does not get forwarded", @@ -109,15 +124,15 @@ func TestBatching(t *testing.T) { NewRPCReq("1", "eth_call", nil), NewRPCReq("2", "eth_accounts", nil), }, - expectedRes: asArray(callResponse1, ethAccountsResponse2), - maxBatchSize: 2, - numExpectedForwards: 1, + expectedRes: asArray(callResponse1, ethAccountsResponse2), + maxUpstreamBatchSize: 2, + numExpectedForwards: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - config.Server.MaxUpstreamBatchSize = tt.maxBatchSize + config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize handler := tt.handler if handler == nil { diff --git a/proxyd/proxyd/integration_tests/testdata/batching.toml b/proxyd/proxyd/integration_tests/testdata/batching.toml index 2aa591d..4762835 100644 --- a/proxyd/proxyd/integration_tests/testdata/batching.toml +++ b/proxyd/proxyd/integration_tests/testdata/batching.toml @@ -17,3 +17,7 @@ backends = ["good"] eth_chainId = "main" net_version = "main" eth_call = "main" + +[batch] +error_message = "over batch size custom message" +max_size = 5 \ No newline at end of file diff --git a/proxyd/proxyd/integration_tests/testdata/whitelist.toml b/proxyd/proxyd/integration_tests/testdata/whitelist.toml index 55b118c..4a65248 100644 --- a/proxyd/proxyd/integration_tests/testdata/whitelist.toml +++ b/proxyd/proxyd/integration_tests/testdata/whitelist.toml @@ -1,3 +1,5 @@ +whitelist_error_message = "rpc method is not whitelisted custom message" + [server] rpc_port = 8545 diff --git a/proxyd/proxyd/integration_tests/testdata/ws.toml b/proxyd/proxyd/integration_tests/testdata/ws.toml index f86b22f..4642e6b 100644 --- a/proxyd/proxyd/integration_tests/testdata/ws.toml +++ b/proxyd/proxyd/integration_tests/testdata/ws.toml @@ -1,3 +1,5 @@ +whitelist_error_message = "rpc method is not whitelisted" + ws_backend_group = "main" ws_method_whitelist = [ diff --git a/proxyd/proxyd/integration_tests/validation_test.go b/proxyd/proxyd/integration_tests/validation_test.go index be964c1..6f0653d 100644 --- a/proxyd/proxyd/integration_tests/validation_test.go +++ b/proxyd/proxyd/integration_tests/validation_test.go @@ -10,7 +10,7 @@ import ( ) const ( - notWhitelistedResponse = `{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted"},"id":999}` + notWhitelistedResponse = `{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted custom message"},"id":999}` parseErrResponse = `{"jsonrpc":"2.0","error":{"code":-32700,"message":"parse error"},"id":null}` invalidJSONRPCVersionResponse = `{"error":{"code":-32601,"message":"invalid JSON-RPC version"},"id":null,"jsonrpc":"2.0"}` invalidIDResponse = `{"error":{"code":-32601,"message":"invalid ID"},"id":null,"jsonrpc":"2.0"}` diff --git a/proxyd/proxyd/integration_tests/ws_test.go b/proxyd/proxyd/integration_tests/ws_test.go index d93d3a7..9ae12ab 100644 --- a/proxyd/proxyd/integration_tests/ws_test.go +++ b/proxyd/proxyd/integration_tests/ws_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/proxyd" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" @@ -42,6 +44,13 @@ func TestConcurrentWSPanic(t *testing.T) { require.NoError(t, err) defer shutdown() + // suppress tons of log messages + oldHandler := log.Root().GetHandler() + log.Root().SetHandler(log.DiscardHandler()) + defer func() { + log.Root().SetHandler(oldHandler) + }() + <-readyCh var wg sync.WaitGroup diff --git a/proxyd/proxyd/metrics.go b/proxyd/proxyd/metrics.go index 5241dfa..a3cfe45 100644 --- a/proxyd/proxyd/metrics.go +++ b/proxyd/proxyd/metrics.go @@ -222,6 +222,20 @@ var ( }, []string{ "backend_name", }) + + batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Name: "batch_size_summary", + Help: "Summary of batch sizes", + Buckets: []float64{ + 1, + 5, + 10, + 25, + 50, + 100, + }, + }) ) func RecordRedisError(source string) { @@ -278,3 +292,7 @@ func RecordCacheHit(method string) { func RecordCacheMiss(method string) { cacheMissesTotal.WithLabelValues(method).Inc() } + +func RecordBatchSize(size int) { + batchSizeHistogram.Observe(float64(size)) +} diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index e9bbe42..12a6a1a 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -55,6 +55,20 @@ func Start(config *Config) (func(), error) { } } + // While modifying shared globals is a bad practice, the alternative + // is to clone these errors on every invocation. This is inefficient. + // We'd also have to make sure that errors.Is and errors.As continue + // to function properly on the cloned errors. + if config.RateLimit.ErrorMessage != "" { + ErrOverRateLimit.Message = config.RateLimit.ErrorMessage + } + if config.WhitelistErrorMessage != "" { + ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage + } + if config.BatchConfig.ErrorMessage != "" { + ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage + } + maxConcurrentRPCs := config.Server.MaxConcurrentRPCs if maxConcurrentRPCs == 0 { maxConcurrentRPCs = math.MaxInt64 @@ -225,6 +239,7 @@ func Start(config *Config) (func(), error) { config.RateLimit, config.Server.EnableRequestLog, config.Server.MaxRequestBodyLogLen, + config.BatchConfig.MaxSize, ) if err != nil { return nil, fmt.Errorf("error creating server: %w", err) diff --git a/proxyd/proxyd/server.go b/proxyd/proxyd/server.go index 455989d..94787d1 100644 --- a/proxyd/proxyd/server.go +++ b/proxyd/proxyd/server.go @@ -28,7 +28,7 @@ const ( ContextKeyAuth = "authorization" ContextKeyReqID = "req_id" ContextKeyXForwardedFor = "x_forwarded_for" - MaxBatchRPCCalls = 100 + MaxBatchRPCCallsHardLimit = 100 cacheStatusHdr = "X-Proxyd-Cache-Status" defaultServerTimeout = time.Second * 10 maxRequestBodyLogLen = 2000 @@ -48,6 +48,7 @@ type Server struct { authenticatedPaths map[string]string timeout time.Duration maxUpstreamBatchSize int + maxBatchSize int upgrader *websocket.Upgrader mainLim limiter.Store overrideLims map[string]limiter.Store @@ -75,6 +76,7 @@ func NewServer( rateLimitConfig RateLimitConfig, enableRequestLog bool, maxRequestBodyLogLen int, + maxBatchSize int, ) (*Server, error) { if cache == nil { cache = &NoopRPCCache{} @@ -92,6 +94,10 @@ func NewServer( maxUpstreamBatchSize = defaultMaxUpstreamBatchSize } + if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit { + maxBatchSize = MaxBatchRPCCallsHardLimit + } + var mainLim limiter.Store limExemptOrigins := make(map[string]bool) limExemptUserAgents := make(map[string]bool) @@ -139,6 +145,7 @@ func NewServer( cache: cache, enableRequestLog: enableRequestLog, maxRequestBodyLogLen: maxRequestBodyLogLen, + maxBatchSize: maxBatchSize, upgrader: &websocket.Upgrader{ HandshakeTimeout: 5 * time.Second, }, @@ -244,12 +251,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { } if isLimited("") { - rpcErr := ErrOverRateLimit - if s.limConfig.ErrorMessage != "" { - rpcErr = ErrOverRateLimit.Clone() - rpcErr.Message = s.limConfig.ErrorMessage - } - RecordRPCError(ctx, BackendProxyd, "unknown", rpcErr) + RecordRPCError(ctx, BackendProxyd, "unknown", ErrOverRateLimit) log.Warn( "rate limited request", "req_id", GetReqID(ctx), @@ -258,7 +260,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { "origin", origin, "remote_ip", xff, ) - writeRPCError(ctx, w, nil, rpcErr) + writeRPCError(ctx, w, nil, ErrOverRateLimit) return } @@ -296,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - if len(reqs) > MaxBatchRPCCalls { + RecordBatchSize(len(reqs)) + + if len(reqs) > s.maxBatchSize { RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests) writeRPCError(ctx, w, nil, ErrTooManyBatchRequests) return @@ -394,13 +398,8 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL "req_id", GetReqID(ctx), "method", parsedReq.Method, ) - rpcErr := ErrOverRateLimit - if s.limConfig.ErrorMessage != "" { - rpcErr = rpcErr.Clone() - rpcErr.Message = s.limConfig.ErrorMessage - } - RecordRPCError(ctx, BackendProxyd, parsedReq.Method, rpcErr) - responses[i] = NewRPCErrorRes(parsedReq.ID, rpcErr) + RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit) + responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit) continue }