// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package core import ( "bytes" "errors" "fmt" "math/big" "math/rand" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/systemcontracts" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) const ( fullProcessCheck = 21 // On diff sync mode, will do full process every fullProcessCheck randomly minNumberOfAccountPerTask = 5 recentTime = 2048 * 3 recentDiffLayerTimeout = 20 farDiffLayerTimeout = 2 ) // StateProcessor is a basic Processor, which takes care of transitioning // state from one point to another. // // StateProcessor implements Processor. type StateProcessor struct { config *params.ChainConfig // Chain configuration options bc *BlockChain // Canonical block chain engine consensus.Engine // Consensus engine used for block rewards } // NewStateProcessor initialises a new StateProcessor. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor { return &StateProcessor{ config: config, bc: bc, engine: engine, } } type LightStateProcessor struct { randomGenerator *rand.Rand StateProcessor } func NewLightStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *LightStateProcessor { randomGenerator := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) return &LightStateProcessor{ randomGenerator: randomGenerator, StateProcessor: *NewStateProcessor(config, bc, engine), } } func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { allowLightProcess := true if posa, ok := p.engine.(consensus.PoSA); ok { allowLightProcess = posa.AllowLightProcess(p.bc, block.Header()) } // random fallback to full process if check := p.randomGenerator.Int63n(fullProcessCheck); allowLightProcess && check != 0 && len(block.Transactions()) != 0 { var pid string if peer, ok := block.ReceivedFrom.(PeerIDer); ok { pid = peer.ID() } var diffLayer *types.DiffLayer var diffLayerTimeout = recentDiffLayerTimeout if time.Now().Unix()-int64(block.Time()) > recentTime { diffLayerTimeout = farDiffLayerTimeout } for tried := 0; tried < diffLayerTimeout; tried++ { // wait a bit for the diff layer diffLayer = p.bc.GetUnTrustedDiffLayer(block.Hash(), pid) if diffLayer != nil { break } time.Sleep(time.Millisecond) } if diffLayer != nil { if err := diffLayer.Receipts.DeriveFields(p.bc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { log.Error("Failed to derive block receipts fields", "hash", block.Hash(), "number", block.NumberU64(), "err", err) // fallback to full process return p.StateProcessor.Process(block, statedb, cfg) } receipts, logs, gasUsed, err := p.LightProcess(diffLayer, block, statedb) if err == nil { log.Info("do light process success at block", "num", block.NumberU64()) return statedb, receipts, logs, gasUsed, nil } log.Error("do light process err at block", "num", block.NumberU64(), "err", err) p.bc.removeDiffLayers(diffLayer.DiffHash) // prepare new statedb statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps) if err != nil { return statedb, nil, nil, 0, err } // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") } } // fallback to full process return p.StateProcessor.Process(block, statedb, cfg) } func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *types.Block, statedb *state.StateDB) (types.Receipts, []*types.Log, uint64, error) { statedb.MarkLightProcessed() fullDiffCode := make(map[common.Hash][]byte, len(diffLayer.Codes)) diffTries := make(map[common.Address]state.Trie) diffCode := make(map[common.Hash][]byte) snapDestructs, snapAccounts, snapStorage, err := statedb.DiffLayerToSnap(diffLayer) if err != nil { return nil, nil, 0, err } for _, c := range diffLayer.Codes { fullDiffCode[c.Hash] = c.Code } for des := range snapDestructs { statedb.Trie().TryDelete(des[:]) } threads := gopool.Threads(len(snapAccounts)) iteAccounts := make([]common.Address, 0, len(snapAccounts)) for diffAccount := range snapAccounts { iteAccounts = append(iteAccounts, diffAccount) } errChan := make(chan error, threads) exitChan := make(chan struct{}) var snapMux sync.RWMutex var stateMux, diffMux sync.Mutex for i := 0; i < threads; i++ { start := i * len(iteAccounts) / threads end := (i + 1) * len(iteAccounts) / threads if i+1 == threads { end = len(iteAccounts) } go func(start, end int) { for index := start; index < end; index++ { select { // fast fail case <-exitChan: return default: } diffAccount := iteAccounts[index] snapMux.RLock() blob := snapAccounts[diffAccount] snapMux.RUnlock() addrHash := crypto.Keccak256Hash(diffAccount[:]) latestAccount, err := snapshot.FullAccount(blob) if err != nil { errChan <- err return } // fetch previous state var previousAccount state.Account stateMux.Lock() enc, err := statedb.Trie().TryGet(diffAccount[:]) stateMux.Unlock() if err != nil { errChan <- err return } if len(enc) != 0 { if err := rlp.DecodeBytes(enc, &previousAccount); err != nil { errChan <- err return } } if latestAccount.Balance == nil { latestAccount.Balance = new(big.Int) } if previousAccount.Balance == nil { previousAccount.Balance = new(big.Int) } if previousAccount.Root == (common.Hash{}) { previousAccount.Root = types.EmptyRootHash } if len(previousAccount.CodeHash) == 0 { previousAccount.CodeHash = types.EmptyCodeHash } // skip no change account if previousAccount.Nonce == latestAccount.Nonce && bytes.Equal(previousAccount.CodeHash, latestAccount.CodeHash) && previousAccount.Balance.Cmp(latestAccount.Balance) == 0 && previousAccount.Root == common.BytesToHash(latestAccount.Root) { // It is normal to receive redundant message since the collected message is redundant. log.Debug("receive redundant account change in diff layer", "account", diffAccount, "num", block.NumberU64()) snapMux.Lock() delete(snapAccounts, diffAccount) delete(snapStorage, diffAccount) snapMux.Unlock() continue } // update code codeHash := common.BytesToHash(latestAccount.CodeHash) if !bytes.Equal(latestAccount.CodeHash, previousAccount.CodeHash) && !bytes.Equal(latestAccount.CodeHash, types.EmptyCodeHash) { if code, exist := fullDiffCode[codeHash]; exist { if crypto.Keccak256Hash(code) != codeHash { errChan <- fmt.Errorf("code and code hash mismatch, account %s", diffAccount.String()) return } diffMux.Lock() diffCode[codeHash] = code diffMux.Unlock() } else { rawCode := rawdb.ReadCode(p.bc.db, codeHash) if len(rawCode) == 0 { errChan <- fmt.Errorf("missing code, account %s", diffAccount.String()) return } } } //update storage latestRoot := common.BytesToHash(latestAccount.Root) if latestRoot != previousAccount.Root && latestRoot != types.EmptyRootHash { accountTrie, err := statedb.Database().OpenStorageTrie(addrHash, previousAccount.Root) if err != nil { errChan <- err return } snapMux.RLock() storageChange, exist := snapStorage[diffAccount] snapMux.RUnlock() if !exist { errChan <- errors.New("missing storage change in difflayer") return } for k, v := range storageChange { if len(v) != 0 { accountTrie.TryUpdate([]byte(k), v) } else { accountTrie.TryDelete([]byte(k)) } } // check storage root accountRootHash := accountTrie.Hash() if latestRoot != accountRootHash { errChan <- errors.New("account storage root mismatch") return } diffMux.Lock() diffTries[diffAccount] = accountTrie diffMux.Unlock() } else { snapMux.Lock() delete(snapStorage, diffAccount) snapMux.Unlock() } // can't trust the blob, need encode by our-self. latestStateAccount := state.Account{ Nonce: latestAccount.Nonce, Balance: latestAccount.Balance, Root: common.BytesToHash(latestAccount.Root), CodeHash: latestAccount.CodeHash, } bz, err := rlp.EncodeToBytes(&latestStateAccount) if err != nil { errChan <- err return } stateMux.Lock() err = statedb.Trie().TryUpdate(diffAccount[:], bz) stateMux.Unlock() if err != nil { errChan <- err return } } errChan <- nil }(start, end) } for i := 0; i < threads; i++ { err := <-errChan if err != nil { close(exitChan) return nil, nil, 0, err } } var allLogs []*types.Log var gasUsed uint64 for _, receipt := range diffLayer.Receipts { allLogs = append(allLogs, receipt.Logs...) gasUsed += receipt.GasUsed } // Do validate in advance so that we can fall back to full process if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } // remove redundant storage change for account := range snapStorage { if _, exist := snapAccounts[account]; !exist { log.Warn("receive redundant storage change in diff layer") delete(snapStorage, account) } } // remove redundant code if len(fullDiffCode) != len(diffLayer.Codes) { diffLayer.Codes = make([]types.DiffCode, 0, len(diffCode)) for hash, code := range diffCode { diffLayer.Codes = append(diffLayer.Codes, types.DiffCode{ Hash: hash, Code: code, }) } } statedb.SetSnapData(snapDestructs, snapAccounts, snapStorage) if len(snapAccounts) != len(diffLayer.Accounts) || len(snapStorage) != len(diffLayer.Storages) { diffLayer.Destructs, diffLayer.Accounts, diffLayer.Storages = statedb.SnapToDiffLayer() } statedb.SetDiff(diffLayer, diffTries, diffCode) return diffLayer.Receipts, allLogs, gasUsed, nil } // Process processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles. // // Process returns the receipts and logs accumulated during the process and // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*state.StateDB, types.Receipts, []*types.Log, uint64, error) { var ( usedGas = new(uint64) header = block.Header() allLogs []*types.Log gp = new(GasPool).AddGas(block.GasLimit()) ) signer := types.MakeSigner(p.bc.chainConfig, block.Number()) statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { misc.ApplyDAOHardFork(statedb) } // Handle upgrade build-in system contract code systemcontracts.UpgradeBuildInSystemContract(p.config, block.Number(), statedb) blockContext := NewEVMBlockContext(header, p.bc, nil) vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) // Iterate over and process the individual transactions posa, isPoSA := p.engine.(consensus.PoSA) commonTxs := make([]*types.Transaction, 0, len(block.Transactions())) // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) for i, tx := range block.Transactions() { if isPoSA { if isSystemTx, err := posa.IsSystemTransaction(tx, block.Header()); err != nil { return statedb, nil, nil, 0, err } else if isSystemTx { systemTxs = append(systemTxs, tx) continue } } msg, err := tx.AsMessage(signer) if err != nil { return statedb, nil, nil, 0, err } statedb.Prepare(tx.Hash(), block.Hash(), i) receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, header, tx, usedGas, vmenv) if err != nil { return statedb, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } commonTxs = append(commonTxs, tx) receipts = append(receipts, receipt) } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) err := p.engine.Finalize(p.bc, header, statedb, &commonTxs, block.Uncles(), &receipts, &systemTxs, usedGas) if err != nil { return statedb, receipts, allLogs, *usedGas, err } for _, receipt := range receipts { allLogs = append(allLogs, receipt.Logs...) } return statedb, receipts, allLogs, *usedGas, nil } func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) { // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg) evm.Reset(txContext, statedb) // Apply the transaction to the current state (included in the env). result, err := ApplyMessage(evm, msg, gp) if err != nil { return nil, err } // Update the state with pending changes. var root []byte if config.IsByzantium(header.Number) { statedb.Finalise(true) } else { root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() } *usedGas += result.UsedGas // Create a new receipt for the transaction, storing the intermediate root and gas used // by the tx. receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas} if result.Failed() { receipt.Status = types.ReceiptStatusFailed } else { receipt.Status = types.ReceiptStatusSuccessful } receipt.TxHash = tx.Hash() receipt.GasUsed = result.UsedGas // If the transaction created a contract, store the creation address in the receipt. if msg.To() == nil { receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce()) } // Set the receipt logs and create the bloom filter. receipt.Logs = statedb.GetLogs(tx.Hash()) receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) receipt.BlockHash = statedb.BlockHash() receipt.BlockNumber = header.Number receipt.TransactionIndex = uint(statedb.TxIndex()) return receipt, err } // ApplyTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. It returns the receipt // for the transaction, gas used and an error if the transaction failed, // indicating the block was invalid. func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, error) { msg, err := tx.AsMessage(types.MakeSigner(config, header.Number)) if err != nil { return nil, err } // Create a new context to be used in the EVM environment blockContext := NewEVMBlockContext(header, bc, author) vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) defer func() { ite := vmenv.Interpreter() vm.EVMInterpreterPool.Put(ite) vm.EvmPool.Put(vmenv) }() return applyTransaction(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv) }