Compare commits

..

27 Commits

Author SHA1 Message Date
galaio
3a3eb1d34f metrics: add more metrics; 2024-06-12 20:26:16 +08:00
galaio
a702225a27 metrics: add more metrics; 2024-06-12 20:18:23 +08:00
galaio
78b0f2940a metrics: add more metrics; 2024-06-12 15:17:25 +08:00
galaio
31a16b3fb8 metrics: add more metrics; 2024-06-12 14:55:32 +08:00
galaio
adf9b91f8f log: opt some log format; 2024-06-12 14:23:02 +08:00
galaio
312f1208f8 dag: fix some unexpected writes; 2024-06-11 21:07:14 +08:00
galaio
6957903492 log: add some logs; 2024-06-11 18:06:54 +08:00
galaio
2da9923d4f log: add some logs; 2024-06-11 17:43:21 +08:00
galaio
c6368a513c log: add some logs; 2024-06-11 16:57:10 +08:00
galaio
a55650056a log: add some logs; 2024-06-11 16:32:42 +08:00
galaio
71520fb59e log: add some logs; 2024-06-11 16:04:45 +08:00
galaio
7cb7800397 log: add some logs; 2024-06-11 15:55:46 +08:00
galaio
d97a34b0cb log: add some logs; 2024-06-11 14:14:14 +08:00
galaio
c20c0e680c log: add some logs; 2024-06-11 14:08:04 +08:00
galaio
b5194c043b log: add some logs; 2024-06-11 12:20:58 +08:00
galaio
6f8e9c570c log: add some logs; 2024-06-11 11:25:34 +08:00
galaio
68fa227b0c dag: opt read recoder logic; 2024-06-10 22:31:39 +08:00
galaio
2d8613a296 dag: opt read recoder logic; 2024-06-10 21:51:37 +08:00
galaio
533f592a05 dag: opt read recoder logic; 2024-06-10 21:44:44 +08:00
galaio
6d1d2ddea5 dag: opt evaluate function; 2024-06-10 20:53:14 +08:00
galaio
bb578de461 dag: opt evaluate function; 2024-06-09 00:01:14 +08:00
galaio
2d0ddc1f67 dag: support stats; 2024-06-08 14:33:24 +08:00
galaio
eff9d23bd2 mvstates: fix val equal check issue; 2024-06-07 22:46:30 +08:00
galaio
98fa1e6c44 mvstates: fix val equal check issue; 2024-06-07 22:39:44 +08:00
galaio
eafdc65814 statedb: fix some system tx issues; 2024-06-07 17:08:23 +08:00
galaio
1a787b6081 rwset: support collect rwset from statedb;
mvstates: support export DAG;
dag: support travel all execution paths;
2024-06-07 16:46:13 +08:00
galaio
1cb6989a30 dag: add basic data structures; 2024-06-06 16:25:12 +08:00
44 changed files with 1370 additions and 575 deletions

View File

