feat(proxyd): add consensus_getReceipts meta method

This commit is contained in:
Felipe Andrade 2023-06-01 13:16:40 -07:00
parent 0ad110cbd4
commit 33881542a9
13 changed files with 458 additions and 27 deletions

@ -89,6 +89,59 @@ Cache use Redis and can be enabled for the following immutable methods:
* `eth_getUncleByBlockHashAndIndex` * `eth_getUncleByBlockHashAndIndex`
* `debug_getRawReceipts` (block hash only) * `debug_getRawReceipts` (block hash only)
## Meta method `consensus_getReceipts`
To support backends with different specifications in the same backend group,
proxyd exposes a convenient method to fetch receipts abstracting away
what specific backend will serve the request.
Each backend can specify their preferred method to fetch receipts with `consensus_receipts_target`.
This method takes **both** the blockNumberOrHash **and** list of transaction hashes to fetch the receipts,
and then after selecting the backend to serve the request,
it translates to the correct target with the appropriate parameters.
Note that only one of the parameters will be actually used depending on the target.
Request params
```json
{
"jsonrpc":"2.0",
"id": 1,
"params": {
"blockNumberOrHash": "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b",
"transactions": [
"0xe670ec64341771606e55d6b4ca35a1a6b75ee3d5145a99d05921026d1527331",
"0x88df016429689c079f3b2f6ad39fa052532c56795b733da78a91ebe6a713944b"
]
}
}
```
It currently supports translation to the following targets:
* `debug_getRawReceipts(blockOrHash)` (default)
* `alchemy_getTransactionReceipts(blockOrHash)`
* `eth_getTransactionReceipt(txHash)` batched
The selected target is returned in the response, in a wrapped result.
Response
```json
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"method": "eth_getTransactionReceipt",
"result": {
// the actual raw result from backend
}
}
}
```
See [op-node receipt fetcher](https://github.com/ethereum-optimism/optimism/blob/186e46a47647a51a658e699e9ff047d39444c2de/op-node/sources/receipts.go#L186-L253).
## Metrics ## Metrics
See `metrics.go` for a list of all available metrics. See `metrics.go` for a list of all available metrics.

@ -7,6 +7,9 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/google/uuid"
"io" "io"
"math" "math"
"math/rand" "math/rand"
@ -97,6 +100,8 @@ var (
} }
ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response") ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
ErrConsensusGetReceiptsCantBeBatched = errors.New("consensus_getReceipts cannot be batched")
) )
func ErrInvalidRequest(msg string) *RPCErr { func ErrInvalidRequest(msg string) *RPCErr {
@ -118,6 +123,7 @@ func ErrInvalidParams(msg string) *RPCErr {
type Backend struct { type Backend struct {
Name string Name string
rpcURL string rpcURL string
receiptsTarget string
wsURL string wsURL string
authUsername string authUsername string
authPassword string authPassword string
@ -208,7 +214,7 @@ func WithProxydIP(ip string) BackendOpt {
} }
} }
func WithSkipPeerCountCheck(skipPeerCountCheck bool) BackendOpt { func WithConsensusSkipPeerCountCheck(skipPeerCountCheck bool) BackendOpt {
return func(b *Backend) { return func(b *Backend) {
b.skipPeerCountCheck = skipPeerCountCheck b.skipPeerCountCheck = skipPeerCountCheck
} }
@ -232,12 +238,33 @@ func WithMaxErrorRateThreshold(maxErrorRateThreshold float64) BackendOpt {
} }
} }
func WithConsensusReceiptTarget(receiptsTarget string) BackendOpt {
return func(b *Backend) {
b.receiptsTarget = receiptsTarget
}
}
type indexedReqRes struct { type indexedReqRes struct {
index int index int
req *RPCReq req *RPCReq
res *RPCRes res *RPCRes
} }
const ConsensusGetReceiptsMethod = "consensus_getReceipts"
const ReceiptsTargetEthTransactionReceipt = "eth_getTransactionReceipt"
const ReceiptsTargetDebugGetRawReceipts = "debug_getRawReceipts"
const ReceiptsTargetGetTransactionReceipts = "alchemy_getTransactionReceipts"
type ConsensusGetReceiptsReq struct {
BlockOrHash *rpc.BlockNumberOrHash `json:"blockOrHash"`
Transactions []common.Hash `json:"transactions"`
}
type ConsensusGetReceiptsRes struct {
Method string `json:"method"`
Result interface{} `json:"result"`
}
func NewBackend( func NewBackend(
name string, name string,
rpcURL string, rpcURL string,
@ -266,9 +293,7 @@ func NewBackend(
networkErrorsSlidingWindow: sw.NewSlidingWindow(), networkErrorsSlidingWindow: sw.NewSlidingWindow(),
} }
for _, opt := range opts { backend.Override(opts...)
opt(backend)
}
if !backend.stripTrailingXFF && backend.proxydIP == "" { if !backend.stripTrailingXFF && backend.proxydIP == "" {
log.Warn("proxied requests' XFF header will not contain the proxyd ip address") log.Warn("proxied requests' XFF header will not contain the proxyd ip address")
@ -277,6 +302,12 @@ func NewBackend(
return backend return backend
} }
func (b *Backend) Override(opts ...BackendOpt) {
for _, opt := range opts {
opt(b)
}
}
func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
var lastError error var lastError error
// <= to account for the first attempt not technically being // <= to account for the first attempt not technically being
@ -298,6 +329,13 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
res, err := b.doForward(ctx, reqs, isBatch) res, err := b.doForward(ctx, reqs, isBatch)
switch err { switch err {
case nil: // do nothing case nil: // do nothing
case ErrConsensusGetReceiptsCantBeBatched:
log.Debug(
"Received unsupported batch request for consensus_getReceipts",
"name", b.Name,
"req_id", GetReqID(ctx),
"err", err,
)
// ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object // ErrBackendUnexpectedJSONRPC occurs because infura responds with a single JSON-RPC object
// to a batch request whenever any Request Object in the batch would induce a partial error. // to a batch request whenever any Request Object in the batch would induce a partial error.
// We don't label the backend offline in this case. But the error is still returned to // We don't label the backend offline in this case. But the error is still returned to
@ -375,11 +413,66 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch // we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr() b.networkRequestsSlidingWindow.Incr()
originalRequests := rpcReqs
translatedReqs := make(map[string]*RPCReq, len(rpcReqs))
derivedRequests := make([]*RPCReq, 0, 0)
// translate consensus_getReceipts to receipts target
// right now we only support non-batched
if !isBatch {
for _, rpcReq := range rpcReqs {
if rpcReq.Method == ConsensusGetReceiptsMethod {
translatedReqs[string(rpcReq.ID)] = rpcReq
rpcReq.Method = b.receiptsTarget
var reqParams []ConsensusGetReceiptsReq
err := json.Unmarshal(rpcReq.Params, &reqParams)
if err != nil {
return nil, ErrInvalidRequest("invalid request")
}
bnh := reqParams[0].BlockOrHash
switch b.receiptsTarget {
case ReceiptsTargetDebugGetRawReceipts,
ReceiptsTargetGetTransactionReceipts: // block or hash
params := make([]string, 1)
if bnh.BlockNumber != nil {
params[0] = bnh.BlockNumber.String()
} else {
params[0] = bnh.BlockHash.Hex()
}
rawParams := mustMarshalJSON(params)
rpcReq.Params = rawParams
case ReceiptsTargetEthTransactionReceipt: // list of tx hashes
for _, txHash := range reqParams[0].Transactions {
params := make([]common.Hash, 1)
params[0] = txHash
rawParams := mustMarshalJSON(params)
randomID := mustMarshalJSON(uuid.New().String())
dReq := &RPCReq{
JSONRPC: rpcReq.JSONRPC,
Method: ReceiptsTargetEthTransactionReceipt,
Params: rawParams,
ID: randomID,
}
derivedRequests = append(derivedRequests, dReq)
}
}
}
}
// replace the original request with the derived requests
if len(derivedRequests) > 0 {
rpcReqs = derivedRequests
}
} else {
for _, rpcReq := range rpcReqs {
if rpcReq.Method == ConsensusGetReceiptsMethod {
return nil, ErrConsensusGetReceiptsCantBeBatched
}
}
}
isSingleElementBatch := len(rpcReqs) == 1 isSingleElementBatch := len(rpcReqs) == 1
// Single element batches are unwrapped before being sent // Single element batches are unwrapped before being sent
// since Alchemy handles single requests better than batches. // since Alchemy handles single requests better than batches.
var body []byte var body []byte
if isSingleElementBatch { if isSingleElementBatch {
body = mustMarshalJSON(rpcReqs[0]) body = mustMarshalJSON(rpcReqs[0])
@ -443,17 +536,17 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
return nil, wrapErr(err, "error reading response body") return nil, wrapErr(err, "error reading response body")
} }
var res []*RPCRes var rpcRes []*RPCRes
if isSingleElementBatch { if isSingleElementBatch {
var singleRes RPCRes var singleRes RPCRes
if err := json.Unmarshal(resB, &singleRes); err != nil { if err := json.Unmarshal(resB, &singleRes); err != nil {
return nil, ErrBackendBadResponse return nil, ErrBackendBadResponse
} }
res = []*RPCRes{ rpcRes = []*RPCRes{
&singleRes, &singleRes,
} }
} else { } else {
if err := json.Unmarshal(resB, &res); err != nil { if err := json.Unmarshal(resB, &rpcRes); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) { if responseIsNotBatched(resB) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
@ -466,7 +559,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
} }
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(rpcRes) {
b.networkErrorsSlidingWindow.Incr() b.networkErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendUnexpectedJSONRPC
@ -475,7 +568,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
// capture the HTTP status code in the response. this will only // capture the HTTP status code in the response. this will only
// ever be 400 given the status check on line 318 above. // ever be 400 given the status check on line 318 above.
if httpRes.StatusCode != 200 { if httpRes.StatusCode != 200 {
for _, res := range res { for _, res := range rpcRes {
res.Error.HTTPErrorCode = httpRes.StatusCode res.Error.HTTPErrorCode = httpRes.StatusCode
} }
} }
@ -484,8 +577,38 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg())) RecordBackendNetworkLatencyAverageSlidingWindow(b, time.Duration(b.latencySlidingWindow.Avg()))
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
sortBatchRPCResponse(rpcReqs, res) // enrich the response with the actual request method
return res, nil for _, res := range rpcRes {
translatedReq, exist := translatedReqs[string(res.ID)]
if exist {
res.Result = ConsensusGetReceiptsRes{
Method: translatedReq.Method,
Result: res.Result,
}
}
}
sortBatchRPCResponse(rpcReqs, rpcRes)
// if the translated requests originated derived requests, wrap their results
if len(derivedRequests) > 0 {
results := make([]interface{}, 0, len(rpcRes))
for _, res := range rpcRes {
results = append(results, res.Result)
}
wrappedRes := &RPCRes{
JSONRPC: originalRequests[0].JSONRPC,
Result: ConsensusGetReceiptsRes{
Method: rpcReqs[0].Method,
Result: results,
},
ID: originalRequests[0].ID,
}
rpcRes = []*RPCRes{wrappedRes}
}
return rpcRes, nil
} }
// IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters // IsHealthy checks if the backend is able to serve traffic, based on dynamic parameters
@ -604,6 +727,9 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
if len(rpcReqs) > 0 { if len(rpcReqs) > 0 {
res, err = back.Forward(ctx, rpcReqs, isBatch) res, err = back.Forward(ctx, rpcReqs, isBatch)
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) {
return nil, err
}
if errors.Is(err, ErrMethodNotWhitelisted) { if errors.Is(err, ErrMethodNotWhitelisted) {
return nil, err return nil, err
} }

@ -79,18 +79,20 @@ type BackendOptions struct {
} }
type BackendConfig struct { type BackendConfig struct {
Username string `toml:"username"` Username string `toml:"username"`
Password string `toml:"password"` Password string `toml:"password"`
RPCURL string `toml:"rpc_url"` RPCURL string `toml:"rpc_url"`
WSURL string `toml:"ws_url"` WSURL string `toml:"ws_url"`
WSPort int `toml:"ws_port"` WSPort int `toml:"ws_port"`
MaxRPS int `toml:"max_rps"` MaxRPS int `toml:"max_rps"`
MaxWSConns int `toml:"max_ws_conns"` MaxWSConns int `toml:"max_ws_conns"`
CAFile string `toml:"ca_file"` CAFile string `toml:"ca_file"`
ClientCertFile string `toml:"client_cert_file"` ClientCertFile string `toml:"client_cert_file"`
ClientKeyFile string `toml:"client_key_file"` ClientKeyFile string `toml:"client_key_file"`
StripTrailingXFF bool `toml:"strip_trailing_xff"` StripTrailingXFF bool `toml:"strip_trailing_xff"`
SkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
ConsensusSkipPeerCountCheck bool `toml:"consensus_skip_peer_count"`
ConsensusReceiptsTarget string `toml:"consensus_receipts_target"`
} }
type BackendsConfig map[string]*BackendConfig type BackendsConfig map[string]*BackendConfig

@ -74,7 +74,9 @@ client_cert_file = ""
client_key_file = "" client_key_file = ""
# Allows backends to skip peer count checking, default false # Allows backends to skip peer count checking, default false
# consensus_skip_peer_count = true # consensus_skip_peer_count = true
# Specified the target method to get receipts, default "debug_getRawReceipts"
# See https://github.com/ethereum-optimism/optimism/blob/186e46a47647a51a658e699e9ff047d39444c2de/op-node/sources/receipts.go#L186-L253
consensus_receipts_target = "eth_getBlockReceipts"
[backends.alchemy] [backends.alchemy]
rpc_url = "" rpc_url = ""
@ -83,6 +85,7 @@ username = ""
password = "" password = ""
max_rps = 3 max_rps = 3
max_ws_conns = 1 max_ws_conns = 1
consensus_receipts_target = "alchemy_getTransactionReceipts"
[backend_groups] [backend_groups]
[backend_groups.main] [backend_groups.main]

@ -9,6 +9,7 @@ require (
github.com/ethereum/go-ethereum v1.12.0 github.com/ethereum/go-ethereum v1.12.0
github.com/go-redis/redis/v8 v8.11.4 github.com/go-redis/redis/v8 v8.11.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d

@ -157,6 +157,8 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=

@ -784,6 +784,152 @@ func TestConsensus(t *testing.T) {
// dont rewrite for 0xe1 // dont rewrite for 0xe1
require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"]) require.Equal(t, "0xe1", jsonMap[2]["result"].(map[string]interface{})["number"])
}) })
t.Run("translate consensus_getReceipts to debug_getRawReceipts", func(t *testing.T) {
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendRPC("consensus_getReceipts", []interface{}{map[string]interface{}{
"blockOrHash": "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"}})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "debug_getRawReceipts", jsonMap["method"])
require.Equal(t, "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", jsonMap["params"].([]interface{})[0])
var resJsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &resJsonMap)
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["method"].(string))
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["result"].(map[string]interface{})["_"])
})
t.Run("translate consensus_getReceipts to debug_getRawReceipts with latest block tag", func(t *testing.T) {
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendRPC("consensus_getReceipts", []interface{}{map[string]interface{}{
"blockOrHash": "latest"}})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "debug_getRawReceipts", jsonMap["method"])
require.Equal(t, "0x101", jsonMap["params"].([]interface{})[0])
var resJsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &resJsonMap)
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["method"].(string))
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["result"].(map[string]interface{})["_"])
})
t.Run("translate consensus_getReceipts to debug_getRawReceipts with block number", func(t *testing.T) {
reset()
useOnlyNode1()
resRaw, statusCode, err := client.SendRPC("consensus_getReceipts", []interface{}{map[string]interface{}{
"blockOrHash": "0x55"}})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var jsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &jsonMap)
require.NoError(t, err)
require.Equal(t, "debug_getRawReceipts", jsonMap["method"])
require.Equal(t, "0x55", jsonMap["params"].([]interface{})[0])
var resJsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &resJsonMap)
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["method"].(string))
require.Equal(t, "debug_getRawReceipts", resJsonMap["result"].(map[string]interface{})["result"].(map[string]interface{})["_"])
})
t.Run("translate consensus_getReceipts to alchemy_getTransactionReceipts", func(t *testing.T) {
reset()
useOnlyNode1()
nodes["node1"].backend.Override(proxyd.WithConsensusReceiptTarget("alchemy_getTransactionReceipts"))
defer nodes["node1"].backend.Override(proxyd.WithConsensusReceiptTarget("debug_getRawReceipts"))
resRaw, statusCode, err := client.SendRPC("consensus_getReceipts", []interface{}{map[string]interface{}{
"blockOrHash": "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b"}})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var reqJsonMap map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &reqJsonMap)
require.NoError(t, err)
require.Equal(t, "alchemy_getTransactionReceipts", reqJsonMap["method"])
require.Equal(t, "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b", reqJsonMap["params"].([]interface{})[0])
var resJsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &resJsonMap)
require.Equal(t, "alchemy_getTransactionReceipts", resJsonMap["result"].(map[string]interface{})["method"].(string))
require.Equal(t, "alchemy_getTransactionReceipts", resJsonMap["result"].(map[string]interface{})["result"].(map[string]interface{})["_"])
})
t.Run("translate consensus_getReceipts to eth_getTransactionReceipt batched", func(t *testing.T) {
reset()
useOnlyNode1()
nodes["node1"].backend.Override(proxyd.WithConsensusReceiptTarget("eth_getTransactionReceipt"))
defer nodes["node1"].backend.Override(proxyd.WithConsensusReceiptTarget("debug_getRawReceipts"))
resRaw, statusCode, err := client.SendRPC("consensus_getReceipts", []interface{}{map[string]interface{}{
"blockOrHash": "0xc6ef2fc5426d6ad6fd9e2a26abeab0aa2411b7ab17f30a99d3cb96aed1d1055b",
"transactions": []string{
"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5",
"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c6",
"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c7",
"0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c8",
}}})
require.NoError(t, err)
require.Equal(t, 200, statusCode)
var reqJsonMap []map[string]interface{}
err = json.Unmarshal(nodes["node1"].mockBackend.Requests()[0].Body, &reqJsonMap)
require.NoError(t, err)
require.Equal(t, 4, len(reqJsonMap))
for _, req := range reqJsonMap {
require.Equal(t, "eth_getTransactionReceipt", req["method"])
}
require.Equal(t, "0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c5", reqJsonMap[0]["params"].([]interface{})[0])
require.Equal(t, "0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c6", reqJsonMap[1]["params"].([]interface{})[0])
require.Equal(t, "0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c7", reqJsonMap[2]["params"].([]interface{})[0])
require.Equal(t, "0x85d995eba9763907fdf35cd2034144dd9d53ce32cbec21349d4b12823c6860c8", reqJsonMap[3]["params"].([]interface{})[0])
var resJsonMap map[string]interface{}
err = json.Unmarshal(resRaw, &resJsonMap)
require.Equal(t, "eth_getTransactionReceipt", resJsonMap["result"].(map[string]interface{})["method"].(string))
require.Equal(t, 4, len(resJsonMap["result"].(map[string]interface{})["result"].([]interface{})))
for _, res := range resJsonMap["result"].(map[string]interface{})["result"].([]interface{}) {
require.Equal(t, "eth_getTransactionReceipt", res.(map[string]interface{})["_"])
}
})
t.Run("consensus_getReceipts should not be used in a batch", func(t *testing.T) {
reset()
useOnlyNode1()
_, statusCode, err := client.SendBatchRPC(
NewRPCReq("1", "eth_getBlockByNumber", []interface{}{"latest"}),
NewRPCReq("2", "consensus_getReceipts", []interface{}{"0x102"}),
NewRPCReq("3", "eth_getBlockByNumber", []interface{}{"0xe1"}))
require.NoError(t, err)
require.Equal(t, 400, statusCode)
})
} }
func buildResponse(result interface{}) string { func buildResponse(result interface{}) string {

@ -27,3 +27,4 @@ eth_call = "node"
eth_chainId = "node" eth_chainId = "node"
eth_blockNumber = "node" eth_blockNumber = "node"
eth_getBlockByNumber = "node" eth_getBlockByNumber = "node"
consensus_getReceipts = "node"

@ -184,3 +184,30 @@
"number": "0xd1" "number": "0xd1"
} }
} }
- method: debug_getRawReceipts
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "debug_getRawReceipts"
}
}
- method: eth_getTransactionReceipt
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "eth_getTransactionReceipt"
}
}
- method: alchemy_getTransactionReceipts
response: >
{
"jsonrpc": "2.0",
"id": 67,
"result": {
"_": "alchemy_getTransactionReceipts"
}
}

@ -141,7 +141,17 @@ func Start(config *Config) (*Server, func(), error) {
opts = append(opts, WithStrippedTrailingXFF()) opts = append(opts, WithStrippedTrailingXFF())
} }
opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP")))
opts = append(opts, WithSkipPeerCountCheck(cfg.SkipPeerCountCheck)) opts = append(opts, WithConsensusSkipPeerCountCheck(cfg.ConsensusSkipPeerCountCheck))
receiptsTarget, err := ReadFromEnvOrConfig(cfg.ConsensusReceiptsTarget)
if err != nil {
return nil, nil, err
}
receiptsTarget, err = validateReceiptsTarget(receiptsTarget)
if err != nil {
return nil, nil, err
}
opts = append(opts, WithConsensusReceiptTarget(receiptsTarget))
back := NewBackend(name, rpcURL, wsURL, rpcRequestSemaphore, opts...) back := NewBackend(name, rpcURL, wsURL, rpcRequestSemaphore, opts...)
backendNames = append(backendNames, name) backendNames = append(backendNames, name)
@ -316,6 +326,20 @@ func Start(config *Config) (*Server, func(), error) {
return srv, shutdownFunc, nil return srv, shutdownFunc, nil
} }
func validateReceiptsTarget(val string) (string, error) {
if val == "" {
val = "debug_getRawReceipts"
}
switch val {
case "debug_getRawReceipts",
"eth_getTransactionReceipt",
"alchemy_getTransactionReceipts":
return val, nil
default:
return "", fmt.Errorf("invalid receipts target: %s", val)
}
}
func secondsToDuration(seconds int) time.Duration { func secondsToDuration(seconds int) time.Duration {
return time.Duration(seconds) * time.Second return time.Duration(seconds) * time.Second
} }

@ -63,6 +63,8 @@ func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResul
case "eth_getLogs", case "eth_getLogs",
"eth_newFilter": "eth_newFilter":
return rewriteRange(rctx, req, res, 0) return rewriteRange(rctx, req, res, 0)
case "consensus_getReceipts":
return rewriteGetReceiptsParams(rctx, req, res)
case "debug_getRawReceipts": case "debug_getRawReceipts":
return rewriteParam(rctx, req, res, 0, true) return rewriteParam(rctx, req, res, 0, true)
case "eth_getBalance", case "eth_getBalance",
@ -82,6 +84,38 @@ func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResul
return RewriteNone, nil return RewriteNone, nil
} }
func rewriteGetReceiptsParams(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResult, error) {
var p []interface{}
err := json.Unmarshal(req.Params, &p)
if err != nil {
return RewriteOverrideError, err
}
if len(p) != 1 {
return RewriteNone, nil
}
if m, ok := p[0].(map[string]interface{}); !ok || m["blockOrHash"] == nil {
return RewriteNone, nil
}
val, rw, err := rewriteTag(rctx, p[0].(map[string]interface{})["blockOrHash"].(string))
if err != nil {
return RewriteOverrideError, err
}
if rw {
p[0].(map[string]interface{})["blockOrHash"] = val
paramRaw, err := json.Marshal(p)
if err != nil {
return RewriteOverrideError, err
}
req.Params = paramRaw
return RewriteOverrideRequest, nil
}
return RewriteNone, nil
}
func rewriteParam(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int, required bool) (RewriteResult, error) { func rewriteParam(rctx RewriteContext, req *RPCReq, res *RPCRes, pos int, required bool) (RewriteResult, error) {
var p []interface{} var p []interface{}
err := json.Unmarshal(req.Params, &p) err := json.Unmarshal(req.Params, &p)

@ -347,6 +347,10 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
writeRPCError(ctx, w, nil, ErrGatewayTimeout) writeRPCError(ctx, w, nil, ErrGatewayTimeout)
return return
} }
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) {
writeRPCError(ctx, w, nil, ErrInvalidRequest(err.Error()))
return
}
if err != nil { if err != nil {
writeRPCError(ctx, w, nil, ErrInternal) writeRPCError(ctx, w, nil, ErrInternal)
return return
@ -485,6 +489,9 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
elems := cacheMisses[start:end] elems := cacheMisses[start:end]
res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
if err != nil { if err != nil {
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) {
return nil, false, err
}
log.Error( log.Error(
"error forwarding RPC batch", "error forwarding RPC batch",
"batch_size", len(elems), "batch_size", len(elems),

@ -88,7 +88,12 @@ func (mh *MockedHandler) Handler(w http.ResponseWriter, req *http.Request) {
} }
} }
if selectedResponse != "" { if selectedResponse != "" {
responses = append(responses, selectedResponse) var rpcRes proxyd.RPCRes
err = json.Unmarshal([]byte(selectedResponse), &rpcRes)
idJson, _ := json.Marshal(r["id"])
rpcRes.ID = idJson
res, _ := json.Marshal(rpcRes)
responses = append(responses, string(res))
} }
} }