Merge pull request #3602 from ethereum-optimism/develop

Develop -> Master
This commit is contained in:
Matthew Slipper 2022-09-28 19:34:57 -06:00 committed by GitHub
commit 3638b5575b
10 changed files with 121 additions and 50 deletions

@ -95,19 +95,26 @@ type BackendGroupsConfig map[string]*BackendGroupConfig
type MethodMappingsConfig map[string]string type MethodMappingsConfig map[string]string
type BatchConfig struct {
MaxSize int `toml:"max_size"`
ErrorMessage string `toml:"error_message"`
}
type Config struct { type Config struct {
WSBackendGroup string `toml:"ws_backend_group"` WSBackendGroup string `toml:"ws_backend_group"`
Server ServerConfig `toml:"server"` Server ServerConfig `toml:"server"`
Cache CacheConfig `toml:"cache"` Cache CacheConfig `toml:"cache"`
Redis RedisConfig `toml:"redis"` Redis RedisConfig `toml:"redis"`
Metrics MetricsConfig `toml:"metrics"` Metrics MetricsConfig `toml:"metrics"`
RateLimit RateLimitConfig `toml:"rate_limit"` RateLimit RateLimitConfig `toml:"rate_limit"`
BackendOptions BackendOptions `toml:"backend"` BackendOptions BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"` Backends BackendsConfig `toml:"backends"`
Authentication map[string]string `toml:"authentication"` BatchConfig BatchConfig `toml:"batch"`
BackendGroups BackendGroupsConfig `toml:"backend_groups"` Authentication map[string]string `toml:"authentication"`
RPCMethodMappings map[string]string `toml:"rpc_method_mappings"` BackendGroups BackendGroupsConfig `toml:"backend_groups"`
WSMethodWhitelist []string `toml:"ws_method_whitelist"` RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
WSMethodWhitelist []string `toml:"ws_method_whitelist"`
WhitelistErrorMessage string `toml:"whitelist_error_message"`
} }
func ReadFromEnvOrConfig(value string) (string, error) { func ReadFromEnvOrConfig(value string) (string, error) {

@ -33,13 +33,13 @@ func TestBatching(t *testing.T) {
callMock1 := mockResult{"eth_call", "1", "ekans1"} callMock1 := mockResult{"eth_call", "1", "ekans1"}
tests := []struct { tests := []struct {
name string name string
handler http.Handler handler http.Handler
mocks []mockResult mocks []mockResult
reqs []*proxyd.RPCReq reqs []*proxyd.RPCReq
expectedRes string expectedRes string
maxBatchSize int maxUpstreamBatchSize int
numExpectedForwards int numExpectedForwards int
}{ }{
{ {
name: "backend returns batches out of order", name: "backend returns batches out of order",
@ -49,9 +49,9 @@ func TestBatching(t *testing.T) {
NewRPCReq("2", "eth_chainId", nil), NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("3", "eth_chainId", nil), NewRPCReq("3", "eth_chainId", nil),
}, },
expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3), expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3),
maxBatchSize: 2, maxUpstreamBatchSize: 2,
numExpectedForwards: 2, numExpectedForwards: 2,
}, },
{ {
// infura behavior // infura behavior
@ -65,8 +65,8 @@ func TestBatching(t *testing.T) {
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
), ),
maxBatchSize: 10, maxUpstreamBatchSize: 10,
numExpectedForwards: 1, numExpectedForwards: 1,
}, },
{ {
name: "backend returns single RPC response object for minibatches", name: "backend returns single RPC response object for minibatches",
@ -79,8 +79,8 @@ func TestBatching(t *testing.T) {
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`, `{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
), ),
maxBatchSize: 1, maxUpstreamBatchSize: 1,
numExpectedForwards: 2, numExpectedForwards: 2,
}, },
{ {
name: "duplicate request ids are on distinct batches", name: "duplicate request ids are on distinct batches",
@ -96,9 +96,24 @@ func TestBatching(t *testing.T) {
NewRPCReq("1", "eth_chainId", nil), NewRPCReq("1", "eth_chainId", nil),
NewRPCReq("1", "eth_call", nil), NewRPCReq("1", "eth_call", nil),
}, },
expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1), expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1),
maxBatchSize: 2, maxUpstreamBatchSize: 2,
numExpectedForwards: 3, numExpectedForwards: 3,
},
{
name: "over max size",
mocks: []mockResult{},
reqs: []*proxyd.RPCReq{
NewRPCReq("1", "net_version", nil),
NewRPCReq("2", "eth_chainId", nil),
NewRPCReq("3", "eth_chainId", nil),
NewRPCReq("4", "eth_call", nil),
NewRPCReq("5", "eth_call", nil),
NewRPCReq("6", "eth_call", nil),
},
expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}",
maxUpstreamBatchSize: 2,
numExpectedForwards: 0,
}, },
{ {
name: "eth_accounts does not get forwarded", name: "eth_accounts does not get forwarded",
@ -109,15 +124,15 @@ func TestBatching(t *testing.T) {
NewRPCReq("1", "eth_call", nil), NewRPCReq("1", "eth_call", nil),
NewRPCReq("2", "eth_accounts", nil), NewRPCReq("2", "eth_accounts", nil),
}, },
expectedRes: asArray(callResponse1, ethAccountsResponse2), expectedRes: asArray(callResponse1, ethAccountsResponse2),
maxBatchSize: 2, maxUpstreamBatchSize: 2,
numExpectedForwards: 1, numExpectedForwards: 1,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
config.Server.MaxUpstreamBatchSize = tt.maxBatchSize config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize
handler := tt.handler handler := tt.handler
if handler == nil { if handler == nil {

@ -17,3 +17,7 @@ backends = ["good"]
eth_chainId = "main" eth_chainId = "main"
net_version = "main" net_version = "main"
eth_call = "main" eth_call = "main"
[batch]
error_message = "over batch size custom message"
max_size = 5

@ -1,3 +1,5 @@
whitelist_error_message = "rpc method is not whitelisted custom message"
[server] [server]
rpc_port = 8545 rpc_port = 8545

@ -1,3 +1,5 @@
whitelist_error_message = "rpc method is not whitelisted"
ws_backend_group = "main" ws_backend_group = "main"
ws_method_whitelist = [ ws_method_whitelist = [

@ -10,7 +10,7 @@ import (
) )
const ( const (
notWhitelistedResponse = `{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted"},"id":999}` notWhitelistedResponse = `{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted custom message"},"id":999}`
parseErrResponse = `{"jsonrpc":"2.0","error":{"code":-32700,"message":"parse error"},"id":null}` parseErrResponse = `{"jsonrpc":"2.0","error":{"code":-32700,"message":"parse error"},"id":null}`
invalidJSONRPCVersionResponse = `{"error":{"code":-32601,"message":"invalid JSON-RPC version"},"id":null,"jsonrpc":"2.0"}` invalidJSONRPCVersionResponse = `{"error":{"code":-32601,"message":"invalid JSON-RPC version"},"id":null,"jsonrpc":"2.0"}`
invalidIDResponse = `{"error":{"code":-32601,"message":"invalid ID"},"id":null,"jsonrpc":"2.0"}` invalidIDResponse = `{"error":{"code":-32601,"message":"invalid ID"},"id":null,"jsonrpc":"2.0"}`

@ -7,6 +7,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum-optimism/optimism/proxyd" "github.com/ethereum-optimism/optimism/proxyd"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -42,6 +44,13 @@ func TestConcurrentWSPanic(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer shutdown() defer shutdown()
// suppress tons of log messages
oldHandler := log.Root().GetHandler()
log.Root().SetHandler(log.DiscardHandler())
defer func() {
log.Root().SetHandler(oldHandler)
}()
<-readyCh <-readyCh
var wg sync.WaitGroup var wg sync.WaitGroup

@ -222,6 +222,20 @@ var (
}, []string{ }, []string{
"backend_name", "backend_name",
}) })
batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "batch_size_summary",
Help: "Summary of batch sizes",
Buckets: []float64{
1,
5,
10,
25,
50,
100,
},
})
) )
func RecordRedisError(source string) { func RecordRedisError(source string) {
@ -278,3 +292,7 @@ func RecordCacheHit(method string) {
func RecordCacheMiss(method string) { func RecordCacheMiss(method string) {
cacheMissesTotal.WithLabelValues(method).Inc() cacheMissesTotal.WithLabelValues(method).Inc()
} }
func RecordBatchSize(size int) {
batchSizeHistogram.Observe(float64(size))
}

@ -55,6 +55,20 @@ func Start(config *Config) (func(), error) {
} }
} }
// While modifying shared globals is a bad practice, the alternative
// is to clone these errors on every invocation. This is inefficient.
// We'd also have to make sure that errors.Is and errors.As continue
// to function properly on the cloned errors.
if config.RateLimit.ErrorMessage != "" {
ErrOverRateLimit.Message = config.RateLimit.ErrorMessage
}
if config.WhitelistErrorMessage != "" {
ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage
}
if config.BatchConfig.ErrorMessage != "" {
ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage
}
maxConcurrentRPCs := config.Server.MaxConcurrentRPCs maxConcurrentRPCs := config.Server.MaxConcurrentRPCs
if maxConcurrentRPCs == 0 { if maxConcurrentRPCs == 0 {
maxConcurrentRPCs = math.MaxInt64 maxConcurrentRPCs = math.MaxInt64
@ -225,6 +239,7 @@ func Start(config *Config) (func(), error) {
config.RateLimit, config.RateLimit,
config.Server.EnableRequestLog, config.Server.EnableRequestLog,
config.Server.MaxRequestBodyLogLen, config.Server.MaxRequestBodyLogLen,
config.BatchConfig.MaxSize,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating server: %w", err) return nil, fmt.Errorf("error creating server: %w", err)

@ -28,7 +28,7 @@ const (
ContextKeyAuth = "authorization" ContextKeyAuth = "authorization"
ContextKeyReqID = "req_id" ContextKeyReqID = "req_id"
ContextKeyXForwardedFor = "x_forwarded_for" ContextKeyXForwardedFor = "x_forwarded_for"
MaxBatchRPCCalls = 100 MaxBatchRPCCallsHardLimit = 100
cacheStatusHdr = "X-Proxyd-Cache-Status" cacheStatusHdr = "X-Proxyd-Cache-Status"
defaultServerTimeout = time.Second * 10 defaultServerTimeout = time.Second * 10
maxRequestBodyLogLen = 2000 maxRequestBodyLogLen = 2000
@ -48,6 +48,7 @@ type Server struct {
authenticatedPaths map[string]string authenticatedPaths map[string]string
timeout time.Duration timeout time.Duration
maxUpstreamBatchSize int maxUpstreamBatchSize int
maxBatchSize int
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
mainLim limiter.Store mainLim limiter.Store
overrideLims map[string]limiter.Store overrideLims map[string]limiter.Store
@ -75,6 +76,7 @@ func NewServer(
rateLimitConfig RateLimitConfig, rateLimitConfig RateLimitConfig,
enableRequestLog bool, enableRequestLog bool,
maxRequestBodyLogLen int, maxRequestBodyLogLen int,
maxBatchSize int,
) (*Server, error) { ) (*Server, error) {
if cache == nil { if cache == nil {
cache = &NoopRPCCache{} cache = &NoopRPCCache{}
@ -92,6 +94,10 @@ func NewServer(
maxUpstreamBatchSize = defaultMaxUpstreamBatchSize maxUpstreamBatchSize = defaultMaxUpstreamBatchSize
} }
if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit {
maxBatchSize = MaxBatchRPCCallsHardLimit
}
var mainLim limiter.Store var mainLim limiter.Store
limExemptOrigins := make(map[string]bool) limExemptOrigins := make(map[string]bool)
limExemptUserAgents := make(map[string]bool) limExemptUserAgents := make(map[string]bool)
@ -139,6 +145,7 @@ func NewServer(
cache: cache, cache: cache,
enableRequestLog: enableRequestLog, enableRequestLog: enableRequestLog,
maxRequestBodyLogLen: maxRequestBodyLogLen, maxRequestBodyLogLen: maxRequestBodyLogLen,
maxBatchSize: maxBatchSize,
upgrader: &websocket.Upgrader{ upgrader: &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second, HandshakeTimeout: 5 * time.Second,
}, },
@ -244,12 +251,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
} }
if isLimited("") { if isLimited("") {
rpcErr := ErrOverRateLimit RecordRPCError(ctx, BackendProxyd, "unknown", ErrOverRateLimit)
if s.limConfig.ErrorMessage != "" {
rpcErr = ErrOverRateLimit.Clone()
rpcErr.Message = s.limConfig.ErrorMessage
}
RecordRPCError(ctx, BackendProxyd, "unknown", rpcErr)
log.Warn( log.Warn(
"rate limited request", "rate limited request",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
@ -258,7 +260,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
"origin", origin, "origin", origin,
"remote_ip", xff, "remote_ip", xff,
) )
writeRPCError(ctx, w, nil, rpcErr) writeRPCError(ctx, w, nil, ErrOverRateLimit)
return return
} }
@ -296,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
if len(reqs) > MaxBatchRPCCalls { RecordBatchSize(len(reqs))
if len(reqs) > s.maxBatchSize {
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests) RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests)
writeRPCError(ctx, w, nil, ErrTooManyBatchRequests) writeRPCError(ctx, w, nil, ErrTooManyBatchRequests)
return return
@ -394,13 +398,8 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"method", parsedReq.Method, "method", parsedReq.Method,
) )
rpcErr := ErrOverRateLimit RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
if s.limConfig.ErrorMessage != "" { responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
rpcErr = rpcErr.Clone()
rpcErr.Message = s.limConfig.ErrorMessage
}
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, rpcErr)
responses[i] = NewRPCErrorRes(parsedReq.ID, rpcErr)
continue continue
} }