From babd5d80261706a7f12ebe38882f92fb68dd68c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Sun, 20 Oct 2024 13:25:15 +0300 Subject: [PATCH] core/state: fix runaway alloc caused by prefetcher heap escape (#30629) Co-authored-by: lightclient --- core/state/state_object.go | 14 +-- core/state/statedb.go | 16 +-- core/state/trie_prefetcher.go | 150 ++++++++++++++++++----------- core/state/trie_prefetcher_test.go | 12 +-- 4 files changed, 115 insertions(+), 77 deletions(-) diff --git a/core/state/state_object.go b/core/state/state_object.go index 422badb19b..1ab432e96e 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -199,7 +199,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash { // Schedule the resolved storage slots for prefetching if it's enabled. if s.db.prefetcher != nil && s.data.Root != types.EmptyRootHash { - if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, [][]byte{key[:]}, true); err != nil { + if err = s.db.prefetcher.prefetch(s.addrHash, s.origin.Root, s.address, nil, []common.Hash{key}, true); err != nil { log.Error("Failed to prefetch storage slot", "addr", s.address, "key", key, "err", err) } } @@ -237,7 +237,7 @@ func (s *stateObject) setState(key common.Hash, value common.Hash, origin common // finalise moves all dirty storage slots into the pending area to be hashed or // committed later. It is invoked at the end of every transaction. func (s *stateObject) finalise() { - slotsToPrefetch := make([][]byte, 0, len(s.dirtyStorage)) + slotsToPrefetch := make([]common.Hash, 0, len(s.dirtyStorage)) for key, value := range s.dirtyStorage { if origin, exist := s.uncommittedStorage[key]; exist && origin == value { // The slot is reverted to its original value, delete the entry @@ -250,7 +250,7 @@ func (s *stateObject) finalise() { // The slot is different from its original value and hasn't been // tracked for commit yet. s.uncommittedStorage[key] = s.GetCommittedState(key) - slotsToPrefetch = append(slotsToPrefetch, common.CopyBytes(key[:])) // Copy needed for closure + slotsToPrefetch = append(slotsToPrefetch, key) // Copy needed for closure } // Aggregate the dirty storage slots into the pending area. It might // be possible that the value of tracked slot here is same with the @@ -261,7 +261,7 @@ func (s *stateObject) finalise() { s.pendingStorage[key] = value } if s.db.prefetcher != nil && len(slotsToPrefetch) > 0 && s.data.Root != types.EmptyRootHash { - if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, slotsToPrefetch, false); err != nil { + if err := s.db.prefetcher.prefetch(s.addrHash, s.data.Root, s.address, nil, slotsToPrefetch, false); err != nil { log.Error("Failed to prefetch slots", "addr", s.address, "slots", len(slotsToPrefetch), "err", err) } } @@ -323,7 +323,7 @@ func (s *stateObject) updateTrie() (Trie, error) { // Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved. var ( deletions []common.Hash - used = make([][]byte, 0, len(s.uncommittedStorage)) + used = make([]common.Hash, 0, len(s.uncommittedStorage)) ) for key, origin := range s.uncommittedStorage { // Skip noop changes, persist actual changes @@ -346,7 +346,7 @@ func (s *stateObject) updateTrie() (Trie, error) { deletions = append(deletions, key) } // Cache the items for preloading - used = append(used, common.CopyBytes(key[:])) // Copy needed for closure + used = append(used, key) // Copy needed for closure } for _, key := range deletions { if err := tr.DeleteStorage(s.address, key[:]); err != nil { @@ -356,7 +356,7 @@ func (s *stateObject) updateTrie() (Trie, error) { s.db.StorageDeleted.Add(1) } if s.db.prefetcher != nil { - s.db.prefetcher.used(s.addrHash, s.data.Root, used) + s.db.prefetcher.used(s.addrHash, s.data.Root, nil, used) } s.uncommittedStorage = make(Storage) // empties the commit markers return tr, nil diff --git a/core/state/statedb.go b/core/state/statedb.go index 527d9bc08d..f7efc199b3 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -214,7 +214,7 @@ func (s *StateDB) StartPrefetcher(namespace string, witness *stateless.Witness) // the prefetcher is constructed. For more details, see: // https://github.com/ethereum/go-ethereum/issues/29880 s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, witness == nil) - if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, false); err != nil { + if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, nil, nil, false); err != nil { log.Error("Failed to prefetch account trie", "root", s.originalRoot, "err", err) } } @@ -587,7 +587,7 @@ func (s *StateDB) getStateObject(addr common.Address) *stateObject { } // Schedule the resolved account for prefetching if it's enabled. if s.prefetcher != nil { - if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, [][]byte{addr[:]}, true); err != nil { + if err = s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, []common.Address{addr}, nil, true); err != nil { log.Error("Failed to prefetch account", "addr", addr, "err", err) } } @@ -720,7 +720,7 @@ func (s *StateDB) GetRefund() uint64 { // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. func (s *StateDB) Finalise(deleteEmptyObjects bool) { - addressesToPrefetch := make([][]byte, 0, len(s.journal.dirties)) + addressesToPrefetch := make([]common.Address, 0, len(s.journal.dirties)) for addr := range s.journal.dirties { obj, exist := s.stateObjects[addr] if !exist { @@ -753,10 +753,10 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { // At this point, also ship the address off to the precacher. The precacher // will start loading tries, and when the change is eventually committed, // the commit-phase will be a lot faster - addressesToPrefetch = append(addressesToPrefetch, common.CopyBytes(addr[:])) // Copy needed for closure + addressesToPrefetch = append(addressesToPrefetch, addr) // Copy needed for closure } if s.prefetcher != nil && len(addressesToPrefetch) > 0 { - if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, false); err != nil { + if err := s.prefetcher.prefetch(common.Hash{}, s.originalRoot, common.Address{}, addressesToPrefetch, nil, false); err != nil { log.Error("Failed to prefetch addresses", "addresses", len(addressesToPrefetch), "err", err) } } @@ -877,7 +877,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // into a shortnode. This requires `B` to be resolved from disk. // Whereas if the created node is handled first, then the collapse is avoided, and `B` is not resolved. var ( - usedAddrs [][]byte + usedAddrs []common.Address deletedAddrs []common.Address ) for addr, op := range s.mutations { @@ -892,7 +892,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.updateStateObject(s.stateObjects[addr]) s.AccountUpdated += 1 } - usedAddrs = append(usedAddrs, common.CopyBytes(addr[:])) // Copy needed for closure + usedAddrs = append(usedAddrs, addr) // Copy needed for closure } for _, deletedAddr := range deletedAddrs { s.deleteStateObject(deletedAddr) @@ -901,7 +901,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { s.AccountUpdates += time.Since(start) if s.prefetcher != nil { - s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs) + s.prefetcher.used(common.Hash{}, s.originalRoot, usedAddrs, nil) } // Track the amount of time wasted on hashing the account trie defer func(start time.Time) { s.AccountHashes += time.Since(start) }(time.Now()) diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 458e965a77..5b64583432 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -118,31 +118,31 @@ func (p *triePrefetcher) report() { fetcher.wait() // ensure the fetcher's idle before poking in its internals if fetcher.root == p.root { - p.accountLoadReadMeter.Mark(int64(len(fetcher.seenRead))) - p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWrite))) + p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr))) + p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr))) p.accountDupReadMeter.Mark(int64(fetcher.dupsRead)) p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite)) p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross)) - for _, key := range fetcher.used { - delete(fetcher.seenRead, string(key)) - delete(fetcher.seenWrite, string(key)) + for _, key := range fetcher.usedAddr { + delete(fetcher.seenReadAddr, key) + delete(fetcher.seenWriteAddr, key) } - p.accountWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite))) + p.accountWasteMeter.Mark(int64(len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr))) } else { - p.storageLoadReadMeter.Mark(int64(len(fetcher.seenRead))) - p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWrite))) + p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot))) + p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot))) p.storageDupReadMeter.Mark(int64(fetcher.dupsRead)) p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite)) p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross)) - for _, key := range fetcher.used { - delete(fetcher.seenRead, string(key)) - delete(fetcher.seenWrite, string(key)) + for _, key := range fetcher.usedSlot { + delete(fetcher.seenReadSlot, key) + delete(fetcher.seenWriteSlot, key) } - p.storageWasteMeter.Mark(int64(len(fetcher.seenRead) + len(fetcher.seenWrite))) + p.storageWasteMeter.Mark(int64(len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot))) } } } @@ -158,7 +158,7 @@ func (p *triePrefetcher) report() { // upon the same contract, the parameters invoking this method may be // repeated. // 2. Finalize of the main account trie. This happens only once per block. -func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, keys [][]byte, read bool) error { +func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr common.Address, addrs []common.Address, slots []common.Hash, read bool) error { // If the state item is only being read, but reads are disabled, return if read && p.noreads { return nil @@ -175,7 +175,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm fetcher = newSubfetcher(p.db, p.root, owner, root, addr) p.fetchers[id] = fetcher } - return fetcher.schedule(keys, read) + return fetcher.schedule(addrs, slots, read) } // trie returns the trie matching the root hash, blocking until the fetcher of @@ -195,10 +195,12 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { // used marks a batch of state items used to allow creating statistics as to // how useful or wasteful the fetcher is. -func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { +func (p *triePrefetcher) used(owner common.Hash, root common.Hash, usedAddr []common.Address, usedSlot []common.Hash) { if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { fetcher.wait() // ensure the fetcher's idle before poking in its internals - fetcher.used = append(fetcher.used, used...) + + fetcher.usedAddr = append(fetcher.usedAddr, usedAddr...) + fetcher.usedSlot = append(fetcher.usedSlot, usedSlot...) } } @@ -235,44 +237,50 @@ type subfetcher struct { stop chan struct{} // Channel to interrupt processing term chan struct{} // Channel to signal interruption - seenRead map[string]struct{} // Tracks the entries already loaded via read operations - seenWrite map[string]struct{} // Tracks the entries already loaded via write operations + seenReadAddr map[common.Address]struct{} // Tracks the accounts already loaded via read operations + seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations + seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations + seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations dupsRead int // Number of duplicate preload tasks via reads only dupsWrite int // Number of duplicate preload tasks via writes only dupsCross int // Number of duplicate preload tasks via read-write-crosses - used [][]byte // Tracks the entries used in the end + usedAddr []common.Address // Tracks the accounts used in the end + usedSlot []common.Hash // Tracks the storage used in the end } // subfetcherTask is a trie path to prefetch, tagged with whether it originates // from a read or a write request. type subfetcherTask struct { read bool - key []byte + addr *common.Address + slot *common.Hash } // newSubfetcher creates a goroutine to prefetch state items belonging to a // particular root hash. func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher { sf := &subfetcher{ - db: db, - state: state, - owner: owner, - root: root, - addr: addr, - wake: make(chan struct{}, 1), - stop: make(chan struct{}), - term: make(chan struct{}), - seenRead: make(map[string]struct{}), - seenWrite: make(map[string]struct{}), + db: db, + state: state, + owner: owner, + root: root, + addr: addr, + wake: make(chan struct{}, 1), + stop: make(chan struct{}), + term: make(chan struct{}), + seenReadAddr: make(map[common.Address]struct{}), + seenWriteAddr: make(map[common.Address]struct{}), + seenReadSlot: make(map[common.Hash]struct{}), + seenWriteSlot: make(map[common.Hash]struct{}), } go sf.loop() return sf } // schedule adds a batch of trie keys to the queue to prefetch. -func (sf *subfetcher) schedule(keys [][]byte, read bool) error { +func (sf *subfetcher) schedule(addrs []common.Address, slots []common.Hash, read bool) error { // Ensure the subfetcher is still alive select { case <-sf.term: @@ -281,8 +289,11 @@ func (sf *subfetcher) schedule(keys [][]byte, read bool) error { } // Append the tasks to the current queue sf.lock.Lock() - for _, key := range keys { - sf.tasks = append(sf.tasks, &subfetcherTask{read: read, key: key}) + for _, addr := range addrs { + sf.tasks = append(sf.tasks, &subfetcherTask{read: read, addr: &addr}) + } + for _, slot := range slots { + sf.tasks = append(sf.tasks, &subfetcherTask{read: read, slot: &slot}) } sf.lock.Unlock() @@ -378,35 +389,66 @@ func (sf *subfetcher) loop() { sf.lock.Unlock() for _, task := range tasks { - key := string(task.key) - if task.read { - if _, ok := sf.seenRead[key]; ok { - sf.dupsRead++ - continue - } - if _, ok := sf.seenWrite[key]; ok { - sf.dupsCross++ - continue + if task.addr != nil { + key := *task.addr + if task.read { + if _, ok := sf.seenReadAddr[key]; ok { + sf.dupsRead++ + continue + } + if _, ok := sf.seenWriteAddr[key]; ok { + sf.dupsCross++ + continue + } + } else { + if _, ok := sf.seenReadAddr[key]; ok { + sf.dupsCross++ + continue + } + if _, ok := sf.seenWriteAddr[key]; ok { + sf.dupsWrite++ + continue + } } } else { - if _, ok := sf.seenRead[key]; ok { - sf.dupsCross++ - continue - } - if _, ok := sf.seenWrite[key]; ok { - sf.dupsWrite++ - continue + key := *task.slot + if task.read { + if _, ok := sf.seenReadSlot[key]; ok { + sf.dupsRead++ + continue + } + if _, ok := sf.seenWriteSlot[key]; ok { + sf.dupsCross++ + continue + } + } else { + if _, ok := sf.seenReadSlot[key]; ok { + sf.dupsCross++ + continue + } + if _, ok := sf.seenWriteSlot[key]; ok { + sf.dupsWrite++ + continue + } } } - if len(task.key) == common.AddressLength { - sf.trie.GetAccount(common.BytesToAddress(task.key)) + if task.addr != nil { + sf.trie.GetAccount(*task.addr) } else { - sf.trie.GetStorage(sf.addr, task.key) + sf.trie.GetStorage(sf.addr, (*task.slot)[:]) } if task.read { - sf.seenRead[key] = struct{}{} + if task.addr != nil { + sf.seenReadAddr[*task.addr] = struct{}{} + } else { + sf.seenReadSlot[*task.slot] = struct{}{} + } } else { - sf.seenWrite[key] = struct{}{} + if task.addr != nil { + sf.seenWriteAddr[*task.addr] = struct{}{} + } else { + sf.seenWriteSlot[*task.slot] = struct{}{} + } } } diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index 529b42d39c..d96727704c 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -53,12 +53,12 @@ func TestUseAfterTerminate(t *testing.T) { prefetcher := newTriePrefetcher(db.db, db.originalRoot, "", true) skey := common.HexToHash("aaa") - if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err != nil { + if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, nil, []common.Hash{skey}, false); err != nil { t.Errorf("Prefetch failed before terminate: %v", err) } prefetcher.terminate(false) - if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, [][]byte{skey.Bytes()}, false); err == nil { + if err := prefetcher.prefetch(common.Hash{}, db.originalRoot, common.Address{}, nil, []common.Hash{skey}, false); err == nil { t.Errorf("Prefetch succeeded after terminate: %v", err) } if tr := prefetcher.trie(common.Hash{}, db.originalRoot); tr == nil { @@ -90,14 +90,10 @@ func TestVerklePrefetcher(t *testing.T) { fetcher := newTriePrefetcher(sdb, root, "", false) // Read account - fetcher.prefetch(common.Hash{}, root, common.Address{}, [][]byte{ - addr.Bytes(), - }, false) + fetcher.prefetch(common.Hash{}, root, common.Address{}, []common.Address{addr}, nil, false) // Read storage slot - fetcher.prefetch(crypto.Keccak256Hash(addr.Bytes()), sRoot, addr, [][]byte{ - skey.Bytes(), - }, false) + fetcher.prefetch(crypto.Keccak256Hash(addr.Bytes()), sRoot, addr, nil, []common.Hash{skey}, false) fetcher.terminate(false) accountTrie := fetcher.trie(common.Hash{}, root)