ethdb: add DeleteRange feature (#30668)
This PR adds `DeleteRange` to `ethdb.KeyValueWriter`. While range deletion using an iterator can be really slow, `DeleteRange` is natively supported by pebble and apparently runs in O(1) time (typically 20-30ms in my tests for removing hundreds of millions of keys and gigabytes of data). For leveldb and memorydb an iterator based fallback is implemented. Note that since the iterator method can be slow and a database function should not unexpectedly block for a very long time, the number of deleted keys is limited at 10000 which should ensure that it does not block for more than a second. ErrTooManyKeys is returned if the range has only been partially deleted. In this case the caller can repeat the call until it finally succeeds.
This commit is contained in:
parent
6c6bf6fe64
commit
80bdab757d
@ -129,6 +129,12 @@ func (t *table) Delete(key []byte) error {
|
|||||||
return t.db.Delete(append([]byte(t.prefix), key...))
|
return t.db.Delete(append([]byte(t.prefix), key...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
||||||
|
// (inclusive on start, exclusive on end).
|
||||||
|
func (t *table) DeleteRange(start, end []byte) error {
|
||||||
|
return t.db.DeleteRange(append([]byte(t.prefix), start...), append([]byte(t.prefix), end...))
|
||||||
|
}
|
||||||
|
|
||||||
// NewIterator creates a binary-alphabetical iterator over a subset
|
// NewIterator creates a binary-alphabetical iterator over a subset
|
||||||
// of database content with a particular key prefix, starting at a particular
|
// of database content with a particular key prefix, starting at a particular
|
||||||
// initial key (or after, if it does not exist).
|
// initial key (or after, if it does not exist).
|
||||||
|
@ -37,6 +37,13 @@ type KeyValueWriter interface {
|
|||||||
Delete(key []byte) error
|
Delete(key []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KeyValueRangeDeleter wraps the DeleteRange method of a backing data store.
|
||||||
|
type KeyValueRangeDeleter interface {
|
||||||
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
||||||
|
// (inclusive on start, exclusive on end).
|
||||||
|
DeleteRange(start, end []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
// KeyValueStater wraps the Stat method of a backing data store.
|
// KeyValueStater wraps the Stat method of a backing data store.
|
||||||
type KeyValueStater interface {
|
type KeyValueStater interface {
|
||||||
// Stat returns the statistic data of the database.
|
// Stat returns the statistic data of the database.
|
||||||
@ -61,6 +68,7 @@ type KeyValueStore interface {
|
|||||||
KeyValueReader
|
KeyValueReader
|
||||||
KeyValueWriter
|
KeyValueWriter
|
||||||
KeyValueStater
|
KeyValueStater
|
||||||
|
KeyValueRangeDeleter
|
||||||
Batcher
|
Batcher
|
||||||
Iteratee
|
Iteratee
|
||||||
Compacter
|
Compacter
|
||||||
@ -158,6 +166,7 @@ type Reader interface {
|
|||||||
// immutable ancient data.
|
// immutable ancient data.
|
||||||
type Writer interface {
|
type Writer interface {
|
||||||
KeyValueWriter
|
KeyValueWriter
|
||||||
|
KeyValueRangeDeleter
|
||||||
AncientWriter
|
AncientWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
@ -343,6 +344,64 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
|
|||||||
t.Fatalf("expected error on batch.Write after Close")
|
t.Fatalf("expected error on batch.Write after Close")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("DeleteRange", func(t *testing.T) {
|
||||||
|
db := New()
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
addRange := func(start, stop int) {
|
||||||
|
for i := start; i <= stop; i++ {
|
||||||
|
db.Put([]byte(strconv.Itoa(i)), nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkRange := func(start, stop int, exp bool) {
|
||||||
|
for i := start; i <= stop; i++ {
|
||||||
|
has, _ := db.Has([]byte(strconv.Itoa(i)))
|
||||||
|
if has && !exp {
|
||||||
|
t.Fatalf("unexpected key %d", i)
|
||||||
|
}
|
||||||
|
if !has && exp {
|
||||||
|
t.Fatalf("missing expected key %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addRange(1, 9)
|
||||||
|
db.DeleteRange([]byte("9"), []byte("1"))
|
||||||
|
checkRange(1, 9, true)
|
||||||
|
db.DeleteRange([]byte("5"), []byte("5"))
|
||||||
|
checkRange(1, 9, true)
|
||||||
|
db.DeleteRange([]byte("5"), []byte("50"))
|
||||||
|
checkRange(1, 4, true)
|
||||||
|
checkRange(5, 5, false)
|
||||||
|
checkRange(6, 9, true)
|
||||||
|
db.DeleteRange([]byte(""), []byte("a"))
|
||||||
|
checkRange(1, 9, false)
|
||||||
|
|
||||||
|
addRange(1, 999)
|
||||||
|
db.DeleteRange([]byte("12345"), []byte("54321"))
|
||||||
|
checkRange(1, 1, true)
|
||||||
|
checkRange(2, 5, false)
|
||||||
|
checkRange(6, 12, true)
|
||||||
|
checkRange(13, 54, false)
|
||||||
|
checkRange(55, 123, true)
|
||||||
|
checkRange(124, 543, false)
|
||||||
|
checkRange(544, 999, true)
|
||||||
|
|
||||||
|
addRange(1, 999)
|
||||||
|
db.DeleteRange([]byte("3"), []byte("7"))
|
||||||
|
checkRange(1, 2, true)
|
||||||
|
checkRange(3, 6, false)
|
||||||
|
checkRange(7, 29, true)
|
||||||
|
checkRange(30, 69, false)
|
||||||
|
checkRange(70, 299, true)
|
||||||
|
checkRange(300, 699, false)
|
||||||
|
checkRange(700, 999, true)
|
||||||
|
|
||||||
|
db.DeleteRange([]byte(""), []byte("a"))
|
||||||
|
checkRange(1, 999, false)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database
|
// BenchDatabaseSuite runs a suite of benchmarks against a KeyValueStore database
|
||||||
@ -438,6 +497,29 @@ func BenchDatabaseSuite(b *testing.B, New func() ethdb.KeyValueStore) {
|
|||||||
benchBatchWrite(b, keys, vals)
|
benchBatchWrite(b, keys, vals)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
b.Run("DeleteRange", func(b *testing.B) {
|
||||||
|
benchDeleteRange := func(b *testing.B, count int) {
|
||||||
|
db := New()
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
db.Put([]byte(strconv.Itoa(i)), nil)
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
db.DeleteRange([]byte("0"), []byte("999999999"))
|
||||||
|
}
|
||||||
|
b.Run("DeleteRange100", func(b *testing.B) {
|
||||||
|
benchDeleteRange(b, 100)
|
||||||
|
})
|
||||||
|
b.Run("DeleteRange1k", func(b *testing.B) {
|
||||||
|
benchDeleteRange(b, 1000)
|
||||||
|
})
|
||||||
|
b.Run("DeleteRange10k", func(b *testing.B) {
|
||||||
|
benchDeleteRange(b, 10000)
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func iterateKeys(it ethdb.Iterator) []string {
|
func iterateKeys(it ethdb.Iterator) []string {
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
package leveldb
|
package leveldb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -206,6 +207,36 @@ func (db *Database) Delete(key []byte) error {
|
|||||||
return db.db.Delete(key, nil)
|
return db.db.Delete(key, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrTooManyKeys = errors.New("too many keys in deleted range")
|
||||||
|
|
||||||
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
||||||
|
// (inclusive on start, exclusive on end).
|
||||||
|
// Note that this is a fallback implementation as leveldb does not natively
|
||||||
|
// support range deletion. It can be slow and therefore the number of deleted
|
||||||
|
// keys is limited in order to avoid blocking for a very long time.
|
||||||
|
// ErrTooManyKeys is returned if the range has only been partially deleted.
|
||||||
|
// In this case the caller can repeat the call until it finally succeeds.
|
||||||
|
func (db *Database) DeleteRange(start, end []byte) error {
|
||||||
|
batch := db.NewBatch()
|
||||||
|
it := db.NewIterator(nil, start)
|
||||||
|
defer it.Release()
|
||||||
|
|
||||||
|
var count int
|
||||||
|
for it.Next() && bytes.Compare(end, it.Key()) > 0 {
|
||||||
|
count++
|
||||||
|
if count > 10000 { // should not block for more than a second
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ErrTooManyKeys
|
||||||
|
}
|
||||||
|
if err := batch.Delete(it.Key()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return batch.Write()
|
||||||
|
}
|
||||||
|
|
||||||
// NewBatch creates a write-only key-value store that buffers changes to its host
|
// NewBatch creates a write-only key-value store that buffers changes to its host
|
||||||
// database until a final write is called.
|
// database until a final write is called.
|
||||||
func (db *Database) NewBatch() ethdb.Batch {
|
func (db *Database) NewBatch() ethdb.Batch {
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package memorydb
|
package memorydb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -121,6 +122,20 @@ func (db *Database) Delete(key []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
||||||
|
// (inclusive on start, exclusive on end).
|
||||||
|
func (db *Database) DeleteRange(start, end []byte) error {
|
||||||
|
it := db.NewIterator(nil, start)
|
||||||
|
defer it.Release()
|
||||||
|
|
||||||
|
for it.Next() && bytes.Compare(end, it.Key()) > 0 {
|
||||||
|
if err := db.Delete(it.Key()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// NewBatch creates a write-only key-value store that buffers changes to its host
|
// NewBatch creates a write-only key-value store that buffers changes to its host
|
||||||
// database until a final write is called.
|
// database until a final write is called.
|
||||||
func (db *Database) NewBatch() ethdb.Batch {
|
func (db *Database) NewBatch() ethdb.Batch {
|
||||||
|
@ -335,7 +335,18 @@ func (d *Database) Delete(key []byte) error {
|
|||||||
if d.closed {
|
if d.closed {
|
||||||
return pebble.ErrClosed
|
return pebble.ErrClosed
|
||||||
}
|
}
|
||||||
return d.db.Delete(key, nil)
|
return d.db.Delete(key, d.writeOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteRange deletes all of the keys (and values) in the range [start,end)
|
||||||
|
// (inclusive on start, exclusive on end).
|
||||||
|
func (d *Database) DeleteRange(start, end []byte) error {
|
||||||
|
d.quitLock.RLock()
|
||||||
|
defer d.quitLock.RUnlock()
|
||||||
|
if d.closed {
|
||||||
|
return pebble.ErrClosed
|
||||||
|
}
|
||||||
|
return d.db.DeleteRange(start, end, d.writeOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch creates a write-only key-value store that buffers changes to its host
|
// NewBatch creates a write-only key-value store that buffers changes to its host
|
||||||
|
@ -94,6 +94,10 @@ func (db *Database) Delete(key []byte) error {
|
|||||||
panic("not supported")
|
panic("not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) DeleteRange(start, end []byte) error {
|
||||||
|
panic("not supported")
|
||||||
|
}
|
||||||
|
|
||||||
func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
func (db *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||||
panic("not supported")
|
panic("not supported")
|
||||||
}
|
}
|
||||||
|
@ -819,6 +819,7 @@ type spongeDb struct {
|
|||||||
func (s *spongeDb) Has(key []byte) (bool, error) { panic("implement me") }
|
func (s *spongeDb) Has(key []byte) (bool, error) { panic("implement me") }
|
||||||
func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, errors.New("no such elem") }
|
func (s *spongeDb) Get(key []byte) ([]byte, error) { return nil, errors.New("no such elem") }
|
||||||
func (s *spongeDb) Delete(key []byte) error { panic("implement me") }
|
func (s *spongeDb) Delete(key []byte) error { panic("implement me") }
|
||||||
|
func (s *spongeDb) DeleteRange(start, end []byte) error { panic("implement me") }
|
||||||
func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBatch{s} }
|
func (s *spongeDb) NewBatch() ethdb.Batch { return &spongeBatch{s} }
|
||||||
func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} }
|
func (s *spongeDb) NewBatchWithSize(size int) ethdb.Batch { return &spongeBatch{s} }
|
||||||
func (s *spongeDb) Stat() (string, error) { panic("implement me") }
|
func (s *spongeDb) Stat() (string, error) { panic("implement me") }
|
||||||
|
@ -69,6 +69,10 @@ func (db *ProofSet) Delete(key []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *ProofSet) DeleteRange(start, end []byte) error {
|
||||||
|
panic("not supported")
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns a stored node
|
// Get returns a stored node
|
||||||
func (db *ProofSet) Get(key []byte) ([]byte, error) {
|
func (db *ProofSet) Get(key []byte) ([]byte, error) {
|
||||||
db.lock.RLock()
|
db.lock.RLock()
|
||||||
|
Loading…
Reference in New Issue
Block a user