Merge pull request #7754 from ethereum-optimism/felipe/avoid-caching-empty-get-raw-receipts

feat(proxyd): avoid caching debug_getRawReceipts responses with 0 receipts
This commit is contained in:
felipe andrade 2023-10-19 20:50:30 +00:00 committed by GitHub
commit 2e2259a0d9
13 changed files with 114 additions and 22 deletions

@ -665,9 +665,9 @@ type BackendGroup struct {
Consensus *ConsensusPoller Consensus *ConsensusPoller
} }
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
if len(rpcReqs) == 0 { if len(rpcReqs) == 0 {
return nil, nil return nil, "", nil
} }
backends := bg.Backends backends := bg.Backends
@ -731,7 +731,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) || errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
errors.Is(err, ErrMethodNotWhitelisted) { errors.Is(err, ErrMethodNotWhitelisted) {
return nil, err return nil, "", err
} }
if errors.Is(err, ErrBackendOffline) { if errors.Is(err, ErrBackendOffline) {
log.Warn( log.Warn(
@ -773,11 +773,12 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
} }
} }
return res, nil servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name)
return res, servedBy, nil
} }
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP) RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
return nil, ErrNoBackends return nil, "", ErrNoBackends
} }
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {

@ -128,7 +128,7 @@ type rpcCache struct {
func newRPCCache(cache Cache) RPCCache { func newRPCCache(cache Cache) RPCCache {
staticHandler := &StaticMethodHandler{cache: cache} staticHandler := &StaticMethodHandler{cache: cache}
debugGetRawReceiptsHandler := &StaticMethodHandler{cache: cache, debugGetRawReceiptsHandler := &StaticMethodHandler{cache: cache,
filter: func(req *RPCReq) bool { filterGet: func(req *RPCReq) bool {
// cache only if the request is for a block hash // cache only if the request is for a block hash
var p []rpc.BlockNumberOrHash var p []rpc.BlockNumberOrHash
@ -141,6 +141,14 @@ func newRPCCache(cache Cache) RPCCache {
} }
return p[0].BlockHash != nil return p[0].BlockHash != nil
}, },
filterPut: func(req *RPCReq, res *RPCRes) bool {
// don't cache if response contains 0 receipts
rawReceipts, ok := res.Result.([]interface{})
if !ok {
return false
}
return len(rawReceipts) > 0
},
} }
handlers := map[string]RPCMethodHandler{ handlers := map[string]RPCMethodHandler{
"eth_chainId": staticHandler, "eth_chainId": staticHandler,

@ -110,7 +110,7 @@ func TestRPCCacheImmutableRPCs(t *testing.T) {
}, },
res: &RPCRes{ res: &RPCRes{
JSONRPC: "2.0", JSONRPC: "2.0",
Result: `{"debug_getRawReceipts":"!"}`, Result: []interface{}{"a"},
ID: ID, ID: ID,
}, },
name: "debug_getRawReceipts", name: "debug_getRawReceipts",

@ -24,6 +24,8 @@ type ServerConfig struct {
EnableRequestLog bool `toml:"enable_request_log"` EnableRequestLog bool `toml:"enable_request_log"`
MaxRequestBodyLogLen int `toml:"max_request_body_log_len"` MaxRequestBodyLogLen int `toml:"max_request_body_log_len"`
EnableXServedByHeader bool `toml:"enable_served_by_header"`
} }
type CacheConfig struct { type CacheConfig struct {

@ -27,6 +27,7 @@ func TestCaching(t *testing.T) {
hdlr.SetRoute("eth_getTransactionByBlockHashAndIndex", "999", "eth_getTransactionByBlockHashAndIndex") hdlr.SetRoute("eth_getTransactionByBlockHashAndIndex", "999", "eth_getTransactionByBlockHashAndIndex")
hdlr.SetRoute("eth_getUncleByBlockHashAndIndex", "999", "eth_getUncleByBlockHashAndIndex") hdlr.SetRoute("eth_getUncleByBlockHashAndIndex", "999", "eth_getUncleByBlockHashAndIndex")
hdlr.SetRoute("eth_getTransactionReceipt", "999", "eth_getTransactionReceipt") hdlr.SetRoute("eth_getTransactionReceipt", "999", "eth_getTransactionReceipt")
hdlr.SetRoute("debug_getRawReceipts", "999", "debug_getRawReceipts")
/* not cacheable */ /* not cacheable */
hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber") hdlr.SetRoute("eth_getBlockByNumber", "999", "eth_getBlockByNumber")
hdlr.SetRoute("eth_blockNumber", "999", "eth_blockNumber") hdlr.SetRoute("eth_blockNumber", "999", "eth_blockNumber")
@ -180,6 +181,30 @@ func TestCaching(t *testing.T) {
RequireEqualJSON(t, resRaw, resCache) RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash")) require.Equal(t, 2, countRequests(backend, "eth_getBlockByHash"))
}) })
t.Run("debug_getRawReceipts with 0 receipts should not be cached", func(t *testing.T) {
backend.Reset()
hdlr.SetRoute("debug_getRawReceipts", "999", []string{})
resRaw, _, err := client.SendRPC("debug_getRawReceipts", []interface{}{"0x88420081ab9c6d50dc57af36b541c6b8a7b3e9c0d837b0414512c4c5883560ff"})
require.NoError(t, err)
resCache, _, err := client.SendRPC("debug_getRawReceipts", []interface{}{"0x88420081ab9c6d50dc57af36b541c6b8a7b3e9c0d837b0414512c4c5883560ff"})
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":[]}"), resRaw)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 2, countRequests(backend, "debug_getRawReceipts"))
})
t.Run("debug_getRawReceipts with more than 0 receipts should be cached", func(t *testing.T) {
backend.Reset()
hdlr.SetRoute("debug_getRawReceipts", "999", []string{"a"})
resRaw, _, err := client.SendRPC("debug_getRawReceipts", []interface{}{"0x88420081ab9c6d50dc57af36b541c6b8a7b3e9c0d837b0414512c4c5883560bb"})
require.NoError(t, err)
resCache, _, err := client.SendRPC("debug_getRawReceipts", []interface{}{"0x88420081ab9c6d50dc57af36b541c6b8a7b3e9c0d837b0414512c4c5883560bb"})
require.NoError(t, err)
RequireEqualJSON(t, []byte("{\"id\":999,\"jsonrpc\":\"2.0\",\"result\":[\"a\"]}"), resRaw)
RequireEqualJSON(t, resRaw, resCache)
require.Equal(t, 1, countRequests(backend, "debug_getRawReceipts"))
})
} }
func TestBatchCaching(t *testing.T) { func TestBatchCaching(t *testing.T) {

@ -77,6 +77,7 @@ func (h *BatchRPCResponseRouter) SetRoute(method string, id string, result inter
switch result.(type) { switch result.(type) {
case string: case string:
case []string:
case nil: case nil:
break break
default: default:

@ -33,3 +33,4 @@ eth_getTransactionByHash = "main"
eth_getTransactionByBlockHashAndIndex = "main" eth_getTransactionByBlockHashAndIndex = "main"
eth_getUncleByBlockHashAndIndex = "main" eth_getUncleByBlockHashAndIndex = "main"
eth_getTransactionReceipt = "main" eth_getTransactionReceipt = "main"
debug_getRawReceipts = "main"

@ -185,6 +185,27 @@
} }
} }
- method: debug_getRawReceipts - method: debug_getRawReceipts
block: 0x55
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: debug_getRawReceipts
block: 0x101
response: > response: >
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",

@ -17,9 +17,10 @@ type RPCMethodHandler interface {
} }
type StaticMethodHandler struct { type StaticMethodHandler struct {
cache Cache cache Cache
m sync.RWMutex m sync.RWMutex
filter func(*RPCReq) bool filterGet func(*RPCReq) bool
filterPut func(*RPCReq, *RPCRes) bool
} }
func (e *StaticMethodHandler) key(req *RPCReq) string { func (e *StaticMethodHandler) key(req *RPCReq) string {
@ -34,7 +35,7 @@ func (e *StaticMethodHandler) GetRPCMethod(ctx context.Context, req *RPCReq) (*R
if e.cache == nil { if e.cache == nil {
return nil, nil return nil, nil
} }
if e.filter != nil && !e.filter(req) { if e.filterGet != nil && !e.filterGet(req) {
return nil, nil return nil, nil
} }
@ -67,7 +68,12 @@ func (e *StaticMethodHandler) PutRPCMethod(ctx context.Context, req *RPCReq, res
if e.cache == nil { if e.cache == nil {
return nil return nil
} }
if e.filter != nil && !e.filter(req) { // if there is a filter on get, we don't want to cache it because its irretrievable
if e.filterGet != nil && !e.filterGet(req) {
return nil
}
// response filter
if e.filterPut != nil && !e.filterPut(req, res) {
return nil return nil
} }

@ -235,6 +235,7 @@ func Start(config *Config) (*Server, func(), error) {
resolvedAuth, resolvedAuth,
secondsToDuration(config.Server.TimeoutSeconds), secondsToDuration(config.Server.TimeoutSeconds),
config.Server.MaxUpstreamBatchSize, config.Server.MaxUpstreamBatchSize,
config.Server.EnableXServedByHeader,
rpcCache, rpcCache,
config.RateLimit, config.RateLimit,
config.SenderRateLimit, config.SenderRateLimit,

@ -60,6 +60,7 @@ type Server struct {
timeout time.Duration timeout time.Duration
maxUpstreamBatchSize int maxUpstreamBatchSize int
maxBatchSize int maxBatchSize int
enableServedByHeader bool
upgrader *websocket.Upgrader upgrader *websocket.Upgrader
mainLim FrontendRateLimiter mainLim FrontendRateLimiter
overrideLims map[string]FrontendRateLimiter overrideLims map[string]FrontendRateLimiter
@ -85,6 +86,7 @@ func NewServer(
authenticatedPaths map[string]string, authenticatedPaths map[string]string,
timeout time.Duration, timeout time.Duration,
maxUpstreamBatchSize int, maxUpstreamBatchSize int,
enableServedByHeader bool,
cache RPCCache, cache RPCCache,
rateLimitConfig RateLimitConfig, rateLimitConfig RateLimitConfig,
senderRateLimitConfig SenderRateLimitConfig, senderRateLimitConfig SenderRateLimitConfig,
@ -175,6 +177,7 @@ func NewServer(
authenticatedPaths: authenticatedPaths, authenticatedPaths: authenticatedPaths,
timeout: timeout, timeout: timeout,
maxUpstreamBatchSize: maxUpstreamBatchSize, maxUpstreamBatchSize: maxUpstreamBatchSize,
enableServedByHeader: enableServedByHeader,
cache: cache, cache: cache,
enableRequestLog: enableRequestLog, enableRequestLog: enableRequestLog,
maxRequestBodyLogLen: maxRequestBodyLogLen, maxRequestBodyLogLen: maxRequestBodyLogLen,
@ -354,7 +357,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
return return
} }
batchRes, batchContainsCached, err := s.handleBatchRPC(ctx, reqs, isLimited, true) batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true)
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
writeRPCError(ctx, w, nil, ErrGatewayTimeout) writeRPCError(ctx, w, nil, ErrGatewayTimeout)
return return
@ -368,14 +371,16 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
writeRPCError(ctx, w, nil, ErrInternal) writeRPCError(ctx, w, nil, ErrInternal)
return return
} }
if s.enableServedByHeader {
w.Header().Set("x-served-by", servedBy)
}
setCacheHeader(w, batchContainsCached) setCacheHeader(w, batchContainsCached)
writeBatchRPCRes(ctx, w, batchRes) writeBatchRPCRes(ctx, w, batchRes)
return return
} }
rawBody := json.RawMessage(body) rawBody := json.RawMessage(body)
backendRes, cached, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false) backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false)
if err != nil { if err != nil {
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
@ -385,11 +390,14 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
writeRPCError(ctx, w, nil, ErrInternal) writeRPCError(ctx, w, nil, ErrInternal)
return return
} }
if s.enableServedByHeader {
w.Header().Set("x-served-by", servedBy)
}
setCacheHeader(w, cached) setCacheHeader(w, cached)
writeRPCRes(ctx, w, backendRes[0]) writeRPCRes(ctx, w, backendRes[0])
} }
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, error) { func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
// A request set is transformed into groups of batches. // A request set is transformed into groups of batches.
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints) // Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's // A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
@ -475,6 +483,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i}) batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i})
} }
servedBy := make(map[string]bool, 0)
var cached bool var cached bool
for group, batch := range batches { for group, batch := range batches {
var cacheMisses []batchElem var cacheMisses []batchElem
@ -499,17 +508,18 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
"batch_index", i, "batch_index", i,
) )
batchRPCShortCircuitsTotal.Inc() batchRPCShortCircuitsTotal.Inc()
return nil, false, context.DeadlineExceeded return nil, false, "", context.DeadlineExceeded
} }
start := i * s.maxUpstreamBatchSize start := i * s.maxUpstreamBatchSize
end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses))))
elems := cacheMisses[start:end] elems := cacheMisses[start:end]
res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
servedBy[sb] = true
if err != nil { if err != nil {
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
return nil, false, err return nil, false, "", err
} }
log.Error( log.Error(
"error forwarding RPC batch", "error forwarding RPC batch",
@ -541,7 +551,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
} }
} }
return responses, cached, nil servedByString := ""
for sb, _ := range servedBy {
if servedByString != "" {
servedByString += ", "
}
servedByString += sb
}
return responses, cached, servedByString, nil
} }
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {

@ -77,7 +77,7 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
for _, r := range requests { for _, r := range requests {
method := r["method"] method := r["method"]
block := "" block := ""
if method == "eth_getBlockByNumber" { if method == "eth_getBlockByNumber" || method == "debug_getRawReceipts" {
block = (r["params"].([]interface{})[0]).(string) block = (r["params"].([]interface{})[0]).(string)
} }

File diff suppressed because one or more lines are too long