eth: enforce announcement metadatas and drop peers violating the protocol (#28261)

* eth: enforce announcement metadatas and drop peers violating the protocol

* eth/fetcher: relax eth/68 validation a bit for flakey clients

* tests/fuzzers/txfetcher: pull in suggestion from Marius

* eth/fetcher: add tests for peer dropping

* eth/fetcher: linter linter linter linter linter
This commit is contained in:
Péter Szilágyi 2023-10-10 11:35:51 +03:00 committed by GitHub
parent 6505297456
commit 8afbcf4713
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 531 additions and 65 deletions

@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
"time"
@ -105,6 +106,14 @@ var (
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
}
// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}
// txRequest represents an in-flight transaction retrieval request destined to
@ -120,6 +129,7 @@ type txRequest struct {
type txDelivery struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes having been delivered
metas []txMetadata // Batch of metadatas associated with the delivered hashes
direct bool // Whether this is a direct reply or a broadcast
}
@ -155,14 +165,14 @@ type TxFetcher struct {
// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)
// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
@ -175,6 +185,7 @@ type TxFetcher struct {
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
@ -183,14 +194,14 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
@ -199,8 +210,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
@ -209,6 +220,7 @@ func NewTxFetcherForTests(
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
rand: rand,
}
@ -216,7 +228,7 @@ func NewTxFetcherForTests(
// Notify announces the fetcher of the potential availability of a new batch of
// transactions in the network.
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
// Keep track of all the announced transactions
txAnnounceInMeter.Mark(int64(len(hashes)))
@ -226,28 +238,35 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))
duplicate int64
underpriced int64
)
for _, hash := range hashes {
for i, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
default:
unknowns = append(unknowns, hash)
unknownHashes = append(unknownHashes, hash)
if types == nil {
unknownMetas = append(unknownMetas, nil)
} else {
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
}
}
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced)
// If anything's left to announce, push it into the internal loop
if len(unknowns) == 0 {
if len(unknownHashes) == 0 {
return nil
}
announce := &txAnnounce{origin: peer, hashes: unknowns}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
select {
case f.notify <- announce:
return nil
@ -290,6 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
metas = make([]txMetadata, 0, len(txs))
)
// proceed in batches
for i := 0; i < len(txs); i += 128 {
@ -325,6 +345,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
otherreject++
}
added = append(added, batch[j].Hash())
metas = append(metas, txMetadata{
kind: batch[j].Type(),
size: uint32(batch[j].Size()),
})
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
@ -337,7 +361,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
}
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
return nil
case <-f.quit:
return errTerminated
@ -394,13 +418,15 @@ func (f *TxFetcher) loop() {
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
ann.hashes = ann.hashes[:want-maxTxAnnounces]
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]
for _, hash := range ann.hashes {
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
@ -409,9 +435,9 @@ func (f *TxFetcher) loop() {
// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
@ -422,9 +448,9 @@ func (f *TxFetcher) loop() {
// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
@ -432,12 +458,18 @@ func (f *TxFetcher) loop() {
// yet downloading, add the peer as an alternate origin in the
// waiting list.
if f.waitlist[hash] != nil {
// Ignore double announcements from the same peer. This is
// especially important if metadata is also passed along to
// prevent malicious peers flip-flopping good/bad values.
if _, ok := f.waitlist[hash][ann.origin]; ok {
continue
}
f.waitlist[hash][ann.origin] = struct{}{}
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
@ -446,9 +478,9 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()
if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
@ -474,9 +506,9 @@ func (f *TxFetcher) loop() {
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]struct{}{hash: {}}
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
@ -545,10 +577,27 @@ func (f *TxFetcher) loop() {
case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers
for _, hash := range delivery.hashes {
// traces of the hash from internal trackers. That said, compare any
// advertised metadata with the real ones and drop bad peers.
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.waitslots, peer)
@ -558,6 +607,22 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.announces, peer)
@ -859,7 +924,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {

@ -41,9 +41,20 @@ var (
testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()}
)
type announce struct {
hash common.Hash
kind *byte
size *uint32
}
func typeptr(t byte) *byte { return &t }
func sizeptr(n uint32) *uint32 { return &n }
type doTxNotify struct {
peer string
hashes []common.Hash
types []byte
sizes []uint32
}
type doTxEnqueue struct {
peer string
@ -57,7 +68,14 @@ type doWait struct {
type doDrop string
type doFunc func()
type isWaitingWithMeta map[string][]announce
type isWaiting map[string][]common.Hash
type isScheduledWithMeta struct {
tracking map[string][]announce
fetching map[string][]common.Hash
dangling map[string][]common.Hash
}
type isScheduled struct {
tracking map[string][]common.Hash
fetching map[string][]common.Hash
@ -81,6 +99,7 @@ func TestTransactionFetcherWaiting(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -162,6 +181,212 @@ func TestTransactionFetcherWaiting(t *testing.T) {
})
}
// Tests that transaction announcements with associated metadata are added to a
// waitlist, and none of them are scheduled for retrieval until the wait expires.
//
// This test is an extended version of TestTransactionFetcherWaiting. It's mostly
// to cover the metadata checkes without bloating up the basic behavioral tests
// with all the useless extra fields.
func TestTransactionFetcherWaitingWithMeta(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaitingWithMeta(map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
},
}),
// Announce from a new peer to check that no overwrite happens
doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}},
isWaitingWithMeta(map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
}),
// Announce clashing hashes but unique new peer
doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}},
isWaitingWithMeta(map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
}),
// Announce existing and clashing hashes from existing peer. Clashes
// should not overwrite previous announcements.
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}},
isWaitingWithMeta(map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
}),
// Announce clashing hashes with conflicting metadata. Somebody will
// be in the wrong, but we don't know yet who.
doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}},
isWaitingWithMeta(map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"D": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)},
{common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)},
},
}),
isScheduled{tracking: nil, fetching: nil},
// Wait for the arrival timeout which should move all expired items
// from the wait list to the scheduler
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"D": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)},
{common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
// Queue up a non-fetchable transaction and then trigger it with a new
// peer (weird case to test 1 line in the fetcher)
doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
isWaitingWithMeta(map[string][]announce{
"C": {
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)},
{common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)},
},
}),
doWait{time: txArriveTimeout, step: true},
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)},
{common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)},
},
"D": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)},
{common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)},
},
},
fetching: map[string][]common.Hash{
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)},
},
"B": {
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
},
"C": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)},
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)},
{common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)},
},
"D": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)},
{common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)},
},
"E": {
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)},
{common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)},
},
},
fetching: map[string][]common.Hash{
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
"E": {{0x06}, {0x07}},
},
},
},
})
}
// Tests that transaction announcements skip the waiting list if they are
// already scheduled.
func TestTransactionFetcherSkipWaiting(t *testing.T) {
@ -171,6 +396,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -234,6 +460,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -313,6 +540,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
},
steps: []interface{}{
@ -382,6 +610,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -421,6 +650,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -459,6 +689,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -505,6 +736,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -543,6 +775,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -591,6 +824,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -648,6 +882,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -713,6 +948,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -772,6 +1008,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -810,6 +1047,7 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -877,6 +1115,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -946,6 +1185,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: append(steps, []interface{}{
@ -968,6 +1208,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1021,6 +1262,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1087,6 +1329,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1120,6 +1363,74 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
})
}
// Tests that announced transactions with the wrong transaction type or size will
// result in a dropped peer.
func TestInvalidAnnounceMetadata(t *testing.T) {
drop := make(chan string, 2)
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
func(peer string) { drop <- peer },
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, types: []byte{testTxs[0].Type(), testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}},
isWaitingWithMeta(map[string][]announce{
"A": {
{testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))},
{testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))},
},
}),
// Announce from new peers conflicting transactions
doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{1024 + uint32(testTxs[0].Size())}},
doTxNotify{peer: "C", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{1 + testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}},
isWaitingWithMeta(map[string][]announce{
"A": {
{testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))},
{testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))},
},
"B": {
{testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(1024 + uint32(testTxs[0].Size()))},
},
"C": {
{testTxsHashes[1], typeptr(1 + testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))},
},
}),
// Schedule all the transactions for retrieval
doWait{time: txArriveTimeout, step: true},
isWaitingWithMeta(nil),
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))},
{testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))},
},
"B": {
{testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(1024 + uint32(testTxs[0].Size()))},
},
"C": {
{testTxsHashes[1], typeptr(1 + testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
"C": {testTxsHashes[1]},
},
},
// Deliver the transactions and wait for B to be dropped
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}},
doFunc(func() { <-drop }),
doFunc(func() { <-drop }),
},
})
}
// This test reproduces a crash caught by the fuzzer. The root cause was a
// dangling transaction timing out and clashing on re-add with a concurrently
// announced one.
@ -1132,6 +1443,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1159,6 +1471,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1188,6 +1501,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
@ -1224,6 +1538,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
},
steps: []interface{}{
@ -1274,9 +1589,34 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
// Crunch through all the test steps and execute them
for i, step := range tt.steps {
// Auto-expand certain steps to ones with metadata
switch old := step.(type) {
case isWaiting:
new := make(isWaitingWithMeta)
for peer, hashes := range old {
for _, hash := range hashes {
new[peer] = append(new[peer], announce{hash, nil, nil})
}
}
step = new
case isScheduled:
new := isScheduledWithMeta{
tracking: make(map[string][]announce),
fetching: old.fetching,
dangling: old.dangling,
}
for peer, hashes := range old.tracking {
for _, hash := range hashes {
new.tracking[peer] = append(new.tracking[peer], announce{hash, nil, nil})
}
}
step = new
}
// Process the original or expanded steps
switch step := step.(type) {
case doTxNotify:
if err := fetcher.Notify(step.peer, step.hashes); err != nil {
if err := fetcher.Notify(step.peer, step.types, step.sizes, step.hashes); err != nil {
t.Errorf("step %d: %v", i, err)
}
<-wait // Fetcher needs to process this, wait until it's done
@ -1307,24 +1647,34 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
case doFunc:
step()
case isWaiting:
case isWaitingWithMeta:
// We need to check that the waiting list (stage 1) internals
// match with the expected set. Check the peer->hash mappings
// first.
for peer, hashes := range step {
for peer, announces := range step {
waiting := fetcher.waitslots[peer]
if waiting == nil {
t.Errorf("step %d: peer %s missing from waitslots", i, peer)
continue
}
for _, hash := range hashes {
if _, ok := waiting[hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from waitslots", i, peer, hash)
for _, ann := range announces {
if meta, ok := waiting[ann.hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from waitslots", i, peer, ann.hash)
} else {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
}
}
}
for hash := range waiting {
if !containsHash(hashes, hash) {
t.Errorf("step %d, peer %s: hash %x extra in waitslots", i, peer, hash)
for hash, meta := range waiting {
ann := announce{hash: hash}
if meta != nil {
ann.kind, ann.size = &meta.kind, &meta.size
}
if !containsAnnounce(announces, ann) {
t.Errorf("step %d, peer %s: announce %v extra in waitslots", i, peer, ann)
}
}
}
@ -1334,13 +1684,13 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
}
// Peer->hash sets correct, check the hash->peer and timeout sets
for peer, hashes := range step {
for _, hash := range hashes {
if _, ok := fetcher.waitlist[hash][peer]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from waitlist", i, hash, peer)
for peer, announces := range step {
for _, ann := range announces {
if _, ok := fetcher.waitlist[ann.hash][peer]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from waitlist", i, ann.hash, peer)
}
if _, ok := fetcher.waittime[hash]; !ok {
t.Errorf("step %d: hash %x missing from waittime", i, hash)
if _, ok := fetcher.waittime[ann.hash]; !ok {
t.Errorf("step %d: hash %x missing from waittime", i, ann.hash)
}
}
}
@ -1349,15 +1699,15 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
t.Errorf("step %d, hash %x: empty peerset in waitlist", i, hash)
}
for peer := range peers {
if !containsHash(step[peer], hash) {
if !containsHashInAnnounces(step[peer], hash) {
t.Errorf("step %d, hash %x: peer %s extra in waitlist", i, hash, peer)
}
}
}
for hash := range fetcher.waittime {
var found bool
for _, hashes := range step {
if containsHash(hashes, hash) {
for _, announces := range step {
if containsHashInAnnounces(announces, hash) {
found = true
break
}
@ -1367,23 +1717,33 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
}
case isScheduled:
case isScheduledWithMeta:
// Check that all scheduled announces are accounted for and no
// extra ones are present.
for peer, hashes := range step.tracking {
for peer, announces := range step.tracking {
scheduled := fetcher.announces[peer]
if scheduled == nil {
t.Errorf("step %d: peer %s missing from announces", i, peer)
continue
}
for _, hash := range hashes {
if _, ok := scheduled[hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from announces", i, peer, hash)
for _, ann := range announces {
if meta, ok := scheduled[ann.hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from announces", i, peer, ann.hash)
} else {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
}
}
}
for hash := range scheduled {
if !containsHash(hashes, hash) {
t.Errorf("step %d, peer %s: hash %x extra in announces", i, peer, hash)
for hash, meta := range scheduled {
ann := announce{hash: hash}
if meta != nil {
ann.kind, ann.size = &meta.kind, &meta.size
}
if !containsAnnounce(announces, ann) {
t.Errorf("step %d, peer %s: announce %x extra in announces", i, peer, hash)
}
}
}
@ -1483,17 +1843,17 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
// retrieval but not actively being downloaded are tracked only
// in the stage 2 `announced` map.
var queued []common.Hash
for _, hashes := range step.tracking {
for _, hash := range hashes {
for _, announces := range step.tracking {
for _, ann := range announces {
var found bool
for _, hs := range step.fetching {
if containsHash(hs, hash) {
if containsHash(hs, ann.hash) {
found = true
break
}
}
if !found {
queued = append(queued, hash)
queued = append(queued, ann.hash)
}
}
}
@ -1526,6 +1886,42 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
}
// containsAnnounce returns whether an announcement is contained within a slice
// of announcements.
func containsAnnounce(slice []announce, ann announce) bool {
for _, have := range slice {
if have.hash == ann.hash {
if have.kind == nil || ann.kind == nil {
if have.kind != ann.kind {
return false
}
} else if *have.kind != *ann.kind {
return false
}
if have.size == nil || ann.size == nil {
if have.size != ann.size {
return false
}
} else if *have.size != *ann.size {
return false
}
return true
}
}
return false
}
// containsHashInAnnounces returns whether a hash is contained within a slice
// of announcements.
func containsHashInAnnounces(slice []announce, hash common.Hash) bool {
for _, have := range slice {
if have.hash == hash {
return true
}
}
return false
}
// containsHash returns whether a hash is contained within a hash slice.
func containsHash(slice []common.Hash, hash common.Hash) bool {
for _, have := range slice {

@ -277,7 +277,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false, false)
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx)
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer)
h.chainSync = newChainSyncer(h)
return h, nil
}

@ -68,10 +68,10 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
case *eth.NewPooledTransactionHashesPacket67:
return h.txFetcher.Notify(peer.ID(), *packet)
return h.txFetcher.Notify(peer.ID(), nil, nil, *packet)
case *eth.NewPooledTransactionHashesPacket68:
return h.txFetcher.Notify(peer.ID(), packet.Hashes)
return h.txFetcher.Notify(peer.ID(), packet.Types, packet.Sizes, packet.Hashes)
case *eth.TransactionsPacket:
for _, tx := range *packet {

@ -83,6 +83,7 @@ func Fuzz(input []byte) int {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
clock, rand,
)
f.Start()
@ -116,6 +117,8 @@ func Fuzz(input []byte) int {
var (
announceIdxs = make([]int, announce)
announces = make([]common.Hash, announce)
types = make([]byte, announce)
sizes = make([]uint32, announce)
)
for i := 0; i < len(announces); i++ {
annBuf := make([]byte, 2)
@ -124,11 +127,13 @@ func Fuzz(input []byte) int {
}
announceIdxs[i] = (int(annBuf[0])*256 + int(annBuf[1])) % len(txs)
announces[i] = txs[announceIdxs[i]].Hash()
types[i] = txs[announceIdxs[i]].Type()
sizes[i] = uint32(txs[announceIdxs[i]].Size())
}
if verbose {
fmt.Println("Notify", peer, announceIdxs)
}
if err := f.Notify(peer, announces); err != nil {
if err := f.Notify(peer, types, sizes, announces); err != nil {
panic(err)
}