ethdb, core: implement delete for db batch (#17101)

This commit is contained in:
gary rong 2018-07-02 16:16:30 +08:00 committed by Péter Szilágyi
parent fdfd6d3c39
commit a4a2343cdc
6 changed files with 51 additions and 14 deletions

@ -269,8 +269,8 @@ func (bc *BlockChain) SetHead(head uint64) error {
defer bc.mu.Unlock() defer bc.mu.Unlock()
// Rewind the header chain, deleting all block bodies until then // Rewind the header chain, deleting all block bodies until then
delFn := func(hash common.Hash, num uint64) { delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
rawdb.DeleteBody(bc.db, hash, num) rawdb.DeleteBody(db, hash, num)
} }
bc.hc.SetHead(head, delFn) bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader() currentHeader := bc.hc.CurrentHeader()
@ -1340,9 +1340,12 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
diff := types.TxDifference(deletedTxs, addedTxs) diff := types.TxDifference(deletedTxs, addedTxs)
// When transactions get deleted from the database that means the // When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted // receipts that were created in the fork must also be deleted
batch := bc.db.NewBatch()
for _, tx := range diff { for _, tx := range diff {
rawdb.DeleteTxLookupEntry(bc.db, tx.Hash()) rawdb.DeleteTxLookupEntry(batch, tx.Hash())
} }
batch.Write()
if len(deletedLogs) > 0 { if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
} }

@ -156,13 +156,16 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// Delete any canonical number assignments above the new head // Delete any canonical number assignments above the new head
batch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ { for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i) hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) { if hash == (common.Hash{}) {
break break
} }
rawdb.DeleteCanonicalHash(hc.chainDb, i) rawdb.DeleteCanonicalHash(batch, i)
} }
batch.Write()
// Overwrite any stale canonical number assignments // Overwrite any stale canonical number assignments
var ( var (
headHash = header.ParentHash headHash = header.ParentHash
@ -438,7 +441,7 @@ func (hc *HeaderChain) SetCurrentHeader(head *types.Header) {
// DeleteCallback is a callback function that is called by SetHead before // DeleteCallback is a callback function that is called by SetHead before
// each header is deleted. // each header is deleted.
type DeleteCallback func(common.Hash, uint64) type DeleteCallback func(rawdb.DatabaseDeleter, common.Hash, uint64)
// SetHead rewinds the local chain to a new head. Everything above the new head // SetHead rewinds the local chain to a new head. Everything above the new head
// will be deleted and the new one set. // will be deleted and the new one set.
@ -448,22 +451,24 @@ func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
if hdr := hc.CurrentHeader(); hdr != nil { if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64() height = hdr.Number.Uint64()
} }
batch := hc.chainDb.NewBatch()
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash() hash := hdr.Hash()
num := hdr.Number.Uint64() num := hdr.Number.Uint64()
if delFn != nil { if delFn != nil {
delFn(hash, num) delFn(batch, hash, num)
} }
rawdb.DeleteHeader(hc.chainDb, hash, num) rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(hc.chainDb, hash, num) rawdb.DeleteTd(batch, hash, num)
hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1)) hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
} }
// Roll back the canonical chain numbering // Roll back the canonical chain numbering
for i := height; i > head; i-- { for i := height; i > head; i-- {
rawdb.DeleteCanonicalHash(hc.chainDb, i) rawdb.DeleteCanonicalHash(batch, i)
} }
batch.Write()
// Clear out any stale content from the caches // Clear out any stale content from the caches
hc.headerCache.Purge() hc.headerCache.Purge()
hc.tdCache.Purge() hc.tdCache.Purge()

@ -388,6 +388,12 @@ func (b *ldbBatch) Put(key, value []byte) error {
return nil return nil
} }
func (b *ldbBatch) Delete(key []byte) error {
b.b.Delete(key)
b.size += 1
return nil
}
func (b *ldbBatch) Write() error { func (b *ldbBatch) Write() error {
return b.db.Write(b.b, nil) return b.db.Write(b.b, nil)
} }
@ -453,6 +459,10 @@ func (tb *tableBatch) Put(key, value []byte) error {
return tb.batch.Put(append([]byte(tb.prefix), key...), value) return tb.batch.Put(append([]byte(tb.prefix), key...), value)
} }
func (tb *tableBatch) Delete(key []byte) error {
return tb.batch.Delete(append([]byte(tb.prefix), key...))
}
func (tb *tableBatch) Write() error { func (tb *tableBatch) Write() error {
return tb.batch.Write() return tb.batch.Write()
} }

