2018-06-20 15:06:27 +03:00
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// disk storage layer for the package bzz
// DbStore implements the ChunkStore interface and is used by the FileStore as
// persistent storage of chunks
// it implements purging based on access count allowing for external control of
// max capacity
package storage
import (
"archive/tar"
"bytes"
2018-07-13 18:40:28 +03:00
"context"
2018-06-20 15:06:27 +03:00
"encoding/binary"
"encoding/hex"
2018-09-13 12:42:19 +03:00
"errors"
2018-06-20 15:06:27 +03:00
"fmt"
"io"
"io/ioutil"
"sort"
"sync"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
const (
gcArrayFreeRatio = 0.1
maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage()
)
2018-09-06 13:11:38 +03:00
var (
dbEntryCount = metrics . NewRegisteredCounter ( "ldbstore.entryCnt" , nil )
)
2018-06-20 15:06:27 +03:00
var (
keyIndex = byte ( 0 )
keyOldData = byte ( 1 )
keyAccessCnt = [ ] byte { 2 }
keyEntryCnt = [ ] byte { 3 }
keyDataIdx = [ ] byte { 4 }
keyData = byte ( 6 )
keyDistanceCnt = byte ( 7 )
2018-10-03 15:31:59 +03:00
keySchema = [ ] byte { 8 }
2018-06-20 15:06:27 +03:00
)
2018-09-13 12:42:19 +03:00
var (
ErrDBClosed = errors . New ( "LDBStore closed" )
)
2018-06-20 15:06:27 +03:00
type gcItem struct {
idx uint64
value uint64
idxKey [ ] byte
po uint8
}
type LDBStoreParams struct {
* StoreParams
Path string
Po func ( Address ) uint8
}
// NewLDBStoreParams constructs LDBStoreParams with the specified values.
func NewLDBStoreParams ( storeparams * StoreParams , path string ) * LDBStoreParams {
return & LDBStoreParams {
StoreParams : storeparams ,
Path : path ,
2018-09-14 23:07:13 +03:00
Po : func ( k Address ) ( ret uint8 ) { return uint8 ( Proximity ( storeparams . BaseKey , k [ : ] ) ) } ,
2018-06-20 15:06:27 +03:00
}
}
type LDBStore struct {
db * LDBDatabase
// this should be stored in db, accessed transactionally
entryCnt uint64 // number of items in the LevelDB
accessCnt uint64 // ever-accumulating number increased every time we read/access an entry
dataIdx uint64 // similar to entryCnt, but we only increment it
capacity uint64
bucketCnt [ ] uint64
hashfunc SwarmHasher
po func ( Address ) uint8
batchC chan bool
batchesC chan struct { }
2018-09-13 12:42:19 +03:00
closed bool
batch * dbBatch
2018-06-20 15:06:27 +03:00
lock sync . RWMutex
quit chan struct { }
// Functions encodeDataFunc is used to bypass
// the default functionality of DbStore with
// mock.NodeStore for testing purposes.
2018-09-13 12:42:19 +03:00
encodeDataFunc func ( chunk Chunk ) [ ] byte
2018-06-20 15:06:27 +03:00
// If getDataFunc is defined, it will be used for
// retrieving the chunk data instead from the local
// LevelDB database.
2018-09-13 12:42:19 +03:00
getDataFunc func ( key Address ) ( data [ ] byte , err error )
}
type dbBatch struct {
* leveldb . Batch
err error
c chan struct { }
}
func newBatch ( ) * dbBatch {
return & dbBatch { Batch : new ( leveldb . Batch ) , c : make ( chan struct { } ) }
2018-06-20 15:06:27 +03:00
}
// TODO: Instead of passing the distance function, just pass the address from which distances are calculated
// to avoid the appearance of a pluggable distance metric and opportunities of bugs associated with providing
// a function different from the one that is actually used.
func NewLDBStore ( params * LDBStoreParams ) ( s * LDBStore , err error ) {
s = new ( LDBStore )
s . hashfunc = params . Hash
s . quit = make ( chan struct { } )
s . batchesC = make ( chan struct { } , 1 )
go s . writeBatches ( )
2018-09-13 12:42:19 +03:00
s . batch = newBatch ( )
2018-06-20 15:06:27 +03:00
// associate encodeData with default functionality
s . encodeDataFunc = encodeData
s . db , err = NewLDBDatabase ( params . Path )
if err != nil {
return nil , err
}
s . po = params . Po
s . setCapacity ( params . DbCapacity )
s . bucketCnt = make ( [ ] uint64 , 0x100 )
for i := 0 ; i < 0x100 ; i ++ {
k := make ( [ ] byte , 2 )
k [ 0 ] = keyDistanceCnt
k [ 1 ] = uint8 ( i )
cnt , _ := s . db . Get ( k )
s . bucketCnt [ i ] = BytesToU64 ( cnt )
}
data , _ := s . db . Get ( keyEntryCnt )
s . entryCnt = BytesToU64 ( data )
data , _ = s . db . Get ( keyAccessCnt )
s . accessCnt = BytesToU64 ( data )
data , _ = s . db . Get ( keyDataIdx )
s . dataIdx = BytesToU64 ( data )
return s , nil
}
// NewMockDbStore creates a new instance of DbStore with
// mockStore set to a provided value. If mockStore argument is nil,
// this function behaves exactly as NewDbStore.
func NewMockDbStore ( params * LDBStoreParams , mockStore * mock . NodeStore ) ( s * LDBStore , err error ) {
s , err = NewLDBStore ( params )
if err != nil {
return nil , err
}
// replace put and get with mock store functionality
if mockStore != nil {
s . encodeDataFunc = newMockEncodeDataFunc ( mockStore )
s . getDataFunc = newMockGetDataFunc ( mockStore )
}
return
}
type dpaDBIndex struct {
Idx uint64
Access uint64
}
func BytesToU64 ( data [ ] byte ) uint64 {
if len ( data ) < 8 {
return 0
}
return binary . BigEndian . Uint64 ( data )
}
func U64ToBytes ( val uint64 ) [ ] byte {
data := make ( [ ] byte , 8 )
binary . BigEndian . PutUint64 ( data , val )
return data
}
func ( s * LDBStore ) updateIndexAccess ( index * dpaDBIndex ) {
index . Access = s . accessCnt
}
func getIndexKey ( hash Address ) [ ] byte {
hashSize := len ( hash )
key := make ( [ ] byte , hashSize + 1 )
key [ 0 ] = keyIndex
copy ( key [ 1 : ] , hash [ : ] )
return key
}
func getDataKey ( idx uint64 , po uint8 ) [ ] byte {
key := make ( [ ] byte , 10 )
key [ 0 ] = keyData
key [ 1 ] = po
binary . BigEndian . PutUint64 ( key [ 2 : ] , idx )
return key
}
func encodeIndex ( index * dpaDBIndex ) [ ] byte {
data , _ := rlp . EncodeToBytes ( index )
return data
}
2018-09-13 12:42:19 +03:00
func encodeData ( chunk Chunk ) [ ] byte {
2018-06-20 15:06:27 +03:00
// Always create a new underlying array for the returned byte slice.
2018-09-13 12:42:19 +03:00
// The chunk.Address array may be used in the returned slice which
2018-06-20 15:06:27 +03:00
// may be changed later in the code or by the LevelDB, resulting
2018-09-13 12:42:19 +03:00
// that the Address is changed as well.
return append ( append ( [ ] byte { } , chunk . Address ( ) [ : ] ... ) , chunk . Data ( ) ... )
2018-06-20 15:06:27 +03:00
}
func decodeIndex ( data [ ] byte , index * dpaDBIndex ) error {
dec := rlp . NewStream ( bytes . NewReader ( data ) , 0 )
return dec . Decode ( index )
}
2018-09-13 12:42:19 +03:00
func decodeData ( addr Address , data [ ] byte ) ( * chunk , error ) {
return NewChunk ( addr , data [ 32 : ] ) , nil
2018-06-20 15:06:27 +03:00
}
func ( s * LDBStore ) collectGarbage ( ratio float32 ) {
2018-09-12 15:39:45 +03:00
log . Trace ( "collectGarbage" , "ratio" , ratio )
2018-06-20 15:06:27 +03:00
metrics . GetOrRegisterCounter ( "ldbstore.collectgarbage" , nil ) . Inc ( 1 )
it := s . db . NewIterator ( )
defer it . Release ( )
garbage := [ ] * gcItem { }
gcnt := 0
for ok := it . Seek ( [ ] byte { keyIndex } ) ; ok && ( gcnt < maxGCitems ) && ( uint64 ( gcnt ) < s . entryCnt ) ; ok = it . Next ( ) {
itkey := it . Key ( )
if ( itkey == nil ) || ( itkey [ 0 ] != keyIndex ) {
break
}
// it.Key() contents change on next call to it.Next(), so we must copy it
key := make ( [ ] byte , len ( it . Key ( ) ) )
copy ( key , it . Key ( ) )
val := it . Value ( )
var index dpaDBIndex
hash := key [ 1 : ]
decodeIndex ( val , & index )
po := s . po ( hash )
gci := & gcItem {
idxKey : key ,
idx : index . Idx ,
value : index . Access , // the smaller, the more likely to be gc'd. see sort comparator below.
po : po ,
}
garbage = append ( garbage , gci )
gcnt ++
}
sort . Slice ( garbage [ : gcnt ] , func ( i , j int ) bool { return garbage [ i ] . value < garbage [ j ] . value } )
cutoff := int ( float32 ( gcnt ) * ratio )
metrics . GetOrRegisterCounter ( "ldbstore.collectgarbage.delete" , nil ) . Inc ( int64 ( cutoff ) )
for i := 0 ; i < cutoff ; i ++ {
s . delete ( garbage [ i ] . idx , garbage [ i ] . idxKey , garbage [ i ] . po )
}
}
// Export writes all chunks from the store to a tar archive, returning the
// number of chunks written.
func ( s * LDBStore ) Export ( out io . Writer ) ( int64 , error ) {
tw := tar . NewWriter ( out )
defer tw . Close ( )
it := s . db . NewIterator ( )
defer it . Release ( )
var count int64
for ok := it . Seek ( [ ] byte { keyIndex } ) ; ok ; ok = it . Next ( ) {
key := it . Key ( )
if ( key == nil ) || ( key [ 0 ] != keyIndex ) {
break
}
var index dpaDBIndex
hash := key [ 1 : ]
decodeIndex ( it . Value ( ) , & index )
po := s . po ( hash )
datakey := getDataKey ( index . Idx , po )
log . Trace ( "store.export" , "dkey" , fmt . Sprintf ( "%x" , datakey ) , "dataidx" , index . Idx , "po" , po )
data , err := s . db . Get ( datakey )
if err != nil {
2018-09-14 23:07:13 +03:00
log . Warn ( fmt . Sprintf ( "Chunk %x found but could not be accessed: %v" , key , err ) )
2018-06-20 15:06:27 +03:00
continue
}
hdr := & tar . Header {
Name : hex . EncodeToString ( hash ) ,
Mode : 0644 ,
Size : int64 ( len ( data ) ) ,
}
if err := tw . WriteHeader ( hdr ) ; err != nil {
return count , err
}
if _ , err := tw . Write ( data ) ; err != nil {
return count , err
}
count ++
}
return count , nil
}
// of chunks read.
func ( s * LDBStore ) Import ( in io . Reader ) ( int64 , error ) {
tr := tar . NewReader ( in )
2018-09-13 12:42:19 +03:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
defer cancel ( )
countC := make ( chan int64 )
errC := make ( chan error )
2018-06-20 15:06:27 +03:00
var count int64
2018-09-13 12:42:19 +03:00
go func ( ) {
for {
hdr , err := tr . Next ( )
if err == io . EOF {
break
} else if err != nil {
select {
case errC <- err :
case <- ctx . Done ( ) :
}
}
2018-06-20 15:06:27 +03:00
2018-09-13 12:42:19 +03:00
if len ( hdr . Name ) != 64 {
log . Warn ( "ignoring non-chunk file" , "name" , hdr . Name )
continue
}
2018-06-20 15:06:27 +03:00
2018-09-13 12:42:19 +03:00
keybytes , err := hex . DecodeString ( hdr . Name )
if err != nil {
log . Warn ( "ignoring invalid chunk file" , "name" , hdr . Name , "err" , err )
continue
}
data , err := ioutil . ReadAll ( tr )
if err != nil {
select {
case errC <- err :
case <- ctx . Done ( ) :
}
}
key := Address ( keybytes )
chunk := NewChunk ( key , data [ 32 : ] )
go func ( ) {
select {
case errC <- s . Put ( ctx , chunk ) :
case <- ctx . Done ( ) :
}
} ( )
count ++
2018-06-20 15:06:27 +03:00
}
2018-09-13 12:42:19 +03:00
countC <- count
} ( )
2018-06-20 15:06:27 +03:00
2018-09-13 12:42:19 +03:00
// wait for all chunks to be stored
i := int64 ( 0 )
var total int64
for {
select {
case err := <- errC :
if err != nil {
return count , err
}
i ++
case total = <- countC :
case <- ctx . Done ( ) :
return i , ctx . Err ( )
}
if total > 0 && i == total {
return total , nil
2018-06-20 15:06:27 +03:00
}
}
}
2018-10-03 15:31:59 +03:00
//Cleanup iterates over the database and deletes chunks if they pass the `f` condition
func ( s * LDBStore ) Cleanup ( f func ( * chunk ) bool ) {
2018-08-20 15:10:30 +03:00
var errorsFound , removed , total int
2018-06-20 15:06:27 +03:00
it := s . db . NewIterator ( )
2018-08-20 15:10:30 +03:00
defer it . Release ( )
for ok := it . Seek ( [ ] byte { keyIndex } ) ; ok ; ok = it . Next ( ) {
key := it . Key ( )
2018-06-20 15:06:27 +03:00
if ( key == nil ) || ( key [ 0 ] != keyIndex ) {
break
}
total ++
var index dpaDBIndex
err := decodeIndex ( it . Value ( ) , & index )
if err != nil {
2018-08-20 15:10:30 +03:00
log . Warn ( "Cannot decode" )
errorsFound ++
2018-06-20 15:06:27 +03:00
continue
}
2018-08-20 15:10:30 +03:00
hash := key [ 1 : ]
po := s . po ( hash )
datakey := getDataKey ( index . Idx , po )
data , err := s . db . Get ( datakey )
2018-06-20 15:06:27 +03:00
if err != nil {
2018-08-20 15:10:30 +03:00
found := false
// highest possible proximity is 255
for po = 1 ; po <= 255 ; po ++ {
datakey = getDataKey ( index . Idx , po )
data , err = s . db . Get ( datakey )
if err == nil {
found = true
break
}
}
if ! found {
2018-09-14 23:07:13 +03:00
log . Warn ( fmt . Sprintf ( "Chunk %x found but count not be accessed with any po" , key ) )
2018-08-20 15:10:30 +03:00
errorsFound ++
continue
2018-06-20 15:06:27 +03:00
}
}
2018-08-20 15:10:30 +03:00
ck := data [ : 32 ]
2018-09-13 12:42:19 +03:00
c , err := decodeData ( ck , data )
if err != nil {
log . Error ( "decodeData error" , "err" , err )
continue
}
2018-08-20 15:10:30 +03:00
2018-09-13 12:42:19 +03:00
cs := int64 ( binary . LittleEndian . Uint64 ( c . sdata [ : 8 ] ) )
2018-09-14 23:07:13 +03:00
log . Trace ( "chunk" , "key" , fmt . Sprintf ( "%x" , key ) , "ck" , fmt . Sprintf ( "%x" , ck ) , "dkey" , fmt . Sprintf ( "%x" , datakey ) , "dataidx" , index . Idx , "po" , po , "len data" , len ( data ) , "len sdata" , len ( c . sdata ) , "size" , cs )
2018-08-20 15:10:30 +03:00
2018-10-03 15:31:59 +03:00
// if chunk is to be removed
if f ( c ) {
2018-09-14 23:07:13 +03:00
log . Warn ( "chunk for cleanup" , "key" , fmt . Sprintf ( "%x" , key ) , "ck" , fmt . Sprintf ( "%x" , ck ) , "dkey" , fmt . Sprintf ( "%x" , datakey ) , "dataidx" , index . Idx , "po" , po , "len data" , len ( data ) , "len sdata" , len ( c . sdata ) , "size" , cs )
2018-08-20 15:10:30 +03:00
s . delete ( index . Idx , getIndexKey ( key [ 1 : ] ) , po )
removed ++
errorsFound ++
}
2018-06-20 15:06:27 +03:00
}
2018-08-20 15:10:30 +03:00
log . Warn ( fmt . Sprintf ( "Found %v errors out of %v entries. Removed %v chunks." , errorsFound , total , removed ) )
2018-06-20 15:06:27 +03:00
}
func ( s * LDBStore ) ReIndex ( ) {
//Iterates over the database and checks that there are no faulty chunks
it := s . db . NewIterator ( )
startPosition := [ ] byte { keyOldData }
it . Seek ( startPosition )
var key [ ] byte
var errorsFound , total int
for it . Valid ( ) {
key = it . Key ( )
if ( key == nil ) || ( key [ 0 ] != keyOldData ) {
break
}
data := it . Value ( )
hasher := s . hashfunc ( )
hasher . Write ( data )
hash := hasher . Sum ( nil )
newKey := make ( [ ] byte , 10 )
oldCntKey := make ( [ ] byte , 2 )
newCntKey := make ( [ ] byte , 2 )
oldCntKey [ 0 ] = keyDistanceCnt
newCntKey [ 0 ] = keyDistanceCnt
key [ 0 ] = keyData
key [ 1 ] = s . po ( Address ( key [ 1 : ] ) )
oldCntKey [ 1 ] = key [ 1 ]
newCntKey [ 1 ] = s . po ( Address ( newKey [ 1 : ] ) )
copy ( newKey [ 2 : ] , key [ 1 : ] )
newValue := append ( hash , data ... )
batch := new ( leveldb . Batch )
batch . Delete ( key )
s . bucketCnt [ oldCntKey [ 1 ] ] --
batch . Put ( oldCntKey , U64ToBytes ( s . bucketCnt [ oldCntKey [ 1 ] ] ) )
batch . Put ( newKey , newValue )
s . bucketCnt [ newCntKey [ 1 ] ] ++
batch . Put ( newCntKey , U64ToBytes ( s . bucketCnt [ newCntKey [ 1 ] ] ) )
s . db . Write ( batch )
it . Next ( )
}
it . Release ( )
log . Warn ( fmt . Sprintf ( "Found %v errors out of %v entries" , errorsFound , total ) )
}
2018-09-12 15:39:45 +03:00
func ( s * LDBStore ) Delete ( addr Address ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
ikey := getIndexKey ( addr )
var indx dpaDBIndex
s . tryAccessIdx ( ikey , & indx )
s . delete ( indx . Idx , ikey , s . po ( addr ) )
}
2018-06-20 15:06:27 +03:00
func ( s * LDBStore ) delete ( idx uint64 , idxKey [ ] byte , po uint8 ) {
metrics . GetOrRegisterCounter ( "ldbstore.delete" , nil ) . Inc ( 1 )
batch := new ( leveldb . Batch )
batch . Delete ( idxKey )
batch . Delete ( getDataKey ( idx , po ) )
s . entryCnt --
2018-09-06 13:11:38 +03:00
dbEntryCount . Dec ( 1 )
2018-06-20 15:06:27 +03:00
cntKey := make ( [ ] byte , 2 )
cntKey [ 0 ] = keyDistanceCnt
cntKey [ 1 ] = po
batch . Put ( keyEntryCnt , U64ToBytes ( s . entryCnt ) )
batch . Put ( cntKey , U64ToBytes ( s . bucketCnt [ po ] ) )
s . db . Write ( batch )
}
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) BinIndex ( po uint8 ) uint64 {
2018-06-20 15:06:27 +03:00
s . lock . RLock ( )
defer s . lock . RUnlock ( )
return s . bucketCnt [ po ]
}
func ( s * LDBStore ) Size ( ) uint64 {
2018-09-12 15:39:45 +03:00
s . lock . RLock ( )
defer s . lock . RUnlock ( )
2018-06-20 15:06:27 +03:00
return s . entryCnt
}
func ( s * LDBStore ) CurrentStorageIndex ( ) uint64 {
s . lock . RLock ( )
defer s . lock . RUnlock ( )
return s . dataIdx
}
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) Put ( ctx context . Context , chunk Chunk ) error {
2018-06-20 15:06:27 +03:00
metrics . GetOrRegisterCounter ( "ldbstore.put" , nil ) . Inc ( 1 )
2018-09-13 12:42:19 +03:00
log . Trace ( "ldbstore.put" , "key" , chunk . Address ( ) )
2018-06-20 15:06:27 +03:00
2018-09-13 12:42:19 +03:00
ikey := getIndexKey ( chunk . Address ( ) )
2018-06-20 15:06:27 +03:00
var index dpaDBIndex
2018-09-13 12:42:19 +03:00
po := s . po ( chunk . Address ( ) )
2018-06-20 15:06:27 +03:00
s . lock . Lock ( )
2018-09-13 12:42:19 +03:00
if s . closed {
s . lock . Unlock ( )
return ErrDBClosed
}
batch := s . batch
log . Trace ( "ldbstore.put: s.db.Get" , "key" , chunk . Address ( ) , "ikey" , fmt . Sprintf ( "%x" , ikey ) )
2018-06-20 15:06:27 +03:00
idata , err := s . db . Get ( ikey )
if err != nil {
s . doPut ( chunk , & index , po )
} else {
2018-09-13 12:42:19 +03:00
log . Trace ( "ldbstore.put: chunk already exists, only update access" , "key" , chunk . Address )
2018-06-20 15:06:27 +03:00
decodeIndex ( idata , & index )
}
index . Access = s . accessCnt
s . accessCnt ++
idata = encodeIndex ( & index )
s . batch . Put ( ikey , idata )
2018-09-13 12:42:19 +03:00
s . lock . Unlock ( )
2018-06-20 15:06:27 +03:00
select {
case s . batchesC <- struct { } { } :
default :
}
2018-09-13 12:42:19 +03:00
select {
case <- batch . c :
return batch . err
case <- ctx . Done ( ) :
return ctx . Err ( )
}
2018-06-20 15:06:27 +03:00
}
// force putting into db, does not check access index
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) doPut ( chunk Chunk , index * dpaDBIndex , po uint8 ) {
2018-06-20 15:06:27 +03:00
data := s . encodeDataFunc ( chunk )
dkey := getDataKey ( s . dataIdx , po )
s . batch . Put ( dkey , data )
index . Idx = s . dataIdx
s . bucketCnt [ po ] = s . dataIdx
s . entryCnt ++
2018-09-06 13:11:38 +03:00
dbEntryCount . Inc ( 1 )
2018-06-20 15:06:27 +03:00
s . dataIdx ++
cntKey := make ( [ ] byte , 2 )
cntKey [ 0 ] = keyDistanceCnt
cntKey [ 1 ] = po
s . batch . Put ( cntKey , U64ToBytes ( s . bucketCnt [ po ] ) )
}
func ( s * LDBStore ) writeBatches ( ) {
for {
select {
case <- s . quit :
2018-09-13 12:42:19 +03:00
log . Debug ( "DbStore: quit batch write loop" )
return
2018-06-20 15:06:27 +03:00
case <- s . batchesC :
2018-09-13 12:42:19 +03:00
err := s . writeCurrentBatch ( )
2018-06-20 15:06:27 +03:00
if err != nil {
2018-09-13 12:42:19 +03:00
log . Debug ( "DbStore: quit batch write loop" , "err" , err . Error ( ) )
return
2018-06-20 15:06:27 +03:00
}
2018-09-13 12:42:19 +03:00
}
}
2018-06-20 15:06:27 +03:00
2018-09-13 12:42:19 +03:00
}
func ( s * LDBStore ) writeCurrentBatch ( ) error {
s . lock . Lock ( )
defer s . lock . Unlock ( )
b := s . batch
l := b . Len ( )
if l == 0 {
return nil
}
e := s . entryCnt
d := s . dataIdx
a := s . accessCnt
s . batch = newBatch ( )
b . err = s . writeBatch ( b , e , d , a )
close ( b . c )
for e > s . capacity {
log . Trace ( "for >" , "e" , e , "s.capacity" , s . capacity )
// Collect garbage in a separate goroutine
// to be able to interrupt this loop by s.quit.
done := make ( chan struct { } )
go func ( ) {
s . collectGarbage ( gcArrayFreeRatio )
log . Trace ( "collectGarbage closing done" )
close ( done )
} ( )
select {
case <- s . quit :
return errors . New ( "CollectGarbage terminated due to quit" )
case <- done :
2018-06-20 15:06:27 +03:00
}
2018-09-13 12:42:19 +03:00
e = s . entryCnt
2018-06-20 15:06:27 +03:00
}
2018-09-13 12:42:19 +03:00
return nil
2018-06-20 15:06:27 +03:00
}
// must be called non concurrently
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) writeBatch ( b * dbBatch , entryCnt , dataIdx , accessCnt uint64 ) error {
2018-06-20 15:06:27 +03:00
b . Put ( keyEntryCnt , U64ToBytes ( entryCnt ) )
b . Put ( keyDataIdx , U64ToBytes ( dataIdx ) )
b . Put ( keyAccessCnt , U64ToBytes ( accessCnt ) )
l := b . Len ( )
2018-09-13 12:42:19 +03:00
if err := s . db . Write ( b . Batch ) ; err != nil {
2018-06-20 15:06:27 +03:00
return fmt . Errorf ( "unable to write batch: %v" , err )
}
log . Trace ( fmt . Sprintf ( "batch write (%d entries)" , l ) )
return nil
}
// newMockEncodeDataFunc returns a function that stores the chunk data
// to a mock store to bypass the default functionality encodeData.
// The constructed function always returns the nil data, as DbStore does
// not need to store the data, but still need to create the index.
2018-09-13 12:42:19 +03:00
func newMockEncodeDataFunc ( mockStore * mock . NodeStore ) func ( chunk Chunk ) [ ] byte {
return func ( chunk Chunk ) [ ] byte {
if err := mockStore . Put ( chunk . Address ( ) , encodeData ( chunk ) ) ; err != nil {
log . Error ( fmt . Sprintf ( "%T: Chunk %v put: %v" , mockStore , chunk . Address ( ) . Log ( ) , err ) )
2018-06-20 15:06:27 +03:00
}
2018-09-13 12:42:19 +03:00
return chunk . Address ( ) [ : ]
2018-06-20 15:06:27 +03:00
}
}
// try to find index; if found, update access cnt and return true
func ( s * LDBStore ) tryAccessIdx ( ikey [ ] byte , index * dpaDBIndex ) bool {
idata , err := s . db . Get ( ikey )
if err != nil {
return false
}
decodeIndex ( idata , index )
s . batch . Put ( keyAccessCnt , U64ToBytes ( s . accessCnt ) )
s . accessCnt ++
index . Access = s . accessCnt
idata = encodeIndex ( index )
s . batch . Put ( ikey , idata )
select {
case s . batchesC <- struct { } { } :
default :
}
return true
}
2018-10-03 15:31:59 +03:00
// GetSchema is returning the current named schema of the datastore as read from LevelDB
func ( s * LDBStore ) GetSchema ( ) ( string , error ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
data , err := s . db . Get ( keySchema )
if err != nil {
if err == leveldb . ErrNotFound {
return "" , nil
}
return "" , err
}
return string ( data ) , nil
}
// PutSchema is saving a named schema to the LevelDB datastore
func ( s * LDBStore ) PutSchema ( schema string ) error {
s . lock . Lock ( )
defer s . lock . Unlock ( )
return s . db . Put ( keySchema , [ ] byte ( schema ) )
}
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) Get ( _ context . Context , addr Address ) ( chunk Chunk , err error ) {
2018-06-20 15:06:27 +03:00
metrics . GetOrRegisterCounter ( "ldbstore.get" , nil ) . Inc ( 1 )
log . Trace ( "ldbstore.get" , "key" , addr )
s . lock . Lock ( )
defer s . lock . Unlock ( )
return s . get ( addr )
}
2018-09-13 12:42:19 +03:00
func ( s * LDBStore ) get ( addr Address ) ( chunk * chunk , err error ) {
2018-06-20 15:06:27 +03:00
var indx dpaDBIndex
2018-09-13 12:42:19 +03:00
if s . closed {
return nil , ErrDBClosed
}
2018-06-20 15:06:27 +03:00
if s . tryAccessIdx ( getIndexKey ( addr ) , & indx ) {
var data [ ] byte
if s . getDataFunc != nil {
// if getDataFunc is defined, use it to retrieve the chunk data
log . Trace ( "ldbstore.get retrieve with getDataFunc" , "key" , addr )
data , err = s . getDataFunc ( addr )
if err != nil {
return
}
} else {
// default DbStore functionality to retrieve chunk data
proximity := s . po ( addr )
datakey := getDataKey ( indx . Idx , proximity )
data , err = s . db . Get ( datakey )
log . Trace ( "ldbstore.get retrieve" , "key" , addr , "indexkey" , indx . Idx , "datakey" , fmt . Sprintf ( "%x" , datakey ) , "proximity" , proximity )
if err != nil {
log . Trace ( "ldbstore.get chunk found but could not be accessed" , "key" , addr , "err" , err )
s . delete ( indx . Idx , getIndexKey ( addr ) , s . po ( addr ) )
return
}
}
2018-09-13 12:42:19 +03:00
return decodeData ( addr , data )
2018-06-20 15:06:27 +03:00
} else {
err = ErrChunkNotFound
}
return
}
// newMockGetFunc returns a function that reads chunk data from
// the mock database, which is used as the value for DbStore.getFunc
// to bypass the default functionality of DbStore with a mock store.
func newMockGetDataFunc ( mockStore * mock . NodeStore ) func ( addr Address ) ( data [ ] byte , err error ) {
return func ( addr Address ) ( data [ ] byte , err error ) {
data , err = mockStore . Get ( addr )
if err == mock . ErrNotFound {
// preserve ErrChunkNotFound error
err = ErrChunkNotFound
}
return data , err
}
}
func ( s * LDBStore ) updateAccessCnt ( addr Address ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
var index dpaDBIndex
s . tryAccessIdx ( getIndexKey ( addr ) , & index ) // result_chn == nil, only update access cnt
}
func ( s * LDBStore ) setCapacity ( c uint64 ) {
s . lock . Lock ( )
defer s . lock . Unlock ( )
s . capacity = c
if s . entryCnt > c {
ratio := float32 ( 1.01 ) - float32 ( c ) / float32 ( s . entryCnt )
if ratio < gcArrayFreeRatio {
ratio = gcArrayFreeRatio
}
if ratio > 1 {
ratio = 1
}
for s . entryCnt > c {
s . collectGarbage ( ratio )
}
}
}
func ( s * LDBStore ) Close ( ) {
close ( s . quit )
2018-09-13 12:42:19 +03:00
s . lock . Lock ( )
s . closed = true
s . lock . Unlock ( )
// force writing out current batch
s . writeCurrentBatch ( )
close ( s . batchesC )
2018-06-20 15:06:27 +03:00
s . db . Close ( )
}
// SyncIterator(start, stop, po, f) calls f on each hash of a bin po from start to stop
func ( s * LDBStore ) SyncIterator ( since uint64 , until uint64 , po uint8 , f func ( Address , uint64 ) bool ) error {
metrics . GetOrRegisterCounter ( "ldbstore.synciterator" , nil ) . Inc ( 1 )
sincekey := getDataKey ( since , po )
untilkey := getDataKey ( until , po )
it := s . db . NewIterator ( )
defer it . Release ( )
for ok := it . Seek ( sincekey ) ; ok ; ok = it . Next ( ) {
metrics . GetOrRegisterCounter ( "ldbstore.synciterator.seek" , nil ) . Inc ( 1 )
dbkey := it . Key ( )
if dbkey [ 0 ] != keyData || dbkey [ 1 ] != po || bytes . Compare ( untilkey , dbkey ) < 0 {
break
}
key := make ( [ ] byte , 32 )
val := it . Value ( )
copy ( key , val [ : 32 ] )
if ! f ( Address ( key ) , binary . BigEndian . Uint64 ( dbkey [ 2 : ] ) ) {
break
}
}
return it . Error ( )
}
func databaseExists ( path string ) bool {
o := & opt . Options {
ErrorIfMissing : true ,
}
tdb , err := leveldb . OpenFile ( path , o )
if err != nil {
return false
}
defer tdb . Close ( )
return true
}