From 84565dc899ebad48862a1fcec039594833f83669 Mon Sep 17 00:00:00 2001 From: lightclient <14004106+lightclient@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:32:15 -0600 Subject: [PATCH] eth/catalyst: ensure period zero mode leaves no pending txs in pool (#30264) closes #29475, replaces #29657, #30104 Fixes two issues. First is a deadlock where the txpool attempts to reorg, but can't complete because there are no readers left for the new txs subscription. Second, resolves a problem with on demand mode where txs may be left pending when there are more pending txs than block space. Co-authored-by: Martin Holst Swende --- eth/catalyst/api.go | 21 ++----- eth/catalyst/simulated_beacon.go | 87 ++++++++++++++++----------- eth/catalyst/simulated_beacon_api.go | 72 +++++++++++++++++----- eth/catalyst/simulated_beacon_test.go | 68 ++++++++++++++++++++- 4 files changed, 178 insertions(+), 70 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 5a001e1ee8..48bfb021b9 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -184,7 +184,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, engine.InvalidParams.With(errors.New("forkChoiceUpdateV1 called post-shanghai")) } } - return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1, false) + return api.forkchoiceUpdated(update, payloadAttributes, engine.PayloadV1) } // ForkchoiceUpdatedV2 is equivalent to V1 with the addition of withdrawals in the payload @@ -207,7 +207,7 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV2(update engine.ForkchoiceStateV1, pa return engine.STATUS_INVALID, engine.UnsupportedFork.With(errors.New("forkchoiceUpdatedV2 must only be called with paris and shanghai payloads")) } } - return api.forkchoiceUpdated(update, params, engine.PayloadV2, false) + return api.forkchoiceUpdated(update, params, engine.PayloadV2) } // ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root @@ -228,10 +228,10 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV3(update engine.ForkchoiceStateV1, pa // hash, even if params are wrong. To do this we need to split up // forkchoiceUpdate into a function that only updates the head and then a // function that kicks off block construction. - return api.forkchoiceUpdated(update, params, engine.PayloadV3, false) + return api.forkchoiceUpdated(update, params, engine.PayloadV3) } -func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion, simulatorMode bool) (engine.ForkChoiceResponse, error) { +func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes, payloadVersion engine.PayloadVersion) (engine.ForkChoiceResponse, error) { api.forkchoiceLock.Lock() defer api.forkchoiceLock.Unlock() @@ -374,19 +374,6 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl if api.localBlocks.has(id) { return valid(&id), nil } - // If the beacon chain is ran by a simulator, then transaction insertion, - // block insertion and block production will happen without any timing - // delay between them. This will cause flaky simulator executions due to - // the transaction pool running its internal reset operation on a back- - // ground thread. To avoid the racey behavior - in simulator mode - the - // pool will be explicitly blocked on its reset before continuing to the - // block production below. - if simulatorMode { - if err := api.eth.TxPool().Sync(); err != nil { - log.Error("Failed to sync transaction pool", "err", err) - return valid(nil), engine.InvalidPayloadAttributes.With(err) - } - } payload, err := api.eth.Miner().BuildPayload(args) if err != nil { log.Error("Failed to build payload", "err", err) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 8bdf94b80e..86355a1533 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -20,6 +20,7 @@ import ( "crypto/rand" "crypto/sha256" "errors" + "fmt" "math/big" "sync" "time" @@ -30,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/params" @@ -41,36 +43,46 @@ const devEpochLength = 32 // withdrawalQueue implements a FIFO queue which holds withdrawals that are // pending inclusion. type withdrawalQueue struct { - pending chan *types.Withdrawal + pending types.Withdrawals + mu sync.Mutex + feed event.Feed + subs event.SubscriptionScope } +type newWithdrawalsEvent struct{ Withdrawals types.Withdrawals } + // add queues a withdrawal for future inclusion. func (w *withdrawalQueue) add(withdrawal *types.Withdrawal) error { - select { - case w.pending <- withdrawal: - break - default: - return errors.New("withdrawal queue full") - } + w.mu.Lock() + w.pending = append(w.pending, withdrawal) + w.mu.Unlock() + + w.feed.Send(newWithdrawalsEvent{types.Withdrawals{withdrawal}}) return nil } -// gatherPending returns a number of queued withdrawals up to a maximum count. -func (w *withdrawalQueue) gatherPending(maxCount int) []*types.Withdrawal { - withdrawals := []*types.Withdrawal{} - for { - select { - case withdrawal := <-w.pending: - withdrawals = append(withdrawals, withdrawal) - if len(withdrawals) == maxCount { - return withdrawals - } - default: - return withdrawals - } - } +// pop dequeues the specified number of withdrawals from the queue. +func (w *withdrawalQueue) pop(count int) types.Withdrawals { + w.mu.Lock() + defer w.mu.Unlock() + + count = min(count, len(w.pending)) + popped := w.pending[0:count] + w.pending = w.pending[count:] + + return popped } +// subscribe allows a listener to be updated when new withdrawals are added to +// the queue. +func (w *withdrawalQueue) subscribe(ch chan<- newWithdrawalsEvent) event.Subscription { + sub := w.feed.Subscribe(ch) + return w.subs.Track(sub) +} + +// SimulatedBeacon drives an Ethereum instance as if it were a real beacon +// client. It can run in period mode where it mines a new block every period +// (seconds) or on every transaction via Commit, Fork and AdjustTime. type SimulatedBeacon struct { shutdownCh chan struct{} eth *eth.Ethereum @@ -86,10 +98,6 @@ type SimulatedBeacon struct { } // NewSimulatedBeacon constructs a new simulated beacon chain. -// Period sets the period in which blocks should be produced. -// -// - If period is set to 0, a block is produced on every transaction. -// via Commit, Fork and AdjustTime. func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, error) { block := eth.BlockChain().CurrentBlock() current := engine.ForkchoiceStateV1{ @@ -112,7 +120,6 @@ func NewSimulatedBeacon(period uint64, eth *eth.Ethereum) (*SimulatedBeacon, err engineAPI: engineAPI, lastBlockTime: block.Time, curForkchoiceState: current, - withdrawals: withdrawalQueue{make(chan *types.Withdrawal, 20)}, }, nil } @@ -156,6 +163,16 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u c.setCurrentState(header.Hash(), *finalizedHash) } + // Because transaction insertion, block insertion, and block production will + // happen without any timing delay between them in simulator mode and the + // transaction pool will be running its internal reset operation on a + // background thread, flaky executions can happen. To avoid the racey + // behavior, the pool will be explicitly blocked on its reset before + // continuing to the block production below. + if err := c.eth.APIBackend.TxPool().Sync(); err != nil { + return fmt.Errorf("failed to sync txpool: %w", err) + } + var random [32]byte rand.Read(random[:]) fcResponse, err := c.engineAPI.forkchoiceUpdated(c.curForkchoiceState, &engine.PayloadAttributes{ @@ -164,13 +181,14 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u Withdrawals: withdrawals, Random: random, BeaconRoot: &common.Hash{}, - }, engine.PayloadV3, true) + }, engine.PayloadV3) if err != nil { return err } if fcResponse == engine.STATUS_SYNCING { return errors.New("chain rewind prevented invocation of payload creation") } + envelope, err := c.engineAPI.getPayload(*fcResponse.PayloadID, true) if err != nil { return err @@ -223,8 +241,7 @@ func (c *SimulatedBeacon) loop() { case <-c.shutdownCh: return case <-timer.C: - withdrawals := c.withdrawals.gatherPending(10) - if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { + if err := c.sealBlock(c.withdrawals.pop(10), uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } else { timer.Reset(time.Second * time.Duration(c.period)) @@ -260,7 +277,7 @@ func (c *SimulatedBeacon) setCurrentState(headHash, finalizedHash common.Hash) { // Commit seals a block on demand. func (c *SimulatedBeacon) Commit() common.Hash { - withdrawals := c.withdrawals.gatherPending(10) + withdrawals := c.withdrawals.pop(10) if err := c.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { log.Warn("Error performing sealing work", "err", err) } @@ -301,16 +318,14 @@ func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { if parent == nil { return errors.New("parent not found") } - withdrawals := c.withdrawals.gatherPending(10) + withdrawals := c.withdrawals.pop(10) return c.sealBlock(withdrawals, parent.Time+uint64(adjustment/time.Second)) } +// RegisterSimulatedBeaconAPIs registers the simulated beacon's API with the +// stack. func RegisterSimulatedBeaconAPIs(stack *node.Node, sim *SimulatedBeacon) { - api := &api{sim} - if sim.period == 0 { - // mine on demand if period is set to 0 - go api.loop() - } + api := newSimulatedBeaconAPI(sim) stack.RegisterAPIs([]rpc.API{ { Namespace: "dev", diff --git a/eth/catalyst/simulated_beacon_api.go b/eth/catalyst/simulated_beacon_api.go index 73d0a5921d..6687805315 100644 --- a/eth/catalyst/simulated_beacon_api.go +++ b/eth/catalyst/simulated_beacon_api.go @@ -18,44 +18,88 @@ package catalyst import ( "context" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" ) -type api struct { +// simulatedBeaconAPI provides a RPC API for SimulatedBeacon. +type simulatedBeaconAPI struct { sim *SimulatedBeacon } -func (a *api) loop() { +// newSimulatedBeaconAPI returns an instance of simulatedBeaconAPI with a +// buffered commit channel. If period is zero, it starts a goroutine to handle +// new tx events. +func newSimulatedBeaconAPI(sim *SimulatedBeacon) *simulatedBeaconAPI { + api := &simulatedBeaconAPI{sim: sim} + if sim.period == 0 { + // mine on demand if period is set to 0 + go api.loop() + } + return api +} + +// loop is the main loop for the API when it's running in period = 0 mode. It +// ensures that block production is triggered as soon as a new withdrawal or +// transaction is received. +func (a *simulatedBeaconAPI) loop() { var ( - newTxs = make(chan core.NewTxsEvent) - sub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) + newTxs = make(chan core.NewTxsEvent) + newWxs = make(chan newWithdrawalsEvent) + newTxsSub = a.sim.eth.TxPool().SubscribeTransactions(newTxs, true) + newWxsSub = a.sim.withdrawals.subscribe(newWxs) + doCommit = make(chan struct{}, 1) ) - defer sub.Unsubscribe() + defer newTxsSub.Unsubscribe() + defer newWxsSub.Unsubscribe() + + // A background thread which signals to the simulator when to commit + // based on messages over doCommit. + go func() { + for range doCommit { + a.sim.Commit() + a.sim.eth.TxPool().Sync() + + // It's worth noting that in case a tx ends up in the pool listed as + // "executable", but for whatever reason the miner does not include it in + // a block -- maybe the miner is enforcing a higher tip than the pool -- + // this code will spinloop. + for { + if executable, _ := a.sim.eth.TxPool().Stats(); executable == 0 { + break + } + a.sim.Commit() + } + } + }() for { select { case <-a.sim.shutdownCh: + close(doCommit) return - case w := <-a.sim.withdrawals.pending: - withdrawals := append(a.sim.withdrawals.gatherPending(9), w) - if err := a.sim.sealBlock(withdrawals, uint64(time.Now().Unix())); err != nil { - log.Warn("Error performing sealing work", "err", err) + case <-newWxs: + select { + case doCommit <- struct{}{}: + default: } case <-newTxs: - a.sim.Commit() + select { + case doCommit <- struct{}{}: + default: + } } } } -func (a *api) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error { +// AddWithdrawal adds a withdrawal to the pending queue. +func (a *simulatedBeaconAPI) AddWithdrawal(ctx context.Context, withdrawal *types.Withdrawal) error { return a.sim.withdrawals.add(withdrawal) } -func (a *api) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) { +// SetFeeRecipient sets the fee recipient for block building purposes. +func (a *simulatedBeaconAPI) SetFeeRecipient(ctx context.Context, feeRecipient common.Address) { a.sim.setFeeRecipient(feeRecipient) } diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index bb10938c35..711e8f1d60 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -35,7 +35,7 @@ import ( "github.com/ethereum/go-ethereum/params" ) -func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node.Node, *eth.Ethereum, *SimulatedBeacon) { +func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis, period uint64) (*node.Node, *eth.Ethereum, *SimulatedBeacon) { t.Helper() n, err := node.New(&node.Config{ @@ -55,7 +55,7 @@ func startSimulatedBeaconEthService(t *testing.T, genesis *core.Genesis) (*node. t.Fatal("can't create eth service:", err) } - simBeacon, err := NewSimulatedBeacon(1, ethservice) + simBeacon, err := NewSimulatedBeacon(period, ethservice) if err != nil { t.Fatal("can't create simulated beacon:", err) } @@ -87,7 +87,7 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { // short period (1 second) for testing purposes var gasLimit uint64 = 10_000_000 genesis := core.DeveloperGenesisBlock(gasLimit, &testAddr) - node, ethService, mock := startSimulatedBeaconEthService(t, genesis) + node, ethService, mock := startSimulatedBeaconEthService(t, genesis, 1) _ = mock defer node.Close() @@ -140,3 +140,65 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { } } } + +// Tests that zero-period dev mode can handle a lot of simultaneous +// transactions/withdrawals +func TestOnDemandSpam(t *testing.T) { + var ( + withdrawals []types.Withdrawal + txs = make(map[common.Hash]*types.Transaction) + testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testAddr = crypto.PubkeyToAddress(testKey.PublicKey) + gasLimit uint64 = 10_000_000 + genesis = core.DeveloperGenesisBlock(gasLimit, &testAddr) + node, eth, mock = startSimulatedBeaconEthService(t, genesis, 0) + _ = newSimulatedBeaconAPI(mock) + signer = types.LatestSigner(eth.BlockChain().Config()) + chainHeadCh = make(chan core.ChainHeadEvent, 100) + sub = eth.BlockChain().SubscribeChainHeadEvent(chainHeadCh) + ) + defer node.Close() + defer sub.Unsubscribe() + + // generate some withdrawals + for i := 0; i < 20; i++ { + withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) + if err := mock.withdrawals.add(&withdrawals[i]); err != nil { + t.Fatal("addWithdrawal failed", err) + } + } + + // generate a bunch of transactions + for i := 0; i < 20000; i++ { + tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) + if err != nil { + t.Fatal("error signing transaction", err) + } + txs[tx.Hash()] = tx + if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil { + t.Fatal("error adding txs to pool", err) + } + } + + var ( + includedTxs = make(map[common.Hash]struct{}) + includedWxs []uint64 + ) + for { + select { + case evt := <-chainHeadCh: + for _, itx := range evt.Block.Transactions() { + includedTxs[itx.Hash()] = struct{}{} + } + for _, iwx := range evt.Block.Withdrawals() { + includedWxs = append(includedWxs, iwx.Index) + } + // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 + if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) { + return + } + case <-time.After(10 * time.Second): + t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals)) + } + } +}