From 6eb8f3225e23f392705a072b534627a9ff637fe1 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 21 Sep 2022 13:48:09 +0200 Subject: [PATCH] eth/catalyst: add locking around newpayload (#25816) Sometimes we get stuck on db compaction, and the CL re-issues the "same" command to us multiple times. Each request get stuck on the same place, in the middle of the handler. This changes makes it so we do not reprocess the same payload, but instead detects it early. --- eth/catalyst/api.go | 17 ++++++ eth/catalyst/api_test.go | 112 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 124 insertions(+), 5 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 754d8b214c..2756a02e2f 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -120,6 +120,7 @@ type ConsensusAPI struct { lastNewPayloadLock sync.Mutex forkchoiceLock sync.Mutex // Lock for the forkChoiceUpdated method + newPayloadLock sync.Mutex // Lock for the NewPayload method } // NewConsensusAPI creates a new consensus api for the given backend. @@ -342,6 +343,22 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.Execu // NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) { + // The locking here is, strictly, not required. Without these locks, this can happen: + // + // 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to + // api.eth.BlockChain().InsertBlockWithoutSetHead, where it is blocked on + // e.g database compaction. + // 2. The call times out on the CL layer, which issues another NewPayload (execdata-N) call. + // Similarly, this also get stuck on the same place. Importantly, since the + // first call has not gone through, the early checks for "do we already have this block" + // will all return false. + // 3. When the db compaction ends, then N calls inserting the same payload are processed + // sequentially. + // Hence, we use a lock here, to be sure that the previous call has finished before we + // check whether we already have the block locally. + api.newPayloadLock.Lock() + defer api.newPayloadLock.Unlock() + log.Trace("Engine API request received", "method", "ExecutePayload", "number", params.Number, "hash", params.BlockHash) block, err := beacon.ExecutableDataToBlock(params) if err != nil { diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index ae53462ff8..ec26f35e8d 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "math/big" + "sync" "testing" "time" @@ -277,10 +278,12 @@ func TestEth2NewBlock(t *testing.T) { t.Fatalf("Failed to convert executable data to block %v", err) } newResp, err := api.NewPayloadV1(*execData) - if err != nil || newResp.Status != "VALID" { + switch { + case err != nil: t.Fatalf("Failed to insert block: %v", err) - } - if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 { + case newResp.Status != "VALID": + t.Fatalf("Failed to insert block: %v", newResp.Status) + case ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1: t.Fatalf("Chain head shouldn't be updated") } checkLogEvents(t, newLogCh, rmLogsCh, 0, 0) @@ -292,8 +295,8 @@ func TestEth2NewBlock(t *testing.T) { if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { t.Fatalf("Failed to insert block: %v", err) } - if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() { - t.Fatalf("Chain head should be updated") + if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want { + t.Fatalf("Chain head should be updated, have %d want %d", have, want) } checkLogEvents(t, newLogCh, rmLogsCh, 1, 0) @@ -855,3 +858,102 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) { t.Fatalf("error sending invalid forkchoice, invalid status: %v", resp.PayloadStatus.Status) } } + +// TestSimultaneousNewBlock does several parallel inserts, both as +// newPayLoad and forkchoiceUpdate. This is to test that the api behaves +// well even of the caller is not being 'serial'. +func TestSimultaneousNewBlock(t *testing.T) { + genesis, preMergeBlocks := generatePreMergeChain(10) + n, ethservice := startEthService(t, genesis, preMergeBlocks) + defer n.Close() + + var ( + api = NewConsensusAPI(ethservice) + parent = preMergeBlocks[len(preMergeBlocks)-1] + ) + for i := 0; i < 10; i++ { + statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) + ethservice.TxPool().AddLocal(types.MustSignNewTx(testKey, types.LatestSigner(ethservice.BlockChain().Config()), + &types.DynamicFeeTx{ + Nonce: statedb.GetNonce(testAddr), + Value: big.NewInt(0), + GasFeeCap: big.NewInt(2 * params.InitialBaseFee), + GasTipCap: big.NewInt(2 * params.InitialBaseFee), + ChainID: genesis.Config.ChainID, + Gas: 1000000, + To: &common.Address{99}, + })) + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ + Timestamp: parent.Time() + 5, + }) + if err != nil { + t.Fatalf("Failed to create the executable data %v", err) + } + // Insert it 10 times in parallel. Should be ignored. + { + var ( + wg sync.WaitGroup + testErr error + errMu sync.Mutex + ) + wg.Add(10) + for ii := 0; ii < 10; ii++ { + go func() { + defer wg.Done() + if newResp, err := api.NewPayloadV1(*execData); err != nil { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %w", err) + errMu.Unlock() + } else if newResp.Status != "VALID" { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %v", newResp.Status) + errMu.Unlock() + } + }() + } + wg.Wait() + if testErr != nil { + t.Fatal(testErr) + } + } + block, err := beacon.ExecutableDataToBlock(*execData) + if err != nil { + t.Fatalf("Failed to convert executable data to block %v", err) + } + if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 { + t.Fatalf("Chain head shouldn't be updated") + } + fcState := beacon.ForkchoiceStateV1{ + HeadBlockHash: block.Hash(), + SafeBlockHash: block.Hash(), + FinalizedBlockHash: block.Hash(), + } + { + var ( + wg sync.WaitGroup + testErr error + errMu sync.Mutex + ) + wg.Add(10) + // Do each FCU 10 times + for ii := 0; ii < 10; ii++ { + go func() { + defer wg.Done() + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + errMu.Lock() + testErr = fmt.Errorf("Failed to insert block: %w", err) + errMu.Unlock() + } + }() + } + wg.Wait() + if testErr != nil { + t.Fatal(testErr) + } + } + if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want { + t.Fatalf("Chain head should be updated, have %d want %d", have, want) + } + parent = block + } +}