diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 18c5ff007a..a113155009 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -114,7 +114,7 @@ var errTerminated = errors.New("terminated") type txAnnounce struct { origin string // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes being announced - metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68) + metas []*txMetadata // Batch of metadata associated with the hashes } // txMetadata is a set of extra data transmitted along the announcement for better @@ -137,7 +137,7 @@ type txRequest struct { type txDelivery struct { origin string // Identifier of the peer originating the notification hashes []common.Hash // Batch of transaction hashes having been delivered - metas []txMetadata // Batch of metadatas associated with the delivered hashes + metas []txMetadata // Batch of metadata associated with the delivered hashes direct bool // Whether this is a direct reply or a broadcast } @@ -260,11 +260,11 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c underpriced++ default: unknownHashes = append(unknownHashes, hash) - if types == nil { - unknownMetas = append(unknownMetas, nil) - } else { - unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) - } + + // Transaction metadata has been available since eth68, and all + // legacy eth protocols (prior to eth68) have been deprecated. + // Therefore, metadata is always expected in the announcement. + unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]}) } } txAnnounceKnownMeter.Mark(duplicate) @@ -892,13 +892,8 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, if len(hashes) >= maxTxRetrievals { return false // break in the for-each } - if meta != nil { // Only set eth/68 and upwards - bytes += uint64(meta.size) - if bytes >= maxTxRetrievalSize { - return false - } - } - return true // scheduled, try to add more + bytes += uint64(meta.size) + return bytes < maxTxRetrievalSize }) // If any hashes were allocated, request them from the peer if len(hashes) > 0 { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 3d3ef81ede..0b47646669 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -45,13 +45,10 @@ var ( type announce struct { hash common.Hash - kind *byte - size *uint32 + kind byte + size uint32 } -func typeptr(t byte) *byte { return &t } -func sizeptr(n uint32) *uint32 { return &n } - type doTxNotify struct { peer string hashes []common.Hash @@ -70,16 +67,10 @@ type doWait struct { type doDrop string type doFunc func() -type isWaitingWithMeta map[string][]announce -type isWaiting map[string][]common.Hash +type isWaiting map[string][]announce -type isScheduledWithMeta struct { - tracking map[string][]announce - fetching map[string][]common.Hash - dangling map[string][]common.Hash -} type isScheduled struct { - tracking map[string][]common.Hash + tracking map[string][]announce fetching map[string][]common.Hash dangling map[string][]common.Hash } @@ -92,104 +83,13 @@ type txFetcherTest struct { steps []interface{} } -// Tests that transaction announcements are added to a waitlist, and none -// of them are scheduled for retrieval until the wait expires. +// Tests that transaction announcements with associated metadata are added to a +// waitlist, and none of them are scheduled for retrieval until the wait expires. +// +// This test is an extended version of TestTransactionFetcherWaiting. It's mostly +// to cover the metadata checks without bloating up the basic behavioral tests +// with all the useless extra fields. func TestTransactionFetcherWaiting(t *testing.T) { - testTransactionFetcherParallel(t, txFetcherTest{ - init: func() *TxFetcher { - return NewTxFetcher( - func(common.Hash) bool { return false }, - nil, - func(string, []common.Hash) error { return nil }, - nil, - ) - }, - steps: []interface{}{ - // Initial announcement to get something into the waitlist - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - }), - // Announce from a new peer to check that no overwrite happens - doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x03}, {0x04}}, - }), - // Announce clashing hashes but unique new peer - doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, - }), - // Announce existing and clashing hashes from existing peer - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, - }), - isScheduled{tracking: nil, fetching: nil}, - - // Wait for the arrival timeout which should move all expired items - // from the wait list to the scheduler - doWait{time: txArriveTimeout, step: true}, - isWaiting(nil), - isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}}, - }, - fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - }, - }, - // Queue up a non-fetchable transaction and then trigger it with a new - // peer (weird case to test 1 line in the fetcher) - doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}}, - isWaiting(map[string][]common.Hash{ - "C": {{0x06}, {0x07}}, - }), - doWait{time: txArriveTimeout, step: true}, - isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}, {0x06}, {0x07}}, - }, - fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - }, - }, - doTxNotify{peer: "D", hashes: []common.Hash{{0x06}, {0x07}}}, - isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x05}}, - "B": {{0x03}, {0x04}}, - "C": {{0x01}, {0x04}, {0x06}, {0x07}}, - "D": {{0x06}, {0x07}}, - }, - fetching: map[string][]common.Hash{ - "A": {{0x02}, {0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - "D": {{0x06}, {0x07}}, - }, - }, - }, - }) -} - -// Tests that transaction announcements with associated metadata are added to a -// waitlist, and none of them are scheduled for retrieval until the wait expires. -// -// This test is an extended version of TestTransactionFetcherWaiting. It's mostly -// to cover the metadata checks without bloating up the basic behavioral tests -// with all the useless extra fields. -func TestTransactionFetcherWaitingWithMeta(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( @@ -202,80 +102,80 @@ func TestTransactionFetcherWaitingWithMeta(t *testing.T) { steps: []interface{}{ // Initial announcement to get something into the waitlist doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, }, }), // Announce from a new peer to check that no overwrite happens doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, }), // Announce clashing hashes but unique new peer doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, }), // Announce existing and clashing hashes from existing peer. Clashes // should not overwrite previous announcements. doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, }), // Announce clashing hashes with conflicting metadata. Somebody will // be in the wrong, but we don't know yet who. doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "D": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)}, - {common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, }, }), isScheduled{tracking: nil, fetching: nil}, @@ -284,25 +184,25 @@ func TestTransactionFetcherWaitingWithMeta(t *testing.T) { // from the wait list to the scheduler doWait{time: txArriveTimeout, step: true}, isWaiting(nil), - isScheduledWithMeta{ + isScheduled{ tracking: map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "D": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)}, - {common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, }, }, fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer @@ -314,34 +214,34 @@ func TestTransactionFetcherWaitingWithMeta(t *testing.T) { // Queue up a non-fetchable transaction and then trigger it with a new // peer (weird case to test 1 line in the fetcher) doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - isWaitingWithMeta(map[string][]announce{ + isWaiting(map[string][]announce{ "C": { - {common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)}, - {common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, }), doWait{time: txArriveTimeout, step: true}, - isScheduledWithMeta{ + isScheduled{ tracking: map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, - {common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)}, - {common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, "D": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)}, - {common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, }, }, fetching: map[string][]common.Hash{ @@ -351,31 +251,31 @@ func TestTransactionFetcherWaitingWithMeta(t *testing.T) { }, }, doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - isScheduledWithMeta{ + isScheduled{ tracking: map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(222)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(555)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, }, "B": { - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(333)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, }, "C": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(111)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(444)}, - {common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)}, - {common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)}, + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, "D": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(999)}, - {common.Hash{0x02}, typeptr(types.BlobTxType), sizeptr(222)}, + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, }, "E": { - {common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(666)}, - {common.Hash{0x07}, typeptr(types.LegacyTxType), sizeptr(777)}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, }, }, fetching: map[string][]common.Hash{ @@ -403,17 +303,28 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{ + peer: "A", + hashes: []common.Hash{{0x01}, {0x02}}, + types: []byte{types.LegacyTxType, types.LegacyTxType}, + sizes: []uint32{111, 222}, + }, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -421,13 +332,18 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { }, // Announce overlaps from the same peer, ensure the new ones end up // in stage one, and clashing ones don't get double tracked - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}, {0x03}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x02}, {0x03}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -435,15 +351,25 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { }, // Announce overlaps from a new peer, ensure new transactions end up // in stage one and clashing ones get tracked for the new peer - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, - "B": {{0x03}, {0x04}}, + doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333, 444}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -467,17 +393,23 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -485,13 +417,19 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { }, // Announce a new set of transactions from the same peer and ensure // they do not start fetching since the peer is already busy - doTxNotify{peer: "A", hashes: []common.Hash{{0x03}, {0x04}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}, {0x04}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -500,8 +438,13 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x04}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -509,14 +452,25 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { }, // Announce a duplicate set of transactions from a new peer and ensure // uniquely new ones start downloading, even if clashing. - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}}, - isWaiting(map[string][]common.Hash{ - "B": {{0x05}, {0x06}}, + doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333, 555, 666}}, + isWaiting(map[string][]announce{ + "B": { + {common.Hash{0x05}, types.LegacyTxType, 555}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}, {0x03}, {0x04}}, - "B": {{0x02}, {0x03}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -547,17 +501,23 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -565,12 +525,17 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { }, // While the original peer is stuck in the request, push in an second // data source. - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, - "B": {{0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}, {0x02}}, @@ -584,8 +549,10 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { doWait{time: 0, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x02}}, + tracking: map[string][]announce{ + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "B": {{0x02}}, @@ -617,17 +584,21 @@ func TestTransactionFetcherCleanup(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -657,17 +628,21 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -696,17 +671,29 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + doTxNotify{peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type(), testTxs[2].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size()), uint32(testTxs[2].Size())}, + }, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + {testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + {testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, @@ -716,8 +703,10 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { // should be dropped and the one after re-requested. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, // This depends on the deterministic random isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[2]}, @@ -743,17 +732,27 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + doTxNotify{peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}, + }, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + }, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0], testTxsHashes[1]}, @@ -782,18 +781,23 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { }, steps: []interface{}{ // Set up three transactions to be in different stats, waiting, queued and fetching - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}, types: []byte{testTxs[2].Type()}, sizes: []uint32{uint32(testTxs[2].Size())}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -830,29 +834,40 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { ) }, steps: []interface{}{ - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + }, }), isScheduled{nil, nil, nil}, doWait{time: txArriveTimeout / 2, step: false}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + }, }), isScheduled{nil, nil, nil}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }), isScheduled{nil, nil, nil}, doWait{time: txArriveTimeout / 2, step: true}, - isWaiting(map[string][]common.Hash{ - "A": {{0x02}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}}, @@ -862,8 +877,11 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { doWait{time: txArriveTimeout / 2, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}}, @@ -889,17 +907,22 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }, steps: []interface{}{ // Push an initial announcement through to the scheduled stage - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + doTxNotify{ + peer: "A", + hashes: []common.Hash{testTxsHashes[0]}, + types: []byte{testTxs[0].Type()}, + sizes: []uint32{uint32(testTxs[0].Size())}, + }, + isWaiting(map[string][]announce{ + "A": {{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}}, }), isScheduled{tracking: nil, fetching: nil}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[string][]announce{ + "A": {{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}}, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -916,11 +939,16 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { }, }, // Ensure that followup announcements don't get scheduled - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{ + peer: "A", + hashes: []common.Hash{testTxsHashes[1]}, + types: []byte{testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[1].Size())}, + }, doWait{time: txArriveTimeout, step: true}, isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[1]}, + tracking: map[string][]announce{ + "A": {{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}}, }, fetching: nil, dangling: map[string][]common.Hash{ @@ -931,8 +959,8 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[1]}, + tracking: map[string][]announce{ + "A": {{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}}, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[1]}, @@ -954,16 +982,20 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { ) }, steps: []interface{}{ - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, - "B": {{0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + }, + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}}, @@ -972,8 +1004,10 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { }, doWait{time: txFetchTimeout - txArriveTimeout, step: true}, isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x02}}, + tracking: map[string][]announce{ + "B": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "B": {{0x02}}, @@ -999,9 +1033,22 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { // number of them will be requested at a time. func TestTransactionFetcherRateLimiting(t *testing.T) { // Create a slew of transactions and announce them - var hashes []common.Hash + var ( + hashes []common.Hash + ts []byte + sizes []uint32 + announces []announce + ) for i := 0; i < maxTxAnnounces; i++ { - hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)}) + hash := common.Hash{byte(i / 256), byte(i % 256)} + hashes = append(hashes, hash) + ts = append(ts, types.LegacyTxType) + sizes = append(sizes, 111) + announces = append(announces, announce{ + hash: hash, + kind: types.LegacyTxType, + size: 111, + }) } testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { @@ -1015,12 +1062,12 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { steps: []interface{}{ // Announce all the transactions, wait a bit and ensure only a small // percentage gets requested - doTxNotify{peer: "A", hashes: hashes}, + doTxNotify{peer: "A", hashes: hashes, types: ts, sizes: sizes}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashes, + tracking: map[string][]announce{ + "A": announces, }, fetching: map[string][]common.Hash{ "A": hashes[1643 : 1643+maxTxRetrievals], @@ -1065,21 +1112,21 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) { }, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), - isScheduledWithMeta{ + isScheduled{ tracking: map[string][]announce{ "A": { - {common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)}, - {common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)}, - {common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)}, - {common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)}, + {common.Hash{0x01}, types.LegacyTxType, 48 * 1024}, + {common.Hash{0x02}, types.LegacyTxType, 48 * 1024}, + {common.Hash{0x03}, types.LegacyTxType, 48 * 1024}, + {common.Hash{0x04}, types.LegacyTxType, 48 * 1024}, }, "B": { - {common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)}, - {common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)}, + {common.Hash{0x05}, types.LegacyTxType, maxTxRetrievalSize}, + {common.Hash{0x06}, types.LegacyTxType, maxTxRetrievalSize}, }, "C": { - {common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)}, - {common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)}, + {common.Hash{0x07}, types.BlobTxType, params.MaxBlobGasPerBlock}, + {common.Hash{0x08}, types.BlobTxType, params.MaxBlobGasPerBlock}, }, }, fetching: map[string][]common.Hash{ @@ -1096,13 +1143,41 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) { // request at the same time is hard capped. func TestTransactionFetcherDoSProtection(t *testing.T) { // Create a slew of transactions and to announce them - var hashesA []common.Hash + var ( + hashesA []common.Hash + typesA []byte + sizesA []uint32 + announceA []announce + ) for i := 0; i < maxTxAnnounces+1; i++ { - hashesA = append(hashesA, common.Hash{0x01, byte(i / 256), byte(i % 256)}) + hash := common.Hash{0x01, byte(i / 256), byte(i % 256)} + hashesA = append(hashesA, hash) + typesA = append(typesA, types.LegacyTxType) + sizesA = append(sizesA, 111) + + announceA = append(announceA, announce{ + hash: hash, + kind: types.LegacyTxType, + size: 111, + }) } - var hashesB []common.Hash + var ( + hashesB []common.Hash + typesB []byte + sizesB []uint32 + announceB []announce + ) for i := 0; i < maxTxAnnounces+1; i++ { - hashesB = append(hashesB, common.Hash{0x02, byte(i / 256), byte(i % 256)}) + hash := common.Hash{0x02, byte(i / 256), byte(i % 256)} + hashesB = append(hashesB, hash) + typesB = append(typesB, types.LegacyTxType) + sizesB = append(sizesB, 111) + + announceB = append(announceB, announce{ + hash: hash, + kind: types.LegacyTxType, + size: 111, + }) } testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { @@ -1115,23 +1190,23 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { }, steps: []interface{}{ // Announce half of the transaction and wait for them to be scheduled - doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2]}, - doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1]}, + doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2], types: typesA[:maxTxAnnounces/2], sizes: sizesA[:maxTxAnnounces/2]}, + doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1], types: typesB[:maxTxAnnounces/2-1], sizes: sizesB[:maxTxAnnounces/2-1]}, doWait{time: txArriveTimeout, step: true}, // Announce the second half and keep them in the wait list - doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces]}, - doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]}, + doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces], types: typesA[maxTxAnnounces/2 : maxTxAnnounces], sizes: sizesA[maxTxAnnounces/2 : maxTxAnnounces]}, + doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], types: typesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], sizes: sizesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]}, // Ensure the hashes are split half and half - isWaiting(map[string][]common.Hash{ - "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces], - "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], + isWaiting(map[string][]announce{ + "A": announceA[maxTxAnnounces/2 : maxTxAnnounces], + "B": announceB[maxTxAnnounces/2-1 : maxTxAnnounces-1], }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashesA[:maxTxAnnounces/2], - "B": hashesB[:maxTxAnnounces/2-1], + tracking: map[string][]announce{ + "A": announceA[:maxTxAnnounces/2], + "B": announceB[:maxTxAnnounces/2-1], }, fetching: map[string][]common.Hash{ "A": hashesA[1643 : 1643+maxTxRetrievals], @@ -1139,17 +1214,17 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { }, }, // Ensure that adding even one more hash results in dropping the hash - doTxNotify{peer: "A", hashes: []common.Hash{hashesA[maxTxAnnounces]}}, - doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1]}, + doTxNotify{peer: "A", hashes: []common.Hash{hashesA[maxTxAnnounces]}, types: []byte{typesA[maxTxAnnounces]}, sizes: []uint32{sizesA[maxTxAnnounces]}}, + doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1], types: typesB[maxTxAnnounces-1 : maxTxAnnounces+1], sizes: sizesB[maxTxAnnounces-1 : maxTxAnnounces+1]}, - isWaiting(map[string][]common.Hash{ - "A": hashesA[maxTxAnnounces/2 : maxTxAnnounces], - "B": hashesB[maxTxAnnounces/2-1 : maxTxAnnounces], + isWaiting(map[string][]announce{ + "A": announceA[maxTxAnnounces/2 : maxTxAnnounces], + "B": announceB[maxTxAnnounces/2-1 : maxTxAnnounces], }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashesA[:maxTxAnnounces/2], - "B": hashesB[:maxTxAnnounces/2-1], + tracking: map[string][]announce{ + "A": announceA[:maxTxAnnounces/2], + "B": announceB[:maxTxAnnounces/2-1], }, fetching: map[string][]common.Hash{ "A": hashesA[1643 : 1643+maxTxRetrievals], @@ -1183,15 +1258,23 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { }, steps: []interface{}{ // Deliver a transaction through the fetcher, but reject as underpriced - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, + doTxNotify{peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}, + }, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}, direct: true}, isScheduled{nil, nil, nil}, // Try to announce the transaction again, ensure it's not scheduled back - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}}, // [2] is needed to force a step in the fetcher - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + doTxNotify{peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type(), testTxs[2].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size()), uint32(testTxs[2].Size())}, + }, // [2] is needed to force a step in the fetcher + isWaiting(map[string][]announce{ + "A": {{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}}, }), isScheduled{nil, nil, nil}, }, @@ -1211,21 +1294,38 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { for i := 0; i < maxTxUnderpricedSetSize+1; i++ { txs = append(txs, types.NewTransaction(rand.Uint64(), common.Address{byte(rand.Intn(256))}, new(big.Int), 0, new(big.Int), nil)) } - hashes := make([]common.Hash, len(txs)) - for i, tx := range txs { - hashes[i] = tx.Hash() + var ( + hashes []common.Hash + ts []byte + sizes []uint32 + annos []announce + ) + for _, tx := range txs { + hashes = append(hashes, tx.Hash()) + ts = append(ts, tx.Type()) + sizes = append(sizes, uint32(tx.Size())) + annos = append(annos, announce{ + hash: tx.Hash(), + kind: tx.Type(), + size: uint32(tx.Size()), + }) } // Generate a set of steps to announce and deliver the entire set of transactions var steps []interface{} for i := 0; i < maxTxUnderpricedSetSize/maxTxRetrievals; i++ { - steps = append(steps, doTxNotify{peer: "A", hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals]}) - steps = append(steps, isWaiting(map[string][]common.Hash{ - "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + steps = append(steps, doTxNotify{ + peer: "A", + hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + types: ts[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + sizes: sizes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + }) + steps = append(steps, isWaiting(map[string][]announce{ + "A": annos[i*maxTxRetrievals : (i+1)*maxTxRetrievals], })) steps = append(steps, doWait{time: txArriveTimeout, step: true}) steps = append(steps, isScheduled{ - tracking: map[string][]common.Hash{ - "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], + tracking: map[string][]announce{ + "A": annos[i*maxTxRetrievals : (i+1)*maxTxRetrievals], }, fetching: map[string][]common.Hash{ "A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals], @@ -1253,7 +1353,12 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { }, steps: append(steps, []interface{}{ // The preparation of the test has already been done in `steps`, add the last check - doTxNotify{peer: "A", hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}}, + doTxNotify{ + peer: "A", + hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]}, + types: []byte{ts[maxTxUnderpricedSetSize]}, + sizes: []uint32{sizes[maxTxUnderpricedSetSize]}, + }, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true}, isUnderpriced(maxTxUnderpricedSetSize), @@ -1283,18 +1388,23 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { isScheduled{nil, nil, nil}, // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}, types: []byte{testTxs[2].Type()}, sizes: []uint32{uint32(testTxs[2].Size())}}, - isWaiting(map[string][]common.Hash{ - "A": {testTxsHashes[2]}, + isWaiting(map[string][]announce{ + "A": { + {testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0], testTxsHashes[1]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -1330,18 +1440,23 @@ func TestTransactionFetcherDrop(t *testing.T) { }, steps: []interface{}{ // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}, types: []byte{types.LegacyTxType}, sizes: []uint32{333}}, - isWaiting(map[string][]common.Hash{ - "A": {{0x03}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + }, }), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}, {0x02}}, + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, }, fetching: map[string][]common.Hash{ "A": {{0x01}}, @@ -1353,12 +1468,14 @@ func TestTransactionFetcherDrop(t *testing.T) { isScheduled{nil, nil, nil}, // Push the node into a dangling (timeout) state - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {testTxsHashes[0]}, + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, }, fetching: map[string][]common.Hash{ "A": {testTxsHashes[0]}, @@ -1397,15 +1514,15 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { }, steps: []interface{}{ // Set up a few hashes into various stages - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}}, doWait{time: txArriveTimeout, step: true}, - doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}}, + doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}}, isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "A": {{0x01}}, - "B": {{0x01}}, + tracking: map[string][]announce{ + "A": {{common.Hash{0x01}, types.LegacyTxType, 111}}, + "B": {{common.Hash{0x01}, types.LegacyTxType, 111}}, }, fetching: map[string][]common.Hash{ "A": {{0x01}}, @@ -1415,8 +1532,8 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { doDrop("A"), isWaiting(nil), isScheduled{ - tracking: map[string][]common.Hash{ - "B": {{0x01}}, + tracking: map[string][]announce{ + "B": {{common.Hash{0x01}, types.LegacyTxType, 111}}, }, fetching: map[string][]common.Hash{ "B": {{0x01}}, @@ -1443,42 +1560,57 @@ func TestInvalidAnnounceMetadata(t *testing.T) { }, steps: []interface{}{ // Initial announcement to get something into the waitlist - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, types: []byte{testTxs[0].Type(), testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}}, - isWaitingWithMeta(map[string][]announce{ + doTxNotify{ + peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}, + }, + isWaiting(map[string][]announce{ "A": { - {testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))}, - {testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))}, + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, }, }), // Announce from new peers conflicting transactions - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{1024 + uint32(testTxs[0].Size())}}, - doTxNotify{peer: "C", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{1 + testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}}, - isWaitingWithMeta(map[string][]announce{ + doTxNotify{ + peer: "B", + hashes: []common.Hash{testTxsHashes[0]}, + types: []byte{testTxs[0].Type()}, + sizes: []uint32{1024 + uint32(testTxs[0].Size())}, + }, + doTxNotify{ + peer: "C", + hashes: []common.Hash{testTxsHashes[1]}, + types: []byte{1 + testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[1].Size())}, + }, + isWaiting(map[string][]announce{ "A": { - {testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))}, - {testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))}, + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, }, "B": { - {testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(1024 + uint32(testTxs[0].Size()))}, + {testTxsHashes[0], testTxs[0].Type(), 1024 + uint32(testTxs[0].Size())}, }, "C": { - {testTxsHashes[1], typeptr(1 + testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))}, + {testTxsHashes[1], 1 + testTxs[1].Type(), uint32(testTxs[1].Size())}, }, }), // Schedule all the transactions for retrieval doWait{time: txArriveTimeout, step: true}, - isWaitingWithMeta(nil), - isScheduledWithMeta{ + isWaiting(nil), + isScheduled{ tracking: map[string][]announce{ "A": { - {testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(uint32(testTxs[0].Size()))}, - {testTxsHashes[1], typeptr(testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))}, + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + {testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}, }, "B": { - {testTxsHashes[0], typeptr(testTxs[0].Type()), sizeptr(1024 + uint32(testTxs[0].Size()))}, + {testTxsHashes[0], testTxs[0].Type(), 1024 + uint32(testTxs[0].Size())}, }, "C": { - {testTxsHashes[1], typeptr(1 + testTxs[1].Type()), sizeptr(uint32(testTxs[1].Size()))}, + {testTxsHashes[1], 1 + testTxs[1].Type(), uint32(testTxs[1].Size())}, }, }, fetching: map[string][]common.Hash{ @@ -1511,12 +1643,12 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more and crash via a timeout - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txFetchTimeout, step: true}, }, }) @@ -1539,12 +1671,12 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doDrop("A"), doWait{time: txFetchTimeout, step: true}, @@ -1569,12 +1701,17 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}}, + doTxNotify{ + peer: "A", + hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]}, + types: []byte{testTxs[0].Type(), testTxs[1].Type()}, + sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())}, + }, doWait{time: txFetchTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}}, // Notify the dangling transaction once more, partially deliver, clash&crash with a timeout - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, @@ -1606,12 +1743,12 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { }, steps: []interface{}{ // Get a transaction into fetching mode and make it dangling with a broadcast - doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}}, // Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect - doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}}, + doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, doFunc(func() { proceed <- struct{}{} // Allow peer A to return the failure @@ -1652,30 +1789,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { // Crunch through all the test steps and execute them for i, step := range tt.steps { - // Auto-expand certain steps to ones with metadata - switch old := step.(type) { - case isWaiting: - new := make(isWaitingWithMeta) - for peer, hashes := range old { - for _, hash := range hashes { - new[peer] = append(new[peer], announce{hash, nil, nil}) - } - } - step = new - - case isScheduled: - new := isScheduledWithMeta{ - tracking: make(map[string][]announce), - fetching: old.fetching, - dangling: old.dangling, - } - for peer, hashes := range old.tracking { - for _, hash := range hashes { - new.tracking[peer] = append(new.tracking[peer], announce{hash, nil, nil}) - } - } - step = new - } // Process the original or expanded steps switch step := step.(type) { case doTxNotify: @@ -1710,7 +1823,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { case doFunc: step() - case isWaitingWithMeta: + case isWaiting: // We need to check that the waiting list (stage 1) internals // match with the expected set. Check the peer->hash mappings // first. @@ -1724,18 +1837,13 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { if meta, ok := waiting[ann.hash]; !ok { t.Errorf("step %d, peer %s: hash %x missing from waitslots", i, peer, ann.hash) } else { - if (meta == nil && (ann.kind != nil || ann.size != nil)) || - (meta != nil && (ann.kind == nil || ann.size == nil)) || - (meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) { - t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size) + if meta.kind != ann.kind || meta.size != ann.size { + t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) } } } for hash, meta := range waiting { - ann := announce{hash: hash} - if meta != nil { - ann.kind, ann.size = &meta.kind, &meta.size - } + ann := announce{hash: hash, kind: meta.kind, size: meta.size} if !containsAnnounce(announces, ann) { t.Errorf("step %d, peer %s: announce %v extra in waitslots", i, peer, ann) } @@ -1780,7 +1888,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } } - case isScheduledWithMeta: + case isScheduled: // Check that all scheduled announces are accounted for and no // extra ones are present. for peer, announces := range step.tracking { @@ -1793,18 +1901,13 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { if meta, ok := scheduled[ann.hash]; !ok { t.Errorf("step %d, peer %s: hash %x missing from announces", i, peer, ann.hash) } else { - if (meta == nil && (ann.kind != nil || ann.size != nil)) || - (meta != nil && (ann.kind == nil || ann.size == nil)) || - (meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) { - t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size) + if meta.kind != ann.kind || meta.size != ann.size { + t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) } } } for hash, meta := range scheduled { - ann := announce{hash: hash} - if meta != nil { - ann.kind, ann.size = &meta.kind, &meta.size - } + ann := announce{hash: hash, kind: meta.kind, size: meta.size} if !containsAnnounce(announces, ann) { t.Errorf("step %d, peer %s: announce %x extra in announces", i, peer, hash) } @@ -1954,18 +2057,10 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { func containsAnnounce(slice []announce, ann announce) bool { for _, have := range slice { if have.hash == ann.hash { - if have.kind == nil || ann.kind == nil { - if have.kind != ann.kind { - return false - } - } else if *have.kind != *ann.kind { + if have.kind != ann.kind { return false } - if have.size == nil || ann.size == nil { - if have.size != ann.size { - return false - } - } else if *have.size != *ann.size { + if have.size != ann.size { return false } return true