@ -25,12 +25,17 @@ type Putter interface {
Put(key []byte, value []byte) error Put(key []byte, value []byte) error
} }
// Deleter wraps the database delete operation supported by both batches and regular databases.
type Deleter interface {
Delete(key []byte) error
}
// Database wraps all database operations. All methods are safe for concurrent use. // Database wraps all database operations. All methods are safe for concurrent use.
type Database interface { type Database interface {
Putter Putter
Deleter
Get(key []byte) ([]byte, error) Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error) Has(key []byte) (bool, error)
Delete(key []byte) error
Close() Close()
NewBatch() Batch NewBatch() Batch
} }
@ -39,6 +44,7 @@ type Database interface {
// when Write is called. Batch cannot be used concurrently. // when Write is called. Batch cannot be used concurrently.
type Batch interface { type Batch interface {
Putter Putter
Deleter
ValueSize() int // amount of data in the batch ValueSize() int // amount of data in the batch
Write() error Write() error
// Reset resets the batch for reuse // Reset resets the batch for reuse

@ -110,11 +110,20 @@ func (b *memBatch) Put(key, value []byte) error {
return nil return nil
} }
func (b *memBatch) Delete(key []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), nil})
return nil
}
func (b *memBatch) Write() error { func (b *memBatch) Write() error {
b.db.lock.Lock() b.db.lock.Lock()
defer b.db.lock.Unlock() defer b.db.lock.Unlock()
for _, kv := range b.writes { for _, kv := range b.writes {
if kv.v == nil {
delete(b.db.db, string(kv.k))
continue
}
b.db.db[string(kv.k)] = kv.v b.db.db[string(kv.k)] = kv.v
} }
return nil return nil

@ -199,15 +199,17 @@ func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number
// rollbackTxs marks the transactions contained in recently rolled back blocks // rollbackTxs marks the transactions contained in recently rolled back blocks
// as rolled back. It also removes any positional lookup entries. // as rolled back. It also removes any positional lookup entries.
func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) { func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
batch := pool.chainDb.NewBatch()
if list, ok := pool.mined[hash]; ok { if list, ok := pool.mined[hash]; ok {
for _, tx := range list { for _, tx := range list {
txHash := tx.Hash() txHash := tx.Hash()
rawdb.DeleteTxLookupEntry(pool.chainDb, txHash) rawdb.DeleteTxLookupEntry(batch, txHash)
pool.pending[txHash] = tx pool.pending[txHash] = tx
txc.setState(txHash, false) txc.setState(txHash, false)
} }
delete(pool.mined, hash) delete(pool.mined, hash)
} }
batch.Write()
} }
// reorgOnNewHead sets a new head header, processing (and rolling back if necessary) // reorgOnNewHead sets a new head header, processing (and rolling back if necessary)
@ -504,14 +506,16 @@ func (self *TxPool) Content() (map[common.Address]types.Transactions, map[common
func (self *TxPool) RemoveTransactions(txs types.Transactions) { func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock() self.mu.Lock()
defer self.mu.Unlock() defer self.mu.Unlock()
var hashes []common.Hash var hashes []common.Hash
batch := self.chainDb.NewBatch()
for _, tx := range txs { for _, tx := range txs {
//self.RemoveTx(tx.Hash())
hash := tx.Hash() hash := tx.Hash()
delete(self.pending, hash) delete(self.pending, hash)
self.chainDb.Delete(hash[:]) batch.Delete(hash.Bytes())
hashes = append(hashes, hash) hashes = append(hashes, hash)
} }
batch.Write()
self.relay.Discard(hashes) self.relay.Discard(hashes)
} }