diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index a3a0f9da4..d82c83873 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -170,9 +170,9 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - hasTx func(common.Hash) bool // Retrieves a tx from the local txpool - addTxs func([]*txpool.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + hasTx func(common.Hash) bool // Retrieves a tx from the local txpool + addTxs func(string, []*txpool.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Time wrapper to simulate in tests @@ -181,14 +181,14 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { +func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher { return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. func NewTxFetcherForTests( - hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error, + hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error, clock mclock.Clock, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), @@ -300,7 +300,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) for j, tx := range batch { wrapped[j] = &txpool.Transaction{Tx: tx} } - for j, err := range f.addTxs(wrapped) { + for j, err := range f.addTxs(peer, wrapped) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another // announcement. diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index c5805d6ef..1125f1fbb 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -378,7 +378,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -417,7 +417,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -455,7 +455,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -501,7 +501,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -539,7 +539,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -644,7 +644,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -865,7 +865,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { if i%2 == 0 { @@ -938,7 +938,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { errs[i] = txpool.ErrUnderpriced @@ -964,7 +964,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1017,7 +1017,7 @@ func TestTransactionFetcherDrop(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1083,7 +1083,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1128,7 +1128,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1155,7 +1155,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1184,7 +1184,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1217,7 +1217,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { init: func() *TxFetcher { return NewTxFetcher( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { diff --git a/eth/handler.go b/eth/handler.go index e59bbb488..b93382402 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/monitor" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/fetcher" @@ -65,7 +66,8 @@ const ( ) var ( - syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge + syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge + accountBlacklistPeerCounter = metrics.NewRegisteredCounter("eth/count/blacklist", nil) ) // txPool defines the methods needed from a transaction pool implementation to @@ -342,8 +344,21 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestTxs(hashes) } - addTxs := func(txs []*txpool.Transaction) []error { - return h.txpool.Add(txs, false, false) + addTxs := func(peer string, txs []*txpool.Transaction) []error { + errors := h.txpool.Add(txs, false, false) + for _, err := range errors { + if err == legacypool.ErrInBlackList { + accountBlacklistPeerCounter.Inc(1) + p := h.peers.peer(peer) + if p != nil { + remoteAddr := p.remoteAddr() + if remoteAddr != nil { + log.Warn("blacklist account detected from other peer", "remoteAddr", remoteAddr, "ID", p.ID()) + } + } + } + } + return errors } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx) h.chainSync = newChainSyncer(h) diff --git a/eth/peer.go b/eth/peer.go index dd6724219..312575304 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -17,6 +17,8 @@ package eth import ( + "net" + "github.com/ethereum/go-ethereum/eth/protocols/bsc" "github.com/ethereum/go-ethereum/eth/protocols/trust" @@ -45,6 +47,13 @@ func (p *ethPeer) info() *ethPeerInfo { } } +func (p *ethPeer) remoteAddr() net.Addr { + if p.Peer != nil && p.Peer.Peer != nil { + return p.Peer.Peer.RemoteAddr() + } + return nil +} + // snapPeerInfo represents a short summary of the `snap` sub-protocol metadata known // about a connected peer. type snapPeerInfo struct { diff --git a/p2p/peer.go b/p2p/peer.go index 723a142ff..ed80b226b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -210,6 +210,9 @@ func (p *Peer) RemoteAddr() net.Addr { } log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr) } + if p.rw == nil { + return nil + } return p.rw.fd.RemoteAddr() } diff --git a/rpc/handler.go b/rpc/handler.go index e7d138e92..8f799a41f 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -26,10 +26,15 @@ import ( "time" "github.com/ethereum/go-ethereum/common/gopool" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/log" ) +var ( + accountBlacklistRpcCounter = metrics.NewRegisteredCounter("rpc/count/blacklist", nil) +) + // handler handles JSON-RPC messages. There is one handler per connection. Note that // handler is not safe for concurrent use. Message handling never blocks indefinitely // because RPCs are processed on background goroutines launched by handler. @@ -476,6 +481,11 @@ func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *json xForward := reqCtx.Value("X-Forwarded-For") h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward) + monitoredError := "sender or to in black list" // using legacypool.ErrInBlackList.Error() will cause `import cycle` + if strings.Contains(resp.Error.Message, monitoredError) { + accountBlacklistRpcCounter.Inc(1) + log.Warn("blacklist account detected from direct rpc", "remoteAddr", h.conn.remoteAddr()) + } ctx = append(ctx, "err", resp.Error.Message) if resp.Error.Data != nil { ctx = append(ctx, "errdata", resp.Error.Data) diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index 56b6b1e64..91ca2eefb 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -80,7 +80,7 @@ func Fuzz(input []byte) int { f := fetcher.NewTxFetcherForTests( func(common.Hash) bool { return false }, - func(txs []*txpool.Transaction) []error { + func(peer string, txs []*txpool.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil },