rpc,eth: output more info about failed txs(#2041)

This commit is contained in:
buddho 2023-12-07 16:07:48 +08:00 committed by GitHub
parent fa5d0cf287
commit e3ef62f3bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 25 deletions

@ -170,9 +170,9 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*txpool.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func(string, []*txpool.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
@ -181,14 +181,14 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func(string, []*txpool.Transaction) []error, fetchTxs func(string, []common.Hash) error,
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
@ -300,7 +300,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
for j, tx := range batch {
wrapped[j] = &txpool.Transaction{Tx: tx}
}
for j, err := range f.addTxs(wrapped) {
for j, err := range f.addTxs(peer, wrapped) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.

@ -378,7 +378,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -417,7 +417,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -455,7 +455,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -501,7 +501,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -539,7 +539,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -644,7 +644,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -865,7 +865,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%2 == 0 {
@ -938,7 +938,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
@ -964,7 +964,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1017,7 +1017,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1083,7 +1083,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1128,7 +1128,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1155,7 +1155,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1184,7 +1184,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1217,7 +1217,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error {

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/core/monitor"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/fetcher"
@ -65,7 +66,8 @@ const (
)
var (
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
accountBlacklistPeerCounter = metrics.NewRegisteredCounter("eth/count/blacklist", nil)
)
// txPool defines the methods needed from a transaction pool implementation to
@ -342,8 +344,21 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return p.RequestTxs(hashes)
}
addTxs := func(txs []*txpool.Transaction) []error {
return h.txpool.Add(txs, false, false)
addTxs := func(peer string, txs []*txpool.Transaction) []error {
errors := h.txpool.Add(txs, false, false)
for _, err := range errors {
if err == legacypool.ErrInBlackList {
accountBlacklistPeerCounter.Inc(1)
p := h.peers.peer(peer)
if p != nil {
remoteAddr := p.remoteAddr()
if remoteAddr != nil {
log.Warn("blacklist account detected from other peer", "remoteAddr", remoteAddr, "ID", p.ID())
}
}
}
}
return errors
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx)
h.chainSync = newChainSyncer(h)

@ -17,6 +17,8 @@
package eth
import (
"net"
"github.com/ethereum/go-ethereum/eth/protocols/bsc"
"github.com/ethereum/go-ethereum/eth/protocols/trust"
@ -45,6 +47,13 @@ func (p *ethPeer) info() *ethPeerInfo {
}
}
func (p *ethPeer) remoteAddr() net.Addr {
if p.Peer != nil && p.Peer.Peer != nil {
return p.Peer.Peer.RemoteAddr()
}
return nil
}
// snapPeerInfo represents a short summary of the `snap` sub-protocol metadata known
// about a connected peer.
type snapPeerInfo struct {

@ -210,6 +210,9 @@ func (p *Peer) RemoteAddr() net.Addr {
}
log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr)
}
if p.rw == nil {
return nil
}
return p.rw.fd.RemoteAddr()
}

@ -26,10 +26,15 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/log"
)
var (
accountBlacklistRpcCounter = metrics.NewRegisteredCounter("rpc/count/blacklist", nil)
)
// handler handles JSON-RPC messages. There is one handler per connection. Note that
// handler is not safe for concurrent use. Message handling never blocks indefinitely
// because RPCs are processed on background goroutines launched by handler.
@ -476,6 +481,11 @@ func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *json
xForward := reqCtx.Value("X-Forwarded-For")
h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward)
monitoredError := "sender or to in black list" // using legacypool.ErrInBlackList.Error() will cause `import cycle`
if strings.Contains(resp.Error.Message, monitoredError) {
accountBlacklistRpcCounter.Inc(1)
log.Warn("blacklist account detected from direct rpc", "remoteAddr", h.conn.remoteAddr())
}
ctx = append(ctx, "err", resp.Error.Message)
if resp.Error.Data != nil {
ctx = append(ctx, "errdata", resp.Error.Data)

@ -80,7 +80,7 @@ func Fuzz(input []byte) int {
f := fetcher.NewTxFetcherForTests(
func(common.Hash) bool { return false },
func(txs []*txpool.Transaction) []error {
func(peer string, txs []*txpool.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },