Merge pull request #4759 from ethereum-optimism/feat/sender-rate-limit

proxyd: Add sender-based rate limiter
This commit is contained in:
mergify[bot] 2023-01-23 18:04:55 +00:00 committed by GitHub
commit 98a303ae77
8 changed files with 282 additions and 19 deletions

@ -78,13 +78,26 @@ var (
Message: "over rate limit",
HTTPErrorCode: 429,
}
ErrOverSenderRateLimit = &RPCErr{
Code: JSONRPCErrorInternal - 17,
Message: "sender is over rate limit",
HTTPErrorCode: 429,
}
ErrBackendUnexpectedJSONRPC = errors.New("backend returned an unexpected JSON-RPC response")
)
func ErrInvalidRequest(msg string) *RPCErr {
return &RPCErr{
Code: -32601,
Code: -32600,
Message: msg,
HTTPErrorCode: 400,
}
}
func ErrInvalidParams(msg string) *RPCErr {
return &RPCErr{
Code: -32602,
Message: msg,
HTTPErrorCode: 400,
}

@ -104,21 +104,30 @@ type BatchConfig struct {
ErrorMessage string `toml:"error_message"`
}
// SenderRateLimitConfig configures the sender-based rate limiter
// for eth_sendRawTransaction requests.
type SenderRateLimitConfig struct {
Enabled bool
Interval TOMLDuration
Limit int
}
type Config struct {
WSBackendGroup string `toml:"ws_backend_group"`
Server ServerConfig `toml:"server"`
Cache CacheConfig `toml:"cache"`
Redis RedisConfig `toml:"redis"`
Metrics MetricsConfig `toml:"metrics"`
RateLimit RateLimitConfig `toml:"rate_limit"`
BackendOptions BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"`
BatchConfig BatchConfig `toml:"batch"`
Authentication map[string]string `toml:"authentication"`
BackendGroups BackendGroupsConfig `toml:"backend_groups"`
RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
WSMethodWhitelist []string `toml:"ws_method_whitelist"`
WhitelistErrorMessage string `toml:"whitelist_error_message"`
WSBackendGroup string `toml:"ws_backend_group"`
Server ServerConfig `toml:"server"`
Cache CacheConfig `toml:"cache"`
Redis RedisConfig `toml:"redis"`
Metrics MetricsConfig `toml:"metrics"`
RateLimit RateLimitConfig `toml:"rate_limit"`
BackendOptions BackendOptions `toml:"backend"`
Backends BackendsConfig `toml:"backends"`
BatchConfig BatchConfig `toml:"batch"`
Authentication map[string]string `toml:"authentication"`
BackendGroups BackendGroupsConfig `toml:"backend_groups"`
RPCMethodMappings map[string]string `toml:"rpc_method_mappings"`
WSMethodWhitelist []string `toml:"ws_method_whitelist"`
WhitelistErrorMessage string `toml:"whitelist_error_message"`
SenderRateLimit SenderRateLimitConfig `toml:"sender_rate_limit"`
}
func ReadFromEnvOrConfig(value string) (string, error) {

@ -0,0 +1,128 @@
package integration_tests
import (
"bufio"
"fmt"
"math"
"os"
"strings"
"testing"
"time"
"github.com/ethereum-optimism/optimism/proxyd"
"github.com/stretchr/testify/require"
)
const txHex1 = "0x02f8b28201a406849502f931849502f931830147f9948f3ddd0fbf3e78ca1d6c" +
"d17379ed88e261249b5280b84447e7ef2400000000000000000000000089c8b1" +
"b2774201bac50f627403eac1b732459cf7000000000000000000000000000000" +
"0000000000000000056bc75e2d63100000c080a0473c95566026c312c9664cd6" +
"1145d2f3e759d49209fe96011ac012884ec5b017a0763b58f6fa6096e6ba28ee" +
"08bfac58f58fb3b8bcef5af98578bdeaddf40bde42"
const txHex2 = "0xf8aa82afd2830f4240830493e094464959ad46e64046b891f562cff202a465d5" +
"22f380b844d5bade070000000000000000000000004200000000000000000000" +
"0000000000000000060000000000000000000000000000000000000000000000" +
"0000000025ef43fc0038a05d8ea9837ea81469bda4dadbe852fdd37fcfbcd666" +
"5641a35e4726fbc04364e7a0107e20bb34aea53c695a551204a11d42fe465055" +
"510ee240e8f884fb70289be6"
const dummyRes = `{"id": 123, "jsonrpc": "2.0", "result": "dummy"}`
const limRes = `{"error":{"code":-32017,"message":"sender is over rate limit"},"id":1,"jsonrpc":"2.0"}`
func TestSenderRateLimitValidation(t *testing.T) {
goodBackend := NewMockBackend(SingleResponseHandler(200, dummyRes))
defer goodBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
config := ReadConfig("sender_rate_limit")
// Don't perform rate limiting in this test since we're only testing
// validation.
config.SenderRateLimit.Limit = math.MaxInt
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
f, err := os.Open("testdata/testdata.txt")
require.NoError(t, err)
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Scan() // skip header
for scanner.Scan() {
record := strings.Split(scanner.Text(), "|")
name, body, expResponseBody := record[0], record[1], record[2]
require.NoError(t, err)
t.Run(name, func(t *testing.T) {
res, _, err := client.SendRequest([]byte(body))
require.NoError(t, err)
RequireEqualJSON(t, []byte(expResponseBody), res)
})
}
}
func TestSenderRateLimitLimiting(t *testing.T) {
goodBackend := NewMockBackend(SingleResponseHandler(200, dummyRes))
defer goodBackend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", goodBackend.URL()))
config := ReadConfig("sender_rate_limit")
client := NewProxydClient("http://127.0.0.1:8545")
shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
// Two separate requests from the same sender
// should be rate limited.
res1, code1, err := client.SendRequest(makeSendRawTransaction(txHex1))
require.NoError(t, err)
res2, code2, err := client.SendRequest(makeSendRawTransaction(txHex1))
require.NoError(t, err)
RequireEqualJSON(t, []byte(dummyRes), res1)
require.Equal(t, 200, code1)
RequireEqualJSON(t, []byte(limRes), res2)
require.Equal(t, 429, code2)
// Clear the limiter.
time.Sleep(1100 * time.Millisecond)
// Two separate requests from different senders
// should not be rate limited.
res1, code1, err = client.SendRequest(makeSendRawTransaction(txHex1))
require.NoError(t, err)
res2, code2, err = client.SendRequest(makeSendRawTransaction(txHex2))
require.NoError(t, err)
RequireEqualJSON(t, []byte(dummyRes), res1)
require.Equal(t, 200, code1)
RequireEqualJSON(t, []byte(dummyRes), res2)
require.Equal(t, 200, code2)
// Clear the limiter.
time.Sleep(1100 * time.Millisecond)
// A batch request should rate limit within the batch itself.
batch := []byte(fmt.Sprintf(
`[%s, %s, %s]`,
makeSendRawTransaction(txHex1),
makeSendRawTransaction(txHex1),
makeSendRawTransaction(txHex2),
))
res, code, err := client.SendRequest(batch)
require.NoError(t, err)
require.Equal(t, 200, code)
RequireEqualJSON(t, []byte(fmt.Sprintf(
`[%s, %s, %s]`,
dummyRes,
limRes,
dummyRes,
)), res)
}
func makeSendRawTransaction(dataHex string) []byte {
return []byte(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["` + dataHex + `"],"id":1}`)
}

@ -0,0 +1,23 @@
[server]
rpc_port = 8545
[backend]
response_timeout_seconds = 1
[backends]
[backends.good]
rpc_url = "$GOOD_BACKEND_RPC_URL"
ws_url = "$GOOD_BACKEND_RPC_URL"
[backend_groups]
[backend_groups.main]
backends = ["good"]
[rpc_method_mappings]
eth_chainId = "main"
eth_sendRawTransaction = "main"
[sender_rate_limit]
enabled = true
interval = "1s"
limit = 1

@ -0,0 +1,11 @@
name|body|responseBody
not json|not json|{"jsonrpc":"2.0","error":{"code":-32700,"message":"parse error"},"id":null}
not json-rpc|{"foo":"bar"}|{"jsonrpc":"2.0","error":{"code":-32600,"message":"invalid JSON-RPC version"},"id":null}
missing fields json-rpc|{"jsonrpc":"2.0"}|{"jsonrpc":"2.0","error":{"code":-32600,"message":"no method specified"},"id":null}
bad method json-rpc|{"jsonrpc":"2.0","method":"eth_notSendRawTransaction","id":1}|{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted"},"id":1}
no transaction data|{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":[],"id":1}|{"jsonrpc":"2.0","error":{"code":-32602,"message":"missing value for required argument 0"},"id":1}
invalid transaction data|{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0xf6806872fcc650ad4e77e0629206426cd183d751e9ddcc8d5e77"],"id":1}|{"jsonrpc":"2.0","error":{"code":-32602,"message":"rlp: value size exceeds available input length"},"id":1}
invalid transaction data|{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0x1234"],"id":1}|{"jsonrpc":"2.0","error":{"code":-32602,"message":"transaction type not supported"},"id":1}
valid transaction data - simple send|{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0x02f8748201a415843b9aca31843b9aca3182520894f80267194936da1e98db10bce06f3147d580a62e880de0b6b3a764000080c001a0b50ee053102360ff5fedf0933b912b7e140c90fe57fa07a0cebe70dbd72339dda072974cb7bfe5c3dc54dde110e2b049408ccab8a879949c3b4d42a3a7555a618b"],"id":1}|{"id": 123, "jsonrpc": "2.0", "result": "dummy"}
valid transaction data - contract call|{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0x02f8b28201a406849502f931849502f931830147f9948f3ddd0fbf3e78ca1d6cd17379ed88e261249b5280b84447e7ef2400000000000000000000000089c8b1b2774201bac50f627403eac1b732459cf70000000000000000000000000000000000000000000000056bc75e2d63100000c080a0473c95566026c312c9664cd61145d2f3e759d49209fe96011ac012884ec5b017a0763b58f6fa6096e6ba28ee08bfac58f58fb3b8bcef5af98578bdeaddf40bde42"],"id":1}|{"id": 123, "jsonrpc": "2.0", "result": "dummy"}
batch with mixed results|[{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0x02f8748201a415843b9aca31843b9aca3182520894f80267194936da1e98db10bce06f3147d580a62e880de0b6b3a764000080c001a0b50ee053102360ff5fedf0933b912b7e140c90fe57fa07a0cebe70dbd72339dda072974cb7bfe5c3dc54dde110e2b049408ccab8a879949c3b4d42a3a7555a618b"],"id":1},{"bad":"json"},{"jsonrpc":"2.0","method":"eth_fooTheBar","params":[],"id":123}]|[{"id": 123, "jsonrpc": "2.0", "result": "dummy"},{"jsonrpc":"2.0","error":{"code":-32600,"message":"invalid JSON-RPC version"},"id":null},{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted"},"id":123}]

@ -12,10 +12,10 @@ import (
const (
notWhitelistedResponse = `{"jsonrpc":"2.0","error":{"code":-32001,"message":"rpc method is not whitelisted custom message"},"id":999}`
parseErrResponse = `{"jsonrpc":"2.0","error":{"code":-32700,"message":"parse error"},"id":null}`
invalidJSONRPCVersionResponse = `{"error":{"code":-32601,"message":"invalid JSON-RPC version"},"id":null,"jsonrpc":"2.0"}`
invalidIDResponse = `{"error":{"code":-32601,"message":"invalid ID"},"id":null,"jsonrpc":"2.0"}`
invalidMethodResponse = `{"error":{"code":-32601,"message":"no method specified"},"id":null,"jsonrpc":"2.0"}`
invalidBatchLenResponse = `{"error":{"code":-32601,"message":"must specify at least one batch call"},"id":null,"jsonrpc":"2.0"}`
invalidJSONRPCVersionResponse = `{"error":{"code":-32600,"message":"invalid JSON-RPC version"},"id":null,"jsonrpc":"2.0"}`
invalidIDResponse = `{"error":{"code":-32600,"message":"invalid ID"},"id":null,"jsonrpc":"2.0"}`
invalidMethodResponse = `{"error":{"code":-32600,"message":"no method specified"},"id":null,"jsonrpc":"2.0"}`
invalidBatchLenResponse = `{"error":{"code":-32600,"message":"must specify at least one batch call"},"id":null,"jsonrpc":"2.0"}`
)
func TestSingleRPCValidation(t *testing.T) {

@ -78,6 +78,15 @@ func Start(config *Config) (func(), error) {
ErrTooManyBatchRequests.Message = config.BatchConfig.ErrorMessage
}
if config.SenderRateLimit.Enabled {
if config.SenderRateLimit.Limit <= 0 {
return nil, errors.New("limit in sender_rate_limit must be > 0")
}
if time.Duration(config.SenderRateLimit.Interval) < time.Second {
return nil, errors.New("interval in sender_rate_limit must be >= 1s")
}
}
maxConcurrentRPCs := config.Server.MaxConcurrentRPCs
if maxConcurrentRPCs == 0 {
maxConcurrentRPCs = math.MaxInt64
@ -244,6 +253,7 @@ func Start(config *Config) (func(), error) {
config.Server.MaxUpstreamBatchSize,
rpcCache,
config.RateLimit,
config.SenderRateLimit,
config.Server.EnableRequestLog,
config.Server.MaxRequestBodyLogLen,
config.BatchConfig.MaxSize,

@ -14,6 +14,9 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/go-redis/redis/v8"
"github.com/gorilla/mux"
@ -50,6 +53,7 @@ type Server struct {
upgrader *websocket.Upgrader
mainLim FrontendRateLimiter
overrideLims map[string]FrontendRateLimiter
senderLim FrontendRateLimiter
limExemptOrigins []*regexp.Regexp
limExemptUserAgents []*regexp.Regexp
rpcServer *http.Server
@ -71,6 +75,7 @@ func NewServer(
maxUpstreamBatchSize int,
cache RPCCache,
rateLimitConfig RateLimitConfig,
senderRateLimitConfig SenderRateLimitConfig,
enableRequestLog bool,
maxRequestBodyLogLen int,
maxBatchSize int,
@ -135,6 +140,10 @@ func NewServer(
return nil, err
}
}
var senderLim FrontendRateLimiter
if senderRateLimitConfig.Enabled {
senderLim = limiterFactory(time.Duration(senderRateLimitConfig.Interval), senderRateLimitConfig.Limit, "senders")
}
return &Server{
backendGroups: backendGroups,
@ -154,6 +163,7 @@ func NewServer(
},
mainLim: mainLim,
overrideLims: overrideLims,
senderLim: senderLim,
limExemptOrigins: limExemptOrigins,
limExemptUserAgents: limExemptUserAgents,
}, nil
@ -409,6 +419,17 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
continue
}
// Apply a sender-based rate limit if it is enabled. Note that sender-based rate
// limits apply regardless of origin or user-agent. As such, they don't use the
// isLimited method.
if parsedReq.Method == "eth_sendRawTransaction" && s.senderLim != nil {
if err := s.rateLimitSender(ctx, parsedReq); err != nil {
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, err)
responses[i] = NewRPCErrorRes(parsedReq.ID, err)
continue
}
}
id := string(parsedReq.ID)
// If this is a duplicate Request ID, move the Request to a new batchGroup
ids[id]++
@ -575,6 +596,54 @@ func (s *Server) isUnlimitedUserAgent(origin string) bool {
return false
}
func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error {
var params []string
if err := json.Unmarshal(req.Params, &params); err != nil {
log.Debug("error unmarshaling raw transaction params", "err", err, "req_Id", GetReqID(ctx))
return ErrParseErr
}
if len(params) != 1 {
log.Debug("raw transaction request has invalid number of params", "req_id", GetReqID(ctx))
// The error below is identical to the one Geth responds with.
return ErrInvalidParams("missing value for required argument 0")
}
var data hexutil.Bytes
if err := data.UnmarshalText([]byte(params[0])); err != nil {
log.Debug("error decoding raw tx data", "err", err, "req_id", GetReqID(ctx))
// Geth returns the raw error from UnmarshalText.
return ErrInvalidParams(err.Error())
}
// Inflates a types.Transaction object from the transaction's raw bytes.
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(data); err != nil {
log.Debug("could not unmarshal transaction", "err", err, "req_id", GetReqID(ctx))
return ErrInvalidParams(err.Error())
}
// Convert the transaction into a Message object so that we can get the
// sender. This method performs an ecrecover, which can be expensive.
msg, err := tx.AsMessage(types.LatestSignerForChainID(tx.ChainId()), nil)
if err != nil {
log.Debug("could not get message from transaction", "err", err, "req_id", GetReqID(ctx))
return ErrInvalidParams(err.Error())
}
ok, err := s.senderLim.Take(ctx, msg.From().Hex())
if err != nil {
log.Error("error taking from sender limiter", "err", err, "req_id", GetReqID(ctx))
return ErrInternal
}
if !ok {
log.Debug("sender rate limit exceeded", "sender", msg.From(), "req_id", GetReqID(ctx))
return ErrOverSenderRateLimit
}
return nil
}
func setCacheHeader(w http.ResponseWriter, cached bool) {
if cached {
w.Header().Set(cacheStatusHdr, "HIT")