@@ -1,42 +1,4 @@
# Changelog
## v1.4.10
### FEATURE
NA
### IMPROVEMENT
* [\#2512](https://github.com/bnb-chain/bsc/pull/2512) feat: add mev helper params and func
* [\#2508](https://github.com/bnb-chain/bsc/pull/2508) perf: speedup pbss trienode read
* [\#2509](https://github.com/bnb-chain/bsc/pull/2509) perf: optimize chain commit performance for multi-database
* [\#2451](https://github.com/bnb-chain/bsc/pull/2451) core/forkchoice: improve stability when inturn block not generate
### BUGFIX
* [\#2518](https://github.com/bnb-chain/bsc/pull/2518) fix: remove zero gasprice check for BSC
* [\#2519](https://github.com/bnb-chain/bsc/pull/2519) UT: random failure of TestSnapSyncWithBlobs
* [\#2515](https://github.com/bnb-chain/bsc/pull/2515) fix getBlobSidecars by ethclient
* [\#2525](https://github.com/bnb-chain/bsc/pull/2525) fix: ensure empty withdrawals after cancun before broadcast
## v1.4.9
### FEATURE
* [\#2463](https://github.com/bnb-chain/bsc/pull/2463) utils: add check_blobtx.js
* [\#2470](https://github.com/bnb-chain/bsc/pull/2470) jsutils: faucet successful requests within blocks
* [\#2467](https://github.com/bnb-chain/bsc/pull/2467) internal/ethapi: add optional parameter for blobSidecars
### IMPROVEMENT
* [\#2462](https://github.com/bnb-chain/bsc/pull/2462) cmd/utils: add a flag to change breathe block interval for testing
* [\#2497](https://github.com/bnb-chain/bsc/pull/2497) params/config: add Bohr hardfork
* [\#2479](https://github.com/bnb-chain/bsc/pull/2479) dev: ensure consistency in BPS bundle result
### BUGFIX
* [\#2461](https://github.com/bnb-chain/bsc/pull/2461) eth/handler: check lists in body before broadcast blocks
* [\#2455](https://github.com/bnb-chain/bsc/pull/2455) cmd: fix memory leak when big dataset
* [\#2466](https://github.com/bnb-chain/bsc/pull/2466) sync: fix some sync issues caused by prune-block.
* [\#2475](https://github.com/bnb-chain/bsc/pull/2475) fix: move mev op to MinerAPI & add command to console
* [\#2473](https://github.com/bnb-chain/bsc/pull/2473) fix: limit the gas price of the mev bid
* [\#2484](https://github.com/bnb-chain/bsc/pull/2484) fix: fix inspect database error
* [\#2481](https://github.com/bnb-chain/bsc/pull/2481) fix: keep 9W blocks in ancient db when prune block
* [\#2495](https://github.com/bnb-chain/bsc/pull/2495) fix: add an empty freeze db
* [\#2507](https://github.com/bnb-chain/bsc/pull/2507) fix: waiting for the last simulation before pick best bid
## v1.4.8
### FEATURE
* [\#2483](https://github.com/bnb-chain/bsc/pull/2483) core/vm: add secp256r1 into PrecompiledContractsHaber

View File

@@ -227,7 +227,7 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig,
return nil, nil, nil, err
}
vmConfig.Tracer = tracer
statedb.SetTxContext(tx.Hash(), txIndex)
statedb.SetTxContext(tx.Hash(), txIndex, 0)
var (
txContext = core.NewEVMTxContext(msg)

View File

@@ -406,7 +406,7 @@ func inspectTrie(ctx *cli.Context) error {
var err error
blockNumber, err = strconv.ParseUint(ctx.Args().Get(0), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse blocknum, Args[0]: %v, err: %v", ctx.Args().Get(0), err)
return fmt.Errorf("failed to Parse blocknum, Args[0]: %v, err: %v", ctx.Args().Get(0), err)
}
}
@@ -417,26 +417,26 @@ func inspectTrie(ctx *cli.Context) error {
var err error
jobnum, err = strconv.ParseUint(ctx.Args().Get(1), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse jobnum, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
return fmt.Errorf("failed to Parse jobnum, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
}
topN = 10
} else {
var err error
jobnum, err = strconv.ParseUint(ctx.Args().Get(1), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse jobnum, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
return fmt.Errorf("failed to Parse jobnum, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
}
topN, err = strconv.ParseUint(ctx.Args().Get(2), 10, 64)
if err != nil {
return fmt.Errorf("failed to parse topn, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
return fmt.Errorf("failed to Parse topn, Args[1]: %v, err: %v", ctx.Args().Get(1), err)
}
}
if blockNumber != math.MaxUint64 {
headerBlockHash = rawdb.ReadCanonicalHash(db, blockNumber)
if headerBlockHash == (common.Hash{}) {
return errors.New("ReadHeadBlockHash empty hash")
return errors.New("ReadHeadBlockHash empry hash")
}
blockHeader := rawdb.ReadHeader(db, headerBlockHash, blockNumber)
trieRootHash = blockHeader.Root
@@ -508,7 +508,7 @@ func ancientInspect(ctx *cli.Context) error {
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, true, false)
db := utils.MakeChainDatabase(ctx, stack, true, true)
defer db.Close()
return rawdb.AncientInspect(db)
}

View File

@@ -1824,7 +1824,7 @@ func (p *Parlia) applyTransaction(
// move to next
*receivedTxs = (*receivedTxs)[1:]
}
state.SetTxContext(expectedTx.Hash(), len(*txs))
state.SetTxContext(expectedTx.Hash(), len(*txs), 0)
gasUsed, err := applyMessage(msg, state, header, p.chainConfig, chainContext)
if err != nil {
return err

View File

@@ -301,7 +301,6 @@ type BlockChain struct {
diffLayerFreezerBlockLimit uint64
wg sync.WaitGroup
dbWg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
@@ -670,7 +669,7 @@ func (bc *BlockChain) cacheBlock(hash common.Hash, block *types.Block) {
// into node seamlessly.
func (bc *BlockChain) empty() bool {
genesis := bc.genesisBlock.Hash()
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db.BlockStore())} {
for _, hash := range []common.Hash{rawdb.ReadHeadBlockHash(bc.db.BlockStore()), rawdb.ReadHeadHeaderHash(bc.db.BlockStore()), rawdb.ReadHeadFastBlockHash(bc.db)} {
if hash != genesis {
return false
}
@@ -739,7 +738,7 @@ func (bc *BlockChain) loadLastState() error {
bc.currentSnapBlock.Store(headBlock.Header())
headFastBlockGauge.Update(int64(headBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db.BlockStore()); head != (common.Hash{}) {
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentSnapBlock.Store(block.Header())
headFastBlockGauge.Update(int64(block.NumberU64()))
@@ -1138,7 +1137,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
if repair {
if target, force := updateFn(bc.db.BlockStore(), bc.CurrentBlock()); force {
if target, force := updateFn(bc.db, bc.CurrentBlock()); force {
bc.hc.SetHead(target.Number.Uint64(), updateFn, delFn)
}
} else {
@@ -1299,33 +1298,19 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) writeHeadBlock(block *types.Block) {
bc.dbWg.Add(2)
defer bc.dbWg.Wait()
go func() {
defer bc.dbWg.Done()
// Add the block to the canonical chain number scheme and mark as the head
blockBatch := bc.db.BlockStore().NewBatch()
rawdb.WriteCanonicalHash(blockBatch, block.Hash(), block.NumberU64())
rawdb.WriteHeadHeaderHash(blockBatch, block.Hash())
rawdb.WriteHeadBlockHash(blockBatch, block.Hash())
rawdb.WriteHeadFastBlockHash(blockBatch, block.Hash())
// Flush the whole batch into the disk, exit the node if failed
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to update chain indexes and markers in block db", "err", err)
}
}()
go func() {
defer bc.dbWg.Done()
rawdb.WriteCanonicalHash(bc.db.BlockStore(), block.Hash(), block.NumberU64())
rawdb.WriteHeadHeaderHash(bc.db.BlockStore(), block.Hash())
rawdb.WriteHeadBlockHash(bc.db.BlockStore(), block.Hash())
batch := bc.db.NewBatch()
rawdb.WriteHeadFastBlockHash(batch, block.Hash())
rawdb.WriteTxLookupEntriesByBlock(batch, block)
// Flush the whole batch into the disk, exit the node if failed
if err := batch.Write(); err != nil {
log.Crit("Failed to update chain indexes in chain db", "err", err)
log.Crit("Failed to update chain indexes and markers", "err", err)
}
}()
// Update all in-memory chain markers in the last step
bc.hc.SetCurrentHeader(block.Header())
@@ -1546,7 +1531,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
} else if !reorg {
return false
}
rawdb.WriteHeadFastBlockHash(bc.db.BlockStore(), head.Hash())
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentSnapBlock.Store(head.Header())
headFastBlockGauge.Update(int64(head.NumberU64()))
return true
@@ -1789,6 +1774,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
rawdb.WritePreimages(bc.db, state.Preimages())
blockBatch := bc.db.BlockStore().NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
@@ -1797,11 +1783,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if bc.chainConfig.IsCancun(block.Number(), block.Time()) {
rawdb.WriteBlobSidecars(blockBatch, block.Hash(), block.NumberU64(), block.Sidecars())
}
if bc.db.StateStore() != nil {
rawdb.WritePreimages(bc.db.StateStore(), state.Preimages())
} else {
rawdb.WritePreimages(blockBatch, state.Preimages())
}
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
@@ -2325,7 +2307,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)
// Report the import stats before returning the various results
stats.processed++
stats.usedGas += usedGas

View File

@@ -119,7 +119,7 @@ func (b *BlockGen) addTx(bc *BlockChain, vmConfig vm.Config, tx *types.Transacti
if b.gasPool == nil {
b.SetCoinbase(common.Address{})
}
b.statedb.SetTxContext(tx.Hash(), len(b.txs))
b.statedb.SetTxContext(tx.Hash(), len(b.txs), 0)
receipt, err := ApplyTransaction(b.cm.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vmConfig, NewReceiptBloomGenerator())
if err != nil {
panic(err)

View File

@@ -114,9 +114,7 @@ func (f *ForkChoice) ReorgNeeded(current *types.Header, extern *types.Header) (b
if f.preserve != nil {
currentPreserve, externPreserve = f.preserve(current), f.preserve(extern)
}
reorg = !currentPreserve && (externPreserve ||
extern.Time < current.Time ||
extern.Time == current.Time && f.rand.Float64() < 0.5)
reorg = !currentPreserve && (externPreserve || f.rand.Float64() < 0.5)
}
return reorg, nil
}

View File

@@ -498,7 +498,7 @@ func (g *Genesis) Commit(db ethdb.Database, triedb *triedb.Database) (*types.Blo
rawdb.WriteReceipts(db.BlockStore(), block.Hash(), block.NumberU64(), nil)
rawdb.WriteCanonicalHash(db.BlockStore(), block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db.BlockStore(), block.Hash())
rawdb.WriteHeadFastBlockHash(db.BlockStore(), block.Hash())
rawdb.WriteHeadFastBlockHash(db, block.Hash())
rawdb.WriteHeadHeaderHash(db.BlockStore(), block.Hash())
rawdb.WriteChainConfig(db, block.Hash(), config)
return block, nil

View File

@@ -668,7 +668,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
// first then remove the relative data from the database.
//
// Update head first(head fast block, head full block) before deleting the data.
markerBatch := hc.chainDb.BlockStore().NewBatch()
markerBatch := hc.chainDb.NewBatch()
if updateFn != nil {
newHead, force := updateFn(markerBatch, parent)
if force && ((headTime > 0 && newHead.Time < headTime) || (headTime == 0 && newHead.Number.Uint64() < headBlock)) {
@@ -677,7 +677,7 @@ func (hc *HeaderChain) setHead(headBlock uint64, headTime uint64, updateFn Updat
}
}
// Update head header then.
rawdb.WriteHeadHeaderHash(markerBatch, parentHash)
rawdb.WriteHeadHeaderHash(hc.chainDb.BlockStore(), parentHash)
if err := markerBatch.Write(); err != nil {
log.Crit("Failed to update chain markers", "error", err)
}

View File

@@ -81,7 +81,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
batch.Reset()
WriteHeadHeaderHash(db.BlockStore(), hash)
WriteHeadFastBlockHash(db.BlockStore(), hash)
WriteHeadFastBlockHash(db, hash)
log.Info("Initialized database from freezer", "blocks", frozen, "elapsed", common.PrettyDuration(time.Since(start)))
}

View File

@@ -61,7 +61,7 @@ func (frdb *freezerdb) BlockStoreReader() ethdb.Reader {
}
func (frdb *freezerdb) BlockStoreWriter() ethdb.Writer {
// TODO implement me
//TODO implement me
panic("implement me")
}
@@ -193,7 +193,7 @@ func (db *nofreezedb) Ancients() (uint64, error) {
return 0, errNotSupported
}
// ItemAmountInAncient returns an error as we don't have a backing chain freezer.
// Ancients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ItemAmountInAncient() (uint64, error) {
return 0, errNotSupported
}
@@ -331,110 +331,6 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
return &nofreezedb{KeyValueStore: db}
}
type emptyfreezedb struct {
ethdb.KeyValueStore
}
// HasAncient returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) HasAncient(kind string, number uint64) (bool, error) {
return false, nil
}
// Ancient returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) Ancient(kind string, number uint64) ([]byte, error) {
return nil, nil
}
// AncientRange returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
return nil, nil
}
// Ancients returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) Ancients() (uint64, error) {
return 0, nil
}
// ItemAmountInAncient returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) ItemAmountInAncient() (uint64, error) {
return 0, nil
}
// Tail returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) Tail() (uint64, error) {
return 0, nil
}
// AncientSize returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) AncientSize(kind string) (uint64, error) {
return 0, nil
}
// ModifyAncients returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, nil
}
// TruncateHead returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) TruncateHead(items uint64) (uint64, error) {
return 0, nil
}
// TruncateTail returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, nil
}
// TruncateTableTail returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) TruncateTableTail(kind string, tail uint64) (uint64, error) {
return 0, nil
}
// ResetTable returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
return nil
}
// Sync returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) Sync() error {
return nil
}
func (db *emptyfreezedb) DiffStore() ethdb.KeyValueStore { return db }
func (db *emptyfreezedb) SetDiffStore(diff ethdb.KeyValueStore) {}
func (db *emptyfreezedb) StateStore() ethdb.Database { return db }
func (db *emptyfreezedb) SetStateStore(state ethdb.Database) {}
func (db *emptyfreezedb) StateStoreReader() ethdb.Reader { return db }
func (db *emptyfreezedb) BlockStore() ethdb.Database { return db }
func (db *emptyfreezedb) SetBlockStore(block ethdb.Database) {}
func (db *emptyfreezedb) HasSeparateBlockStore() bool { return false }
func (db *emptyfreezedb) BlockStoreReader() ethdb.Reader { return db }
func (db *emptyfreezedb) BlockStoreWriter() ethdb.Writer { return db }
func (db *emptyfreezedb) ReadAncients(fn func(reader ethdb.AncientReaderOp) error) (err error) {
return nil
}
func (db *emptyfreezedb) AncientOffSet() uint64 { return 0 }
// MigrateTable returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) MigrateTable(kind string, convert convertLegacyFn) error {
return nil
}
// AncientDatadir returns nil for pruned db that we don't have a backing chain freezer.
func (db *emptyfreezedb) AncientDatadir() (string, error) {
return "", nil
}
func (db *emptyfreezedb) SetupFreezerEnv(env *ethdb.FreezerEnv) error {
return nil
}
// NewEmptyFreezeDB is used for CLI such as `geth db inspect` in pruned db that we don't
// have a backing chain freezer.
// WARNING: it must be only used in the above case.
func NewEmptyFreezeDB(db ethdb.KeyValueStore) ethdb.Database {
return &emptyfreezedb{KeyValueStore: db}
}
// NewFreezerDb only create a freezer without statedb.
func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*Freezer, error) {
// Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB.
@@ -484,12 +380,6 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
offset = ReadOffSetOfCurrentAncientFreezer(db)
}
// This case is used for someone who wants to execute geth db inspect CLI in a pruned db
if !disableFreeze && readonly && ReadAncientType(db) == PruneFreezerType {
log.Warn("Disk db is pruned, using an empty freezer db for CLI")
return NewEmptyFreezeDB(db), nil
}
if pruneAncientData && !disableFreeze && !readonly {
frdb, err := newPrunedFreezer(resolveChainFreezerDir(ancient), db, offset)
if err != nil {
@@ -744,7 +634,7 @@ func Open(o OpenOptions) (ethdb.Database, error) {
}
if ReadAncientType(kvdb) == PruneFreezerType {
if !o.PruneAncientData {
log.Warn("NOTICE: You're opening a pruned disk db!")
log.Warn("Disk db is pruned")
}
}
if len(o.AncientsDirectory) == 0 {
@@ -873,7 +763,7 @@ func DataTypeByKey(key []byte) DataType {
return StateDataType
}
}
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} {
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} {
if bytes.Equal(key, meta) {
return BlockDataType
}
@@ -1088,7 +978,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
hashNumPairings.Add(size)
default:
var accounted bool
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey, headBlockKey, headFastBlockKey} {
for _, meta := range [][]byte{headHeaderKey, headFinalizedBlockKey} {
if bytes.Equal(key, meta) {
metadata.Add(size)
accounted = true
@@ -1282,7 +1172,7 @@ func ReadChainMetadataFromMultiDatabase(db ethdb.Database) [][]string {
data := [][]string{
{"databaseVersion", pp(ReadDatabaseVersion(db))},
{"headBlockHash", fmt.Sprintf("%v", ReadHeadBlockHash(db.BlockStore()))},
{"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db.BlockStore()))},
{"headFastBlockHash", fmt.Sprintf("%v", ReadHeadFastBlockHash(db))},
{"headHeaderHash", fmt.Sprintf("%v", ReadHeadHeaderHash(db.BlockStore()))},
{"lastPivotNumber", pp(ReadLastPivotNumber(db))},
{"len(snapshotSyncStatus)", fmt.Sprintf("%d bytes", len(ReadSnapshotSyncStatus(db)))},

View File

@@ -164,7 +164,7 @@ func (ch createObjectChange) dirtied() *common.Address {
func (ch resetObjectChange) revert(s *StateDB) {
s.setStateObject(ch.prev)
if !ch.prevdestruct {
delete(s.stateObjectsDestruct, ch.prev.address)
s.deleteStateObjectsDestruct(ch.prev.address)
}
if ch.prevAccount != nil {
s.accounts[ch.prev.addrHash] = ch.prevAccount

View File

@@ -19,6 +19,7 @@ package state
import (
"bytes"
"fmt"
"golang.org/x/exp/slices"
"io"
"sync"
"time"
@@ -68,6 +69,11 @@ type stateObject struct {
origin *types.StateAccount // Account original data without any change applied, nil means it was not existent
data types.StateAccount // Account data with all mutations applied in the scope of block
// dirty account state
dirtyBalance *uint256.Int
dirtyNonce *uint64
dirtyCodeHash []byte
// Write caches.
trie Trie // storage trie, which becomes non-nil on first access
code Code // contract bytecode, which gets set when code is loaded
@@ -95,7 +101,7 @@ type stateObject struct {
// empty returns whether the account is considered empty.
func (s *stateObject) empty() bool {
return s.data.Nonce == 0 && s.data.Balance.IsZero() && bytes.Equal(s.data.CodeHash, types.EmptyCodeHash.Bytes())
return s.Nonce() == 0 && s.Balance().IsZero() && bytes.Equal(s.CodeHash(), types.EmptyCodeHash.Bytes())
}
// newObject creates a state object.
@@ -113,7 +119,7 @@ func newObject(db *StateDB, address common.Address, acct *types.StateAccount) *s
storageMap = db.GetStorage(address)
}
return &stateObject{
s := &stateObject{
db: db,
address: address,
addrHash: crypto.Keccak256Hash(address[:]),
@@ -125,6 +131,15 @@ func newObject(db *StateDB, address common.Address, acct *types.StateAccount) *s
dirtyStorage: make(Storage),
created: created,
}
// dirty data when create a new account
if acct == nil {
s.dirtyBalance = acct.Balance.Clone()
s.dirtyNonce = new(uint64)
*s.dirtyNonce = acct.Nonce
s.dirtyCodeHash = acct.CodeHash
}
return s
}
// EncodeRLP implements rlp.Encoder.
@@ -219,7 +234,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
// 1) resurrect happened, and new slot values were set -- those should
// have been handles via pendingStorage above.
// 2) we don't have new values, and can deliver empty response back
if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed {
if _, destructed := s.db.queryStateObjectsDestruct(s.address); destructed {
return common.Hash{}
}
// If no live objects are available, attempt to use snapshots
@@ -294,6 +309,18 @@ func (s *stateObject) finalise(prefetch bool) {
slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure
}
}
if s.dirtyNonce != nil {
s.data.Nonce = *s.dirtyNonce
s.dirtyNonce = nil
}
if s.dirtyBalance != nil {
s.data.Balance = s.dirtyBalance
s.dirtyBalance = nil
}
if s.dirtyCodeHash != nil {
s.data.CodeHash = s.dirtyCodeHash
s.dirtyCodeHash = nil
}
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash {
s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch)
}
@@ -302,6 +329,26 @@ func (s *stateObject) finalise(prefetch bool) {
}
}
func (s *stateObject) finaliseRWSet() {
for key, value := range s.dirtyStorage {
// three are some unclean dirtyStorage from previous reverted txs, it will skip finalise
// so add a new rule, if val has no change, then skip it
if value == s.GetCommittedState(key) {
continue
}
s.db.RecordWrite(types.StorageStateKey(s.address, key), value)
}
if s.dirtyNonce != nil && *s.dirtyNonce != s.data.Nonce {
s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountNonce), *s.dirtyNonce)
}
if s.dirtyBalance != nil && !s.dirtyBalance.Eq(s.data.Balance) {
s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountBalance), s.dirtyBalance.Clone())
}
if s.dirtyCodeHash != nil && !slices.Equal(s.dirtyCodeHash, s.data.CodeHash) {
s.db.RecordWrite(types.AccountStateKey(s.address, types.AccountCodeHash), s.dirtyCodeHash)
}
}
// updateTrie is responsible for persisting cached storage changes into the
// object's storage trie. In case the storage trie is not yet loaded, this
// function will load the trie automatically. If any issues arise during the
@@ -501,13 +548,13 @@ func (s *stateObject) SubBalance(amount *uint256.Int) {
func (s *stateObject) SetBalance(amount *uint256.Int) {
s.db.journal.append(balanceChange{
account: &s.address,
prev: new(uint256.Int).Set(s.data.Balance),
prev: new(uint256.Int).Set(s.Balance()),
})
s.setBalance(amount)
}
func (s *stateObject) setBalance(amount *uint256.Int) {
s.data.Balance = amount
s.dirtyBalance = amount
}
func (s *stateObject) deepCopy(db *StateDB) *stateObject {
@@ -528,6 +575,17 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject {
obj.selfDestructed = s.selfDestructed
obj.dirtyCode = s.dirtyCode
obj.deleted = s.deleted
// dirty states
if s.dirtyNonce != nil {
obj.dirtyNonce = new(uint64)
*obj.dirtyNonce = *s.dirtyNonce
}
if s.dirtyBalance != nil {
obj.dirtyBalance = s.dirtyBalance.Clone()
}
obj.dirtyCodeHash = s.dirtyCodeHash
return obj
}
@@ -585,32 +643,44 @@ func (s *stateObject) SetCode(codeHash common.Hash, code []byte) {
func (s *stateObject) setCode(codeHash common.Hash, code []byte) {
s.code = code
s.data.CodeHash = codeHash[:]
s.dirtyCodeHash = codeHash[:]
s.dirtyCode = true
}
func (s *stateObject) SetNonce(nonce uint64) {
s.db.journal.append(nonceChange{
account: &s.address,
prev: s.data.Nonce,
prev: s.Nonce(),
})
s.setNonce(nonce)
}
func (s *stateObject) setNonce(nonce uint64) {
s.data.Nonce = nonce
s.dirtyNonce = &nonce
}
func (s *stateObject) CodeHash() []byte {
return s.data.CodeHash
if len(s.dirtyCodeHash) > 0 {
return s.dirtyCodeHash
}
ret := s.data.CodeHash
return ret
}
func (s *stateObject) Balance() *uint256.Int {
return s.data.Balance
if s.dirtyBalance != nil {
return s.dirtyBalance
}
ret := s.data.Balance
return ret
}
func (s *stateObject) Nonce() uint64 {
return s.data.Nonce
if s.dirtyNonce != nil {
return *s.dirtyNonce
}
ret := s.data.Nonce
return ret
}
func (s *stateObject) Root() common.Hash {

View File

@@ -99,6 +99,7 @@ type StateDB struct {
stateObjectsPending map[common.Address]struct{} // State objects finalized but not yet written to the trie
stateObjectsDirty map[common.Address]struct{} // State objects modified in the current execution
stateObjectsDestruct map[common.Address]*types.StateAccount // State objects destructed in the block along with its previous value
stateObjectsDestructDirty map[common.Address]*types.StateAccount
storagePool *StoragePool // sharedPool to store L1 originStorage of stateObjects
writeOnSharedStorage bool // Write to the shared origin storage of a stateObject while reading from the underlying storage layer.
@@ -117,9 +118,15 @@ type StateDB struct {
// The tx context and all occurred logs in the scope of transaction.
thash common.Hash
txIndex int
txIncarnation int
logs map[common.Hash][]*types.Log
logSize uint
// parallel EVM related
rwSet *types.RWSet
mvStates *types.MVStates
es *types.ExeStat
// Preimages occurred seen by VM in the scope of block.
preimages map[common.Hash][]byte
@@ -184,6 +191,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
stateObjectsPending: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDirty: make(map[common.Address]struct{}, defaultNumOfSlots),
stateObjectsDestruct: make(map[common.Address]*types.StateAccount, defaultNumOfSlots),
stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount, defaultNumOfSlots),
logs: make(map[common.Hash][]*types.Log),
preimages: make(map[common.Hash][]byte),
journal: newJournal(),
@@ -425,7 +433,10 @@ func (s *StateDB) Empty(addr common.Address) bool {
}
// GetBalance retrieves the balance from the given address or 0 if object not found
func (s *StateDB) GetBalance(addr common.Address) *uint256.Int {
func (s *StateDB) GetBalance(addr common.Address) (ret *uint256.Int) {
defer func() {
s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), ret)
}()
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.Balance()
@@ -434,7 +445,10 @@ func (s *StateDB) GetBalance(addr common.Address) *uint256.Int {
}
// GetNonce retrieves the nonce from the given address or 0 if object not found
func (s *StateDB) GetNonce(addr common.Address) uint64 {
func (s *StateDB) GetNonce(addr common.Address) (ret uint64) {
defer func() {
s.RecordRead(types.AccountStateKey(addr, types.AccountNonce), ret)
}()
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.Nonce()
@@ -482,7 +496,10 @@ func (s *StateDB) GetCodeSize(addr common.Address) int {
return 0
}
func (s *StateDB) GetCodeHash(addr common.Address) common.Hash {
func (s *StateDB) GetCodeHash(addr common.Address) (ret common.Hash) {
defer func() {
s.RecordRead(types.AccountStateKey(addr, types.AccountCodeHash), ret.Bytes())
}()
stateObject := s.getStateObject(addr)
if stateObject != nil {
return common.BytesToHash(stateObject.CodeHash())
@@ -491,7 +508,10 @@ func (s *StateDB) GetCodeHash(addr common.Address) common.Hash {
}
// GetState retrieves a value from the given account's storage trie.
func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash {
func (s *StateDB) GetState(addr common.Address, hash common.Hash) (ret common.Hash) {
defer func() {
s.RecordRead(types.StorageStateKey(addr, hash), ret)
}()
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.GetState(hash)
@@ -500,7 +520,10 @@ func (s *StateDB) GetState(addr common.Address, hash common.Hash) common.Hash {
}
// GetCommittedState retrieves a value from the given account's committed storage trie.
func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) common.Hash {
func (s *StateDB) GetCommittedState(addr common.Address, hash common.Hash) (ret common.Hash) {
defer func() {
s.RecordRead(types.StorageStateKey(addr, hash), ret)
}()
stateObject := s.getStateObject(addr)
if stateObject != nil {
return stateObject.GetCommittedState(hash)
@@ -529,6 +552,7 @@ func (s *StateDB) HasSelfDestructed(addr common.Address) bool {
func (s *StateDB) AddBalance(addr common.Address, amount *uint256.Int) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), stateObject.Balance())
stateObject.AddBalance(amount)
}
}
@@ -537,6 +561,7 @@ func (s *StateDB) AddBalance(addr common.Address, amount *uint256.Int) {
func (s *StateDB) SubBalance(addr common.Address, amount *uint256.Int) {
stateObject := s.getOrNewStateObject(addr)
if stateObject != nil {
s.RecordRead(types.AccountStateKey(addr, types.AccountBalance), stateObject.Balance())
stateObject.SubBalance(amount)
}
}
@@ -581,8 +606,8 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common
//
// TODO(rjl493456442) this function should only be supported by 'unwritable'
// state and all mutations made should all be discarded afterwards.
if _, ok := s.stateObjectsDestruct[addr]; !ok {
s.stateObjectsDestruct[addr] = nil
if _, ok := s.queryStateObjectsDestruct(addr); !ok {
s.tagStateObjectsDestruct(addr, nil)
}
stateObject := s.getOrNewStateObject(addr)
for k, v := range storage {
@@ -606,7 +631,7 @@ func (s *StateDB) SelfDestruct(addr common.Address) {
prevbalance: new(uint256.Int).Set(stateObject.Balance()),
})
stateObject.markSelfdestructed()
stateObject.data.Balance = new(uint256.Int)
stateObject.setBalance(new(uint256.Int))
}
func (s *StateDB) Selfdestruct6780(addr common.Address) {
@@ -712,6 +737,7 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject {
// flag set. This is needed by the state journal to revert to the correct s-
// destructed object instead of wiping all knowledge about the state object.
func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
s.RecordRead(types.AccountStateKey(addr, types.AccountSelf), struct{}{})
// Prefer live objects if any is available
if obj := s.stateObjects[addr]; obj != nil {
return obj
@@ -798,9 +824,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject)
// account and storage data should be cleared as well. Note, it must
// be done here, otherwise the destruction event of "original account"
// will be lost.
_, prevdestruct := s.stateObjectsDestruct[prev.address]
_, prevdestruct := s.queryStateObjectsDestruct(prev.address)
if !prevdestruct {
s.stateObjectsDestruct[prev.address] = prev.origin
s.tagStateObjectsDestruct(prev.address, prev.origin)
}
// There may be some cached account/storage data already since IntermediateRoot
// will be called for each transaction before byzantium fork which will always
@@ -841,7 +867,7 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject)
func (s *StateDB) CreateAccount(addr common.Address) {
newObj, prev := s.createObject(addr)
if prev != nil {
newObj.setBalance(prev.data.Balance)
newObj.setBalance(prev.Balance())
}
}
@@ -877,6 +903,7 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
stateObjectsPending: make(map[common.Address]struct{}, len(s.stateObjectsPending)),
stateObjectsDirty: make(map[common.Address]struct{}, len(s.journal.dirties)),
stateObjectsDestruct: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestruct)),
stateObjectsDestructDirty: make(map[common.Address]*types.StateAccount, len(s.stateObjectsDestructDirty)),
storagePool: s.storagePool,
// writeOnSharedStorage: s.writeOnSharedStorage,
refund: s.refund,
@@ -929,6 +956,9 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
for addr, value := range s.stateObjectsDestruct {
state.stateObjectsDestruct[addr] = value
}
for addr, value := range s.stateObjectsDestructDirty {
state.stateObjectsDestructDirty[addr] = value
}
// Deep copy the state changes made in the scope of block
// along with their original values.
state.accounts = copySet(s.accounts)
@@ -967,6 +997,12 @@ func (s *StateDB) copyInternal(doPrefetch bool) *StateDB {
// know that they need to explicitly terminate an active copy).
state.prefetcher = state.prefetcher.copy()
}
// parallel EVM related
if s.mvStates != nil {
state.mvStates = s.mvStates
}
return state
}
@@ -1015,6 +1051,11 @@ func (s *StateDB) WaitPipeVerification() error {
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties))
// finalise stateObjectsDestruct
for addr, acc := range s.stateObjectsDestructDirty {
s.stateObjectsDestruct[addr] = acc
}
s.stateObjectsDestructDirty = make(map[common.Address]*types.StateAccount)
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
if !exist {
@@ -1249,9 +1290,10 @@ func (s *StateDB) StateIntermediateRoot() common.Hash {
// SetTxContext sets the current transaction hash and index which are
// used when the EVM emits new state logs. It should be invoked before
// transaction execution.
func (s *StateDB) SetTxContext(thash common.Hash, ti int) {
func (s *StateDB) SetTxContext(thash common.Hash, txIndex int, incarnation int) {
s.thash = thash
s.txIndex = ti
s.txIndex = txIndex
s.txIncarnation = incarnation
s.accessList = nil // can't delete this line now, because StateDB.Prepare is not called before processsing a system transaction
}
@@ -1900,6 +1942,131 @@ func (s *StateDB) GetSnap() snapshot.Snapshot {
return s.snap
}
func (s *StateDB) BeforeTxTransition() {
log.Debug("BeforeTxTransition", "mvStates", s.mvStates == nil, "rwSet", s.rwSet == nil)
if s.mvStates == nil {
return
}
s.rwSet = types.NewRWSet(types.StateVersion{
TxIndex: s.txIndex,
TxIncarnation: s.txIncarnation,
})
}
func (s *StateDB) BeginTxStat(index int) {
if s.mvStates == nil {
return
}
s.es = types.NewExeStat(index).Begin()
}
func (s *StateDB) StopTxStat(usedGas uint64) {
if s.mvStates == nil {
return
}
// record stat first
if s.es != nil {
s.es.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet()))
}
}
func (s *StateDB) RecordRead(key types.RWKey, val interface{}) {
if s.mvStates == nil || s.rwSet == nil {
return
}
// TODO: read from MVStates, record with ver
s.rwSet.RecordRead(key, types.StateVersion{
TxIndex: -1,
}, val)
}
func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) {
if s.mvStates == nil || s.rwSet == nil {
return
}
s.rwSet.RecordWrite(key, val)
}
func (s *StateDB) ResetMVStates(txCount int) {
log.Debug("ResetMVStates", "mvStates", s.mvStates == nil, "rwSet", s.rwSet == nil)
s.mvStates = types.NewMVStates(txCount)
s.rwSet = nil
}
func (s *StateDB) FinaliseRWSet() error {
log.Debug("FinaliseRWSet", "mvStates", s.mvStates == nil, "rwSet", s.rwSet == nil)
if s.mvStates == nil || s.rwSet == nil {
return nil
}
// finalise stateObjectsDestruct
for addr, acc := range s.stateObjectsDestructDirty {
s.stateObjectsDestruct[addr] = acc
s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{})
}
for addr := range s.journal.dirties {
obj, exist := s.stateObjects[addr]
if !exist {
continue
}
if obj.selfDestructed || obj.empty() {
// We need to maintain account deletions explicitly (will remain
// set indefinitely). Note only the first occurred self-destruct
// event is tracked.
if _, ok := s.stateObjectsDestruct[obj.address]; !ok {
log.Debug("FinaliseRWSet find Destruct", "tx", s.txIndex, "addr", addr, "selfDestructed", obj.selfDestructed)
s.RecordWrite(types.AccountStateKey(addr, types.AccountSuicide), struct{}{})
}
} else {
// finalise account & storages
obj.finaliseRWSet()
}
}
ver := types.StateVersion{
TxIndex: s.txIndex,
TxIncarnation: s.txIncarnation,
}
if ver != s.rwSet.Version() {
return errors.New("you finalize a wrong ver of RWSet")
}
log.Debug("FinaliseRWSet", "rwset", s.rwSet)
return s.mvStates.FulfillRWSet(s.rwSet, s.es)
}
func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) {
if acc, ok := s.stateObjectsDestructDirty[addr]; ok {
return acc, ok
}
acc, ok := s.stateObjectsDestruct[addr]
return acc, ok
}
func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateAccount) {
s.stateObjectsDestructDirty[addr] = acc
}
func (s *StateDB) deleteStateObjectsDestruct(addr common.Address) {
delete(s.stateObjectsDestructDirty, addr)
}
func (s *StateDB) MVStates2TxDAG() (*types.TxDAG, []*types.ExeStat) {
if s.mvStates == nil {
return nil, nil
}
return s.mvStates.ResolveTxDAG(), s.mvStates.Stats()
}
func (s *StateDB) RecordSystemTxRWSet(index int) {
if s.mvStates == nil {
return
}
s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{
TxIndex: index,
TxIncarnation: 0,
}).WithSerialFlag(), types.NewExeStat(index).WithSerialFlag())
}
// copySet returns a deep-copied set.
func copySet[k comparable](set map[k][]byte) map[k][]byte {
copied := make(map[k][]byte, len(set))

View File

@@ -76,7 +76,7 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
if err != nil {
return // Also invalid block, bail out
}
newStatedb.SetTxContext(tx.Hash(), txIndex)
newStatedb.SetTxContext(tx.Hash(), txIndex, 0)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
case <-interruptCh:
@@ -125,7 +125,7 @@ func (p *statePrefetcher) PrefetchMining(txs TransactionsByPriceAndNonce, header
return // Also invalid block, bail out
}
idx++
newStatedb.SetTxContext(tx.Hash(), idx)
newStatedb.SetTxContext(tx.Hash(), idx, 0)
precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm)
gaspool = new(GasPool).AddGas(gasLimit)
case <-stopCh:

View File

@@ -19,8 +19,6 @@ package core
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
@@ -29,7 +27,11 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"math/big"
"time"
)
// StateProcessor is a basic Processor, which takes care of transitioning
@@ -51,6 +53,12 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
}
}
var (
dagExecutionTimer = metrics.NewRegisteredTimer("dag/executiontime", nil)
dagAccountReadTimer = metrics.NewRegisteredTimer("dag/accountreadtime", nil)
dagStorageReadTimer = metrics.NewRegisteredTimer("dag/storagereadtime", nil)
)
// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
@@ -59,6 +67,7 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) {
var (
usedGas = new(uint64)
header = block.Header()
@@ -99,16 +108,20 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// initialise bloom processors
bloomProcessors := NewAsyncReceiptBloomGenerator(txNum)
statedb.MarkFullProcessed()
statedb.ResetMVStates(len(block.Transactions()))
log.Debug("ResetMVStates", "block", block.NumberU64(), "txs", len(block.Transactions()))
// usually do have two tx, one for validator set contract, another for system reward contract.
systemTxs := make([]*types.Transaction, 0, 2)
start := time.Now()
for i, tx := range block.Transactions() {
statedb.BeginTxStat(i)
if isPoSA {
if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil {
bloomProcessors.Close()
return statedb, nil, nil, 0, err
} else if isSystemTx {
statedb.RecordSystemTxRWSet(i)
systemTxs = append(systemTxs, tx)
continue
}
@@ -125,7 +138,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
bloomProcessors.Close()
return statedb, nil, nil, 0, err
}
statedb.SetTxContext(tx.Hash(), i)
statedb.SetTxContext(tx.Hash(), i, 0)
receipt, err := applyTransaction(msg, p.config, gp, statedb, blockNumber, blockHash, tx, usedGas, vmenv, bloomProcessors)
if err != nil {
@@ -134,7 +147,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
commonTxs = append(commonTxs, tx)
receipts = append(receipts, receipt)
statedb.StopTxStat(receipt.GasUsed)
}
eTime := time.Since(start)
// this bloomProcessors may take ~20ms
bloomProcessors.Close()
// Fail if Shanghai not enabled and len(withdrawals) is non-zero.
@@ -143,7 +159,20 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
return nil, nil, nil, 0, errors.New("withdrawals before shanghai")
}
// TODO: temporary add time metrics
dag, exrStats := statedb.MVStates2TxDAG()
//log.Info("MVStates2TxDAG", "block", block.NumberU64(), "tx", len(block.Transactions()), "dag", dag)
fmt.Printf("MVStates2TxDAG, block: %v|%v, tx: %v, time: %v\n", block.NumberU64(), block.Hash(), len(block.Transactions()), time.Now().Format(time.DateTime))
fmt.Print(types.EvaluateTxDAGPerformance(dag, exrStats))
fmt.Printf("block: %v, execution: %.2fms, accountRead: %.2fms, storageRead: %.2fms\n",
block.NumberU64(), float64(eTime.Microseconds())/1000, float64((statedb.SnapshotAccountReads+statedb.AccountReads).Microseconds())/1000,
float64((statedb.SnapshotStorageReads+statedb.StorageReads).Microseconds())/1000)
dagExecutionTimer.Update(eTime)
dagAccountReadTimer.Update(statedb.SnapshotAccountReads + statedb.AccountReads)
dagStorageReadTimer.Update(statedb.SnapshotStorageReads + statedb.StorageReads)
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
// TODO: system txs must execute at last
err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), withdrawals, &receipts, &systemTxs, usedGas)
if err != nil {
return statedb, receipts, allLogs, *usedGas, err

View File

@@ -367,6 +367,8 @@ func (st *StateTransition) preCheck() error {
// However if any consensus issue encountered, return the error directly with
// nil evm execution result.
func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
// start record rw set in here
st.state.BeforeTxTransition()
// First check this message satisfies all consensus rules before
// applying the message. The rules include these clauses
//
@@ -446,6 +448,10 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
ret, st.gasRemaining, vmerr = st.evm.Call(sender, st.to(), msg.Data, st.gasRemaining, value)
}
// stop record rw set in here
if err := st.state.FinaliseRWSet(); err != nil {
return nil, err
}
var gasRefund uint64
if !rules.IsLondon {
// Before EIP-3529: refunds were capped to gasUsed / 2

View File

@@ -193,7 +193,5 @@ type MevParams struct {
ValidatorCommission uint64 // 100 means 1%
BidSimulationLeftOver time.Duration
GasCeil uint64
GasPrice *big.Int // Minimum avg gas price for bid block
BuilderFeeCeil *big.Int
Version string
}

View File

@@ -204,6 +204,8 @@ type Body struct {
Transactions []*Transaction
Uncles []*Header
Withdrawals []*Withdrawal `rlp:"optional"`
// TODO: add TxDAG in block body
//TxDAG *TxDAG `rlp:"optional"`
}
// Block represents an Ethereum block.

698
core/types/dag.go Normal file
View File

@@ -0,0 +1,698 @@
package types
import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
"golang.org/x/exp/slices"
"sort"
"strings"
"sync"
"time"
)
// TxDep store the current tx dependency relation with other txs
type TxDep struct {
// It describes the Relation with below txs
// 0: this tx depends on below txs
// 1: this transaction does not depend on below txs, all other previous txs depend on
Relation uint8
TxIndexes []int
}
func (d *TxDep) AppendDep(i int) {
d.TxIndexes = append(d.TxIndexes, i)
}
func (d *TxDep) Exist(i int) bool {
for _, index := range d.TxIndexes {
if index == i {
return true
}
}
return false
}
// TxDAG indicate how to use the dependency of txs
type TxDAG struct {
// The TxDAG type
// 0: delay the distribution of GasFee, it will ignore all gas fee distribution when tx execute
// 1: timely distribution of transaction fees, it will keep partial serial execution when tx cannot delay the distribution
Type uint8
// Tx Dependency List, the list index is equal to TxIndex
TxDeps []TxDep
// It indicates the scheduling priority of the transactions
SchedulePriority []int
}
func NewTxDAG(txLen int) *TxDAG {
return &TxDAG{
Type: 0,
TxDeps: make([]TxDep, txLen),
}
}
func (d *TxDAG) String() string {
builder := strings.Builder{}
exePaths := d.travelExecutionPaths()
for _, path := range exePaths {
builder.WriteString(fmt.Sprintf("%v\n", path))
}
return builder.String()
}
func (d *TxDAG) travelExecutionPaths() [][]int {
// regenerate TxDAG
nd := NewTxDAG(len(d.TxDeps))
for i, txDep := range d.TxDeps {
nd.TxDeps[i].Relation = 0
if txDep.Relation == 0 {
nd.TxDeps[i] = txDep
continue
}
// recover to relation 0
for j := 0; j < i; j++ {
if !txDep.Exist(j) {
nd.TxDeps[i].AppendDep(j)
}
}
}
exePaths := make([][]int, 0)
// travel tx deps with BFS
for i := 0; i < len(nd.TxDeps); i++ {
exePaths = append(exePaths, travelTargetPath(nd.TxDeps, i))
}
return exePaths
}
var (
longestTimeTimer = metrics.NewRegisteredTimer("dag/longesttime", nil)
longestGasTimer = metrics.NewRegisteredTimer("dag/longestgas", nil)
serialTimeTimer = metrics.NewRegisteredTimer("dag/serialtime", nil)
totalTxMeter = metrics.NewRegisteredMeter("dag/txcnt", nil)
totalNoDepMeter = metrics.NewRegisteredMeter("dag/nodepcntcnt", nil)
total2DepMeter = metrics.NewRegisteredMeter("dag/2depcntcnt", nil)
total4DepMeter = metrics.NewRegisteredMeter("dag/4depcntcnt", nil)
total8DepMeter = metrics.NewRegisteredMeter("dag/8depcntcnt", nil)
total16DepMeter = metrics.NewRegisteredMeter("dag/16depcntcnt", nil)
total32DepMeter = metrics.NewRegisteredMeter("dag/32depcntcnt", nil)
)
func EvaluateTxDAGPerformance(dag *TxDAG, stats []*ExeStat) string {
if len(stats) != len(dag.TxDeps) || len(dag.TxDeps) == 0 {
return ""
}
sb := strings.Builder{}
sb.WriteString("TxDAG:\n")
for i, dep := range dag.TxDeps {
if stats[i].mustSerialFlag {
continue
}
sb.WriteString(fmt.Sprintf("%v: %v\n", i, dep.TxIndexes))
}
sb.WriteString("Parallel Execution Path:\n")
paths := dag.travelExecutionPaths()
// Attention: this is based on best schedule, it will reduce a lot by executing previous txs in parallel
// It assumes that there is no parallel thread limit
var (
maxGasIndex int
maxGas uint64
maxTimeIndex int
maxTime time.Duration
txTimes = make([]time.Duration, len(dag.TxDeps))
txGases = make([]uint64, len(dag.TxDeps))
txReads = make([]int, len(dag.TxDeps))
noDepdencyCount int
)
totalTxMeter.Mark(int64(len(dag.TxDeps)))
for i, path := range paths {
if stats[i].mustSerialFlag {
continue
}
if len(path) <= 1 {
noDepdencyCount++
totalNoDepMeter.Mark(1)
}
if len(path) <= 3 {
total2DepMeter.Mark(1)
}
if len(path) <= 5 {
total4DepMeter.Mark(1)
}
if len(path) <= 9 {
total8DepMeter.Mark(1)
}
if len(path) <= 17 {
total16DepMeter.Mark(1)
}
if len(path) <= 33 {
total32DepMeter.Mark(1)
}
// find the biggest cost time from dependency txs
for j := 0; j < len(path)-1; j++ {
prev := path[j]
if txTimes[prev] > txTimes[i] {
txTimes[i] = txTimes[prev]
}
if txGases[prev] > txGases[i] {
txGases[i] = txGases[prev]
}
if txReads[prev] > txReads[i] {
txReads[i] = txReads[prev]
}
}
txTimes[i] += stats[i].costTime
txGases[i] += stats[i].usedGas
txReads[i] += stats[i].readCount
//sb.WriteString(fmt.Sprintf("Tx%v, %.2fms|%vgas|%vreads\npath: %v\n", i, float64(txTimes[i].Microseconds())/1000, txGases[i], txReads[i], path))
sb.WriteString(fmt.Sprintf("%v: %v\n", i, path))
// try to find max gas
if txGases[i] > maxGas {
maxGas = txGases[i]
maxGasIndex = i
}
if txTimes[i] > maxTime {
maxTime = txTimes[i]
maxTimeIndex = i
}
}
sb.WriteString(fmt.Sprintf("LargestGasPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxGasIndex].Microseconds())/1000, txGases[maxGasIndex], txReads[maxGasIndex], paths[maxGasIndex]))
sb.WriteString(fmt.Sprintf("LongestTimePath: %.2fms|%vgas|%vreads\npath: %v\n", float64(txTimes[maxTimeIndex].Microseconds())/1000, txGases[maxTimeIndex], txReads[maxTimeIndex], paths[maxTimeIndex]))
longestTimeTimer.Update(txTimes[maxTimeIndex])
longestGasTimer.Update(txTimes[maxGasIndex])
// serial path
var (
sTime time.Duration
sGas uint64
sRead int
sPath []int
)
for i, stat := range stats {
if stat.mustSerialFlag {
continue
}
sPath = append(sPath, i)
sTime += stat.costTime
sGas += stat.usedGas
sRead += stat.readCount
}
if sTime == 0 {
return ""
}
sb.WriteString(fmt.Sprintf("SerialPath: %.2fms|%vgas|%vreads\npath: %v\n", float64(sTime.Microseconds())/1000, sGas, sRead, sPath))
maxParaTime := txTimes[maxTimeIndex]
sb.WriteString(fmt.Sprintf("Estimated saving: %.2fms, %.2f%%, %.2fX, noDepCnt: %v|%.2f%%\n",
float64((sTime-maxParaTime).Microseconds())/1000, float64(sTime-maxParaTime)/float64(sTime)*100,
float64(sTime)/float64(maxParaTime), noDepdencyCount, float64(noDepdencyCount)/float64(len(dag.TxDeps))*100))
serialTimeTimer.Update(sTime)
return sb.String()
}
func travelTargetPath(deps []TxDep, from int) []int {
q := make([]int, 0, len(deps))
path := make([]int, 0, len(deps))
q = append(q, from)
path = append(path, from)
for len(q) > 0 {
t := make([]int, 0, len(deps))
for _, i := range q {
for _, dep := range deps[i].TxIndexes {
if !slices.Contains(path, dep) {
path = append(path, dep)
t = append(t, dep)
}
}
}
q = t
}
sort.Ints(path)
return path
}
type ValidatorExtraItem struct {
ValidatorAddress common.Address
VoteAddress BLSPublicKey
}
type HeaderCustomExtra struct {
ValidatorSet ValidatorExtraItem
TxDAG TxDAG
}
// StateVersion record specific TxIndex & TxIncarnation
// if TxIndex equals to -1, it means the state read from DB.
type StateVersion struct {
TxIndex int
TxIncarnation int
}
// ReadRecord keep read value & its version
type ReadRecord struct {
StateVersion
Val interface{}
}
// WriteRecord keep latest state value & change count
type WriteRecord struct {
Val interface{}
}
// RWSet record all read & write set in txs
// Attention: this is not a concurrent safety structure
type RWSet struct {
ver StateVersion
readSet map[RWKey]*ReadRecord
writeSet map[RWKey]*WriteRecord
// some flags
mustSerial bool
}
func NewRWSet(ver StateVersion) *RWSet {
return &RWSet{
ver: ver,
readSet: make(map[RWKey]*ReadRecord),
writeSet: make(map[RWKey]*WriteRecord),
}
}
func (s *RWSet) RecordRead(key RWKey, ver StateVersion, val interface{}) {
// only record the first read version
if _, exist := s.readSet[key]; exist {
return
}
s.readSet[key] = &ReadRecord{
StateVersion: ver,
Val: val,
}
}
func (s *RWSet) RecordWrite(key RWKey, val interface{}) {
wr, exist := s.writeSet[key]
if !exist {
s.writeSet[key] = &WriteRecord{
Val: val,
}
return
}
wr.Val = val
}
func (s *RWSet) Version() StateVersion {
return s.ver
}
func (s *RWSet) ReadSet() map[RWKey]*ReadRecord {
return s.readSet
}
func (s *RWSet) WriteSet() map[RWKey]*WriteRecord {
return s.writeSet
}
func (s *RWSet) WithSerialFlag() *RWSet {
s.mustSerial = true
return s
}
func (s *RWSet) String() string {
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("tx: %v, inc: %v\nreadSet: [", s.ver.TxIndex, s.ver.TxIncarnation))
i := 0
for key, _ := range s.readSet {
if i > 0 {
builder.WriteString(fmt.Sprintf(", %v", key.String()))
continue
}
builder.WriteString(fmt.Sprintf("%v", key.String()))
i++
}
builder.WriteString("]\nwriteSet: [")
i = 0
for key, _ := range s.writeSet {
if i > 0 {
builder.WriteString(fmt.Sprintf(", %v", key.String()))
continue
}
builder.WriteString(fmt.Sprintf("%v", key.String()))
i++
}
builder.WriteString("]\n")
return builder.String()
}
const (
AccountStatePrefix = 'a'
StorageStatePrefix = 's'
)
type RWKey [1 + common.AddressLength + common.HashLength]byte
type AccountState byte
const (
AccountSelf AccountState = iota
AccountNonce
AccountBalance
AccountCodeHash
AccountSuicide
)
func AccountStateKey(account common.Address, state AccountState) RWKey {
var key RWKey
key[0] = AccountStatePrefix
copy(key[1:], account.Bytes())
key[1+common.AddressLength] = byte(state)
return key
}
func StorageStateKey(account common.Address, state common.Hash) RWKey {
var key RWKey
key[0] = StorageStatePrefix
copy(key[1:], account.Bytes())
copy(key[1+common.AddressLength:], state.Bytes())
return key
}
func (key *RWKey) IsAccountState() (bool, AccountState) {
return AccountStatePrefix == key[0], AccountState(key[1+common.AddressLength])
}
func (key *RWKey) IsAccountSelf() bool {
ok, s := key.IsAccountState()
if !ok {
return false
}
return s == AccountSelf
}
func (key *RWKey) IsAccountSuicide() bool {
ok, s := key.IsAccountState()
if !ok {
return false
}
return s == AccountSuicide
}
func (key *RWKey) ToAccountSelf() RWKey {
return AccountStateKey(key.Addr(), AccountSelf)
}
func (key *RWKey) IsStorageState() bool {
return StorageStatePrefix == key[0]
}
func (key *RWKey) String() string {
return hex.EncodeToString(key[:])
}
func (key *RWKey) Addr() common.Address {
return common.BytesToAddress(key[1 : 1+common.AddressLength])
}
type PendingWrite struct {
Ver StateVersion
Val interface{}
}
func NewPendingWrite(ver StateVersion, wr *WriteRecord) *PendingWrite {
return &PendingWrite{
Ver: ver,
Val: wr.Val,
}
}
func (w *PendingWrite) TxIndex() int {
return w.Ver.TxIndex
}
func (w *PendingWrite) TxIncarnation() int {
return w.Ver.TxIncarnation
}
type PendingWrites struct {
list []*PendingWrite
}
func NewPendingWrites() *PendingWrites {
return &PendingWrites{
list: make([]*PendingWrite, 0),
}
}
func (w *PendingWrites) Append(pw *PendingWrite) {
if i, found := w.SearchTxIndex(pw.TxIndex()); found {
w.list[i] = pw
return
}
w.list = append(w.list, pw)
for i := len(w.list) - 1; i > 0; i-- {
if w.list[i].TxIndex() > w.list[i-1].TxIndex() {
break
}
w.list[i-1], w.list[i] = w.list[i], w.list[i-1]
}
}
func (w *PendingWrites) SearchTxIndex(txIndex int) (int, bool) {
n := len(w.list)
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1)
// i ≤ h < j
if w.list[h].TxIndex() < txIndex {
i = h + 1
} else {
j = h
}
}
return i, i < n && w.list[i].TxIndex() == txIndex
}
func (w *PendingWrites) FindLastWrite(txIndex int) *PendingWrite {
var i, _ = w.SearchTxIndex(txIndex)
for j := i - 1; j >= 0; j-- {
if w.list[j].TxIndex() < txIndex {
return w.list[j]
}
}
return nil
}
type MVStates struct {
rwSets []*RWSet
stats []*ExeStat
pendingWriteSet map[RWKey]*PendingWrites
lock sync.RWMutex
}
func NewMVStates(txCount int) *MVStates {
return &MVStates{
rwSets: make([]*RWSet, txCount),
stats: make([]*ExeStat, txCount),
pendingWriteSet: make(map[RWKey]*PendingWrites, txCount*8),
}
}
func (s *MVStates) RWSets() []*RWSet {
s.lock.RLock()
defer s.lock.RUnlock()
return s.rwSets
}
func (s *MVStates) Stats() []*ExeStat {
s.lock.RLock()
defer s.lock.RUnlock()
return s.stats
}
func (s *MVStates) RWSet(index int) *RWSet {
s.lock.RLock()
defer s.lock.RUnlock()
return s.rwSets[index]
}
func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error {
s.lock.Lock()
defer s.lock.Unlock()
index := rwSet.ver.TxIndex
if index >= len(s.rwSets) {
return errors.New("refill out of bound")
}
if s := s.rwSets[index]; s != nil {
return errors.New("refill a exist RWSet")
}
if stat != nil {
if stat.txIndex != index {
return errors.New("wrong execution stat")
}
s.stats[index] = stat
}
for k, v := range rwSet.writeSet {
// ignore no changed write record
checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet)
// this will be handled by state object
//if rwSet.readSet[k] != nil && isEqualRWVal(k, rwSet.readSet[k].Val, v.Val) {
// delete(rwSet.writeSet, k)
// continue
//}
if _, exist := s.pendingWriteSet[k]; !exist {
s.pendingWriteSet[k] = NewPendingWrites()
}
s.pendingWriteSet[k].Append(NewPendingWrite(rwSet.ver, v))
}
s.rwSets[index] = rwSet
return nil
}
func checkRWSetInconsistent(index int, k RWKey, readSet map[RWKey]*ReadRecord, writeSet map[RWKey]*WriteRecord) bool {
var (
readOk bool
writeOk bool
r *WriteRecord
)
if k.IsAccountSuicide() {
_, readOk = readSet[k.ToAccountSelf()]
} else {
_, readOk = readSet[k]
}
r, writeOk = writeSet[k]
if readOk != writeOk {
// check if it's correct? read nil, write non-nil
log.Info("checkRWSetInconsistent find inconsistent", "tx", index, "k", k.String(), "read", readOk, "write", writeOk, "val", r.Val)
return true
}
return false
}
func isEqualRWVal(key RWKey, src interface{}, compared interface{}) bool {
if ok, state := key.IsAccountState(); ok {
switch state {
case AccountBalance:
if src != nil && compared != nil {
return equalUint256(src.(*uint256.Int), compared.(*uint256.Int))
}
return src == compared
case AccountNonce:
return src.(uint64) == compared.(uint64)
case AccountCodeHash:
if src != nil && compared != nil {
return slices.Equal(src.([]byte), compared.([]byte))
}
return src == compared
}
return false
}
if src != nil && compared != nil {
return src.(common.Hash) == compared.(common.Hash)
}
return src == compared
}
func equalUint256(s, c *uint256.Int) bool {
if s != nil && c != nil {
return s.Eq(c)
}
return s == c
}
func (s *MVStates) ResolveTxDAG() *TxDAG {
rwSets := s.RWSets()
txDAG := NewTxDAG(len(rwSets))
for i := len(rwSets) - 1; i >= 0; i-- {
txDAG.TxDeps[i].TxIndexes = []int{}
if rwSets[i].mustSerial {
txDAG.TxDeps[i].Relation = 1
continue
}
readSet := rwSets[i].ReadSet()
// TODO: check if there are RW with system address
// check if there has written op before i
for j := 0; j < i; j++ {
if checkDependency(rwSets[j].writeSet, readSet) {
txDAG.TxDeps[i].AppendDep(j)
}
}
}
return txDAG
}
func checkDependency(writeSet map[RWKey]*WriteRecord, readSet map[RWKey]*ReadRecord) bool {
// check tx dependency, only check key, skip version
for k, _ := range writeSet {
// check suicide, add read address flag, it only for check suicide quickly, and cannot for other scenarios.
if k.IsAccountSuicide() {
if _, ok := readSet[k.ToAccountSelf()]; ok {
return true
}
continue
}
if _, ok := readSet[k]; ok {
return true
}
}
return false
}
type ExeStat struct {
txIndex int
usedGas uint64
readCount int
startTime time.Time
costTime time.Duration
// TODO: consider system tx, gas fee issues, may need to use different flag
mustSerialFlag bool
}
func NewExeStat(txIndex int) *ExeStat {
return &ExeStat{
txIndex: txIndex,
}
}
func (s *ExeStat) Begin() *ExeStat {
s.startTime = time.Now()
return s
}
func (s *ExeStat) Done() *ExeStat {
s.costTime = time.Since(s.startTime)
return s
}
func (s *ExeStat) WithSerialFlag() *ExeStat {
s.mustSerialFlag = true
return s
}
func (s *ExeStat) WithGas(gas uint64) *ExeStat {
s.usedGas = gas
return s
}
func (s *ExeStat) WithRead(rc int) *ExeStat {
s.readCount = rc
return s
}

239
core/types/dag_test.go Normal file
View File

@@ -0,0 +1,239 @@
package types
import (
"github.com/ethereum/go-ethereum/common"
"github.com/holiman/uint256"
"github.com/stretchr/testify/require"
"testing"
)
var (
mockAddr = common.HexToAddress("0x482bA86399ab6Dcbe54071f8d22258688B4509b1")
mockHash = common.HexToHash("0xdc13f8d7bdb8ec4de02cd4a50a1aa2ab73ec8814e0cdb550341623be3dd8ab7a")
)
func TestTxDAG(t *testing.T) {
dag := mockSimpleDAG()
t.Log(dag.String())
dag = mockSystemTxDAG()
t.Log(dag.String())
}
func TestEvaluateTxDAG(t *testing.T) {
dag := mockSystemTxDAG()
stats := make([]*ExeStat, len(dag.TxDeps))
for i, dep := range dag.TxDeps {
stats[i] = NewExeStat(i).WithGas(uint64(i)).WithRead(i)
stats[i].costTime = int64(i)
if dep.Relation == 1 {
stats[i].WithSerialFlag()
}
}
t.Log(EvaluateTxDAGPerformance(dag, stats))
}
func TestSimpleMVStates2TxDAG(t *testing.T) {
ms := NewMVStates(10)
ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"})
ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"})
ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"})
ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"})
ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"})
ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"})
ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"})
ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"})
ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"})
ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"})
dag := ms.ResolveTxDAG()
require.Equal(t, mockSimpleDAG(), dag)
t.Log(dag.String())
}
func TestSystemTxMVStates2TxDAG(t *testing.T) {
ms := NewMVStates(12)
ms.rwSets[0] = mockRWSet(0, []string{"0x00"}, []string{"0x00"})
ms.rwSets[1] = mockRWSet(1, []string{"0x01"}, []string{"0x01"})
ms.rwSets[2] = mockRWSet(2, []string{"0x02"}, []string{"0x02"})
ms.rwSets[3] = mockRWSet(3, []string{"0x00", "0x03"}, []string{"0x03"})
ms.rwSets[4] = mockRWSet(4, []string{"0x00", "0x04"}, []string{"0x04"})
ms.rwSets[5] = mockRWSet(5, []string{"0x01", "0x02", "0x05"}, []string{"0x05"})
ms.rwSets[6] = mockRWSet(6, []string{"0x02", "0x05", "0x06"}, []string{"0x06"})
ms.rwSets[7] = mockRWSet(7, []string{"0x06", "0x07"}, []string{"0x07"})
ms.rwSets[8] = mockRWSet(8, []string{"0x08"}, []string{"0x08"})
ms.rwSets[9] = mockRWSet(9, []string{"0x08", "0x09"}, []string{"0x09"})
ms.rwSets[10] = mockRWSet(10, []string{"0x10"}, []string{"0x10"}).WithSerialFlag()
ms.rwSets[11] = mockRWSet(11, []string{"0x11"}, []string{"0x11"}).WithSerialFlag()
dag := ms.ResolveTxDAG()
require.Equal(t, mockSystemTxDAG(), dag)
t.Log(dag.String())
}
func TestIsEqualRWVal(t *testing.T) {
tests := []struct {
key RWKey
src interface{}
compared interface{}
isEqual bool
}{
{
key: AccountStateKey(mockAddr, AccountNonce),
src: uint64(0),
compared: uint64(0),
isEqual: true,
},
{
key: AccountStateKey(mockAddr, AccountNonce),
src: uint64(0),
compared: uint64(1),
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountBalance),
src: new(uint256.Int).SetUint64(1),
compared: new(uint256.Int).SetUint64(1),
isEqual: true,
},
{
key: AccountStateKey(mockAddr, AccountBalance),
src: nil,
compared: new(uint256.Int).SetUint64(1),
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountBalance),
src: (*uint256.Int)(nil),
compared: new(uint256.Int).SetUint64(1),
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountBalance),
src: (*uint256.Int)(nil),
compared: (*uint256.Int)(nil),
isEqual: true,
},
{
key: AccountStateKey(mockAddr, AccountCodeHash),
src: []byte{1},
compared: []byte{1},
isEqual: true,
},
{
key: AccountStateKey(mockAddr, AccountCodeHash),
src: nil,
compared: []byte{1},
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountCodeHash),
src: ([]byte)(nil),
compared: []byte{1},
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountCodeHash),
src: ([]byte)(nil),
compared: ([]byte)(nil),
isEqual: true,
},
{
key: AccountStateKey(mockAddr, AccountSuicide),
src: struct{}{},
compared: struct{}{},
isEqual: false,
},
{
key: AccountStateKey(mockAddr, AccountSuicide),
src: nil,
compared: struct{}{},
isEqual: false,
},
{
key: StorageStateKey(mockAddr, mockHash),
src: mockHash,
compared: mockHash,
isEqual: true,
},
{
key: StorageStateKey(mockAddr, mockHash),
src: nil,
compared: mockHash,
isEqual: false,
},
}
for i, item := range tests {
require.Equal(t, item.isEqual, isEqualRWVal(item.key, item.src, item.compared), i)
}
}
func mockSimpleDAG() *TxDAG {
dag := NewTxDAG(10)
dag.TxDeps[0].TxIndexes = []int{}
dag.TxDeps[1].TxIndexes = []int{}
dag.TxDeps[2].TxIndexes = []int{}
dag.TxDeps[3].TxIndexes = []int{0}
dag.TxDeps[4].TxIndexes = []int{0}
dag.TxDeps[5].TxIndexes = []int{1, 2}
dag.TxDeps[6].TxIndexes = []int{2, 5}
dag.TxDeps[7].TxIndexes = []int{6}
dag.TxDeps[8].TxIndexes = []int{}
dag.TxDeps[9].TxIndexes = []int{8}
return dag
}
func mockSystemTxDAG() *TxDAG {
dag := NewTxDAG(12)
dag.TxDeps[0].TxIndexes = []int{}
dag.TxDeps[1].TxIndexes = []int{}
dag.TxDeps[2].TxIndexes = []int{}
dag.TxDeps[3].TxIndexes = []int{0}
dag.TxDeps[4].TxIndexes = []int{0}
dag.TxDeps[5].TxIndexes = []int{1, 2}
dag.TxDeps[6].TxIndexes = []int{2, 5}
dag.TxDeps[7].TxIndexes = []int{6}
dag.TxDeps[8].TxIndexes = []int{}
dag.TxDeps[9].TxIndexes = []int{8}
dag.TxDeps[10] = TxDep{
Relation: 1,
TxIndexes: []int{},
}
dag.TxDeps[11] = TxDep{
Relation: 1,
TxIndexes: []int{},
}
return dag
}
func mockRWSet(index int, read []string, write []string) *RWSet {
ver := StateVersion{
TxIndex: index,
}
set := NewRWSet(ver)
for _, k := range read {
key := RWKey{}
if len(k) > len(key) {
k = k[:len(key)]
}
copy(key[:], k)
set.readSet[key] = &ReadRecord{
StateVersion: ver,
Val: struct{}{},
}
}
for _, k := range write {
key := RWKey{}
if len(k) > len(key) {
k = k[:len(key)]
}
copy(key[:], k)
set.writeSet[key] = &WriteRecord{
Val: struct{}{},
}
}
return set
}

View File

@@ -79,6 +79,10 @@ type StateDB interface {
AddLog(*types.Log)
AddPreimage(common.Hash, []byte)
// parallel DAG related
BeforeTxTransition()
FinaliseRWSet() error
}
// CallContext provides a basic interface for the EVM calling conventions. The EVM

View File

@@ -484,10 +484,6 @@ func (b *EthAPIBackend) RemoveBuilder(builder common.Address) error {
return b.Miner().RemoveBuilder(builder)
}
func (b *EthAPIBackend) HasBuilder(builder common.Address) bool {
return b.Miner().HasBuilder(builder)
}
func (b *EthAPIBackend) SendBid(ctx context.Context, bid *types.BidArgs) (common.Hash, error) {
return b.Miner().SendBid(ctx, bid)
}

View File

@@ -161,18 +161,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Optimize memory distribution by reallocating surplus allowance from the
// dirty cache to the clean cache.
if config.StateScheme == rawdb.PathScheme && config.TrieDirtyCache > pathdb.MaxDirtyBufferSize/1024/1024 {
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024,
"adjusted", common.StorageSize(config.TrieCleanCache+config.TrieDirtyCache-pathdb.MaxDirtyBufferSize/1024/1024)*1024*1024)
config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxDirtyBufferSize/1024/1024
log.Info("Capped dirty cache size", "provided", common.StorageSize(config.TrieDirtyCache)*1024*1024, "adjusted", common.StorageSize(pathdb.MaxDirtyBufferSize))
log.Info("Clean cache size", "provided", common.StorageSize(config.TrieCleanCache)*1024*1024)
config.TrieDirtyCache = pathdb.MaxDirtyBufferSize / 1024 / 1024
}
log.Info("Allocated memory caches",
"state_scheme", config.StateScheme,
"trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024,
"trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024,
"snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024)
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
// Try to recover offline state pruning only in hash-based.
if config.StateScheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb, config.TriesInMemory); err != nil {

View File

@@ -731,6 +731,9 @@ func (f *BlockFetcher) loop() {
matched = true
if f.getBlock(hash) == nil {
block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
if block.Header().EmptyWithdrawalsHash() {
block = block.WithWithdrawals(make([]*types.Withdrawal, 0))
}
block = block.WithSidecars(task.sidecars[i])
block.ReceivedAt = task.time
blocks = append(blocks, block)
@@ -916,10 +919,6 @@ func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) {
return
}
if block.Header().EmptyWithdrawalsHash() {
block = block.WithWithdrawals(make([]*types.Withdrawal, 0))
}
defer func() { f.done <- hash }()
// Quickly validate the header and propagate the block if it passes
switch err := f.verifyHeader(block.Header()); err {

View File

@@ -158,7 +158,7 @@ func (f *fetcherTester) chainFinalizedHeight() uint64 {
return f.blocks[f.hashes[len(f.hashes)-3]].NumberU64()
}
// insertHeaders injects a new headers into the simulated chain.
// insertChain injects a new headers into the simulated chain.
func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) {
f.lock.Lock()
defer f.lock.Unlock()

View File

@@ -633,9 +633,6 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
// Wait a bit for the above handlers to start
time.Sleep(100 * time.Millisecond)
if err := sinkPeer.Handshake(1, td, genesis.Hash(), genesis.Hash(), forkid.NewIDWithChain(source.chain), forkid.NewFilter(source.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}

View File

@@ -281,7 +281,7 @@ func (eth *Ethereum) stateAtTransaction(ctx context.Context, block *types.Block,
}
// Not yet the searched for transaction, execute on top of the current state
vmenv := vm.NewEVM(context, txContext, statedb, eth.blockchain.Config(), vm.Config{})
statedb.SetTxContext(tx.Hash(), idx)
statedb.SetTxContext(tx.Hash(), idx, 0)
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil {
return nil, vm.BlockContext{}, nil, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err)
}

View File

@@ -151,8 +151,6 @@ func testChainSyncWithBlobs(t *testing.T, mode downloader.SyncMode, preCancunBlk
go full.handler.runEthPeer(fullPeerEth, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(full.handler), peer)
})
// Wait a bit for the above handlers to start
time.Sleep(250 * time.Millisecond)
emptyPipeSnap, fullPipeSnap := p2p.MsgPipe()
defer emptyPipeSnap.Close()

View File

@@ -587,7 +587,7 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config
}
}
statedb.SetTxContext(tx.Hash(), i)
statedb.SetTxContext(tx.Hash(), i, 0)
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.GasLimit)); err != nil {
log.Warn("Tracing intermediate roots did not complete", "txindex", i, "txhash", tx.Hash(), "err", err)
// We intentionally don't return the error here: if we do, then the RPC server will not
@@ -786,7 +786,7 @@ txloop:
// Generate the next state snapshot fast without tracing
msg, _ := core.TransactionToMessage(tx, signer, block.BaseFee())
statedb.SetTxContext(tx.Hash(), i)
statedb.SetTxContext(tx.Hash(), i, 0)
vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{})
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.GasLimit)); err != nil {
failed = err
@@ -919,7 +919,7 @@ func (api *API) standardTraceBlockToFile(ctx context.Context, block *types.Block
}
// Execute the transaction and flush any traces to disk
vmenv := vm.NewEVM(vmctx, txContext, statedb, chainConfig, vmConf)
statedb.SetTxContext(tx.Hash(), i)
statedb.SetTxContext(tx.Hash(), i, 0)
_, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.GasLimit))
if writer != nil {
writer.Flush()
@@ -1127,7 +1127,7 @@ func (api *API) traceTx(ctx context.Context, message *core.Message, txctx *Conte
}
// Call Prepare to clear out the statedb access list
statedb.SetTxContext(txctx.TxHash, txctx.TxIndex)
statedb.SetTxContext(txctx.TxHash, txctx.TxIndex, 0)
if _, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.GasLimit)); err != nil {
return nil, fmt.Errorf("tracing failed: %w", err)
}

View File

@@ -133,7 +133,7 @@ func (ec *Client) BlockReceipts(ctx context.Context, blockNrOrHash rpc.BlockNumb
// BlobSidecars return the Sidecars of a given block number or hash.
func (ec *Client) BlobSidecars(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) ([]*types.BlobTxSidecar, error) {
var r []*types.BlobTxSidecar
err := ec.c.CallContext(ctx, &r, "eth_getBlobSidecars", blockNrOrHash.String())
err := ec.c.CallContext(ctx, &r, "eth_getBlobSidecars", blockNrOrHash.String(), true)
if err == nil && r == nil {
return nil, ethereum.NotFound
}
@@ -143,7 +143,7 @@ func (ec *Client) BlobSidecars(ctx context.Context, blockNrOrHash rpc.BlockNumbe
// BlobSidecarByTxHash return a sidecar of a given blob transaction
func (ec *Client) BlobSidecarByTxHash(ctx context.Context, hash common.Hash) (*types.BlobTxSidecar, error) {
var r *types.BlobTxSidecar
err := ec.c.CallContext(ctx, &r, "eth_getBlobSidecarByTxHash", hash)
err := ec.c.CallContext(ctx, &r, "eth_getBlockSidecarByTxHash", hash, true)
if err == nil && r == nil {
return nil, ethereum.NotFound
}
@@ -752,13 +752,6 @@ func (ec *Client) MevRunning(ctx context.Context) (bool, error) {
return result, err
}
// HasBuilder returns whether the builder is registered
func (ec *Client) HasBuilder(ctx context.Context, address common.Address) (bool, error) {
var result bool
err := ec.c.CallContext(ctx, &result, "mev_hasBuilder", address)
return result, err
}
// SendBid sends a bid
func (ec *Client) SendBid(ctx context.Context, args types.BidArgs) (common.Hash, error) {
var hash common.Hash

View File

@@ -87,10 +87,6 @@ func (m *MevAPI) Params() *types.MevParams {
return m.b.MevParams()
}
func (m *MevAPI) HasBuilder(builder common.Address) bool {
return m.b.HasBuilder(builder)
}
// Running returns true if mev is running
func (m *MevAPI) Running() bool {
return m.b.MevRunning()

View File

@@ -651,7 +651,6 @@ func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.Match
}
func (b *testBackend) MevRunning() bool { return false }
func (b *testBackend) HasBuilder(builder common.Address) bool { return false }
func (b *testBackend) MevParams() *types.MevParams {
return &types.MevParams{}
}

View File

@@ -115,8 +115,6 @@ type Backend interface {
AddBuilder(builder common.Address, builderUrl string) error
// RemoveBuilder removes a builder from the bid simulator.
RemoveBuilder(builder common.Address) error
// HasBuilder returns true if the builder is in the builder list.
HasBuilder(builder common.Address) bool
// SendBid receives bid from the builders.
SendBid(ctx context.Context, bid *types.BidArgs) (common.Hash, error)
// BestBidGasFee returns the gas fee of the best bid for the given parent hash.

View File

@@ -204,8 +204,7 @@ func (args *TransactionArgs) setFeeDefaults(ctx context.Context, b Backend) erro
// Sanity check the EIP-1559 fee parameters if present.
if args.GasPrice == nil && eip1559ParamsSet {
if args.MaxFeePerGas.ToInt().Sign() == 0 {
// return errors.New("maxFeePerGas must be non-zero")
log.Warn("EIP-1559 Tx with zero maxFeePerGas") // BSC accepts zero gas price.
return errors.New("maxFeePerGas must be non-zero")
}
if args.MaxFeePerGas.ToInt().Cmp(args.MaxPriorityFeePerGas.ToInt()) < 0 {
return fmt.Errorf("maxFeePerGas (%v) < maxPriorityFeePerGas (%v)", args.MaxFeePerGas, args.MaxPriorityFeePerGas)
@@ -218,8 +217,7 @@ func (args *TransactionArgs) setFeeDefaults(ctx context.Context, b Backend) erro
if args.GasPrice != nil && !eip1559ParamsSet {
// Zero gas-price is not allowed after London fork
if args.GasPrice.ToInt().Sign() == 0 && isLondon {
// return errors.New("gasPrice must be non-zero after london fork")
log.Warn("non EIP-1559 Tx with zero gasPrice") // BSC accepts zero gas price.
return errors.New("gasPrice must be non-zero after london fork")
}
return nil // No need to set anything, user already set GasPrice
}

View File

@@ -85,8 +85,8 @@ func TestSetFeeDefaults(t *testing.T) {
"legacy tx post-London with zero price",
"london",
&TransactionArgs{GasPrice: zero},
&TransactionArgs{GasPrice: zero},
nil, // errors.New("gasPrice must be non-zero after london fork"),
nil,
errors.New("gasPrice must be non-zero after london fork"),
},
// Access list txs
@@ -180,8 +180,8 @@ func TestSetFeeDefaults(t *testing.T) {
"dynamic fee tx post-London, explicit gas price",
"london",
&TransactionArgs{MaxFeePerGas: zero, MaxPriorityFeePerGas: zero},
&TransactionArgs{MaxFeePerGas: zero, MaxPriorityFeePerGas: zero},
nil, // errors.New("maxFeePerGas must be non-zero"),
nil,
errors.New("maxFeePerGas must be non-zero"),
},
// Misc
@@ -417,7 +417,6 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b *backendMock) Engine() consensus.Engine { return nil }
func (b *backendMock) MevRunning() bool { return false }
func (b *backendMock) HasBuilder(builder common.Address) bool { return false }
func (b *backendMock) MevParams() *types.MevParams {
return &types.MevParams{}
}

View File

@@ -29,6 +29,11 @@ import (
const (
// maxBidPerBuilderPerBlock is the max bid number per builder
maxBidPerBuilderPerBlock = 3
// leftOverTimeRate is the rate of left over time to simulate a bid
leftOverTimeRate = 11
// leftOverTimeScale is the scale of left over time to simulate a bid
leftOverTimeScale = 10
)
var (
@@ -313,6 +318,18 @@ func (b *bidSimulator) newBidLoop() {
// commit aborts in-flight bid execution with given signal and resubmits a new one.
commit := func(reason int32, bidRuntime *BidRuntime) {
// if the left time is not enough to do simulation, return
var simDuration time.Duration
if lastBid := b.GetBestBid(bidRuntime.bid.ParentHash); lastBid != nil && lastBid.duration != 0 {
simDuration = lastBid.duration
}
if time.Until(b.bidMustBefore(bidRuntime.bid.ParentHash)) <= simDuration*leftOverTimeRate/leftOverTimeScale {
log.Debug("BidSimulator: abort commit, not enough time to simulate",
"builder", bidRuntime.bid.Builder, "bidHash", bidRuntime.bid.Hash().Hex())
return
}
if interruptCh != nil {
// each commit work will have its own interruptCh to stop work with a reason
interruptCh <- reason
@@ -353,7 +370,6 @@ func (b *bidSimulator) newBidLoop() {
expectedValidatorReward: expectedValidatorReward,
packedBlockReward: big.NewInt(0),
packedValidatorReward: big.NewInt(0),
finished: make(chan struct{}),
}
simulatingBid := b.GetSimulatingBid(newBid.ParentHash)
@@ -394,6 +410,11 @@ func (b *bidSimulator) newBidLoop() {
}
}
func (b *bidSimulator) bidMustBefore(parentHash common.Hash) time.Time {
parentHeader := b.chain.GetHeaderByHash(parentHash)
return bidutil.BidMustBefore(parentHeader, b.chainConfig.Parlia.Period, b.delayLeftOver)
}
func (b *bidSimulator) bidBetterBefore(parentHash common.Hash) time.Time {
parentHeader := b.chain.GetHeaderByHash(parentHash)
return bidutil.BidBetterBefore(parentHeader, b.chainConfig.Parlia.Period, b.delayLeftOver, b.config.BidSimulationLeftOver)
@@ -509,6 +530,7 @@ func (b *bidSimulator) simBid(interruptCh chan int32, bidRuntime *BidRuntime) {
// ensure simulation exited then start next simulation
b.SetSimulatingBid(parentHash, bidRuntime)
start := time.Now()
defer func(simStart time.Time) {
logCtx := []any{
@@ -534,11 +556,10 @@ func (b *bidSimulator) simBid(interruptCh chan int32, bidRuntime *BidRuntime) {
}
b.RemoveSimulatingBid(parentHash)
close(bidRuntime.finished)
bidSimTimer.UpdateSince(start)
if success {
bidRuntime.duration = time.Since(simStart)
bidSimTimer.UpdateSince(simStart)
// only recommit self bid when newBidCh is empty
if len(b.newBidCh) > 0 {
@@ -562,14 +583,6 @@ func (b *bidSimulator) simBid(interruptCh chan int32, bidRuntime *BidRuntime) {
return
}
// if the left time is not enough to do simulation, return
delay := b.engine.Delay(b.chain, bidRuntime.env.header, &b.delayLeftOver)
if delay == nil || *delay <= 0 {
log.Info("BidSimulator: abort commit, not enough time to simulate",
"builder", bidRuntime.bid.Builder, "bidHash", bidRuntime.bid.Hash().Hex())
return
}
gasLimit := bidRuntime.env.header.GasLimit
if bidRuntime.env.gasPool == nil {
bidRuntime.env.gasPool = new(core.GasPool).AddGas(gasLimit)
@@ -637,12 +650,14 @@ func (b *bidSimulator) simBid(interruptCh chan int32, bidRuntime *BidRuntime) {
if b.config.GreedyMergeTx {
delay := b.engine.Delay(b.chain, bidRuntime.env.header, &b.delayLeftOver)
if delay != nil && *delay > 0 {
stopTimer := time.NewTimer(*delay)
bidTxsSet := mapset.NewSet[common.Hash]()
for _, tx := range bidRuntime.bid.Txs {
bidTxsSet.Add(tx.Hash())
}
fillErr := b.bidWorker.fillTransactions(interruptCh, bidRuntime.env, nil, bidTxsSet)
fillErr := b.bidWorker.fillTransactions(interruptCh, bidRuntime.env, stopTimer, bidTxsSet)
log.Trace("BidSimulator: greedy merge stopped", "block", bidRuntime.env.header.Number,
"builder", bidRuntime.bid.Builder, "tx count", bidRuntime.env.tcount-bidTxLen+1, "err", fillErr)
@@ -718,7 +733,6 @@ type BidRuntime struct {
packedBlockReward *big.Int
packedValidatorReward *big.Int
finished chan struct{}
duration time.Duration
}
@@ -742,7 +756,7 @@ func (r *BidRuntime) commitTransaction(chain *core.BlockChain, chainConfig *para
)
// Start executing the transaction
r.env.state.SetTxContext(tx.Hash(), r.env.tcount)
r.env.state.SetTxContext(tx.Hash(), r.env.tcount, 0)
if tx.Type() == types.BlobTxType {
sc = types.NewBlobSidecarFromTx(tx)

View File

@@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)
type BuilderConfig struct {
@@ -60,11 +59,6 @@ func (miner *Miner) RemoveBuilder(builderAddr common.Address) error {
return miner.bidSimulator.RemoveBuilder(builderAddr)
}
// HasBuilder returns true if the builder is in the builder list.
func (miner *Miner) HasBuilder(builder common.Address) bool {
return miner.bidSimulator.ExistBuilder(builder)
}
func (miner *Miner) SendBid(ctx context.Context, bidArgs *types.BidArgs) (common.Hash, error) {
builder, err := bidArgs.EcrecoverSender()
if err != nil {
@@ -123,8 +117,6 @@ func (miner *Miner) MevParams() *types.MevParams {
ValidatorCommission: miner.worker.config.Mev.ValidatorCommission,
BidSimulationLeftOver: miner.worker.config.Mev.BidSimulationLeftOver,
GasCeil: miner.worker.config.GasCeil,
GasPrice: miner.worker.config.GasPrice,
BuilderFeeCeil: builderFeeCeil,
Version: params.Version,
}
}

View File

@@ -67,9 +67,6 @@ const (
// the current 4 mining loops could have asynchronous risk of mining block with
// save height, keep recently mined blocks to avoid double sign for safety,
recentMinedCacheLimit = 20
// the default to wait for the mev miner to finish
waitMEVMinerEndTimeLimit = 50 * time.Millisecond
)
var (
@@ -174,7 +171,6 @@ type getWorkReq struct {
type bidFetcher interface {
GetBestBid(parentHash common.Hash) *BidRuntime
GetSimulatingBid(prevBlockHash common.Hash) *BidRuntime
}
// worker is the main object which takes care of submitting new work to consensus engine
@@ -907,7 +903,7 @@ LOOP:
continue
}
// Start executing the transaction
env.state.SetTxContext(tx.Hash(), env.tcount)
env.state.SetTxContext(tx.Hash(), env.tcount, 0)
logs, err := w.commitTransaction(env, tx, bloomProcessors)
switch {
@@ -1340,15 +1336,6 @@ LOOP:
// when in-turn, compare with remote work.
from := bestWork.coinbase
if w.bidFetcher != nil && bestWork.header.Difficulty.Cmp(diffInTurn) == 0 {
if pendingBid := w.bidFetcher.GetSimulatingBid(bestWork.header.ParentHash); pendingBid != nil {
waitBidTimer := time.NewTimer(waitMEVMinerEndTimeLimit)
defer waitBidTimer.Stop()
select {
case <-waitBidTimer.C:
case <-pendingBid.finished:
}
}
bestBid := w.bidFetcher.GetBestBid(bestWork.header.ParentHash)
if bestBid != nil {

View File

@@ -23,7 +23,7 @@ import (
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 4 // Minor version component of the current release
VersionPatch = 10 // Patch version component of the current release
VersionPatch = 8 // Patch version component of the current release
VersionMeta = "" // Version metadata to append to the version string
)

View File

@@ -26,106 +26,6 @@ import (
"github.com/ethereum/go-ethereum/trie/triestate"
)
type RefTrieNode struct {
refCount uint32
node *trienode.Node
}
type HashNodeCache struct {
lock sync.RWMutex
cache map[common.Hash]*RefTrieNode
}
func (h *HashNodeCache) length() int {
if h == nil {
return 0
}
h.lock.RLock()
defer h.lock.RUnlock()
return len(h.cache)
}
func (h *HashNodeCache) set(hash common.Hash, node *trienode.Node) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
if n, ok := h.cache[hash]; ok {
n.refCount++
} else {
h.cache[hash] = &RefTrieNode{1, node}
}
}
func (h *HashNodeCache) Get(hash common.Hash) *trienode.Node {
if h == nil {
return nil
}
h.lock.RLock()
defer h.lock.RUnlock()
if n, ok := h.cache[hash]; ok {
return n.node
}
return nil
}
func (h *HashNodeCache) del(hash common.Hash) {
if h == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
n, ok := h.cache[hash]
if !ok {
return
}
if n.refCount > 0 {
n.refCount--
}
if n.refCount == 0 {
delete(h.cache, hash)
}
}
func (h *HashNodeCache) Add(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
beforeAdd := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.set(node.Hash, node)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Add difflayer to hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "add_delta", h.length()-beforeAdd)
}
func (h *HashNodeCache) Remove(ly layer) {
if h == nil {
return
}
dl, ok := ly.(*diffLayer)
if !ok {
return
}
go func() {
beforeDel := h.length()
for _, subset := range dl.nodes {
for _, node := range subset {
h.del(node.Hash)
}
}
diffHashCacheLengthGauge.Update(int64(h.length()))
log.Debug("Remove difflayer from hash map", "root", ly.rootHash(), "block_number", dl.block, "map_len", h.length(), "del_delta", beforeDel-h.length())
}()
}
// diffLayer represents a collection of modifications made to the in-memory tries
// along with associated state changes after running a block on top.
//
@@ -139,10 +39,7 @@ type diffLayer struct {
nodes map[common.Hash]map[string]*trienode.Node // Cached trie nodes indexed by owner and path
states *triestate.Set // Associated state change set for building history
memory uint64 // Approximate guess as to how much memory we use
cache *HashNodeCache // trienode cache by hash key. cache is immutable, but cache's item can be add/del.
// mutables
origin *diskLayer // The current difflayer corresponds to the underlying disklayer and is updated during cap.
parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent
}
@@ -161,20 +58,6 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
states: states,
parent: parent,
}
switch l := parent.(type) {
case *diskLayer:
dl.origin = l
dl.cache = &HashNodeCache{
cache: make(map[common.Hash]*RefTrieNode),
}
case *diffLayer:
dl.origin = l.originDiskLayer()
dl.cache = l.cache
default:
panic("unknown parent type")
}
for _, subset := range nodes {
for path, n := range subset {
dl.memory += uint64(n.Size() + len(path))
@@ -192,12 +75,6 @@ func newDiffLayer(parent layer, root common.Hash, id uint64, block uint64, nodes
return dl
}
func (dl *diffLayer) originDiskLayer() *diskLayer {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.origin
}
// rootHash implements the layer interface, returning the root hash of
// corresponding state.
func (dl *diffLayer) rootHash() common.Hash {
@@ -256,32 +133,6 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, hash common.Hash, dept
// Node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
if n := dl.cache.Get(hash); n != nil {
// The query from the hash map is fastpath,
// avoiding recursive query of 128 difflayers.
diffHashCacheHitMeter.Mark(1)
diffHashCacheReadMeter.Mark(int64(len(n.Blob)))
return n.Blob, nil
}
diffHashCacheMissMeter.Mark(1)
persistLayer := dl.originDiskLayer()
if persistLayer != nil {
blob, err := persistLayer.Node(owner, path, hash)
if err != nil {
// This is a bad case with a very low probability.
// r/w the difflayer cache and r/w the disklayer are not in the same lock,
// so in extreme cases, both reading the difflayer cache and reading the disklayer may fail, eg, disklayer is stale.
// In this case, fallback to the original 128-layer recursive difflayer query path.
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to query origin failed", "owner", owner, "path", path, "hash", hash.String(), "error", err)
return dl.node(owner, path, hash, 0)
} else { // This is the fastpath.
return blob, nil
}
}
diffHashCacheSlowPathMeter.Mark(1)
log.Debug("Retry difflayer due to origin is nil", "owner", owner, "path", path, "hash", hash.String())
return dl.node(owner, path, hash, 0)
}

View File

@@ -286,9 +286,6 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
}
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
}
// The bottom has been eaten by disklayer, releasing the hash cache of bottom difflayer.
bottom.cache.Remove(bottom)
return ndl, nil
}

View File

@@ -51,20 +51,9 @@ func (tree *layerTree) reset(head layer) {
tree.lock.Lock()
defer tree.lock.Unlock()
for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
// Clean up the hash cache of difflayers due to reset.
dl.cache.Remove(dl)
}
}
var layers = make(map[common.Hash]layer)
for head != nil {
layers[head.rootHash()] = head
if dl, ok := head.(*diffLayer); ok {
// Add the hash cache of difflayers due to reset.
dl.cache.Add(dl)
}
head = head.parentLayer()
}
tree.layers = layers
@@ -109,19 +98,12 @@ func (tree *layerTree) add(root common.Hash, parentRoot common.Hash, block uint6
if root == parentRoot {
return errors.New("layer cycle")
}
if tree.get(root) != nil {
log.Info("Skip add repeated difflayer", "root", root.String(), "block_id", block)
return nil
}
parent := tree.get(parentRoot)
if parent == nil {
return fmt.Errorf("triedb parent [%#x] layer missing", parentRoot)
}
l := parent.update(root, parent.stateID()+1, block, nodes.Flatten(), states)
// Before adding layertree, update the hash cache.
l.cache.Add(l)
tree.lock.Lock()
tree.layers[l.rootHash()] = l
tree.lock.Unlock()
@@ -150,15 +132,8 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil {
return err
}
for _, ly := range tree.layers {
if dl, ok := ly.(*diffLayer); ok {
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to cap all", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
// Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base}
log.Debug("Cap all difflayers to disklayer", "disk_root", base.rootHash().String())
return nil
}
// Dive until we run out of layers or reach the persistent database
@@ -171,7 +146,6 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
return nil
}
}
var persisted *diskLayer
// We're out of layers, flatten anything below, stopping if it's the disk or if
// the memory limit is not yet exceeded.
switch parent := diff.parentLayer().(type) {
@@ -192,7 +166,6 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
diff.parent = base
diff.lock.Unlock()
persisted = base.(*diskLayer)
default:
panic(fmt.Sprintf("unknown data layer in triedb: %T", parent))
@@ -207,13 +180,6 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
}
var remove func(root common.Hash)
remove = func(root common.Hash) {
if df, exist := tree.layers[root]; exist {
if dl, ok := df.(*diffLayer); ok {
// Clean up the hash cache of the child difflayer corresponding to the stale parent, include the re-org case.
dl.cache.Remove(dl)
log.Debug("Cleanup difflayer hash cache due to reorg", "diff_root", dl.root.String(), "diff_block_number", dl.block)
}
}
delete(tree.layers, root)
for _, child := range children[root] {
remove(child)
@@ -223,25 +189,8 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
for root, layer := range tree.layers {
if dl, ok := layer.(*diskLayer); ok && dl.isStale() {
remove(root)
log.Debug("Remove stale the disklayer", "disk_root", dl.root.String())
}
}
if persisted != nil {
var updateOriginFunc func(root common.Hash)
updateOriginFunc = func(root common.Hash) {
if diff, ok := tree.layers[root].(*diffLayer); ok {
diff.lock.Lock()
diff.origin = persisted
diff.lock.Unlock()
}
for _, child := range children[root] {
updateOriginFunc(child)
}
}
updateOriginFunc(persisted.root)
}
return nil
}

View File

@@ -47,10 +47,4 @@ var (
historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil)
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
diffHashCacheHitMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/hit", nil)
diffHashCacheReadMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/read", nil)
diffHashCacheMissMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/miss", nil)
diffHashCacheSlowPathMeter = metrics.NewRegisteredMeter("pathdb/difflayer/hashcache/slowpath", nil)
diffHashCacheLengthGauge = metrics.NewRegisteredGauge("pathdb/difflayer/hashcache/size", nil)
)