proxyd: Add batch size metric and configurable max (#3545)
* proxyd: Add batch size metric and configurable max The max batch size will be overwritten if it is over `MaxBatchRPCCallsHardLimit`. Builds on https://github.com/ethereum-optimism/optimism/pull/3544. * changeset * fix lint * fix test
This commit is contained in:
parent
485258d3c2
commit
537717610e
@ -95,6 +95,11 @@ type BackendGroupsConfig map[string]*BackendGroupConfig
|
||||
|
||||
type MethodMappingsConfig map[string]string
|
||||
|
||||
type BatchConfig struct {
|
||||
MaxSize int `toml:"max_size"`
|
||||
ErrorMessage string `toml:"error_message"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
WSBackendGroup string `toml:"ws_backend_group"`
|
||||
Server ServerConfig `toml:"server"`
|
||||
@ -104,6 +109,7 @@ type Config struct {
|
||||
RateLimit RateLimitConfig `toml:"rate_limit"`
|
||||
BackendOptions BackendOptions `toml:"backend"`
|
||||
Backends BackendsConfig `toml:"backends"`
|
||||
BatchConfig BatchConfig `toml:"batch"`
|
||||
Authentication map[string]string `toml:"authentication"`
|
||||
BackendGroups BackendGroupsConfig `toml:"backend_groups"`
|
||||
RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
|
||||
|
@ -38,7 +38,7 @@ func TestBatching(t *testing.T) {
|
||||
mocks []mockResult
|
||||
reqs []*proxyd.RPCReq
|
||||
expectedRes string
|
||||
maxBatchSize int
|
||||
maxUpstreamBatchSize int
|
||||
numExpectedForwards int
|
||||
}{
|
||||
{
|
||||
@ -50,7 +50,7 @@ func TestBatching(t *testing.T) {
|
||||
NewRPCReq("3", "eth_chainId", nil),
|
||||
},
|
||||
expectedRes: asArray(chainIDResponse1, chainIDResponse2, chainIDResponse3),
|
||||
maxBatchSize: 2,
|
||||
maxUpstreamBatchSize: 2,
|
||||
numExpectedForwards: 2,
|
||||
},
|
||||
{
|
||||
@ -65,7 +65,7 @@ func TestBatching(t *testing.T) {
|
||||
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
|
||||
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
|
||||
),
|
||||
maxBatchSize: 10,
|
||||
maxUpstreamBatchSize: 10,
|
||||
numExpectedForwards: 1,
|
||||
},
|
||||
{
|
||||
@ -79,7 +79,7 @@ func TestBatching(t *testing.T) {
|
||||
`{"error":{"code":-32011,"message":"no backends available for method"},"id":1,"jsonrpc":"2.0"}`,
|
||||
`{"error":{"code":-32011,"message":"no backends available for method"},"id":2,"jsonrpc":"2.0"}`,
|
||||
),
|
||||
maxBatchSize: 1,
|
||||
maxUpstreamBatchSize: 1,
|
||||
numExpectedForwards: 2,
|
||||
},
|
||||
{
|
||||
@ -97,9 +97,24 @@ func TestBatching(t *testing.T) {
|
||||
NewRPCReq("1", "eth_call", nil),
|
||||
},
|
||||
expectedRes: asArray(netVersionResponse1, chainIDResponse2, chainIDResponse1, callResponse1),
|
||||
maxBatchSize: 2,
|
||||
maxUpstreamBatchSize: 2,
|
||||
numExpectedForwards: 3,
|
||||
},
|
||||
{
|
||||
name: "over max size",
|
||||
mocks: []mockResult{},
|
||||
reqs: []*proxyd.RPCReq{
|
||||
NewRPCReq("1", "net_version", nil),
|
||||
NewRPCReq("2", "eth_chainId", nil),
|
||||
NewRPCReq("3", "eth_chainId", nil),
|
||||
NewRPCReq("4", "eth_call", nil),
|
||||
NewRPCReq("5", "eth_call", nil),
|
||||
NewRPCReq("6", "eth_call", nil),
|
||||
},
|
||||
expectedRes: "{\"error\":{\"code\":-32014,\"message\":\"over batch size custom message\"},\"id\":null,\"jsonrpc\":\"2.0\"}",
|
||||
maxUpstreamBatchSize: 2,
|
||||
numExpectedForwards: 0,
|
||||
},
|
||||
{
|
||||
name: "eth_accounts does not get forwarded",
|
||||
mocks: []mockResult{
|
||||
@ -110,14 +125,14 @@ func TestBatching(t *testing.T) {
|
||||
NewRPCReq("2", "eth_accounts", nil),
|
||||
},
|
||||
expectedRes: asArray(callResponse1, ethAccountsResponse2),
|
||||
maxBatchSize: 2,
|
||||
maxUpstreamBatchSize: 2,
|
||||
numExpectedForwards: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
config.Server.MaxUpstreamBatchSize = tt.maxBatchSize
|
||||
config.Server.MaxUpstreamBatchSize = tt.maxUpstreamBatchSize
|
||||
|
||||
handler := tt.handler
|
||||
if handler == nil {
|
||||
|
@ -17,3 +17,7 @@ backends = ["good"]
|
||||
eth_chainId = "main"
|
||||
net_version = "main"
|
||||
eth_call = "main"
|
||||
|
||||
[batch]
|
||||
error_message = "over batch size custom message"
|
||||
max_size = 5
|
@ -222,6 +222,20 @@ var (
|
||||
}, []string{
|
||||
"backend_name",
|
||||
})
|
||||
|
||||
batchSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: MetricsNamespace,
|
||||
Name: "batch_size_summary",
|
||||
Help: "Summary of batch sizes",
|
||||
Buckets: []float64{
|
||||
1,
|
||||
5,
|
||||
10,
|
||||
25,
|
||||
50,
|
||||
100,
|
||||
},
|
||||
})
|
||||
)
|
||||
|
||||
func RecordRedisError(source string) {
|
||||
@ -278,3 +292,7 @@ func RecordCacheHit(method string) {
|
||||
func RecordCacheMiss(method string) {
|
||||
cacheMissesTotal.WithLabelValues(method).Inc()
|
||||
}
|
||||
|
||||
func RecordBatchSize(size int) {
|
||||
batchSizeHistogram.Observe(float64(size))
|
||||
}
|
||||
|
@ -65,6 +65,9 @@ func Start(config *Config) (func(), error) {
|
||||
if config.WhitelistErrorMessage != "" {
|
||||
ErrMethodNotWhitelisted.Message = config.WhitelistErrorMessage
|
||||
}
|
||||
if config.BatchConfig.ErrorMessage != "" {
|
||||
ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage
|
||||
}
|
||||
|
||||
maxConcurrentRPCs := config.Server.MaxConcurrentRPCs
|
||||
if maxConcurrentRPCs == 0 {
|
||||
@ -236,6 +239,7 @@ func Start(config *Config) (func(), error) {
|
||||
config.RateLimit,
|
||||
config.Server.EnableRequestLog,
|
||||
config.Server.MaxRequestBodyLogLen,
|
||||
config.BatchConfig.MaxSize,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating server: %w", err)
|
||||
|
@ -28,7 +28,7 @@ const (
|
||||
ContextKeyAuth = "authorization"
|
||||
ContextKeyReqID = "req_id"
|
||||
ContextKeyXForwardedFor = "x_forwarded_for"
|
||||
MaxBatchRPCCalls = 100
|
||||
MaxBatchRPCCallsHardLimit = 100
|
||||
cacheStatusHdr = "X-Proxyd-Cache-Status"
|
||||
defaultServerTimeout = time.Second * 10
|
||||
maxRequestBodyLogLen = 2000
|
||||
@ -48,6 +48,7 @@ type Server struct {
|
||||
authenticatedPaths map[string]string
|
||||
timeout time.Duration
|
||||
maxUpstreamBatchSize int
|
||||
maxBatchSize int
|
||||
upgrader *websocket.Upgrader
|
||||
mainLim limiter.Store
|
||||
overrideLims map[string]limiter.Store
|
||||
@ -75,6 +76,7 @@ func NewServer(
|
||||
rateLimitConfig RateLimitConfig,
|
||||
enableRequestLog bool,
|
||||
maxRequestBodyLogLen int,
|
||||
maxBatchSize int,
|
||||
) (*Server, error) {
|
||||
if cache == nil {
|
||||
cache = &NoopRPCCache{}
|
||||
@ -92,6 +94,10 @@ func NewServer(
|
||||
maxUpstreamBatchSize = defaultMaxUpstreamBatchSize
|
||||
}
|
||||
|
||||
if maxBatchSize == 0 || maxBatchSize > MaxBatchRPCCallsHardLimit {
|
||||
maxBatchSize = MaxBatchRPCCallsHardLimit
|
||||
}
|
||||
|
||||
var mainLim limiter.Store
|
||||
limExemptOrigins := make(map[string]bool)
|
||||
limExemptUserAgents := make(map[string]bool)
|
||||
@ -139,6 +145,7 @@ func NewServer(
|
||||
cache: cache,
|
||||
enableRequestLog: enableRequestLog,
|
||||
maxRequestBodyLogLen: maxRequestBodyLogLen,
|
||||
maxBatchSize: maxBatchSize,
|
||||
upgrader: &websocket.Upgrader{
|
||||
HandshakeTimeout: 5 * time.Second,
|
||||
},
|
||||
@ -291,7 +298,9 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(reqs) > MaxBatchRPCCalls {
|
||||
RecordBatchSize(len(reqs))
|
||||
|
||||
if len(reqs) > s.maxBatchSize {
|
||||
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrTooManyBatchRequests)
|
||||
writeRPCError(ctx, w, nil, ErrTooManyBatchRequests)
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user