go-ethereum/swarm/storage/ldbstore.go
Matthew Halpern fbedf62f3d swarm/storage: fix loop bound for database cleanup (#19085)
The current loop continuation condition is always true as a uint8
is always being checked whether it is less than 255 (its maximum
value). Since the loop starts with the value 1, the loop termination
can be guarranteed to exit once the value overflows to 0.
2019-02-21 06:37:32 +01:00

1078 lines
27 KiB
Go

// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// disk storage layer for the package bzz
// DbStore implements the ChunkStore interface and is used by the FileStore as
// persistent storage of chunks
// it implements purging based on access count allowing for external control of
// max capacity
package storage
import (
"archive/tar"
"bytes"
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
)
const (
defaultGCRatio = 10
defaultMaxGCRound = 10000
defaultMaxGCBatch = 5000
wEntryCnt = 1 << 0
wIndexCnt = 1 << 1
wAccessCnt = 1 << 2
)
var (
dbEntryCount = metrics.NewRegisteredCounter("ldbstore.entryCnt", nil)
)
var (
keyIndex = byte(0)
keyAccessCnt = []byte{2}
keyEntryCnt = []byte{3}
keyDataIdx = []byte{4}
keyData = byte(6)
keyDistanceCnt = byte(7)
keySchema = []byte{8}
keyGCIdx = byte(9) // access to chunk data index, used by garbage collection in ascending order from first entry
)
var (
ErrDBClosed = errors.New("LDBStore closed")
)
type LDBStoreParams struct {
*StoreParams
Path string
Po func(Address) uint8
}
// NewLDBStoreParams constructs LDBStoreParams with the specified values.
func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams {
return &LDBStoreParams{
StoreParams: storeparams,
Path: path,
Po: func(k Address) (ret uint8) { return uint8(Proximity(storeparams.BaseKey, k[:])) },
}
}
type garbage struct {
maxRound int // maximum number of chunks to delete in one garbage collection round
maxBatch int // maximum number of chunks to delete in one db request batch
ratio int // 1/x ratio to calculate the number of chunks to gc on a low capacity db
count int // number of chunks deleted in running round
target int // number of chunks to delete in running round
batch *dbBatch // the delete batch
runC chan struct{} // struct in chan means gc is NOT running
}
type LDBStore struct {
db *LDBDatabase
// this should be stored in db, accessed transactionally
entryCnt uint64 // number of items in the LevelDB
accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
dataIdx uint64 // similar to entryCnt, but we only increment it
capacity uint64
bucketCnt []uint64
hashfunc SwarmHasher
po func(Address) uint8
batchesC chan struct{}
closed bool
batch *dbBatch
lock sync.RWMutex
quit chan struct{}
gc *garbage
// Functions encodeDataFunc is used to bypass
// the default functionality of DbStore with
// mock.NodeStore for testing purposes.
encodeDataFunc func(chunk Chunk) []byte
// If getDataFunc is defined, it will be used for
// retrieving the chunk data instead from the local
// LevelDB database.
getDataFunc func(key Address) (data []byte, err error)
}
type dbBatch struct {
*leveldb.Batch
err error
c chan struct{}
}
func newBatch() *dbBatch {
return &dbBatch{Batch: new(leveldb.Batch), c: make(chan struct{})}
}
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
// a function different from the one that is actually used.
func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
s = new(LDBStore)
s.hashfunc = params.Hash
s.quit = make(chan struct{})
s.batchesC = make(chan struct{}, 1)
go s.writeBatches()
s.batch = newBatch()
// associate encodeData with default functionality
s.encodeDataFunc = encodeData
s.db, err = NewLDBDatabase(params.Path)
if err != nil {
return nil, err
}
s.po = params.Po
s.setCapacity(params.DbCapacity)
s.bucketCnt = make([]uint64, 0x100)
for i := 0; i < 0x100; i++ {
k := make([]byte, 2)
k[0] = keyDistanceCnt
k[1] = uint8(i)
cnt, _ := s.db.Get(k)
s.bucketCnt[i] = BytesToU64(cnt)
}
data, _ := s.db.Get(keyEntryCnt)
s.entryCnt = BytesToU64(data)
data, _ = s.db.Get(keyAccessCnt)
s.accessCnt = BytesToU64(data)
data, _ = s.db.Get(keyDataIdx)
s.dataIdx = BytesToU64(data)
// set up garbage collection
s.gc = &garbage{
maxBatch: defaultMaxGCBatch,
maxRound: defaultMaxGCRound,
ratio: defaultGCRatio,
}
s.gc.runC = make(chan struct{}, 1)
s.gc.runC <- struct{}{}
return s, nil
}
// MarkAccessed increments the access counter as a best effort for a chunk, so
// the chunk won't get garbage collected.
func (s *LDBStore) MarkAccessed(addr Address) {
s.lock.Lock()
defer s.lock.Unlock()
if s.closed {
return
}
proximity := s.po(addr)
s.tryAccessIdx(addr, proximity)
}
// initialize and set values for processing of gc round
func (s *LDBStore) startGC(c int) {
s.gc.count = 0
// calculate the target number of deletions
if c >= s.gc.maxRound {
s.gc.target = s.gc.maxRound
} else {
s.gc.target = c / s.gc.ratio
}
s.gc.batch = newBatch()
log.Debug("startgc", "requested", c, "target", s.gc.target)
}
// NewMockDbStore creates a new instance of DbStore with
// mockStore set to a provided value. If mockStore argument is nil,
// this function behaves exactly as NewDbStore.
func NewMockDbStore(params *LDBStoreParams, mockStore *mock.NodeStore) (s *LDBStore, err error) {
s, err = NewLDBStore(params)
if err != nil {
return nil, err
}
// replace put and get with mock store functionality
if mockStore != nil {
s.encodeDataFunc = newMockEncodeDataFunc(mockStore)
s.getDataFunc = newMockGetDataFunc(mockStore)
}
return
}
type dpaDBIndex struct {
Idx uint64
Access uint64
}
func BytesToU64(data []byte) uint64 {
if len(data) < 8 {
return 0
}
return binary.BigEndian.Uint64(data)
}
func U64ToBytes(val uint64) []byte {
data := make([]byte, 8)
binary.BigEndian.PutUint64(data, val)
return data
}
func getIndexKey(hash Address) []byte {
hashSize := len(hash)
key := make([]byte, hashSize+1)
key[0] = keyIndex
copy(key[1:], hash[:])
return key
}
func getDataKey(idx uint64, po uint8) []byte {
key := make([]byte, 10)
key[0] = keyData
key[1] = po
binary.BigEndian.PutUint64(key[2:], idx)
return key
}
func getGCIdxKey(index *dpaDBIndex) []byte {
key := make([]byte, 9)
key[0] = keyGCIdx
binary.BigEndian.PutUint64(key[1:], index.Access)
return key
}
func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
val := make([]byte, 41) // po = 1, index.Index = 8, Address = 32
val[0] = po
binary.BigEndian.PutUint64(val[1:], index.Idx)
copy(val[9:], addr)
return val
}
func parseIdxKey(key []byte) (byte, []byte) {
return key[0], key[1:]
}
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
index = &dpaDBIndex{
Idx: binary.BigEndian.Uint64(val[1:]),
Access: binary.BigEndian.Uint64(accessCnt),
}
po = val[0]
addr = val[9:]
return
}
func encodeIndex(index *dpaDBIndex) []byte {
data, _ := rlp.EncodeToBytes(index)
return data
}
func encodeData(chunk Chunk) []byte {
// Always create a new underlying array for the returned byte slice.
// The chunk.Address array may be used in the returned slice which
// may be changed later in the code or by the LevelDB, resulting
// that the Address is changed as well.
return append(append([]byte{}, chunk.Address()[:]...), chunk.Data()...)
}
func decodeIndex(data []byte, index *dpaDBIndex) error {
dec := rlp.NewStream(bytes.NewReader(data), 0)
return dec.Decode(index)
}
func decodeData(addr Address, data []byte) (*chunk, error) {
return NewChunk(addr, data[32:]), nil
}
func (s *LDBStore) collectGarbage() error {
// prevent duplicate gc from starting when one is already running
select {
case <-s.gc.runC:
default:
return nil
}
s.lock.Lock()
entryCnt := s.entryCnt
s.lock.Unlock()
metrics.GetOrRegisterCounter("ldbstore.collectgarbage", nil).Inc(1)
// calculate the amount of chunks to collect and reset counter
s.startGC(int(entryCnt))
log.Debug("collectGarbage", "target", s.gc.target, "entryCnt", entryCnt)
for s.gc.count < s.gc.target {
it := s.db.NewIterator()
ok := it.Seek([]byte{keyGCIdx})
var singleIterationCount int
// every batch needs a lock so we avoid entries changing accessidx in the meantime
s.lock.Lock()
for ; ok && (singleIterationCount < s.gc.maxBatch); ok = it.Next() {
// quit if no more access index keys
itkey := it.Key()
if (itkey == nil) || (itkey[0] != keyGCIdx) {
break
}
// get chunk data entry from access index
val := it.Value()
index, po, hash := parseGCIdxEntry(itkey[1:], val)
keyIdx := make([]byte, 33)
keyIdx[0] = keyIndex
copy(keyIdx[1:], hash)
// add delete operation to batch
s.delete(s.gc.batch.Batch, index, keyIdx, po)
singleIterationCount++
s.gc.count++
log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
// break if target is not on max garbage batch boundary
if s.gc.count >= s.gc.target {
break
}
}
s.writeBatch(s.gc.batch, wEntryCnt)
log.Trace("garbage collect batch done", "batch", singleIterationCount, "total", s.gc.count)
s.lock.Unlock()
it.Release()
}
metrics.GetOrRegisterCounter("ldbstore.collectgarbage.delete", nil).Inc(int64(s.gc.count))
log.Debug("garbage collect done", "c", s.gc.count)
s.gc.runC <- struct{}{}
return nil
}
// Export writes all chunks from the store to a tar archive, returning the
// number of chunks written.
func (s *LDBStore) Export(out io.Writer) (int64, error) {
tw := tar.NewWriter(out)
defer tw.Close()
it := s.db.NewIterator()
defer it.Release()
var count int64
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
key := it.Key()
if (key == nil) || (key[0] != keyIndex) {
break
}
var index dpaDBIndex
hash := key[1:]
decodeIndex(it.Value(), &index)
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
log.Trace("store.export", "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po)
data, err := s.db.Get(datakey)
if err != nil {
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key, err))
continue
}
hdr := &tar.Header{
Name: hex.EncodeToString(hash),
Mode: 0644,
Size: int64(len(data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return count, err
}
if _, err := tw.Write(data); err != nil {
return count, err
}
count++
}
return count, nil
}
// of chunks read.
func (s *LDBStore) Import(in io.Reader) (int64, error) {
tr := tar.NewReader(in)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
countC := make(chan int64)
errC := make(chan error)
var count int64
go func() {
for {
hdr, err := tr.Next()
if err == io.EOF {
break
} else if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
if len(hdr.Name) != 64 {
log.Warn("ignoring non-chunk file", "name", hdr.Name)
continue
}
keybytes, err := hex.DecodeString(hdr.Name)
if err != nil {
log.Warn("ignoring invalid chunk file", "name", hdr.Name, "err", err)
continue
}
data, err := ioutil.ReadAll(tr)
if err != nil {
select {
case errC <- err:
case <-ctx.Done():
}
}
key := Address(keybytes)
chunk := NewChunk(key, data[32:])
go func() {
select {
case errC <- s.Put(ctx, chunk):
case <-ctx.Done():
}
}()
count++
}
countC <- count
}()
// wait for all chunks to be stored
i := int64(0)
var total int64
for {
select {
case err := <-errC:
if err != nil {
return count, err
}
i++
case total = <-countC:
case <-ctx.Done():
return i, ctx.Err()
}
if total > 0 && i == total {
return total, nil
}
}
}
// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
var errorsFound, removed, total int
it := s.db.NewIterator()
defer it.Release()
for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
key := it.Key()
if (key == nil) || (key[0] != keyIndex) {
break
}
total++
var index dpaDBIndex
err := decodeIndex(it.Value(), &index)
if err != nil {
log.Warn("Cannot decode")
errorsFound++
continue
}
hash := key[1:]
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
data, err := s.db.Get(datakey)
if err != nil {
found := false
// The highest possible proximity is 255, so exit loop upon overflow.
for po = uint8(1); po != 0; po++ {
datakey = getDataKey(index.Idx, po)
data, err = s.db.Get(datakey)
if err == nil {
found = true
break
}
}
if !found {
log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key))
errorsFound++
continue
}
}
ck := data[:32]
c, err := decodeData(ck, data)
if err != nil {
log.Error("decodeData error", "err", err)
continue
}
cs := int64(binary.LittleEndian.Uint64(c.sdata[:8]))
log.Trace("chunk", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
// if chunk is to be removed
if f(c) {
log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.sdata), "size", cs)
s.deleteNow(&index, getIndexKey(key[1:]), po)
removed++
errorsFound++
}
}
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
}
// CleanGCIndex rebuilds the garbage collector index from scratch, while
// removing inconsistent elements, e.g., indices with missing data chunks.
// WARN: it's a pretty heavy, long running function.
func (s *LDBStore) CleanGCIndex() error {
s.lock.Lock()
defer s.lock.Unlock()
batch := leveldb.Batch{}
var okEntryCount uint64
var totalEntryCount uint64
// throw out all gc indices, we will rebuild from cleaned index
it := s.db.NewIterator()
it.Seek([]byte{keyGCIdx})
var gcDeletes int
for it.Valid() {
rowType, _ := parseIdxKey(it.Key())
if rowType != keyGCIdx {
break
}
batch.Delete(it.Key())
gcDeletes++
it.Next()
}
log.Debug("gc", "deletes", gcDeletes)
if err := s.db.Write(&batch); err != nil {
return err
}
batch.Reset()
it.Release()
// corrected po index pointer values
var poPtrs [256]uint64
// set to true if chunk count not on 4096 iteration boundary
var doneIterating bool
// last key index in previous iteration
lastIdxKey := []byte{keyIndex}
// counter for debug output
var cleanBatchCount int
// go through all key index entries
for !doneIterating {
cleanBatchCount++
var idxs []dpaDBIndex
var chunkHashes [][]byte
var pos []uint8
it := s.db.NewIterator()
it.Seek(lastIdxKey)
// 4096 is just a nice number, don't look for any hidden meaning here...
var i int
for i = 0; i < 4096; i++ {
// this really shouldn't happen unless database is empty
// but let's keep it to be safe
if !it.Valid() {
doneIterating = true
break
}
// if it's not keyindex anymore we're done iterating
rowType, chunkHash := parseIdxKey(it.Key())
if rowType != keyIndex {
doneIterating = true
break
}
// decode the retrieved index
var idx dpaDBIndex
err := decodeIndex(it.Value(), &idx)
if err != nil {
return fmt.Errorf("corrupt index: %v", err)
}
po := s.po(chunkHash)
lastIdxKey = it.Key()
// if we don't find the data key, remove the entry
// if we find it, add to the array of new gc indices to create
dataKey := getDataKey(idx.Idx, po)
_, err = s.db.Get(dataKey)
if err != nil {
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
batch.Delete(it.Key())
} else {
idxs = append(idxs, idx)
chunkHashes = append(chunkHashes, chunkHash)
pos = append(pos, po)
okEntryCount++
if idx.Idx > poPtrs[po] {
poPtrs[po] = idx.Idx
}
}
totalEntryCount++
it.Next()
}
it.Release()
// flush the key index corrections
err := s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
// add correct gc indices
for i, okIdx := range idxs {
gcIdxKey := getGCIdxKey(&okIdx)
gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i])
batch.Put(gcIdxKey, gcIdxData)
log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData)
}
// flush them
err = s.db.Write(&batch)
if err != nil {
return err
}
batch.Reset()
log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs))
}
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
// lastly add updated entry count
var entryCount [8]byte
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
batch.Put(keyEntryCnt, entryCount[:])
// and add the new po index pointers
var poKey [2]byte
poKey[0] = keyDistanceCnt
for i, poPtr := range poPtrs {
poKey[1] = uint8(i)
if poPtr == 0 {
batch.Delete(poKey[:])
} else {
var idxCount [8]byte
binary.BigEndian.PutUint64(idxCount[:], poPtr)
batch.Put(poKey[:], idxCount[:])
}
}
// if you made it this far your harddisk has survived. Congratulations
return s.db.Write(&batch)
}
// Delete is removes a chunk and updates indices.
// Is thread safe
func (s *LDBStore) Delete(addr Address) error {
s.lock.Lock()
defer s.lock.Unlock()
ikey := getIndexKey(addr)
idata, err := s.db.Get(ikey)
if err != nil {
return err
}
var idx dpaDBIndex
decodeIndex(idata, &idx)
proximity := s.po(addr)
return s.deleteNow(&idx, ikey, proximity)
}
// executes one delete operation immediately
// see *LDBStore.delete
func (s *LDBStore) deleteNow(idx *dpaDBIndex, idxKey []byte, po uint8) error {
batch := new(leveldb.Batch)
s.delete(batch, idx, idxKey, po)
return s.db.Write(batch)
}
// adds a delete chunk operation to the provided batch
// if called directly, decrements entrycount regardless if the chunk exists upon deletion. Risk of wrap to max uint64
func (s *LDBStore) delete(batch *leveldb.Batch, idx *dpaDBIndex, idxKey []byte, po uint8) {
metrics.GetOrRegisterCounter("ldbstore.delete", nil).Inc(1)
gcIdxKey := getGCIdxKey(idx)
batch.Delete(gcIdxKey)
dataKey := getDataKey(idx.Idx, po)
batch.Delete(dataKey)
batch.Delete(idxKey)
s.entryCnt--
dbEntryCount.Dec(1)
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
}
func (s *LDBStore) BinIndex(po uint8) uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.bucketCnt[po]
}
// Put adds a chunk to the database, adding indices and incrementing global counters.
// If it already exists, it merely increments the access count of the existing entry.
// Is thread safe
func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
metrics.GetOrRegisterCounter("ldbstore.put", nil).Inc(1)
log.Trace("ldbstore.put", "key", chunk.Address())
ikey := getIndexKey(chunk.Address())
var index dpaDBIndex
po := s.po(chunk.Address())
s.lock.Lock()
if s.closed {
s.lock.Unlock()
return ErrDBClosed
}
batch := s.batch
log.Trace("ldbstore.put: s.db.Get", "key", chunk.Address(), "ikey", fmt.Sprintf("%x", ikey))
_, err := s.db.Get(ikey)
if err != nil {
s.doPut(chunk, &index, po)
}
idata := encodeIndex(&index)
s.batch.Put(ikey, idata)
// add the access-chunkindex index for garbage collection
gcIdxKey := getGCIdxKey(&index)
gcIdxData := getGCIdxValue(&index, po, chunk.Address())
s.batch.Put(gcIdxKey, gcIdxData)
s.lock.Unlock()
select {
case s.batchesC <- struct{}{}:
default:
}
select {
case <-batch.c:
return batch.err
case <-ctx.Done():
return ctx.Err()
}
}
// force putting into db, does not check or update necessary indices
func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
data := s.encodeDataFunc(chunk)
dkey := getDataKey(s.dataIdx, po)
s.batch.Put(dkey, data)
index.Idx = s.dataIdx
s.bucketCnt[po] = s.dataIdx
s.entryCnt++
dbEntryCount.Inc(1)
s.dataIdx++
index.Access = s.accessCnt
s.accessCnt++
cntKey := make([]byte, 2)
cntKey[0] = keyDistanceCnt
cntKey[1] = po
s.batch.Put(cntKey, U64ToBytes(s.bucketCnt[po]))
}
func (s *LDBStore) writeBatches() {
for {
select {
case <-s.quit:
log.Debug("DbStore: quit batch write loop")
return
case <-s.batchesC:
err := s.writeCurrentBatch()
if err != nil {
log.Debug("DbStore: quit batch write loop", "err", err.Error())
return
}
}
}
}
func (s *LDBStore) writeCurrentBatch() error {
s.lock.Lock()
defer s.lock.Unlock()
b := s.batch
l := b.Len()
if l == 0 {
return nil
}
s.batch = newBatch()
b.err = s.writeBatch(b, wEntryCnt|wAccessCnt|wIndexCnt)
close(b.c)
if s.entryCnt >= s.capacity {
go s.collectGarbage()
}
return nil
}
// must be called non concurrently
func (s *LDBStore) writeBatch(b *dbBatch, wFlag uint8) error {
if wFlag&wEntryCnt > 0 {
b.Put(keyEntryCnt, U64ToBytes(s.entryCnt))
}
if wFlag&wIndexCnt > 0 {
b.Put(keyDataIdx, U64ToBytes(s.dataIdx))
}
if wFlag&wAccessCnt > 0 {
b.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
}
l := b.Len()
if err := s.db.Write(b.Batch); err != nil {
return fmt.Errorf("unable to write batch: %v", err)
}
log.Trace(fmt.Sprintf("batch write (%d entries)", l))
return nil
}
// newMockEncodeDataFunc returns a function that stores the chunk data
// to a mock store to bypass the default functionality encodeData.
// The constructed function always returns the nil data, as DbStore does
// not need to store the data, but still need to create the index.
func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
return func(chunk Chunk) []byte {
if err := mockStore.Put(chunk.Address(), encodeData(chunk)); err != nil {
log.Error(fmt.Sprintf("%T: Chunk %v put: %v", mockStore, chunk.Address().Log(), err))
}
return chunk.Address()[:]
}
}
// tryAccessIdx tries to find index entry. If found then increments the access
// count for garbage collection and returns the index entry and true for found,
// otherwise returns nil and false.
func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
ikey := getIndexKey(addr)
idata, err := s.db.Get(ikey)
if err != nil {
return nil, false
}
index := new(dpaDBIndex)
decodeIndex(idata, index)
oldGCIdxKey := getGCIdxKey(index)
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
index.Access = s.accessCnt
idata = encodeIndex(index)
s.accessCnt++
s.batch.Put(ikey, idata)
newGCIdxKey := getGCIdxKey(index)
newGCIdxData := getGCIdxValue(index, po, ikey[1:])
s.batch.Delete(oldGCIdxKey)
s.batch.Put(newGCIdxKey, newGCIdxData)
select {
case s.batchesC <- struct{}{}:
default:
}
return index, true
}
// GetSchema is returning the current named schema of the datastore as read from LevelDB
func (s *LDBStore) GetSchema() (string, error) {
s.lock.Lock()
defer s.lock.Unlock()
data, err := s.db.Get(keySchema)
if err != nil {
if err == leveldb.ErrNotFound {
return DbSchemaNone, nil
}
return "", err
}
return string(data), nil
}
// PutSchema is saving a named schema to the LevelDB datastore
func (s *LDBStore) PutSchema(schema string) error {
s.lock.Lock()
defer s.lock.Unlock()
return s.db.Put(keySchema, []byte(schema))
}
// Get retrieves the chunk matching the provided key from the database.
// If the chunk entry does not exist, it returns an error
// Updates access count and is thread safe
func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error) {
metrics.GetOrRegisterCounter("ldbstore.get", nil).Inc(1)
log.Trace("ldbstore.get", "key", addr)
s.lock.Lock()
defer s.lock.Unlock()
return s.get(addr)
}
// Has queries the underlying DB if a chunk with the given address is stored
// Returns true if the chunk is found, false if not
func (s *LDBStore) Has(_ context.Context, addr Address) bool {
s.lock.RLock()
defer s.lock.RUnlock()
ikey := getIndexKey(addr)
_, err := s.db.Get(ikey)
return err == nil
}
// TODO: To conform with other private methods of this object indices should not be updated
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
if s.closed {
return nil, ErrDBClosed
}
proximity := s.po(addr)
index, found := s.tryAccessIdx(addr, proximity)
if found {
var data []byte
if s.getDataFunc != nil {
// if getDataFunc is defined, use it to retrieve the chunk data
log.Trace("ldbstore.get retrieve with getDataFunc", "key", addr)
data, err = s.getDataFunc(addr)
if err != nil {
return
}
} else {
// default DbStore functionality to retrieve chunk data
datakey := getDataKey(index.Idx, proximity)
data, err = s.db.Get(datakey)
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
if err != nil {
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
s.deleteNow(index, getIndexKey(addr), s.po(addr))
return
}
}
return decodeData(addr, data)
} else {
err = ErrChunkNotFound
}
return
}
// newMockGetFunc returns a function that reads chunk data from
// the mock database, which is used as the value for DbStore.getFunc
// to bypass the default functionality of DbStore with a mock store.
func newMockGetDataFunc(mockStore *mock.NodeStore) func(addr Address) (data []byte, err error) {
return func(addr Address) (data []byte, err error) {
data, err = mockStore.Get(addr)
if err == mock.ErrNotFound {
// preserve ErrChunkNotFound error
err = ErrChunkNotFound
}
return data, err
}
}
func (s *LDBStore) setCapacity(c uint64) {
s.lock.Lock()
defer s.lock.Unlock()
s.capacity = c
for s.entryCnt > c {
s.collectGarbage()
}
}
func (s *LDBStore) Close() {
close(s.quit)
s.lock.Lock()
s.closed = true
s.lock.Unlock()
// force writing out current batch
s.writeCurrentBatch()
s.db.Close()
}
// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Address, uint64) bool) error {
metrics.GetOrRegisterCounter("ldbstore.synciterator", nil).Inc(1)
sincekey := getDataKey(since, po)
untilkey := getDataKey(until, po)
it := s.db.NewIterator()
defer it.Release()
for ok := it.Seek(sincekey); ok; ok = it.Next() {
metrics.GetOrRegisterCounter("ldbstore.synciterator.seek", nil).Inc(1)
dbkey := it.Key()
if dbkey[0] != keyData || dbkey[1] != po || bytes.Compare(untilkey, dbkey) < 0 {
break
}
key := make([]byte, 32)
val := it.Value()
copy(key, val[:32])
if !f(Address(key), binary.BigEndian.Uint64(dbkey[2:])) {
break
}
}
return it.Error()
}