go-ethereum/eth/fetcher/tx_fetcher_test.go

2187 lines
70 KiB
Go
Raw Permalink Normal View History

// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package fetcher
import (
"errors"
"math/big"
"math/rand"
"slices"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)
var (
2020-05-25 16:21:28 +08:00
// testTxs is a set of transactions to use during testing that have meaningful hashes.
testTxs = []*types.Transaction{
types.NewTransaction(5577006791947779410, common.Address{0x0f}, new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(15352856648520921629, common.Address{0xbb}, new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(3916589616287113937, common.Address{0x86}, new(big.Int), 0, new(big.Int), nil),
types.NewTransaction(9828766684487745566, common.Address{0xac}, new(big.Int), 0, new(big.Int), nil),
}
// testTxsHashes is the hashes of the test transactions above
testTxsHashes = []common.Hash{testTxs[0].Hash(), testTxs[1].Hash(), testTxs[2].Hash(), testTxs[3].Hash()}
)
type announce struct {
hash common.Hash
kind byte
size uint32
}
type doTxNotify struct {
peer string
hashes []common.Hash
types []byte
sizes []uint32
}
type doTxEnqueue struct {
peer string
txs []*types.Transaction
direct bool
}
type doWait struct {
time time.Duration
step bool
}
type doDrop string
type doFunc func()
type isWaiting map[string][]announce
type isScheduled struct {
tracking map[string][]announce
fetching map[string][]common.Hash
dangling map[string][]common.Hash
}
type isUnderpriced int
// txFetcherTest represents a test scenario that can be executed by the test
// runner.
type txFetcherTest struct {
init func() *TxFetcher
steps []interface{}
}
// Tests that transaction announcements with associated metadata are added to a
// waitlist, and none of them are scheduled for retrieval until the wait expires.
//
// This test is an extended version of TestTransactionFetcherWaiting. It's mostly
// to cover the metadata checks without bloating up the basic behavioral tests
// with all the useless extra fields.
func TestTransactionFetcherWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
// Announce from a new peer to check that no overwrite happens
doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
}),
// Announce clashing hashes but unique new peer
doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
}),
// Announce existing and clashing hashes from existing peer. Clashes
// should not overwrite previous announcements.
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
}),
// Announce clashing hashes with conflicting metadata. Somebody will
// be in the wrong, but we don't know yet who.
doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
},
}),
isScheduled{tracking: nil, fetching: nil},
// Wait for the arrival timeout which should move all expired items
// from the wait list to the scheduler
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
// Queue up a non-fetchable transaction and then trigger it with a new
// peer (weird case to test 1 line in the fetcher)
doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
isWaiting(map[string][]announce{
"C": {
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
}),
doWait{time: txArriveTimeout, step: true},
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
},
},
doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}},
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x05}, types.LegacyTxType, 555},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"C": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x04}, types.LegacyTxType, 444},
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
"D": {
{common.Hash{0x01}, types.LegacyTxType, 999},
{common.Hash{0x02}, types.BlobTxType, 222},
},
"E": {
{common.Hash{0x06}, types.LegacyTxType, 666},
{common.Hash{0x07}, types.LegacyTxType, 777},
},
},
fetching: map[string][]common.Hash{
"A": {{0x03}, {0x05}},
"C": {{0x01}, {0x04}},
"D": {{0x02}},
"E": {{0x06}, {0x07}},
},
},
},
})
}
// Tests that transaction announcements skip the waiting list if they are
// already scheduled.
func TestTransactionFetcherSkipWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{
peer: "A",
hashes: []common.Hash{{0x01}, {0x02}},
types: []byte{types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{111, 222},
},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// Announce overlaps from the same peer, ensure the new ones end up
// in stage one, and clashing ones don't get double tracked
doTxNotify{peer: "A", hashes: []common.Hash{{0x02}, {0x03}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x03}, types.LegacyTxType, 333},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// Announce overlaps from a new peer, ensure new transactions end up
// in stage one and clashing ones get tracked for the new peer
doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333, 444}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x03}, types.LegacyTxType, 333},
},
"B": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
},
})
}
// Tests that only a single transaction request gets scheduled to a peer
// and subsequent announces block or get allotted to someone else.
func TestTransactionFetcherSingletonRequesting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// Announce a new set of transactions from the same peer and ensure
// they do not start fetching since the peer is already busy
doTxNotify{peer: "A", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// Announce a duplicate set of transactions from a new peer and ensure
// uniquely new ones start downloading, even if clashing.
doTxNotify{peer: "B", hashes: []common.Hash{{0x02}, {0x03}, {0x05}, {0x06}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{222, 333, 555, 666}},
isWaiting(map[string][]announce{
"B": {
{common.Hash{0x05}, types.LegacyTxType, 555},
{common.Hash{0x06}, types.LegacyTxType, 666},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
{common.Hash{0x04}, types.LegacyTxType, 444},
},
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
{common.Hash{0x03}, types.LegacyTxType, 333},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
"B": {{0x03}},
},
},
},
})
}
// Tests that if a transaction retrieval fails, all the transactions get
// instantly schedule back to someone else or the announcements dropped
// if no alternate source is available.
func TestTransactionFetcherFailedRescheduling(t *testing.T) {
// Create a channel to control when tx requests can fail
proceed := make(chan struct{})
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(origin string, hashes []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// While the original peer is stuck in the request, push in an second
// data source.
doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}},
},
},
// Wait until the original request fails and check that transactions
// are either rescheduled or dropped
doFunc(func() {
proceed <- struct{}{} // Allow peer A to return the failure
}),
doWait{time: 0, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"B": {{0x02}},
},
},
doFunc(func() {
proceed <- struct{}{} // Allow peer B to return the failure
}),
doWait{time: 0, step: true},
isWaiting(nil),
isScheduled{nil, nil, nil},
},
})
}
// Tests that if a transaction retrieval succeeds, all alternate origins
// are cleaned up.
func TestTransactionFetcherCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Request should be delivered
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
isScheduled{nil, nil, nil},
},
})
}
// Tests that if a transaction retrieval succeeds, but the response is empty (no
// transactions available, then all are nuked instead of being rescheduled (yes,
// this was a bug)).
func TestTransactionFetcherCleanupEmpty(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Deliver an empty response and ensure the transaction is cleared, not rescheduled
doTxEnqueue{peer: "A", txs: []*types.Transaction{}, direct: true},
isScheduled{nil, nil, nil},
},
})
}
2020-05-25 16:21:28 +08:00
// Tests that non-returned transactions are either re-scheduled from a
// different peer, or self if they are after the cutoff point.
func TestTransactionFetcherMissingRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
types: []byte{testTxs[0].Type(), testTxs[1].Type(), testTxs[2].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size()), uint32(testTxs[2].Size())},
},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
},
},
// Deliver the middle transaction requested, the one before which
// should be dropped and the one after re-requested.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true},
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[2]},
},
},
},
})
}
// Tests that out of two transactions, if one is missing and the last is
// delivered, the peer gets properly cleaned out from the internal state.
func TestTransactionFetcherMissingCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]},
types: []byte{testTxs[0].Type(), testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())},
},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0], testTxsHashes[1]},
},
},
// Deliver the middle transaction requested, the one before which
// should be dropped and the one after re-requested.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true}, // This depends on the deterministic random
isScheduled{nil, nil, nil},
},
})
}
// Tests that transaction broadcasts properly clean up announcements.
func TestTransactionFetcherBroadcasts(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Set up three transactions to be in different stats, waiting, queued and fetching
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}, types: []byte{testTxs[2].Type()}, sizes: []uint32{uint32(testTxs[2].Size())}},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Broadcast all the transactions and ensure everything gets cleaned
// up, but the dangling request is left alone to avoid doing multiple
// concurrent requests.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: false},
isWaiting(nil),
isScheduled{
tracking: nil,
fetching: nil,
dangling: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Deliver the requested hashes
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2]}, direct: true},
isScheduled{nil, nil, nil},
},
})
}
// Tests that the waiting list timers properly reset and reschedule.
func TestTransactionFetcherWaitTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
},
}),
isScheduled{nil, nil, nil},
doWait{time: txArriveTimeout / 2, step: false},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
},
}),
isScheduled{nil, nil, nil},
doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{nil, nil, nil},
doWait{time: txArriveTimeout / 2, step: true},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
},
},
doWait{time: txArriveTimeout / 2, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
},
},
},
})
}
// Tests that if a transaction request is not replied to, it will time
// out and be re-scheduled for someone else.
func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{
peer: "A",
hashes: []common.Hash{testTxsHashes[0]},
types: []byte{testTxs[0].Type()},
sizes: []uint32{uint32(testTxs[0].Size())},
},
isWaiting(map[string][]announce{
"A": {{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}},
}),
isScheduled{tracking: nil, fetching: nil},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Wait until the delivery times out, everything should be cleaned up
doWait{time: txFetchTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: nil,
fetching: nil,
dangling: map[string][]common.Hash{
"A": {},
},
},
// Ensure that followup announcements don't get scheduled
doTxNotify{
peer: "A",
hashes: []common.Hash{testTxsHashes[1]},
types: []byte{testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[1].Size())},
},
doWait{time: txArriveTimeout, step: true},
isScheduled{
tracking: map[string][]announce{
"A": {{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}},
},
fetching: nil,
dangling: map[string][]common.Hash{
"A": {},
},
},
// If the dangling request arrives a bit later, do not choke
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())}},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[1]},
},
},
},
})
}
// Tests that the fetching timeout timers properly reset and reschedule.
func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "B", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
},
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
"B": {{0x02}},
},
},
doWait{time: txFetchTimeout - txArriveTimeout, step: true},
isScheduled{
tracking: map[string][]announce{
"B": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"B": {{0x02}},
},
dangling: map[string][]common.Hash{
"A": {},
},
},
doWait{time: txArriveTimeout, step: true},
isScheduled{
tracking: nil,
fetching: nil,
dangling: map[string][]common.Hash{
"A": {},
"B": {},
},
},
},
})
}
// Tests that if thousands of transactions are announced, only a small
// number of them will be requested at a time.
func TestTransactionFetcherRateLimiting(t *testing.T) {
// Create a slew of transactions and announce them
var (
hashes []common.Hash
ts []byte
sizes []uint32
announces []announce
)
for i := 0; i < maxTxAnnounces; i++ {
hash := common.Hash{byte(i / 256), byte(i % 256)}
hashes = append(hashes, hash)
ts = append(ts, types.LegacyTxType)
sizes = append(sizes, 111)
announces = append(announces, announce{
hash: hash,
kind: types.LegacyTxType,
size: 111,
})
}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce all the transactions, wait a bit and ensure only a small
// percentage gets requested
doTxNotify{peer: "A", hashes: hashes, types: ts, sizes: sizes},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": announces,
},
fetching: map[string][]common.Hash{
"A": hashes[:maxTxRetrievals],
},
},
},
})
}
// Tests that if huge transactions are announced, only a small number of them will
// be requested at a time, to keep the responses below a reasonable level.
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce mid size transactions from A to verify that multiple
// ones can be piled into a single request.
doTxNotify{peer: "A",
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
},
// Announce exactly on the limit transactions to see that only one
// gets requested
doTxNotify{peer: "B",
hashes: []common.Hash{{0x05}, {0x06}},
types: []byte{types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
},
// Announce oversized blob transactions to see that overflows are ok
doTxNotify{peer: "C",
hashes: []common.Hash{{0x07}, {0x08}},
types: []byte{types.BlobTxType, types.BlobTxType},
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 48 * 1024},
{common.Hash{0x02}, types.LegacyTxType, 48 * 1024},
{common.Hash{0x03}, types.LegacyTxType, 48 * 1024},
{common.Hash{0x04}, types.LegacyTxType, 48 * 1024},
},
"B": {
{common.Hash{0x05}, types.LegacyTxType, maxTxRetrievalSize},
{common.Hash{0x06}, types.LegacyTxType, maxTxRetrievalSize},
},
"C": {
{common.Hash{0x07}, types.BlobTxType, params.MaxBlobGasPerBlock},
{common.Hash{0x08}, types.BlobTxType, params.MaxBlobGasPerBlock},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}, {0x02}, {0x03}},
"B": {{0x05}},
"C": {{0x07}},
},
},
},
})
}
// Tests that then number of transactions a peer is allowed to announce and/or
// request at the same time is hard capped.
func TestTransactionFetcherDoSProtection(t *testing.T) {
// Create a slew of transactions and to announce them
var (
hashesA []common.Hash
typesA []byte
sizesA []uint32
announceA []announce
)
for i := 0; i < maxTxAnnounces+1; i++ {
hash := common.Hash{0x01, byte(i / 256), byte(i % 256)}
hashesA = append(hashesA, hash)
typesA = append(typesA, types.LegacyTxType)
sizesA = append(sizesA, 111)
announceA = append(announceA, announce{
hash: hash,
kind: types.LegacyTxType,
size: 111,
})
}
var (
hashesB []common.Hash
typesB []byte
sizesB []uint32
announceB []announce
)
for i := 0; i < maxTxAnnounces+1; i++ {
hash := common.Hash{0x02, byte(i / 256), byte(i % 256)}
hashesB = append(hashesB, hash)
typesB = append(typesB, types.LegacyTxType)
sizesB = append(sizesB, 111)
announceB = append(announceB, announce{
hash: hash,
kind: types.LegacyTxType,
size: 111,
})
}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce half of the transaction and wait for them to be scheduled
doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2], types: typesA[:maxTxAnnounces/2], sizes: sizesA[:maxTxAnnounces/2]},
doTxNotify{peer: "B", hashes: hashesB[:maxTxAnnounces/2-1], types: typesB[:maxTxAnnounces/2-1], sizes: sizesB[:maxTxAnnounces/2-1]},
doWait{time: txArriveTimeout, step: true},
// Announce the second half and keep them in the wait list
doTxNotify{peer: "A", hashes: hashesA[maxTxAnnounces/2 : maxTxAnnounces], types: typesA[maxTxAnnounces/2 : maxTxAnnounces], sizes: sizesA[maxTxAnnounces/2 : maxTxAnnounces]},
doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], types: typesB[maxTxAnnounces/2-1 : maxTxAnnounces-1], sizes: sizesB[maxTxAnnounces/2-1 : maxTxAnnounces-1]},
// Ensure the hashes are split half and half
isWaiting(map[string][]announce{
"A": announceA[maxTxAnnounces/2 : maxTxAnnounces],
"B": announceB[maxTxAnnounces/2-1 : maxTxAnnounces-1],
}),
isScheduled{
tracking: map[string][]announce{
"A": announceA[:maxTxAnnounces/2],
"B": announceB[:maxTxAnnounces/2-1],
},
fetching: map[string][]common.Hash{
"A": hashesA[:maxTxRetrievals],
"B": hashesB[:maxTxRetrievals],
},
},
// Ensure that adding even one more hash results in dropping the hash
doTxNotify{peer: "A", hashes: []common.Hash{hashesA[maxTxAnnounces]}, types: []byte{typesA[maxTxAnnounces]}, sizes: []uint32{sizesA[maxTxAnnounces]}},
doTxNotify{peer: "B", hashes: hashesB[maxTxAnnounces-1 : maxTxAnnounces+1], types: typesB[maxTxAnnounces-1 : maxTxAnnounces+1], sizes: sizesB[maxTxAnnounces-1 : maxTxAnnounces+1]},
isWaiting(map[string][]announce{
"A": announceA[maxTxAnnounces/2 : maxTxAnnounces],
"B": announceB[maxTxAnnounces/2-1 : maxTxAnnounces],
}),
isScheduled{
tracking: map[string][]announce{
"A": announceA[:maxTxAnnounces/2],
"B": announceB[:maxTxAnnounces/2-1],
},
fetching: map[string][]common.Hash{
"A": hashesA[:maxTxRetrievals],
"B": hashesB[:maxTxRetrievals],
},
},
},
})
}
// Tests that underpriced transactions don't get rescheduled after being rejected.
func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%2 == 0 {
errs[i] = txpool.ErrUnderpriced
} else {
errs[i] = txpool.ErrReplaceUnderpriced
}
}
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Deliver a transaction through the fetcher, but reject as underpriced
doTxNotify{peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]},
types: []byte{testTxs[0].Type(), testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())},
},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}, direct: true},
isScheduled{nil, nil, nil},
// Try to announce the transaction again, ensure it's not scheduled back
doTxNotify{peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1], testTxsHashes[2]},
types: []byte{testTxs[0].Type(), testTxs[1].Type(), testTxs[2].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size()), uint32(testTxs[2].Size())},
}, // [2] is needed to force a step in the fetcher
isWaiting(map[string][]announce{
"A": {{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())}},
}),
isScheduled{nil, nil, nil},
},
})
}
// Tests that underpriced transactions don't get rescheduled after being rejected,
// but at the same time there's a hard cap on the number of transactions that are
// tracked.
func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
// Temporarily disable fetch timeouts as they massively mess up the simulated clock
defer func(timeout time.Duration) { txFetchTimeout = timeout }(txFetchTimeout)
txFetchTimeout = 24 * time.Hour
// Create a slew of transactions to max out the underpriced set
var txs []*types.Transaction
for i := 0; i < maxTxUnderpricedSetSize+1; i++ {
txs = append(txs, types.NewTransaction(rand.Uint64(), common.Address{byte(rand.Intn(256))}, new(big.Int), 0, new(big.Int), nil))
}
var (
hashes []common.Hash
ts []byte
sizes []uint32
annos []announce
)
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
ts = append(ts, tx.Type())
sizes = append(sizes, uint32(tx.Size()))
annos = append(annos, announce{
hash: tx.Hash(),
kind: tx.Type(),
size: uint32(tx.Size()),
})
}
// Generate a set of steps to announce and deliver the entire set of transactions
var steps []interface{}
for i := 0; i < maxTxUnderpricedSetSize/maxTxRetrievals; i++ {
steps = append(steps, doTxNotify{
peer: "A",
hashes: hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
types: ts[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
sizes: sizes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
})
steps = append(steps, isWaiting(map[string][]announce{
"A": annos[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
}))
steps = append(steps, doWait{time: txArriveTimeout, step: true})
steps = append(steps, isScheduled{
tracking: map[string][]announce{
"A": annos[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
},
fetching: map[string][]common.Hash{
"A": hashes[i*maxTxRetrievals : (i+1)*maxTxRetrievals],
},
})
steps = append(steps, doTxEnqueue{peer: "A", txs: txs[i*maxTxRetrievals : (i+1)*maxTxRetrievals], direct: true})
steps = append(steps, isWaiting(nil))
steps = append(steps, isScheduled{nil, nil, nil})
steps = append(steps, isUnderpriced((i+1)*maxTxRetrievals))
}
testTransactionFetcher(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
}
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: append(steps, []interface{}{
// The preparation of the test has already been done in `steps`, add the last check
doTxNotify{
peer: "A",
hashes: []common.Hash{hashes[maxTxUnderpricedSetSize]},
types: []byte{ts[maxTxUnderpricedSetSize]},
sizes: []uint32{sizes[maxTxUnderpricedSetSize]},
},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{txs[maxTxUnderpricedSetSize]}, direct: true},
isUnderpriced(maxTxUnderpricedSetSize),
}...),
})
}
// Tests that unexpected deliveries don't corrupt the internal state.
func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Deliver something out of the blue
isWaiting(nil),
isScheduled{nil, nil, nil},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: false},
isWaiting(nil),
isScheduled{nil, nil, nil},
// Set up a few hashes into various stages
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[1]}, types: []byte{testTxs[1].Type()}, sizes: []uint32{uint32(testTxs[1].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[2]}, types: []byte{testTxs[2].Type()}, sizes: []uint32{uint32(testTxs[2].Size())}},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[2], testTxs[2].Type(), uint32(testTxs[2].Size())},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
// Deliver everything and more out of the blue
doTxEnqueue{peer: "B", txs: []*types.Transaction{testTxs[0], testTxs[1], testTxs[2], testTxs[3]}, direct: true},
isWaiting(nil),
isScheduled{
tracking: nil,
fetching: nil,
dangling: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
},
})
}
// Tests that dropping a peer cleans out all internal data structures in all the
// live or dangling stages.
func TestTransactionFetcherDrop(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Set up a few hashes into various stages
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{{0x02}}, types: []byte{types.LegacyTxType}, sizes: []uint32{222}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "A", hashes: []common.Hash{{0x03}}, types: []byte{types.LegacyTxType}, sizes: []uint32{333}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x03}, types.LegacyTxType, 333},
},
}),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
},
},
// Drop the peer and ensure everything's cleaned out
doDrop("A"),
isWaiting(nil),
isScheduled{nil, nil, nil},
// Push the node into a dangling (timeout) state
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
},
},
doWait{time: txFetchTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: nil,
fetching: nil,
dangling: map[string][]common.Hash{
"A": {},
},
},
// Drop the peer and ensure everything's cleaned out
doDrop("A"),
isWaiting(nil),
isScheduled{nil, nil, nil},
},
})
}
// Tests that dropping a peer instantly reschedules failed announcements to any
// available peer.
func TestTransactionFetcherDropRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Set up a few hashes into various stages
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
doWait{time: txArriveTimeout, step: true},
doTxNotify{peer: "B", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {{common.Hash{0x01}, types.LegacyTxType, 111}},
"B": {{common.Hash{0x01}, types.LegacyTxType, 111}},
},
fetching: map[string][]common.Hash{
"A": {{0x01}},
},
},
// Drop the peer and ensure everything's cleaned out
doDrop("A"),
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"B": {{common.Hash{0x01}, types.LegacyTxType, 111}},
},
fetching: map[string][]common.Hash{
"B": {{0x01}},
},
},
},
})
}
// Tests that announced transactions with the wrong transaction type or size will
// result in a dropped peer.
func TestInvalidAnnounceMetadata(t *testing.T) {
drop := make(chan string, 2)
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
func(peer string) { drop <- peer },
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{
peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]},
types: []byte{testTxs[0].Type(), testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())},
},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
}),
// Announce from new peers conflicting transactions
doTxNotify{
peer: "B",
hashes: []common.Hash{testTxsHashes[0]},
types: []byte{testTxs[0].Type()},
sizes: []uint32{1024 + uint32(testTxs[0].Size())},
},
doTxNotify{
peer: "C",
hashes: []common.Hash{testTxsHashes[1]},
types: []byte{1 + testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[1].Size())},
},
isWaiting(map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
"B": {
{testTxsHashes[0], testTxs[0].Type(), 1024 + uint32(testTxs[0].Size())},
},
"C": {
{testTxsHashes[1], 1 + testTxs[1].Type(), uint32(testTxs[1].Size())},
},
}),
// Schedule all the transactions for retrieval
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())},
{testTxsHashes[1], testTxs[1].Type(), uint32(testTxs[1].Size())},
},
"B": {
{testTxsHashes[0], testTxs[0].Type(), 1024 + uint32(testTxs[0].Size())},
},
"C": {
{testTxsHashes[1], 1 + testTxs[1].Type(), uint32(testTxs[1].Size())},
},
},
fetching: map[string][]common.Hash{
"A": {testTxsHashes[0]},
"C": {testTxsHashes[1]},
},
},
// Deliver the transactions and wait for B to be dropped
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}},
doFunc(func() { <-drop }),
doFunc(func() { <-drop }),
},
})
}
// This test reproduces a crash caught by the fuzzer. The root cause was a
// dangling transaction timing out and clashing on re-add with a concurrently
// announced one.
func TestTransactionFetcherFuzzCrash01(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
// Notify the dangling transaction once more and crash via a timeout
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txFetchTimeout, step: true},
},
})
}
// This test reproduces a crash caught by the fuzzer. The root cause was a
// dangling transaction getting peer-dropped and clashing on re-add with a
// concurrently announced one.
func TestTransactionFetcherFuzzCrash02(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
// Notify the dangling transaction once more, re-fetch, and crash via a drop and timeout
doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doDrop("A"),
doWait{time: txFetchTimeout, step: true},
},
})
}
// This test reproduces a crash caught by the fuzzer. The root cause was a
// dangling transaction getting rescheduled via a partial delivery, clashing
// with a concurrent notify.
func TestTransactionFetcherFuzzCrash03(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{
peer: "A",
hashes: []common.Hash{testTxsHashes[0], testTxsHashes[1]},
types: []byte{testTxs[0].Type(), testTxs[1].Type()},
sizes: []uint32{uint32(testTxs[0].Size()), uint32(testTxs[1].Size())},
},
doWait{time: txFetchTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0], testTxs[1]}},
// Notify the dangling transaction once more, partially deliver, clash&crash with a timeout
doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[1]}, direct: true},
doWait{time: txFetchTimeout, step: true},
},
})
}
// This test reproduces a crash caught by the fuzzer. The root cause was a
// dangling transaction getting rescheduled via a disconnect, clashing with
// a concurrent notify.
func TestTransactionFetcherFuzzCrash04(t *testing.T) {
// Create a channel to control when tx requests can fail
proceed := make(chan struct{})
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
core/types: support for optional blob sidecar in BlobTx (#27841) This PR removes the newly added txpool.Transaction wrapper type, and instead adds a way of keeping the blob sidecar within types.Transaction. It's better this way because most code in go-ethereum does not care about blob transactions, and probably never will. This will start mattering especially on the client side of RPC, where all APIs are based on types.Transaction. Users need to be able to use the same signing flows they already have. However, since blobs are only allowed in some places but not others, we will now need to add checks to avoid creating invalid blocks. I'm still trying to figure out the best place to do some of these. The way I have it currently is as follows: - In block validation (import), txs are verified not to have a blob sidecar. - In miner, we strip off the sidecar when committing the transaction into the block. - In TxPool validation, txs must have a sidecar to be added into the blobpool. - Note there is a special case here: when transactions are re-added because of a chain reorg, we cannot use the transactions gathered from the old chain blocks as-is, because they will be missing their blobs. This was previously handled by storing the blobs into the 'blobpool limbo'. The code has now changed to store the full transaction in the limbo instead, but it might be confusing for code readers why we're not simply adding the types.Transaction we already have. Code changes summary: - txpool.Transaction removed and all uses replaced by types.Transaction again - blobpool now stores types.Transaction instead of defining its own blobTx format for storage - the blobpool limbo now stores types.Transaction instead of storing only the blobs - checks to validate the presence/absence of the blob sidecar added in certain critical places
2023-08-14 10:13:34 +02:00
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
},
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}},
// Notify the dangling transaction once more, re-fetch, and crash via an in-flight disconnect
doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
doFunc(func() {
proceed <- struct{}{} // Allow peer A to return the failure
}),
doWait{time: 0, step: true},
doWait{time: txFetchTimeout, step: true},
},
})
}
// This test ensures the blob transactions will be scheduled for fetching
// once they are announced in the network.
func TestBlobTransactionAnnounce(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
// Announce a blob transaction
doTxNotify{peer: "B", hashes: []common.Hash{{0x03}}, types: []byte{types.BlobTxType}, sizes: []uint32{333}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
}),
doWait{time: 0, step: true}, // zero time, but the blob fetching should be scheduled
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
isScheduled{
tracking: map[string][]announce{
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"B": {{0x03}},
},
},
doWait{time: txArriveTimeout, step: true}, // zero time, but the blob fetching should be scheduled
isWaiting(nil),
isScheduled{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, types.LegacyTxType, 111},
{common.Hash{0x02}, types.LegacyTxType, 222},
},
"B": {
{common.Hash{0x03}, types.BlobTxType, 333},
},
},
fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer
"A": {{0x01}, {0x02}},
"B": {{0x03}},
},
},
},
})
}
func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
t.Parallel()
testTransactionFetcher(t, tt)
}
func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
// Create a fetcher and hook into it's simulated fields
clock := new(mclock.Simulated)
wait := make(chan struct{})
fetcher := tt.init()
fetcher.clock = clock
fetcher.step = wait
fetcher.rand = rand.New(rand.NewSource(0x3a29))
fetcher.Start()
defer fetcher.Stop()
defer func() { // drain the wait chan on exit
for {
select {
case <-wait:
default:
return
}
}
}()
// Crunch through all the test steps and execute them
for i, step := range tt.steps {
// Process the original or expanded steps
switch step := step.(type) {
case doTxNotify:
if err := fetcher.Notify(step.peer, step.types, step.sizes, step.hashes); err != nil {
t.Errorf("step %d: %v", i, err)
}
<-wait // Fetcher needs to process this, wait until it's done
select {
case <-wait:
panic("wtf")
case <-time.After(time.Millisecond):
}
case doTxEnqueue:
if err := fetcher.Enqueue(step.peer, step.txs, step.direct); err != nil {
t.Errorf("step %d: %v", i, err)
}
<-wait // Fetcher needs to process this, wait until it's done
case doWait:
clock.Run(step.time)
if step.step {
<-wait // Fetcher supposed to do something, wait until it's done
}
case doDrop:
if err := fetcher.Drop(string(step)); err != nil {
t.Errorf("step %d: %v", i, err)
}
<-wait // Fetcher needs to process this, wait until it's done
case doFunc:
step()
case isWaiting:
// We need to check that the waiting list (stage 1) internals
// match with the expected set. Check the peer->hash mappings
// first.
for peer, announces := range step {
waiting := fetcher.waitslots[peer]
if waiting == nil {
t.Errorf("step %d: peer %s missing from waitslots", i, peer)
continue
}
for _, ann := range announces {
if meta, ok := waiting[ann.hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from waitslots", i, peer, ann.hash)
} else {
if meta.kind != ann.kind || meta.size != ann.size {
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
}
}
}
for hash, meta := range waiting {
ann := announce{hash: hash, kind: meta.kind, size: meta.size}
if !containsAnnounce(announces, ann) {
t.Errorf("step %d, peer %s: announce %v extra in waitslots", i, peer, ann)
}
}
}
for peer := range fetcher.waitslots {
if _, ok := step[peer]; !ok {
t.Errorf("step %d: peer %s extra in waitslots", i, peer)
}
}
// Peer->hash sets correct, check the hash->peer and timeout sets
for peer, announces := range step {
for _, ann := range announces {
if _, ok := fetcher.waitlist[ann.hash][peer]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from waitlist", i, ann.hash, peer)
}
if _, ok := fetcher.waittime[ann.hash]; !ok {
t.Errorf("step %d: hash %x missing from waittime", i, ann.hash)
}
}
}
for hash, peers := range fetcher.waitlist {
if len(peers) == 0 {
t.Errorf("step %d, hash %x: empty peerset in waitlist", i, hash)
}
for peer := range peers {
if !containsHashInAnnounces(step[peer], hash) {
t.Errorf("step %d, hash %x: peer %s extra in waitlist", i, hash, peer)
}
}
}
for hash := range fetcher.waittime {
var found bool
for _, announces := range step {
if containsHashInAnnounces(announces, hash) {
found = true
break
}
}
if !found {
t.Errorf("step %d,: hash %x extra in waittime", i, hash)
}
}
case isScheduled:
// Check that all scheduled announces are accounted for and no
// extra ones are present.
for peer, announces := range step.tracking {
scheduled := fetcher.announces[peer]
if scheduled == nil {
t.Errorf("step %d: peer %s missing from announces", i, peer)
continue
}
for _, ann := range announces {
if meta, ok := scheduled[ann.hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from announces", i, peer, ann.hash)
} else {
if meta.kind != ann.kind || meta.size != ann.size {
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
}
}
}
for hash, meta := range scheduled {
ann := announce{hash: hash, kind: meta.kind, size: meta.size}
if !containsAnnounce(announces, ann) {
t.Errorf("step %d, peer %s: announce %x extra in announces", i, peer, hash)
}
}
}
for peer := range fetcher.announces {
if _, ok := step.tracking[peer]; !ok {
t.Errorf("step %d: peer %s extra in announces", i, peer)
}
}
// Check that all announces required to be fetching are in the
// appropriate sets
for peer, hashes := range step.fetching {
request := fetcher.requests[peer]
if request == nil {
t.Errorf("step %d: peer %s missing from requests", i, peer)
continue
}
for _, hash := range hashes {
if !slices.Contains(request.hashes, hash) {
t.Errorf("step %d, peer %s: hash %x missing from requests", i, peer, hash)
}
}
for _, hash := range request.hashes {
if !slices.Contains(hashes, hash) {
t.Errorf("step %d, peer %s: hash %x extra in requests", i, peer, hash)
}
}
}
for peer := range fetcher.requests {
if _, ok := step.fetching[peer]; !ok {
if _, ok := step.dangling[peer]; !ok {
t.Errorf("step %d: peer %s extra in requests", i, peer)
}
}
}
for peer, hashes := range step.fetching {
for _, hash := range hashes {
if _, ok := fetcher.fetching[hash]; !ok {
t.Errorf("step %d, peer %s: hash %x missing from fetching", i, peer, hash)
}
}
}
for hash := range fetcher.fetching {
var found bool
for _, req := range fetcher.requests {
if slices.Contains(req.hashes, hash) {
found = true
break
}
}
if !found {
t.Errorf("step %d: hash %x extra in fetching", i, hash)
}
}
for _, hashes := range step.fetching {
for _, hash := range hashes {
alternates := fetcher.alternates[hash]
if alternates == nil {
t.Errorf("step %d: hash %x missing from alternates", i, hash)
continue
}
for peer := range alternates {
if _, ok := fetcher.announces[peer]; !ok {
t.Errorf("step %d: peer %s extra in alternates", i, peer)
continue
}
if _, ok := fetcher.announces[peer][hash]; !ok {
t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer)
continue
}
}
for p := range fetcher.announced[hash] {
if _, ok := alternates[p]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p)
continue
}
}
}
}
for peer, hashes := range step.dangling {
request := fetcher.requests[peer]
if request == nil {
t.Errorf("step %d: peer %s missing from requests", i, peer)
continue
}
for _, hash := range hashes {
if !slices.Contains(request.hashes, hash) {
t.Errorf("step %d, peer %s: hash %x missing from requests", i, peer, hash)
}
}
for _, hash := range request.hashes {
if !slices.Contains(hashes, hash) {
t.Errorf("step %d, peer %s: hash %x extra in requests", i, peer, hash)
}
}
}
// Check that all transaction announces that are scheduled for
// retrieval but not actively being downloaded are tracked only
// in the stage 2 `announced` map.
var queued []common.Hash
for _, announces := range step.tracking {
for _, ann := range announces {
var found bool
for _, hs := range step.fetching {
if slices.Contains(hs, ann.hash) {
found = true
break
}
}
if !found {
queued = append(queued, ann.hash)
}
}
}
for _, hash := range queued {
if _, ok := fetcher.announced[hash]; !ok {
t.Errorf("step %d: hash %x missing from announced", i, hash)
}
}
for hash := range fetcher.announced {
if !slices.Contains(queued, hash) {
t.Errorf("step %d: hash %x extra in announced", i, hash)
}
}
case isUnderpriced:
if fetcher.underpriced.Len() != int(step) {
t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Len(), step)
}
default:
t.Fatalf("step %d: unknown step type %T", i, step)
}
// After every step, cross validate the internal uniqueness invariants
// between stage one and stage two.
for hash := range fetcher.waittime {
if _, ok := fetcher.announced[hash]; ok {
t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash)
}
}
}
}
// containsAnnounce returns whether an announcement is contained within a slice
// of announcements.
func containsAnnounce(slice []announce, ann announce) bool {
for _, have := range slice {
if have.hash == ann.hash {
if have.kind != ann.kind {
return false
}
if have.size != ann.size {
return false
}
return true
}
}
return false
}
// containsHashInAnnounces returns whether a hash is contained within a slice
// of announcements.
func containsHashInAnnounces(slice []announce, hash common.Hash) bool {
for _, have := range slice {
if have.hash == hash {
return true
}
}
return false
}
// Tests that a transaction is forgotten after the timeout.
func TestTransactionForgotten(t *testing.T) {
fetcher := NewTxFetcher(
func(common.Hash) bool { return false },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
}
return errs
},
func(string, []common.Hash) error { return nil },
func(string) {},
)
fetcher.Start()
defer fetcher.Stop()
// Create one TX which is 5 minutes old, and one which is recent
tx1 := types.NewTx(&types.LegacyTx{Nonce: 0})
tx1.SetTime(time.Now().Add(-maxTxUnderpricedTimeout - 1*time.Second))
tx2 := types.NewTx(&types.LegacyTx{Nonce: 1})
// Enqueue both in the fetcher. They will be immediately tagged as underpriced
if err := fetcher.Enqueue("asdf", []*types.Transaction{tx1, tx2}, false); err != nil {
t.Fatal(err)
}
// isKnownUnderpriced should trigger removal of the first tx (no longer be known underpriced)
if fetcher.isKnownUnderpriced(tx1.Hash()) {
t.Fatal("transaction should be forgotten by now")
}
// isKnownUnderpriced should not trigger removal of the second
if !fetcher.isKnownUnderpriced(tx2.Hash()) {
t.Fatal("transaction should be known underpriced")
}
}