diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go index ebaba2d8f3..84c4f596d1 100644 --- a/swarm/storage/localstore/gc.go +++ b/swarm/storage/localstore/gc.go @@ -14,84 +14,9 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -/* -Counting number of items in garbage collection index - -The number of items in garbage collection index is not the same as the number of -chunks in retrieval index (total number of stored chunks). Chunk can be garbage -collected only when it is set to a synced state by ModSetSync, and only then can -be counted into garbage collection size, which determines whether a number of -chunk should be removed from the storage by the garbage collection. This opens a -possibility that the storage size exceeds the limit if files are locally -uploaded and the node is not connected to other nodes or there is a problem with -syncing. - -Tracking of garbage collection size (gcSize) is focused on performance. Key -points: - - 1. counting the number of key/value pairs in LevelDB takes around 0.7s for 1e6 - on a very fast ssd (unacceptable long time in reality) - 2. locking leveldb batch writes with a global mutex (serial batch writes) is - not acceptable, we should use locking per chunk address - -Because of point 1. we cannot count the number of items in garbage collection -index in New constructor as it could last very long for realistic scenarios -where limit is 5e6 and nodes are running on slower hdd disks or cloud providers -with low IOPS. - -Point 2. is a performance optimization to allow parallel batch writes with -getters, putters and setters. Every single batch that they create contain only -information related to a single chunk, no relations with other chunks or shared -statistical data (like gcSize). This approach avoids race conditions on writing -batches in parallel, but creates a problem of synchronizing statistical data -values like gcSize. With global mutex lock, any data could be written by any -batch, but would not use utilize the full potential of leveldb parallel writes. - -To mitigate this two problems, the implementation of counting and persisting -gcSize is split into two parts. One is the in-memory value (gcSize) that is fast -to read and write with a dedicated mutex (gcSizeMu) if the batch which adds or -removes items from garbage collection index is successful. The second part is -the reliable persistence of this value to leveldb database, as storedGCSize -field. This database field is saved by writeGCSizeWorker and writeGCSize -functions when in-memory gcSize variable is changed, but no too often to avoid -very frequent database writes. This database writes are triggered by -writeGCSizeTrigger when a call is made to function incGCSize. Trigger ensures -that no database writes are done only when gcSize is changed (contrary to a -simpler periodic writes or checks). A backoff of 10s in writeGCSizeWorker -ensures that no frequent batch writes are made. Saving the storedGCSize on -database Close function ensures that in-memory gcSize is persisted when database -is closed. - -This persistence must be resilient to failures like panics. For this purpose, a -collection of hashes that are added to the garbage collection index, but still -not persisted to storedGCSize, must be tracked to count them in when DB is -constructed again with New function after the failure (swarm node restarts). On -every batch write that adds a new item to garbage collection index, the same -hash is added to gcUncountedHashesIndex. This ensures that there is a persisted -information which hashes were added to the garbage collection index. But, when -the storedGCSize is saved by writeGCSize function, this values are removed in -the same batch in which storedGCSize is changed to ensure consistency. When the -panic happen, or database Close method is not saved. The database storage -contains all information to reliably and efficiently get the correct number of -items in garbage collection index. This is performed in the New function when -all hashes in gcUncountedHashesIndex are counted, added to the storedGCSize and -saved to the disk before the database is constructed again. Index -gcUncountedHashesIndex is acting as dirty bit for recovery that provides -information what needs to be corrected. With a simple dirty bit, the whole -garbage collection index should me counted on recovery instead only the items in -gcUncountedHashesIndex. Because of the triggering mechanizm of writeGCSizeWorker -and relatively short backoff time, the number of hashes in -gcUncountedHashesIndex should be low and it should take a very short time to -recover from the previous failure. If there was no failure and -gcUncountedHashesIndex is empty, which is the usual case, New function will take -the minimal time to return. -*/ - package localstore import ( - "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/syndtr/goleveldb/leveldb" @@ -109,7 +34,7 @@ var ( gcTargetRatio = 0.9 // gcBatchSize limits the number of chunks in a single // leveldb batch on garbage collection. - gcBatchSize int64 = 1000 + gcBatchSize uint64 = 1000 ) // collectGarbageWorker is a long running function that waits for @@ -149,20 +74,21 @@ func (db *DB) collectGarbageWorker() { // is false, another call to this function is needed to collect // the rest of the garbage as the batch size limit is reached. // This function is called in collectGarbageWorker. -func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) { +func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { batch := new(leveldb.Batch) target := db.gcTarget() + // protect database from changing idexes and gcSize + db.batchMu.Lock() + defer db.batchMu.Unlock() + + gcSize, err := db.gcSize.Get() + if err != nil { + return 0, true, err + } + done = true err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { - // protect parallel updates - unlock, err := db.lockAddr(item.Address) - if err != nil { - return false, err - } - defer unlock() - - gcSize := db.getGCSize() if gcSize-collectedCount <= target { return true, nil } @@ -184,49 +110,19 @@ func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) { return 0, false, err } + db.gcSize.PutInBatch(batch, gcSize-collectedCount) + err = db.shed.WriteBatch(batch) if err != nil { return 0, false, err } - // batch is written, decrement gcSize - db.incGCSize(-collectedCount) return collectedCount, done, nil } // gcTrigger retruns the absolute value for garbage collection // target value, calculated from db.capacity and gcTargetRatio. -func (db *DB) gcTarget() (target int64) { - return int64(float64(db.capacity) * gcTargetRatio) -} - -// incGCSize increments gcSize by the provided number. -// If count is negative, it will decrement gcSize. -func (db *DB) incGCSize(count int64) { - if count == 0 { - return - } - - db.gcSizeMu.Lock() - new := db.gcSize + count - db.gcSize = new - db.gcSizeMu.Unlock() - - select { - case db.writeGCSizeTrigger <- struct{}{}: - default: - } - if new >= db.capacity { - db.triggerGarbageCollection() - } -} - -// getGCSize returns gcSize value by locking it -// with gcSizeMu mutex. -func (db *DB) getGCSize() (count int64) { - db.gcSizeMu.RLock() - count = db.gcSize - db.gcSizeMu.RUnlock() - return count +func (db *DB) gcTarget() (target uint64) { + return uint64(float64(db.capacity) * gcTargetRatio) } // triggerGarbageCollection signals collectGarbageWorker @@ -239,68 +135,41 @@ func (db *DB) triggerGarbageCollection() { } } -// writeGCSizeWorker writes gcSize on trigger event -// and waits writeGCSizeDelay after each write. -// It implements a linear backoff with delay of -// writeGCSizeDelay duration to avoid very frequent -// database operations. -func (db *DB) writeGCSizeWorker() { - defer close(db.writeGCSizeWorkerDone) - - for { - select { - case <-db.writeGCSizeTrigger: - err := db.writeGCSize(db.getGCSize()) - if err != nil { - log.Error("localstore write gc size", "err", err) - } - // Wait some time before writing gc size in the next - // iteration. This prevents frequent I/O operations. - select { - case <-time.After(10 * time.Second): - case <-db.close: - return - } - case <-db.close: - return - } +// incGCSizeInBatch changes gcSize field value +// by change which can be negative. This function +// must be called under batchMu lock. +func (db *DB) incGCSizeInBatch(batch *leveldb.Batch, change int64) (err error) { + if change == 0 { + return nil } -} - -// writeGCSize stores the number of items in gcIndex. -// It removes all hashes from gcUncountedHashesIndex -// not to include them on the next DB initialization -// (New function) when gcSize is counted. -func (db *DB) writeGCSize(gcSize int64) (err error) { - const maxBatchSize = 1000 - - batch := new(leveldb.Batch) - db.storedGCSize.PutInBatch(batch, uint64(gcSize)) - batchSize := 1 - - // use only one iterator as it acquires its snapshot - // not to remove hashes from index that are added - // after stored gc size is written - err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) { - db.gcUncountedHashesIndex.DeleteInBatch(batch, item) - batchSize++ - if batchSize >= maxBatchSize { - err = db.shed.WriteBatch(batch) - if err != nil { - return false, err - } - batch.Reset() - batchSize = 0 - } - return false, nil - }, nil) + gcSize, err := db.gcSize.Get() if err != nil { return err } - return db.shed.WriteBatch(batch) + + var new uint64 + if change > 0 { + new = gcSize + uint64(change) + } else { + // 'change' is an int64 and is negative + // a conversion is needed with correct sign + c := uint64(-change) + if c > gcSize { + // protect uint64 undeflow + return nil + } + new = gcSize - c + } + db.gcSize.PutInBatch(batch, new) + + // trigger garbage collection if we reached the capacity + if new >= db.capacity { + db.triggerGarbageCollection() + } + return nil } // testHookCollectGarbage is a hook that can provide // information when a garbage collection run is done // and how many items it removed. -var testHookCollectGarbage func(collectedCount int64) +var testHookCollectGarbage func(collectedCount uint64) diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go index 8ed34384da..081e0af80b 100644 --- a/swarm/storage/localstore/gc_test.go +++ b/swarm/storage/localstore/gc_test.go @@ -38,7 +38,7 @@ func TestDB_collectGarbageWorker(t *testing.T) { func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) { // lower the maximal number of chunks in a single // gc batch to ensure multiple batches. - defer func(s int64) { gcBatchSize = s }(gcBatchSize) + defer func(s uint64) { gcBatchSize = s }(gcBatchSize) gcBatchSize = 2 testDB_collectGarbageWorker(t) @@ -54,8 +54,8 @@ func testDB_collectGarbageWorker(t *testing.T) { db, cleanupFunc := newTestDB(t, &Options{ Capacity: 100, }) - testHookCollectGarbageChan := make(chan int64) - defer setTestHookCollectGarbage(func(collectedCount int64) { + testHookCollectGarbageChan := make(chan uint64) + defer setTestHookCollectGarbage(func(collectedCount uint64) { select { case testHookCollectGarbageChan <- collectedCount: case <-db.close: @@ -93,7 +93,10 @@ func testDB_collectGarbageWorker(t *testing.T) { case <-time.After(10 * time.Second): t.Error("collect garbage timeout") } - gcSize := db.getGCSize() + gcSize, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } if gcSize == gcTarget { break } @@ -134,8 +137,8 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { uploader := db.NewPutter(ModePutUpload) syncer := db.NewSetter(ModeSetSync) - testHookCollectGarbageChan := make(chan int64) - defer setTestHookCollectGarbage(func(collectedCount int64) { + testHookCollectGarbageChan := make(chan uint64) + defer setTestHookCollectGarbage(func(collectedCount uint64) { testHookCollectGarbageChan <- collectedCount })() @@ -202,7 +205,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { gcTarget := db.gcTarget() - var totalCollectedCount int64 + var totalCollectedCount uint64 for { select { case c := <-testHookCollectGarbageChan: @@ -210,13 +213,16 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { case <-time.After(10 * time.Second): t.Error("collect garbage timeout") } - gcSize := db.getGCSize() + gcSize, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } if gcSize == gcTarget { break } } - wantTotalCollectedCount := int64(len(addrs)) - gcTarget + wantTotalCollectedCount := uint64(len(addrs)) - gcTarget if totalCollectedCount != wantTotalCollectedCount { t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount) } @@ -288,10 +294,7 @@ func TestDB_gcSize(t *testing.T) { } } - // DB.Close writes gc size to disk, so - // Instead calling Close, close the database - // without it. - if err := db.closeWithOptions(false); err != nil { + if err := db.Close(); err != nil { t.Fatal(err) } @@ -302,14 +305,12 @@ func TestDB_gcSize(t *testing.T) { defer db.Close() t.Run("gc index size", newIndexGCSizeTest(db)) - - t.Run("gc uncounted hashes index count", newItemsCountTest(db.gcUncountedHashesIndex, 0)) } // setTestHookCollectGarbage sets testHookCollectGarbage and // returns a function that will reset it to the // value before the change. -func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) { +func setTestHookCollectGarbage(h func(collectedCount uint64)) (reset func()) { current := testHookCollectGarbage reset = func() { testHookCollectGarbage = current } testHookCollectGarbage = h @@ -321,7 +322,7 @@ func setTestHookCollectGarbage(h func(collectedCount int64)) (reset func()) { // resets the original function. func TestSetTestHookCollectGarbage(t *testing.T) { // Set the current function after the test finishes. - defer func(h func(collectedCount int64)) { testHookCollectGarbage = h }(testHookCollectGarbage) + defer func(h func(collectedCount uint64)) { testHookCollectGarbage = h }(testHookCollectGarbage) // expected value for the unchanged function original := 1 @@ -332,7 +333,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { var got int // define the original (unchanged) functions - testHookCollectGarbage = func(_ int64) { + testHookCollectGarbage = func(_ uint64) { got = original } @@ -345,7 +346,7 @@ func TestSetTestHookCollectGarbage(t *testing.T) { } // set the new function - reset := setTestHookCollectGarbage(func(_ int64) { + reset := setTestHookCollectGarbage(func(_ uint64) { got = changed }) diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go index 35044115b3..98d4c78816 100644 --- a/swarm/storage/localstore/localstore.go +++ b/swarm/storage/localstore/localstore.go @@ -18,7 +18,6 @@ package localstore import ( "encoding/binary" - "encoding/hex" "errors" "sync" "time" @@ -41,7 +40,7 @@ var ( var ( // Default value for Capacity DB option. - defaultCapacity int64 = 5000000 + defaultCapacity uint64 = 5000000 // Limit the number of goroutines created by Getters // that call updateGC function. Value 0 sets no limit. maxParallelUpdateGC = 1000 @@ -54,8 +53,6 @@ type DB struct { // schema name of loaded data schemaName shed.StringField - // field that stores number of intems in gc index - storedGCSize shed.Uint64Field // retrieval indexes retrievalDataIndex shed.Index @@ -74,23 +71,16 @@ type DB struct { // garbage collection index gcIndex shed.Index - // index that stores hashes that are not - // counted in and saved to storedGCSize - gcUncountedHashesIndex shed.Index - // number of elements in garbage collection index - // it must be always read by getGCSize and - // set with incGCSize which are locking gcSizeMu - gcSize int64 - gcSizeMu sync.RWMutex + // field that stores number of intems in gc index + gcSize shed.Uint64Field + // garbage collection is triggered when gcSize exceeds // the capacity value - capacity int64 + capacity uint64 // triggers garbage collection event loop collectGarbageTrigger chan struct{} - // triggers write gc size event loop - writeGCSizeTrigger chan struct{} // a buffered channel acting as a semaphore // to limit the maximal number of goroutines @@ -102,7 +92,7 @@ type DB struct { baseKey []byte - addressLocks sync.Map + batchMu sync.Mutex // this channel is closed when close function is called // to terminate other goroutines @@ -112,7 +102,6 @@ type DB struct { // garbage collection and gc size write workers // are done collectGarbageWorkerDone chan struct{} - writeGCSizeWorkerDone chan struct{} } // Options struct holds optional parameters for configuring DB. @@ -125,7 +114,7 @@ type Options struct { MockStore *mock.NodeStore // Capacity is a limit that triggers garbage collection when // number of items in gcIndex equals or exceeds it. - Capacity int64 + Capacity uint64 // MetricsPrefix defines a prefix for metrics names. MetricsPrefix string } @@ -140,15 +129,13 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { db = &DB{ capacity: o.Capacity, baseKey: baseKey, - // channels collectGarbageTrigger and writeGCSizeTrigger - // need to be buffered with the size of 1 + // channel collectGarbageTrigger + // needs to be buffered with the size of 1 // to signal another event if it // is triggered during already running function collectGarbageTrigger: make(chan struct{}, 1), - writeGCSizeTrigger: make(chan struct{}, 1), close: make(chan struct{}), collectGarbageWorkerDone: make(chan struct{}), - writeGCSizeWorkerDone: make(chan struct{}), } if db.capacity <= 0 { db.capacity = defaultCapacity @@ -167,7 +154,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { return nil, err } // Persist gc size. - db.storedGCSize, err = db.shed.NewUint64Field("gc-size") + db.gcSize, err = db.shed.NewUint64Field("gc-size") if err != nil { return nil, err } @@ -318,48 +305,6 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if err != nil { return nil, err } - // gc uncounted hashes index keeps hashes that are in gc index - // but not counted in and saved to storedGCSize - db.gcUncountedHashesIndex, err = db.shed.NewIndex("Hash->nil", shed.IndexFuncs{ - EncodeKey: func(fields shed.Item) (key []byte, err error) { - return fields.Address, nil - }, - DecodeKey: func(key []byte) (e shed.Item, err error) { - e.Address = key - return e, nil - }, - EncodeValue: func(fields shed.Item) (value []byte, err error) { - return nil, nil - }, - DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) { - return e, nil - }, - }) - if err != nil { - return nil, err - } - - // count number of elements in garbage collection index - gcSize, err := db.storedGCSize.Get() - if err != nil { - return nil, err - } - // get number of uncounted hashes - gcUncountedSize, err := db.gcUncountedHashesIndex.Count() - if err != nil { - return nil, err - } - gcSize += uint64(gcUncountedSize) - // remove uncounted hashes from the index and - // save the total gcSize after uncounted hashes are removed - err = db.writeGCSize(int64(gcSize)) - if err != nil { - return nil, err - } - db.incGCSize(int64(gcSize)) - - // start worker to write gc size - go db.writeGCSizeWorker() // start garbage collection worker go db.collectGarbageWorker() return db, nil @@ -367,34 +312,16 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { // Close closes the underlying database. func (db *DB) Close() (err error) { - return db.closeWithOptions(true) -} - -// closeWithOptions provides a more control which part of closing -// is done for tests. -func (db *DB) closeWithOptions(writeGCSize bool) (err error) { close(db.close) db.updateGCWG.Wait() - // wait for gc worker and gc size write workers to + // wait for gc worker to // return before closing the shed - timeout := time.After(5 * time.Second) select { case <-db.collectGarbageWorkerDone: - case <-timeout: + case <-time.After(5 * time.Second): log.Error("localstore: collect garbage worker did not return after db close") } - select { - case <-db.writeGCSizeWorkerDone: - case <-timeout: - log.Error("localstore: write gc size worker did not return after db close") - } - - if writeGCSize { - if err := db.writeGCSize(db.getGCSize()); err != nil { - log.Error("localstore: write gc size", "err", err) - } - } return db.shed.Close() } @@ -404,35 +331,6 @@ func (db *DB) po(addr chunk.Address) (bin uint8) { return uint8(chunk.Proximity(db.baseKey, addr)) } -var ( - // Maximal time for lockAddr to wait until it - // returns error. - addressLockTimeout = 3 * time.Second - // duration between two lock checks in lockAddr. - addressLockCheckDelay = 30 * time.Microsecond -) - -// lockAddr sets the lock on a particular address -// using addressLocks sync.Map and returns unlock function. -// If the address is locked this function will check it -// in a for loop for addressLockTimeout time, after which -// it will return ErrAddressLockTimeout error. -func (db *DB) lockAddr(addr chunk.Address) (unlock func(), err error) { - start := time.Now() - lockKey := hex.EncodeToString(addr) - for { - _, loaded := db.addressLocks.LoadOrStore(lockKey, struct{}{}) - if !loaded { - break - } - time.Sleep(addressLockCheckDelay) - if time.Since(start) > addressLockTimeout { - return nil, ErrAddressLockTimeout - } - } - return func() { db.addressLocks.Delete(lockKey) }, nil -} - // chunkToItem creates new Item with data provided by the Chunk. func chunkToItem(ch chunk.Chunk) shed.Item { return shed.Item{ diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go index d106241733..42e762587f 100644 --- a/swarm/storage/localstore/localstore_test.go +++ b/swarm/storage/localstore/localstore_test.go @@ -24,7 +24,6 @@ import ( "os" "runtime" "sort" - "strconv" "sync" "testing" "time" @@ -137,89 +136,6 @@ func TestDB_updateGCSem(t *testing.T) { } } -// BenchmarkNew measures the time that New function -// needs to initialize and count the number of key/value -// pairs in GC index. -// This benchmark generates a number of chunks, uploads them, -// sets them to synced state for them to enter the GC index, -// and measures the execution time of New function by creating -// new databases with the same data directory. -// -// This benchmark takes significant amount of time. -// -// Measurements on MacBook Pro (Retina, 15-inch, Mid 2014) show -// that New function executes around 1s for database with 1M chunks. -// -// # go test -benchmem -run=none github.com/ethereum/go-ethereum/swarm/storage/localstore -bench BenchmarkNew -v -timeout 20m -// goos: darwin -// goarch: amd64 -// pkg: github.com/ethereum/go-ethereum/swarm/storage/localstore -// BenchmarkNew/1000-8 200 11672414 ns/op 9570960 B/op 10008 allocs/op -// BenchmarkNew/10000-8 100 14890609 ns/op 10490118 B/op 7759 allocs/op -// BenchmarkNew/100000-8 20 58334080 ns/op 17763157 B/op 22978 allocs/op -// BenchmarkNew/1000000-8 2 748595153 ns/op 45297404 B/op 253242 allocs/op -// PASS -func BenchmarkNew(b *testing.B) { - if testing.Short() { - b.Skip("skipping benchmark in short mode") - } - for _, count := range []int{ - 1000, - 10000, - 100000, - 1000000, - } { - b.Run(strconv.Itoa(count), func(b *testing.B) { - dir, err := ioutil.TempDir("", "localstore-new-benchmark") - if err != nil { - b.Fatal(err) - } - defer os.RemoveAll(dir) - baseKey := make([]byte, 32) - if _, err := rand.Read(baseKey); err != nil { - b.Fatal(err) - } - db, err := New(dir, baseKey, nil) - if err != nil { - b.Fatal(err) - } - defer db.Close() - uploader := db.NewPutter(ModePutUpload) - syncer := db.NewSetter(ModeSetSync) - for i := 0; i < count; i++ { - chunk := generateTestRandomChunk() - err := uploader.Put(chunk) - if err != nil { - b.Fatal(err) - } - err = syncer.Set(chunk.Address()) - if err != nil { - b.Fatal(err) - } - } - err = db.Close() - if err != nil { - b.Fatal(err) - } - b.ResetTimer() - - for n := 0; n < b.N; n++ { - b.StartTimer() - db, err := New(dir, baseKey, nil) - b.StopTimer() - - if err != nil { - b.Fatal(err) - } - err = db.Close() - if err != nil { - b.Fatal(err) - } - } - }) - } -} - // newTestDB is a helper function that constructs a // temporary database and returns a cleanup function that must // be called to remove the data. @@ -411,7 +327,7 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) { // value is the same as the number of items in DB.gcIndex. func newIndexGCSizeTest(db *DB) func(t *testing.T) { return func(t *testing.T) { - var want int64 + var want uint64 err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { want++ return @@ -419,7 +335,10 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) { if err != nil { t.Fatal(err) } - got := db.getGCSize() + got, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } if got != want { t.Errorf("got gc size %v, want %v", got, want) } diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go index 9640cd27e1..a6353e1413 100644 --- a/swarm/storage/localstore/mode_get.go +++ b/swarm/storage/localstore/mode_get.go @@ -113,11 +113,8 @@ func (db *DB) get(mode ModeGet, addr chunk.Address) (out shed.Item, err error) { // only Address and Data fields with non zero values, // which is ensured by the get function. func (db *DB) updateGC(item shed.Item) (err error) { - unlock, err := db.lockAddr(item.Address) - if err != nil { - return err - } - defer unlock() + db.batchMu.Lock() + defer db.batchMu.Unlock() batch := new(leveldb.Batch) diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go index 81df435351..1599ca8e34 100644 --- a/swarm/storage/localstore/mode_put.go +++ b/swarm/storage/localstore/mode_put.go @@ -64,11 +64,8 @@ func (p *Putter) Put(ch chunk.Chunk) (err error) { // with their nil values. func (db *DB) put(mode ModePut, item shed.Item) (err error) { // protect parallel updates - unlock, err := db.lockAddr(item.Address) - if err != nil { - return err - } - defer unlock() + db.batchMu.Lock() + defer db.batchMu.Unlock() batch := new(leveldb.Batch) @@ -116,7 +113,6 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { db.retrievalAccessIndex.PutInBatch(batch, item) // add new entry to gc index db.gcIndex.PutInBatch(batch, item) - db.gcUncountedHashesIndex.PutInBatch(batch, item) gcSizeChange++ db.retrievalDataIndex.PutInBatch(batch, item) @@ -143,12 +139,14 @@ func (db *DB) put(mode ModePut, item shed.Item) (err error) { return ErrInvalidMode } - err = db.shed.WriteBatch(batch) + err = db.incGCSizeInBatch(batch, gcSizeChange) if err != nil { return err } - if gcSizeChange != 0 { - db.incGCSize(gcSizeChange) + + err = db.shed.WriteBatch(batch) + if err != nil { + return err } if triggerPullFeed { db.triggerPullSubscriptions(db.po(item.Address)) diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go index a7c9875fed..83fcbea528 100644 --- a/swarm/storage/localstore/mode_set.go +++ b/swarm/storage/localstore/mode_set.go @@ -63,11 +63,8 @@ func (s *Setter) Set(addr chunk.Address) (err error) { // of this function for the same address in parallel. func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { // protect parallel updates - unlock, err := db.lockAddr(addr) - if err != nil { - return err - } - defer unlock() + db.batchMu.Lock() + defer db.batchMu.Unlock() batch := new(leveldb.Batch) @@ -113,7 +110,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { db.pullIndex.PutInBatch(batch, item) triggerPullFeed = true db.gcIndex.PutInBatch(batch, item) - db.gcUncountedHashesIndex.PutInBatch(batch, item) gcSizeChange++ case ModeSetSync: @@ -151,7 +147,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { db.retrievalAccessIndex.PutInBatch(batch, item) db.pushIndex.DeleteInBatch(batch, item) db.gcIndex.PutInBatch(batch, item) - db.gcUncountedHashesIndex.PutInBatch(batch, item) gcSizeChange++ case modeSetRemove: @@ -179,7 +174,6 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { db.retrievalAccessIndex.DeleteInBatch(batch, item) db.pullIndex.DeleteInBatch(batch, item) db.gcIndex.DeleteInBatch(batch, item) - db.gcUncountedHashesIndex.DeleteInBatch(batch, item) // a check is needed for decrementing gcSize // as delete is not reporting if the key/value pair // is deleted or not @@ -191,12 +185,14 @@ func (db *DB) set(mode ModeSet, addr chunk.Address) (err error) { return ErrInvalidMode } - err = db.shed.WriteBatch(batch) + err = db.incGCSizeInBatch(batch, gcSizeChange) if err != nil { return err } - if gcSizeChange != 0 { - db.incGCSize(gcSizeChange) + + err = db.shed.WriteBatch(batch) + if err != nil { + return err } if triggerPullFeed { db.triggerPullSubscriptions(db.po(item.Address))