From 1e067202a2311e583238026da0c9032e28a3f0b2 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Thu, 16 May 2019 15:47:11 +0200 Subject: [PATCH] swarm/feeds: Parallel feed lookups (#19414) --- swarm/storage/feed/cacheentry.go | 2 +- swarm/storage/feed/handler.go | 21 +- swarm/storage/feed/handler_test.go | 8 +- swarm/storage/feed/id_test.go | 4 +- .../feed/lookup/algorithm_fluzcapacitor.go | 63 ++ .../feed/lookup/algorithm_longearth.go | 185 ++++++ swarm/storage/feed/lookup/epoch.go | 2 +- swarm/storage/feed/lookup/lookup.go | 75 +-- swarm/storage/feed/lookup/lookup_test.go | 614 +++++++++++------- swarm/storage/feed/lookup/store_test.go | 154 +++++ swarm/storage/feed/lookup/timesim_test.go | 128 ++++ swarm/storage/feed/query_test.go | 2 +- swarm/storage/feed/request_test.go | 2 +- swarm/storage/feed/update_test.go | 2 +- 14 files changed, 936 insertions(+), 326 deletions(-) create mode 100644 swarm/storage/feed/lookup/algorithm_fluzcapacitor.go create mode 100644 swarm/storage/feed/lookup/algorithm_longearth.go create mode 100644 swarm/storage/feed/lookup/store_test.go create mode 100644 swarm/storage/feed/lookup/timesim_test.go diff --git a/swarm/storage/feed/cacheentry.go b/swarm/storage/feed/cacheentry.go index be42008e9..1c7e22619 100644 --- a/swarm/storage/feed/cacheentry.go +++ b/swarm/storage/feed/cacheentry.go @@ -27,7 +27,7 @@ import ( const ( hasherCount = 8 feedsHashAlgorithm = storage.SHA3Hash - defaultRetrieveTimeout = 100 * time.Millisecond + defaultRetrieveTimeout = 1000 * time.Millisecond ) // cacheEntry caches the last known update of a specific Swarm feed. diff --git a/swarm/storage/feed/handler.go b/swarm/storage/feed/handler.go index 0f6f2ba34..98ed7fa99 100644 --- a/swarm/storage/feed/handler.go +++ b/swarm/storage/feed/handler.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "github.com/ethereum/go-ethereum/swarm/chunk" @@ -178,12 +179,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") } - var readCount int + var readCount int32 // Invoke the lookup engine. // The callback will be called every time the lookup algorithm needs to guess requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - readCount++ + atomic.AddInt32(&readCount, 1) id := ID{ Feed: query.Feed, Epoch: epoch, @@ -228,17 +229,17 @@ func (h *Handler) updateCache(request *Request) (*cacheEntry, error) { updateAddr := request.Addr() log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level) - feedUpdate := h.get(&request.Feed) - if feedUpdate == nil { - feedUpdate = &cacheEntry{} - h.set(&request.Feed, feedUpdate) + entry := h.get(&request.Feed) + if entry == nil { + entry = &cacheEntry{} + h.set(&request.Feed, entry) } // update our rsrcs entry map - feedUpdate.lastKey = updateAddr - feedUpdate.Update = request.Update - feedUpdate.Reader = bytes.NewReader(feedUpdate.data) - return feedUpdate, nil + entry.lastKey = updateAddr + entry.Update = request.Update + entry.Reader = bytes.NewReader(entry.data) + return entry, nil } // Update publishes a feed update diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go index c4f6fe689..3d8213e60 100644 --- a/swarm/storage/feed/handler_test.go +++ b/swarm/storage/feed/handler_test.go @@ -177,8 +177,8 @@ func TestFeedsHandler(t *testing.T) { if err != nil { t.Fatal(err) } - if request.Epoch.Base() != 0 || request.Epoch.Level != 22 { - t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 22, request.Epoch.Level) + if request.Epoch.Base() != 0 || request.Epoch.Level != 28 { + t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 28, request.Epoch.Level) } data = []byte(updates[3]) request.SetData(data) @@ -213,8 +213,8 @@ func TestFeedsHandler(t *testing.T) { if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) { t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1]) } - if update2.Level != 22 { - t.Fatalf("feed update epoch level was %d, expected 22", update2.Level) + if update2.Level != 28 { + t.Fatalf("feed update epoch level was %d, expected 28", update2.Level) } if update2.Base() != 0 { t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base()) diff --git a/swarm/storage/feed/id_test.go b/swarm/storage/feed/id_test.go index e561ff9b4..8a820abfe 100644 --- a/swarm/storage/feed/id_test.go +++ b/swarm/storage/feed/id_test.go @@ -16,11 +16,11 @@ func getTestID() *ID { func TestIDAddr(t *testing.T) { id := getTestID() updateAddr := id.Addr() - compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783") + compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x842d0a81987b9755dfeaa5558f5c134c1c0af48b6545005cac7b533d9411453a") } func TestIDSerializer(t *testing.T) { - testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019") + testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f") } func TestIDLengthCheck(t *testing.T) { diff --git a/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go b/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go new file mode 100644 index 000000000..3840bd0fd --- /dev/null +++ b/swarm/storage/feed/lookup/algorithm_fluzcapacitor.go @@ -0,0 +1,63 @@ +package lookup + +import "context" + +// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found +// going back and forth in time +// First, it will attempt to find an update where it should be now if the hint was +// really the last update. If that lookup fails, then the last update must be either the hint itself +// or the epochs right below. If however, that lookup succeeds, then the update must be +// that one or within the epochs right below. +// see the guide for a more graphical representation +func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) { + var lastFound interface{} + var epoch Epoch + if hint == NoClue { + hint = worstHint + } + + t := now + + for { + epoch = GetNextEpoch(hint, t) + value, err = read(ctx, epoch, now) + if err != nil { + return nil, err + } + if value != nil { + lastFound = value + if epoch.Level == LowestLevel || epoch.Equals(hint) { + return value, nil + } + hint = epoch + continue + } + if epoch.Base() == hint.Base() { + if lastFound != nil { + return lastFound, nil + } + // we have reached the hint itself + if hint == worstHint { + return nil, nil + } + // check it out + value, err = read(ctx, hint, now) + if err != nil { + return nil, err + } + if value != nil { + return value, nil + } + // bad hint. + t = hint.Base() + hint = worstHint + continue + } + base := epoch.Base() + if base == 0 { + return nil, nil + } + t = base - 1 + } + +} diff --git a/swarm/storage/feed/lookup/algorithm_longearth.go b/swarm/storage/feed/lookup/algorithm_longearth.go new file mode 100644 index 000000000..d0342f67c --- /dev/null +++ b/swarm/storage/feed/lookup/algorithm_longearth.go @@ -0,0 +1,185 @@ +package lookup + +import ( + "context" + "sync/atomic" + "time" +) + +type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{} + +// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches +var LongEarthLookaheadDelay = 250 * time.Millisecond + +// LongEarthLookbackDelay is the headstart the lookback gives R before it launches +var LongEarthLookbackDelay = 250 * time.Millisecond + +// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon +// as a more promising lookup path is found. As a result, this lookup algorithm is an order +// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads. +// This algorithm works as follows. On each step, the next epoch is immediately looked up (R) +// and given a head start, while two parallel "steps" are launched a short time after: +// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas +// look back (B) is the path the algorithm would take if the R lookup failed. +// as soon as R is actually finished, the A or B paths are pruned depending on the value of R. +// if A returns earlier than R, then R and B read operations can be safely canceled, saving time. +// The maximum number of active read operations is calculated as 2^(timeout/headstart). +// If headstart is infinite, this algorithm behaves as FluzCapacitor. +// timeout is the maximum execution time of the passed `read` function. +// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay +func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) { + if hint == NoClue { + hint = worstHint + } + + var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance + + errc := make(chan struct{}) // errc will help as an error shortcut signal + var gerr error // in case of error, this variable will be set + + var step stepFunc // For efficiency, the algorithm step is defined as a closure + step = func(ctxS context.Context, t uint64, last Epoch) interface{} { + stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance + trace(stepID, "init: t=%d, last=%s", t, last.String()) + var valueA, valueB, valueR interface{} + + // initialize the three read contexts + ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation + ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path + ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path + + epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance + + // define the lookAhead function, which will follow the path as if R was successful + lookAhead := func() { + valueA = step(ctxA, t, epoch) // launch the next step, recursively. + if valueA != nil { // if this path is successful, we don't need R or B. + cancelB() + cancelR() + } + } + + // define the lookBack function, which will follow the path as if R was unsuccessful + lookBack := func() { + if epoch.Base() == last.Base() { + return + } + base := epoch.Base() + if base == 0 { + return + } + valueB = step(ctxB, base-1, last) + } + + go func() { //goroutine to read the current epoch (R) + defer cancelR() + var err error + valueR, err = read(ctxR, epoch, now) // read this epoch + if valueR == nil { // if unsuccessful, cancel lookahead, otherwise cancel lookback. + cancelA() + } else { + cancelB() + } + if err != nil && err != context.Canceled { + gerr = err + close(errc) + } + }() + + go func() { // goroutine to give a headstart to R and then launch lookahead. + defer cancelA() + + // if we are at the lowest level or the epoch to look up equals the last one, + // then we cannot lookahead (can't go lower or repeat the same lookup, this would + // cause an infinite loop) + if epoch.Level == LowestLevel || epoch.Equals(last) { + return + } + + // give a head start to R, or launch immediately if R finishes early enough + select { + case <-TimeAfter(LongEarthLookaheadDelay): + lookAhead() + case <-ctxR.Done(): + if valueR != nil { + lookAhead() // only look ahead if R was successful + } + case <-ctxA.Done(): + } + }() + + go func() { // goroutine to give a headstart to R and then launch lookback. + defer cancelB() + + // give a head start to R, or launch immediately if R finishes early enough + select { + case <-TimeAfter(LongEarthLookbackDelay): + lookBack() + case <-ctxR.Done(): + if valueR == nil { + lookBack() // only look back in case R failed + } + case <-ctxB.Done(): + } + }() + + <-ctxA.Done() + if valueA != nil { + trace(stepID, "Returning valueA=%v", valueA) + return valueA + } + + <-ctxR.Done() + if valueR != nil { + trace(stepID, "Returning valueR=%v", valueR) + return valueR + } + <-ctxB.Done() + trace(stepID, "Returning valueB=%v", valueB) + return valueB + } + + var value interface{} + stepCtx, cancel := context.WithCancel(ctx) + + go func() { // launch the root step in its own goroutine to allow cancellation + defer cancel() + value = step(stepCtx, now, hint) + }() + + // wait for the algorithm to finish, but shortcut in case + // of errors + select { + case <-stepCtx.Done(): + case <-errc: + cancel() + return nil, gerr + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + if value != nil || hint == worstHint { + return value, nil + } + + // at this point the algorithm did not return a value, + // so we challenge the hint given. + value, err := read(ctx, hint, now) + if err != nil { + return nil, err + } + if value != nil { + return value, nil // hint is valid, return it. + } + + // hint is invalid. Invoke the algorithm + // without hint. + now = hint.Base() + if hint.Level == HighestLevel { + now-- + } + + return LongEarthAlgorithm(ctx, now, NoClue, read) +} diff --git a/swarm/storage/feed/lookup/epoch.go b/swarm/storage/feed/lookup/epoch.go index bafe95477..6d75ba243 100644 --- a/swarm/storage/feed/lookup/epoch.go +++ b/swarm/storage/feed/lookup/epoch.go @@ -87,5 +87,5 @@ func (e *Epoch) Equals(epoch Epoch) bool { // String implements the Stringer interface. func (e *Epoch) String() string { - return fmt.Sprintf("Epoch{Time:%d, Level:%d}", e.Time, e.Level) + return fmt.Sprintf("Epoch{Base: %d, Time:%d, Level:%d}", e.Base(), e.Time, e.Level) } diff --git a/swarm/storage/feed/lookup/lookup.go b/swarm/storage/feed/lookup/lookup.go index 1642c659a..4b233a0e0 100644 --- a/swarm/storage/feed/lookup/lookup.go +++ b/swarm/storage/feed/lookup/lookup.go @@ -20,7 +20,10 @@ so they can be found */ package lookup -import "context" +import ( + "context" + "time" +) const maxuint64 = ^uint64(0) @@ -28,8 +31,8 @@ const maxuint64 = ^uint64(0) const LowestLevel uint8 = 0 // default is 0 (1 second) // HighestLevel sets the lowest frequency the algorithm will operate at, as a power of 2. -// 25 -> 2^25 equals to roughly one year. -const HighestLevel = 25 // default is 25 (~1 year) +// 31 -> 2^31 equals to roughly 38 years. +const HighestLevel = 31 // DefaultLevel sets what level will be chosen to search when there is no hint const DefaultLevel = HighestLevel @@ -43,7 +46,12 @@ type Algorithm func(ctx context.Context, now uint64, hint Epoch, read ReadFunc) // read() will be called on each lookup attempt // Returns an error only if read() returns an error // Returns nil if an update was not found -var Lookup Algorithm = FluzCapacitorAlgorithm +var Lookup Algorithm = LongEarthAlgorithm + +// TimeAfter must point to a function that returns a timer +// This is here so that tests can replace it with +// a mock up timer factory to simulate time deterministically +var TimeAfter = time.After // ReadFunc is a handler called by Lookup each time it attempts to find a value // It should return if a value is not found @@ -123,61 +131,6 @@ func GetFirstEpoch(now uint64) Epoch { var worstHint = Epoch{Time: 0, Level: 63} -// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found -// going back and forth in time -// First, it will attempt to find an update where it should be now if the hint was -// really the last update. If that lookup fails, then the last update must be either the hint itself -// or the epochs right below. If however, that lookup succeeds, then the update must be -// that one or within the epochs right below. -// see the guide for a more graphical representation -func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) { - var lastFound interface{} - var epoch Epoch - if hint == NoClue { - hint = worstHint - } - - t := now - - for { - epoch = GetNextEpoch(hint, t) - value, err = read(ctx, epoch, now) - if err != nil { - return nil, err - } - if value != nil { - lastFound = value - if epoch.Level == LowestLevel || epoch.Equals(hint) { - return value, nil - } - hint = epoch - continue - } - if epoch.Base() == hint.Base() { - if lastFound != nil { - return lastFound, nil - } - // we have reached the hint itself - if hint == worstHint { - return nil, nil - } - // check it out - value, err = read(ctx, hint, now) - if err != nil { - return nil, err - } - if value != nil { - return value, nil - } - // bad hint. - t = hint.Base() - hint = worstHint - continue - } - base := epoch.Base() - if base == 0 { - return nil, nil - } - t = base - 1 - } +var trace = func(id int32, formatString string, a ...interface{}) { + //fmt.Printf("Step ID #%d "+formatString+"\n", append([]interface{}{id}, a...)...) } diff --git a/swarm/storage/feed/lookup/lookup_test.go b/swarm/storage/feed/lookup/lookup_test.go index 60d77b709..b5d1a3b0b 100644 --- a/swarm/storage/feed/lookup/lookup_test.go +++ b/swarm/storage/feed/lookup/lookup_test.go @@ -21,56 +21,55 @@ import ( "fmt" "math/rand" "testing" + "time" - "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" ) -type Data struct { - Payload uint64 - Time uint64 +type AlgorithmInfo struct { + Lookup lookup.Algorithm + Name string } -type Store map[lookup.EpochID]*Data - -func write(store Store, epoch lookup.Epoch, value *Data) { - log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload) - store[epoch.ID()] = value +var algorithms = []AlgorithmInfo{ + {lookup.FluzCapacitorAlgorithm, "FluzCapacitor"}, + {lookup.LongEarthAlgorithm, "LongEarth"}, } -func update(store Store, last lookup.Epoch, now uint64, value *Data) lookup.Epoch { - epoch := lookup.GetNextEpoch(last, now) +const enablePrintMetrics = false // set to true to display algorithm benchmarking stats - write(store, epoch, value) - - return epoch +func printMetric(metric string, store *Store, elapsed time.Duration) { + if enablePrintMetrics { + fmt.Printf("metric=%s, readcount=%d (successful=%d, failed=%d), cached=%d, canceled=%d, maxSimult=%d, elapsed=%s\n", metric, + store.reads, store.successful, store.failed, store.cacheHits, store.canceled, store.maxSimultaneous, elapsed) + } } const Day = 60 * 60 * 24 const Year = Day * 365 const Month = Day * 30 -func makeReadFunc(store Store, counter *int) lookup.ReadFunc { - return func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - *counter++ - data := store[epoch.ID()] - var valueStr string - if data != nil { - valueStr = fmt.Sprintf("%d", data.Payload) - } - log.Debug("Read: %d-%d, value='%s'\n", epoch.Base(), epoch.Level, valueStr) - if data != nil && data.Time <= now { - return data, nil - } - return nil, nil - } +// DefaultStoreConfig indicates the time the different read +// operations will take in the simulation +// This allows to measure an algorithm performance relative +// to other +var DefaultStoreConfig = &StoreConfig{ + CacheReadTime: 50 * time.Millisecond, + FailedReadTime: 1000 * time.Millisecond, + SuccessfulReadTime: 500 * time.Millisecond, } +// TestLookup verifies if the last update and intermediates are +// found and if that same last update is found faster if a hint is given func TestLookup(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 - readFunc := makeReadFunc(store, &readCount) + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() // write an update every month for 12 months 3 years ago and then silence for two years now := uint64(1533799046) @@ -83,68 +82,90 @@ func TestLookup(t *testing.T) { Payload: t, //our "payload" will be the timestamp itself. Time: t, } - epoch = update(store, epoch, t, &data) + epoch = store.Update(epoch, t, &data) lastData = &data } - // try to get the last value + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) + store.Reset() // reset the store read counters + + // ### 3.1.- Test how long it takes to find the last update without a hint: + timeElapsedWithoutHint := stopwatch.Measure(func() { + + // try to get the last value + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + + }) + printMetric("SIMPLE READ", store, timeElapsedWithoutHint) + + store.Reset() // reset the read counters for the next test + + // ### 3.2.- Test how long it takes to find the last update *with* a hint. + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + if err != nil { + t.Fatal(err) + } + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + printMetric("WITH HINT", store, stopwatch.Elapsed()) + + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + + store.Reset() // reset the read counters for the next test + + // ### 3.3.- try to get an intermediate value + // if we look for a value in, e.g., now - Year*3 + 6*Month, we should get that value + // Since the "payload" is the timestamp itself, we can check this. + expectedTime := now - Year*3 + 6*Month + timeElapsed = stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), expectedTime, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + + data, ok := value.(*Data) + + if !ok { + t.Fatal("Expected value to contain data") + } + + if data.Time != expectedTime { + t.Fatalf("Expected value timestamp to be %d, got %d", data.Time, expectedTime) + } + }) + printMetric("INTERMEDIATE READ", store, timeElapsed) + }) } - - readCountWithoutHint := readCount - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - // reset the read count for the next test - readCount = 0 - // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update - value, err = lookup.Lookup(context.Background(), now, epoch, readFunc) - if err != nil { - t.Fatal(err) - } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - if readCount > readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer or same reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } - - // try to get an intermediate value - // if we look for a value in now - Year*3 + 6*Month, we should get that value - // Since the "payload" is the timestamp itself, we can check this. - - expectedTime := now - Year*3 + 6*Month - - value, err = lookup.Lookup(context.Background(), expectedTime, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - - data, ok := value.(*Data) - - if !ok { - t.Fatal("Expected value to contain data") - } - - if data.Time != expectedTime { - t.Fatalf("Expected value timestamp to be %d, got %d", data.Time, expectedTime) - } - } +// TestOneUpdateAt0 checks if the lookup algorithm can return an update that +// is precisely set at t=0 func TestOneUpdateAt0(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var epoch lookup.Epoch @@ -152,24 +173,37 @@ func TestOneUpdateAt0(t *testing.T) { Payload: 79, Time: 0, } - update(store, epoch, 0, &data) + store.Update(epoch, 0, &data) //place 1 update in t=0 - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - if value != &data { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset the read counters for the next test + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != &data { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } -// Tests the update is found even when a bad hint is given +// TestBadHint tests if the update is found even when a bad hint is given func TestBadHint(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var epoch lookup.Epoch @@ -179,7 +213,7 @@ func TestBadHint(t *testing.T) { } // place an update for t=1200 - update(store, epoch, 1200, &data) + store.Update(epoch, 1200, &data) // come up with some evil hint badHint := lookup.Epoch{ @@ -187,21 +221,35 @@ func TestBadHint(t *testing.T) { Time: 1200000000, } - value, err := lookup.Lookup(context.Background(), now, badHint, readFunc) - if err != nil { - t.Fatal(err) - } - if value != &data { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, badHint, readFunc) + if err != nil { + t.Fatal(err) + } + if value != &data { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", data, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } -// Tests whether the update is found when the bad hint is exactly below the last update +// TestBadHintNextToUpdate checks whether the update is found when the bad hint is exactly below the last update func TestBadHintNextToUpdate(t *testing.T) { - store := make(Store) - readCount := 0 + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() + + // ### 2.- Setup mock storage and generate updates + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) var last *Data @@ -227,7 +275,7 @@ func TestBadHintNextToUpdate(t *testing.T) { Time: 0, } last = &data - epoch = update(store, epoch, 1200000000+i, &data) + epoch = store.Update(epoch, 1200000000+i, &data) } // come up with some evil hint: @@ -237,99 +285,132 @@ func TestBadHintNextToUpdate(t *testing.T) { Time: 1200000005, } - value, err := lookup.Lookup(context.Background(), now, badHint, readFunc) - if err != nil { - t.Fatal(err) - } - if value != last { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", last, value) + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset read counters for next test + + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, badHint, readFunc) + if err != nil { + t.Fatal(err) + } + if value != last { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", last, value) + } + }) + printMetric("SIMPLE", store, timeElapsed) + }) } } +// TestContextCancellation checks whether a lookup can be canceled func TestContextCancellation(t *testing.T) { - readFunc := func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - <-ctx.Done() - return nil, ctx.Err() - } + // ### 1.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + // ### 2.1.- Test a simple cancel of an always blocking read function + readFunc := func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + <-ctx.Done() + return nil, ctx.Err() + } - errc := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + errc := make(chan error) - go func() { - _, err := lookup.Lookup(ctx, 1200000000, lookup.NoClue, readFunc) - errc <- err - }() + go func() { + _, err := algo.Lookup(ctx, 1200000000, lookup.NoClue, readFunc) + errc <- err + }() - cancel() + cancel() //actually cancel the lookup - if err := <-errc; err != context.Canceled { - t.Fatalf("Expected lookup to return a context Cancelled error, got %v", err) - } + if err := <-errc; err != context.Canceled { + t.Fatalf("Expected lookup to return a context canceled error, got %v", err) + } - // text context cancellation during hint lookup: - ctx, cancel = context.WithCancel(context.Background()) - errc = make(chan error) - someHint := lookup.Epoch{ - Level: 25, - Time: 300, - } + // ### 2.2.- Test context cancellation during hint lookup: + ctx, cancel = context.WithCancel(context.Background()) + errc = make(chan error) + someHint := lookup.Epoch{ + Level: 25, + Time: 300, + } + // put up a read function that gets canceled only on hint lookup + readFunc = func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + if epoch == someHint { + go cancel() + <-ctx.Done() + return nil, ctx.Err() + } + return nil, nil + } - readFunc = func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { - if epoch == someHint { - go cancel() - <-ctx.Done() - return nil, ctx.Err() - } - return nil, nil - } + go func() { + _, err := algo.Lookup(ctx, 301, someHint, readFunc) + errc <- err + }() - go func() { - _, err := lookup.Lookup(ctx, 301, someHint, readFunc) - errc <- err - }() - - if err := <-errc; err != context.Canceled { - t.Fatalf("Expected lookup to return a context Cancelled error, got %v", err) + if err := <-errc; err != context.Canceled { + t.Fatalf("Expected lookup to return a context canceled error, got %v", err) + } + }) } } +// TestLookupFail makes sure the lookup function fails on a timely manner +// when there are no updates at all func TestLookupFail(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 + // ### 2.- Setup mock storage, without adding updates + // don't write anything and try to look up. + // we're testing we don't get stuck in a loop and that the lookup + // function converges in a timely fashion + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - readFunc := makeReadFunc(store, &readCount) now := uint64(1533903729) - // don't write anything and try to look up. - // we're testing we don't get stuck in a loop + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - if value != nil { - t.Fatal("Expected value to be nil, since the update should've failed") - } + stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + if value != nil { + t.Fatal("Expected value to be nil, since the update should've failed") + } + }) - expectedReads := now/(1< readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer or equal reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } + // ### 3.2.- Now test how long it takes to find the last update *with* a hint, + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + stopwatch.Stop() + if err != nil { + t.Fatal(err) + } - for i := uint64(0); i <= 994; i++ { - T := uint64(now - 1000 + i) // update every second for the last 1000 seconds - value, err := lookup.Lookup(context.Background(), T, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) - } - data, _ := value.(*Data) - if data == nil { - t.Fatalf("Expected lookup to return %d, got nil", T) - } - if data.Payload != T { - t.Fatalf("Expected lookup to return %d, got %d", T, data.Time) - } + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + + }) + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + printMetric("WITH HINT", store, timeElapsed) + + store.Reset() // reset read counters + + // ### 3.3.- Test multiple lookups at different intervals + timeElapsed = stopwatch.Measure(func() { + for i := uint64(0); i <= 10; i++ { + T := uint64(now - 1000 + i) + value, err := algo.Lookup(context.Background(), T, lookup.NoClue, readFunc) + if err != nil { + t.Fatal(err) + } + data, _ := value.(*Data) + if data == nil { + t.Fatalf("Expected lookup to return %d, got nil", T) + } + if data.Payload != T { + t.Fatalf("Expected lookup to return %d, got %d", T, data.Time) + } + } + }) + printMetric("MULTIPLE", store, timeElapsed) + }) } } +// TestSparseUpdates checks the lookup algorithm when +// updates come sparsely and in bursts func TestSparseUpdates(t *testing.T) { + // ### 1.- Initialize stopwatch time sim + stopwatch := NewStopwatch(50 * time.Millisecond) + lookup.TimeAfter = stopwatch.TimeAfter() + defer stopwatch.Stop() - store := make(Store) - readCount := 0 - readFunc := makeReadFunc(store, &readCount) + // ### 2.- Setup mock storage and write an updates sparsely in bursts, + // every 5 years 3 times starting in Jan 1st 1970 and then silence + store := NewStore(DefaultStoreConfig) + readFunc := store.MakeReadFunc() - // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence - - now := uint64(1533799046) + now := uint64(633799046) var epoch lookup.Epoch var lastData *Data - for i := uint64(0); i < 5; i++ { - T := uint64(Year * 5 * i) // write an update every 5 years 3 times starting in Jan 1st 1970 and then silence - data := Data{ - Payload: T, //our "payload" will be the timestamp itself. - Time: T, + for i := uint64(0); i < 3; i++ { + for j := uint64(0); j < 10; j++ { + T := uint64(Year*5*i + j) // write a burst of 10 updates every 5 years 3 times starting in Jan 1st 1970 and then silence + data := Data{ + Payload: T, //our "payload" will be the timestamp itself. + Time: T, + } + epoch = store.Update(epoch, T, &data) + lastData = &data } - epoch = update(store, epoch, T, &data) - lastData = &data } - // try to get the last value + // ### 3.- Test all algorithms + for _, algo := range algorithms { + t.Run(algo.Name, func(t *testing.T) { + store.Reset() // reset read counters for next test - value, err := lookup.Lookup(context.Background(), now, lookup.NoClue, readFunc) - if err != nil { - t.Fatal(err) + // ### 3.1.- Test how long it takes to find the last update without a hint: + timeElapsedWithoutHint := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, lookup.NoClue, readFunc) + stopwatch.Stop() + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + printMetric("SIMPLE", store, timeElapsedWithoutHint) + + // reset the read count for the next test + store.Reset() + + // ### 3.2.- Now test how long it takes to find the last update *with* a hint, + // it should take less time! + timeElapsed := stopwatch.Measure(func() { + value, err := algo.Lookup(context.Background(), now, epoch, readFunc) + if err != nil { + t.Fatal(err) + } + + if value != lastData { + t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) + } + }) + if timeElapsed > timeElapsedWithoutHint { + t.Fatalf("Expected lookup to complete faster than %s since we provided a hint. Took %s", timeElapsedWithoutHint, timeElapsed) + } + + printMetric("WITH HINT", store, stopwatch.Elapsed()) + + }) } - - readCountWithoutHint := readCount - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - // reset the read count for the next test - readCount = 0 - // Provide a hint to get a faster lookup. In particular, we give the exact location of the last update - value, err = lookup.Lookup(context.Background(), now, epoch, readFunc) - if err != nil { - t.Fatal(err) - } - - if value != lastData { - t.Fatalf("Expected lookup to return the last written value: %v. Got %v", lastData, value) - } - - if readCount > readCountWithoutHint { - t.Fatalf("Expected lookup to complete with fewer reads than %d since we provided a hint. Did %d reads.", readCountWithoutHint, readCount) - } - } // testG will hold precooked test results @@ -447,8 +572,9 @@ type testG struct { } // test cases -var testGetNextLevelCases = []testG{{e: lookup.Epoch{Time: 989875233, Level: 12}, n: 989875233, x: 11}, {e: lookup.Epoch{Time: 995807650, Level: 18}, n: 995598156, x: 19}, {e: lookup.Epoch{Time: 969167082, Level: 0}, n: 968990357, x: 18}, {e: lookup.Epoch{Time: 993087628, Level: 14}, n: 992987044, x: 20}, {e: lookup.Epoch{Time: 963364631, Level: 20}, n: 963364630, x: 19}, {e: lookup.Epoch{Time: 963497510, Level: 16}, n: 963370732, x: 18}, {e: lookup.Epoch{Time: 955421349, Level: 22}, n: 955421348, x: 21}, {e: lookup.Epoch{Time: 968220379, Level: 15}, n: 968220378, x: 14}, {e: lookup.Epoch{Time: 939129014, Level: 6}, n: 939128771, x: 11}, {e: lookup.Epoch{Time: 907847903, Level: 6}, n: 907791833, x: 18}, {e: lookup.Epoch{Time: 910835564, Level: 15}, n: 910835564, x: 14}, {e: lookup.Epoch{Time: 913578333, Level: 22}, n: 881808431, x: 25}, {e: lookup.Epoch{Time: 895818460, Level: 3}, n: 895818132, x: 9}, {e: lookup.Epoch{Time: 903843025, Level: 24}, n: 895609561, x: 23}, {e: lookup.Epoch{Time: 877889433, Level: 13}, n: 877877093, x: 15}, {e: lookup.Epoch{Time: 901450396, Level: 10}, n: 901450058, x: 9}, {e: lookup.Epoch{Time: 925179910, Level: 3}, n: 925168393, x: 16}, {e: lookup.Epoch{Time: 913485477, Level: 21}, n: 913485476, x: 20}, {e: lookup.Epoch{Time: 924462991, Level: 18}, n: 924462990, x: 17}, {e: lookup.Epoch{Time: 941175128, Level: 13}, n: 941175127, x: 12}, {e: lookup.Epoch{Time: 920126583, Level: 3}, n: 920100782, x: 19}, {e: lookup.Epoch{Time: 932403200, Level: 9}, n: 932279891, x: 17}, {e: lookup.Epoch{Time: 948284931, Level: 2}, n: 948284921, x: 9}, {e: lookup.Epoch{Time: 953540997, Level: 7}, n: 950547986, x: 22}, {e: lookup.Epoch{Time: 926639837, Level: 18}, n: 918608882, x: 24}, {e: lookup.Epoch{Time: 954637598, Level: 1}, n: 954578761, x: 17}, {e: lookup.Epoch{Time: 943482981, Level: 10}, n: 942924151, x: 19}, {e: lookup.Epoch{Time: 963580771, Level: 7}, n: 963580771, x: 6}, {e: lookup.Epoch{Time: 993744930, Level: 7}, n: 993690858, x: 16}, {e: lookup.Epoch{Time: 1018890213, Level: 12}, n: 1018890212, x: 11}, {e: lookup.Epoch{Time: 1030309411, Level: 2}, n: 1030309227, x: 9}, {e: lookup.Epoch{Time: 1063204997, Level: 20}, n: 1063204996, x: 19}, {e: lookup.Epoch{Time: 1094340832, Level: 6}, n: 1094340633, x: 7}, {e: lookup.Epoch{Time: 1077880597, Level: 10}, n: 1075914292, x: 20}, {e: lookup.Epoch{Time: 1051114957, Level: 18}, n: 1051114957, x: 17}, {e: lookup.Epoch{Time: 1045649701, Level: 22}, n: 1045649700, x: 21}, {e: lookup.Epoch{Time: 1066198885, Level: 14}, n: 1066198884, x: 13}, {e: lookup.Epoch{Time: 1053231952, Level: 1}, n: 1053210845, x: 16}, {e: lookup.Epoch{Time: 1068763404, Level: 14}, n: 1068675428, x: 18}, {e: lookup.Epoch{Time: 1039042173, Level: 15}, n: 1038973110, x: 17}, {e: lookup.Epoch{Time: 1050747636, Level: 6}, n: 1050747364, x: 9}, {e: lookup.Epoch{Time: 1030034434, Level: 23}, n: 1030034433, x: 22}, {e: lookup.Epoch{Time: 1003783425, Level: 18}, n: 1003783424, x: 17}, {e: lookup.Epoch{Time: 988163976, Level: 15}, n: 988084064, x: 17}, {e: lookup.Epoch{Time: 1007222377, Level: 15}, n: 1007222377, x: 14}, {e: lookup.Epoch{Time: 1001211375, Level: 13}, n: 1001208178, x: 14}, {e: lookup.Epoch{Time: 997623199, Level: 8}, n: 997623198, x: 7}, {e: lookup.Epoch{Time: 1026283830, Level: 10}, n: 1006681704, x: 24}, {e: lookup.Epoch{Time: 1019421907, Level: 20}, n: 1019421906, x: 19}, {e: lookup.Epoch{Time: 1043154306, Level: 16}, n: 1043108343, x: 16}, {e: lookup.Epoch{Time: 1075643767, Level: 17}, n: 1075325898, x: 18}, {e: lookup.Epoch{Time: 1043726309, Level: 20}, n: 1043726308, x: 19}, {e: lookup.Epoch{Time: 1056415324, Level: 17}, n: 1056415324, x: 16}, {e: lookup.Epoch{Time: 1088650219, Level: 13}, n: 1088650218, x: 12}, {e: lookup.Epoch{Time: 1088551662, Level: 7}, n: 1088543355, x: 13}, {e: lookup.Epoch{Time: 1069667265, Level: 6}, n: 1069667075, x: 7}, {e: lookup.Epoch{Time: 1079145970, Level: 18}, n: 1079145969, x: 17}, {e: lookup.Epoch{Time: 1083338876, Level: 7}, n: 1083338875, x: 6}, {e: lookup.Epoch{Time: 1051581086, Level: 4}, n: 1051568869, x: 14}, {e: lookup.Epoch{Time: 1028430882, Level: 4}, n: 1028430864, x: 5}, {e: lookup.Epoch{Time: 1057356462, Level: 1}, n: 1057356417, x: 5}, {e: lookup.Epoch{Time: 1033104266, Level: 0}, n: 1033097479, x: 13}, {e: lookup.Epoch{Time: 1031391367, Level: 11}, n: 1031387304, x: 14}, {e: lookup.Epoch{Time: 1049781164, Level: 15}, n: 1049781163, x: 14}, {e: lookup.Epoch{Time: 1027271628, Level: 12}, n: 1027271627, x: 11}, {e: lookup.Epoch{Time: 1057270560, Level: 23}, n: 1057270560, x: 22}, {e: lookup.Epoch{Time: 1047501317, Level: 15}, n: 1047501317, x: 14}, {e: lookup.Epoch{Time: 1058349035, Level: 11}, n: 1045175573, x: 24}, {e: lookup.Epoch{Time: 1057396147, Level: 20}, n: 1057396147, x: 19}, {e: lookup.Epoch{Time: 1048906375, Level: 18}, n: 1039616919, x: 25}, {e: lookup.Epoch{Time: 1074294831, Level: 20}, n: 1074294831, x: 19}, {e: lookup.Epoch{Time: 1088946052, Level: 1}, n: 1088917364, x: 14}, {e: lookup.Epoch{Time: 1112337595, Level: 17}, n: 1111008110, x: 22}, {e: lookup.Epoch{Time: 1099990284, Level: 5}, n: 1099968370, x: 15}, {e: lookup.Epoch{Time: 1087036441, Level: 16}, n: 1053967855, x: 25}, {e: lookup.Epoch{Time: 1069225185, Level: 8}, n: 1069224660, x: 10}, {e: lookup.Epoch{Time: 1057505479, Level: 9}, n: 1057505170, x: 14}, {e: lookup.Epoch{Time: 1072381377, Level: 12}, n: 1065950959, x: 22}, {e: lookup.Epoch{Time: 1093887139, Level: 8}, n: 1093863305, x: 14}, {e: lookup.Epoch{Time: 1082366510, Level: 24}, n: 1082366510, x: 23}, {e: lookup.Epoch{Time: 1103231132, Level: 14}, n: 1102292201, x: 22}, {e: lookup.Epoch{Time: 1094502355, Level: 3}, n: 1094324652, x: 18}, {e: lookup.Epoch{Time: 1068488344, Level: 12}, n: 1067577330, x: 19}, {e: lookup.Epoch{Time: 1050278233, Level: 12}, n: 1050278232, x: 11}, {e: lookup.Epoch{Time: 1047660768, Level: 5}, n: 1047652137, x: 17}, {e: lookup.Epoch{Time: 1060116167, Level: 11}, n: 1060114091, x: 12}, {e: lookup.Epoch{Time: 1068149392, Level: 21}, n: 1052074801, x: 24}, {e: lookup.Epoch{Time: 1081934120, Level: 6}, n: 1081933847, x: 8}, {e: lookup.Epoch{Time: 1107943693, Level: 16}, n: 1107096139, x: 25}, {e: lookup.Epoch{Time: 1131571649, Level: 9}, n: 1131570428, x: 11}, {e: lookup.Epoch{Time: 1123139367, Level: 0}, n: 1122912198, x: 20}, {e: lookup.Epoch{Time: 1121144423, Level: 6}, n: 1120568289, x: 20}, {e: lookup.Epoch{Time: 1089932411, Level: 17}, n: 1089932410, x: 16}, {e: lookup.Epoch{Time: 1104899012, Level: 22}, n: 1098978789, x: 22}, {e: lookup.Epoch{Time: 1094588059, Level: 21}, n: 1094588059, x: 20}, {e: lookup.Epoch{Time: 1114987438, Level: 24}, n: 1114987437, x: 23}, {e: lookup.Epoch{Time: 1084186305, Level: 7}, n: 1084186241, x: 6}, {e: lookup.Epoch{Time: 1058827111, Level: 8}, n: 1058826504, x: 9}, {e: lookup.Epoch{Time: 1090679810, Level: 12}, n: 1090616539, x: 17}, {e: lookup.Epoch{Time: 1084299475, Level: 23}, n: 1084299475, x: 22}} +var testGetNextLevelCases = []testG{testG{e: lookup.Epoch{Time: 989875233, Level: 12}, n: 989807323, x: 24}, testG{e: lookup.Epoch{Time: 995807650, Level: 18}, n: 995807649, x: 17}, testG{e: lookup.Epoch{Time: 969167082, Level: 0}, n: 969111431, x: 18}, testG{e: lookup.Epoch{Time: 993087628, Level: 14}, n: 993087627, x: 13}, testG{e: lookup.Epoch{Time: 963364631, Level: 20}, n: 962941578, x: 19}, testG{e: lookup.Epoch{Time: 963497510, Level: 16}, n: 963497509, x: 15}, testG{e: lookup.Epoch{Time: 955421349, Level: 22}, n: 929292183, x: 27}, testG{e: lookup.Epoch{Time: 968220379, Level: 15}, n: 968220378, x: 14}, testG{e: lookup.Epoch{Time: 939129014, Level: 6}, n: 939126953, x: 11}, testG{e: lookup.Epoch{Time: 907847903, Level: 6}, n: 907846146, x: 11}, testG{e: lookup.Epoch{Time: 910835564, Level: 15}, n: 703619757, x: 28}, testG{e: lookup.Epoch{Time: 913578333, Level: 22}, n: 913578332, x: 21}, testG{e: lookup.Epoch{Time: 895818460, Level: 3}, n: 895818132, x: 9}, testG{e: lookup.Epoch{Time: 903843025, Level: 24}, n: 903843025, x: 23}, testG{e: lookup.Epoch{Time: 877889433, Level: 13}, n: 149120378, x: 29}, testG{e: lookup.Epoch{Time: 901450396, Level: 10}, n: 858997793, x: 26}, testG{e: lookup.Epoch{Time: 925179910, Level: 3}, n: 925177237, x: 13}, testG{e: lookup.Epoch{Time: 913485477, Level: 21}, n: 907146511, x: 22}, testG{e: lookup.Epoch{Time: 924462991, Level: 18}, n: 924462990, x: 17}, testG{e: lookup.Epoch{Time: 941175128, Level: 13}, n: 941168924, x: 13}, testG{e: lookup.Epoch{Time: 920126583, Level: 3}, n: 538054817, x: 28}, testG{e: lookup.Epoch{Time: 891721312, Level: 18}, n: 890975671, x: 21}, testG{e: lookup.Epoch{Time: 920397342, Level: 11}, n: 920396960, x: 10}, testG{e: lookup.Epoch{Time: 953406530, Level: 3}, n: 953406530, x: 2}, testG{e: lookup.Epoch{Time: 920024527, Level: 23}, n: 920024527, x: 22}, testG{e: lookup.Epoch{Time: 927050922, Level: 7}, n: 927049632, x: 11}, testG{e: lookup.Epoch{Time: 894599900, Level: 10}, n: 890021707, x: 22}, testG{e: lookup.Epoch{Time: 883010150, Level: 3}, n: 882969902, x: 15}, testG{e: lookup.Epoch{Time: 855561102, Level: 22}, n: 855561102, x: 21}, testG{e: lookup.Epoch{Time: 828245477, Level: 19}, n: 825245571, x: 22}, testG{e: lookup.Epoch{Time: 851095026, Level: 4}, n: 851083702, x: 13}, testG{e: lookup.Epoch{Time: 879209039, Level: 11}, n: 879209039, x: 10}, testG{e: lookup.Epoch{Time: 859265651, Level: 0}, n: 840582083, x: 24}, testG{e: lookup.Epoch{Time: 827349870, Level: 24}, n: 827349869, x: 23}, testG{e: lookup.Epoch{Time: 819602318, Level: 3}, n: 18446744073490860182, x: 31}, testG{e: lookup.Epoch{Time: 849708538, Level: 7}, n: 849708538, x: 6}, testG{e: lookup.Epoch{Time: 873885094, Level: 11}, n: 873881798, x: 11}, testG{e: lookup.Epoch{Time: 852169070, Level: 1}, n: 852049399, x: 17}, testG{e: lookup.Epoch{Time: 852885343, Level: 8}, n: 852875652, x: 13}, testG{e: lookup.Epoch{Time: 830957057, Level: 8}, n: 830955867, x: 10}, testG{e: lookup.Epoch{Time: 807353611, Level: 4}, n: 807325211, x: 16}, testG{e: lookup.Epoch{Time: 803198793, Level: 8}, n: 696477575, x: 26}, testG{e: lookup.Epoch{Time: 791356887, Level: 10}, n: 791356003, x: 10}, testG{e: lookup.Epoch{Time: 817771215, Level: 12}, n: 817708431, x: 17}, testG{e: lookup.Epoch{Time: 846211146, Level: 14}, n: 846211146, x: 13}, testG{e: lookup.Epoch{Time: 821849822, Level: 9}, n: 821849229, x: 9}, testG{e: lookup.Epoch{Time: 789508756, Level: 9}, n: 789508755, x: 8}, testG{e: lookup.Epoch{Time: 814088521, Level: 12}, n: 814088512, x: 11}, testG{e: lookup.Epoch{Time: 813665673, Level: 6}, n: 813548257, x: 17}, testG{e: lookup.Epoch{Time: 791472209, Level: 6}, n: 720857845, x: 26}, testG{e: lookup.Epoch{Time: 805687744, Level: 2}, n: 805687720, x: 6}, testG{e: lookup.Epoch{Time: 783153927, Level: 12}, n: 783134053, x: 14}, testG{e: lookup.Epoch{Time: 815033655, Level: 11}, n: 815033654, x: 10}, testG{e: lookup.Epoch{Time: 821184581, Level: 6}, n: 821184464, x: 11}, testG{e: lookup.Epoch{Time: 841908114, Level: 2}, n: 841636025, x: 18}, testG{e: lookup.Epoch{Time: 862969167, Level: 20}, n: 862919955, x: 19}, testG{e: lookup.Epoch{Time: 887604565, Level: 21}, n: 887604564, x: 20}, testG{e: lookup.Epoch{Time: 863723789, Level: 10}, n: 858274530, x: 22}, testG{e: lookup.Epoch{Time: 851533290, Level: 10}, n: 851531385, x: 11}, testG{e: lookup.Epoch{Time: 826032484, Level: 14}, n: 826032484, x: 13}, testG{e: lookup.Epoch{Time: 819401505, Level: 7}, n: 818943526, x: 18}, testG{e: lookup.Epoch{Time: 800886832, Level: 12}, n: 800563106, x: 19}, testG{e: lookup.Epoch{Time: 780767476, Level: 10}, n: 694450997, x: 26}, testG{e: lookup.Epoch{Time: 789209418, Level: 15}, n: 789209417, x: 14}, testG{e: lookup.Epoch{Time: 816086666, Level: 9}, n: 816034646, x: 18}, testG{e: lookup.Epoch{Time: 835407077, Level: 21}, n: 835407076, x: 20}, testG{e: lookup.Epoch{Time: 846527322, Level: 20}, n: 846527321, x: 19}, testG{e: lookup.Epoch{Time: 850131130, Level: 19}, n: 18446744073670013406, x: 31}, testG{e: lookup.Epoch{Time: 842248607, Level: 24}, n: 783963834, x: 28}, testG{e: lookup.Epoch{Time: 816181999, Level: 2}, n: 816124867, x: 15}, testG{e: lookup.Epoch{Time: 806627026, Level: 17}, n: 756013427, x: 28}, testG{e: lookup.Epoch{Time: 826223084, Level: 4}, n: 826169865, x: 16}, testG{e: lookup.Epoch{Time: 835380147, Level: 21}, n: 835380147, x: 20}, testG{e: lookup.Epoch{Time: 860137874, Level: 3}, n: 860137782, x: 7}, testG{e: lookup.Epoch{Time: 860623757, Level: 8}, n: 860621582, x: 12}, testG{e: lookup.Epoch{Time: 875464114, Level: 24}, n: 875464114, x: 23}, testG{e: lookup.Epoch{Time: 853804052, Level: 6}, n: 853804051, x: 5}, testG{e: lookup.Epoch{Time: 864150903, Level: 14}, n: 854360673, x: 24}, testG{e: lookup.Epoch{Time: 850104561, Level: 23}, n: 850104561, x: 22}, testG{e: lookup.Epoch{Time: 878020186, Level: 24}, n: 878020186, x: 23}, testG{e: lookup.Epoch{Time: 900150940, Level: 8}, n: 899224760, x: 21}, testG{e: lookup.Epoch{Time: 869566202, Level: 2}, n: 869566199, x: 3}, testG{e: lookup.Epoch{Time: 851878045, Level: 5}, n: 851878045, x: 4}, testG{e: lookup.Epoch{Time: 824469671, Level: 12}, n: 824466504, x: 13}, testG{e: lookup.Epoch{Time: 819830223, Level: 9}, n: 816550241, x: 22}, testG{e: lookup.Epoch{Time: 813720249, Level: 20}, n: 801351581, x: 28}, testG{e: lookup.Epoch{Time: 831200185, Level: 20}, n: 830760165, x: 19}, testG{e: lookup.Epoch{Time: 838915973, Level: 9}, n: 838915972, x: 8}, testG{e: lookup.Epoch{Time: 812902644, Level: 5}, n: 812902644, x: 4}, testG{e: lookup.Epoch{Time: 812755887, Level: 3}, n: 812755887, x: 2}, testG{e: lookup.Epoch{Time: 822497779, Level: 8}, n: 822486000, x: 14}, testG{e: lookup.Epoch{Time: 832407585, Level: 9}, n: 579450238, x: 28}, testG{e: lookup.Epoch{Time: 799645403, Level: 23}, n: 799645403, x: 22}, testG{e: lookup.Epoch{Time: 827279665, Level: 2}, n: 826723872, x: 19}, testG{e: lookup.Epoch{Time: 846062554, Level: 6}, n: 765881119, x: 28}, testG{e: lookup.Epoch{Time: 855122998, Level: 6}, n: 855122978, x: 5}, testG{e: lookup.Epoch{Time: 841905104, Level: 4}, n: 751401236, x: 28}, testG{e: lookup.Epoch{Time: 857737438, Level: 12}, n: 325468127, x: 29}, testG{e: lookup.Epoch{Time: 838103691, Level: 18}, n: 779030823, x: 28}, testG{e: lookup.Epoch{Time: 841581240, Level: 22}, n: 841581239, x: 21}} +// TestGetNextLevel tests the lookup.GetNextLevel function func TestGetNextLevel(t *testing.T) { // First, test well-known cases @@ -492,7 +618,7 @@ func TestGetNextLevel(t *testing.T) { } -// cookGetNextLevelTests is used to generate a deterministic +// CookGetNextLevelTests is used to generate a deterministic // set of cases for TestGetNextLevel and thus "freeze" its current behavior func CookGetNextLevelTests(t *testing.T) { st := "" diff --git a/swarm/storage/feed/lookup/store_test.go b/swarm/storage/feed/lookup/store_test.go new file mode 100644 index 000000000..ed5209319 --- /dev/null +++ b/swarm/storage/feed/lookup/store_test.go @@ -0,0 +1,154 @@ +package lookup_test + +/* +This file contains components to mock a storage for testing +lookup algorithms and measure the number of reads. +*/ + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/storage/feed/lookup" +) + +// Data is a struct to keep a value to store/retrieve during testing +type Data struct { + Payload uint64 + Time uint64 +} + +// String implements fmt.Stringer +func (d *Data) String() string { + return fmt.Sprintf("%d-%d", d.Payload, d.Time) +} + +// Datamap is an internal map to hold the mocked storage +type DataMap map[lookup.EpochID]*Data + +// StoreConfig allows to specify the simulated delays for each type of +// read operation +type StoreConfig struct { + CacheReadTime time.Duration // time it takes to read from the cache + FailedReadTime time.Duration // time it takes to acknowledge a read as failed + SuccessfulReadTime time.Duration // time it takes to fetch data +} + +// StoreCounters will track read count metrics +type StoreCounters struct { + reads int + cacheHits int + failed int + successful int + canceled int + maxSimultaneous int +} + +// Store simulates a store and keeps track of performance counters +type Store struct { + StoreConfig + StoreCounters + data DataMap + cache DataMap + lock sync.RWMutex + activeReads int +} + +// NewStore returns a new mock store ready for use +func NewStore(config *StoreConfig) *Store { + store := &Store{ + StoreConfig: *config, + data: make(DataMap), + } + + store.Reset() + return store +} + +// Reset reset performance counters and clears the cache +func (s *Store) Reset() { + s.cache = make(DataMap) + s.StoreCounters = StoreCounters{} +} + +// Put stores a value in the mock store at the given epoch +func (s *Store) Put(epoch lookup.Epoch, value *Data) { + log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload) + s.data[epoch.ID()] = value +} + +// Update runs the seed algorithm to place the update in the appropriate epoch +func (s *Store) Update(last lookup.Epoch, now uint64, value *Data) lookup.Epoch { + epoch := lookup.GetNextEpoch(last, now) + s.Put(epoch, value) + return epoch +} + +// Get retrieves data at the specified epoch, simulating a delay +func (s *Store) Get(ctx context.Context, epoch lookup.Epoch, now uint64) (value interface{}, err error) { + epochID := epoch.ID() + var operationTime time.Duration + + defer func() { // simulate a delay according to what has actually happened + select { + case <-lookup.TimeAfter(operationTime): + case <-ctx.Done(): + s.lock.Lock() + s.canceled++ + s.lock.Unlock() + value = nil + err = ctx.Err() + } + s.lock.Lock() + s.activeReads-- + s.lock.Unlock() + }() + + s.lock.Lock() + defer s.lock.Unlock() + s.reads++ + s.activeReads++ + if s.activeReads > s.maxSimultaneous { + s.maxSimultaneous = s.activeReads + } + + // 1.- Simulate a cache read + item := s.cache[epochID] + operationTime += s.CacheReadTime + + if item != nil { + s.cacheHits++ + if item.Time <= now { + s.successful++ + return item, nil + } + return nil, nil + } + + // 2.- simulate a full read + + item = s.data[epochID] + if item != nil { + operationTime += s.SuccessfulReadTime + s.successful++ + s.cache[epochID] = item + if item.Time <= now { + return item, nil + } + } else { + operationTime += s.FailedReadTime + s.failed++ + } + return nil, nil +} + +// MakeReadFunc returns a read function suitable for the lookup algorithm, mapped +// to this mock storage +func (s *Store) MakeReadFunc() lookup.ReadFunc { + return func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) { + return s.Get(ctx, epoch, now) + } +} diff --git a/swarm/storage/feed/lookup/timesim_test.go b/swarm/storage/feed/lookup/timesim_test.go new file mode 100644 index 000000000..2a254188c --- /dev/null +++ b/swarm/storage/feed/lookup/timesim_test.go @@ -0,0 +1,128 @@ +package lookup_test + +// This file contains simple time simulation tools for testing +// and measuring time-aware algorithms + +import ( + "sync" + "time" +) + +// Timer tracks information about a simulated timer +type Timer struct { + deadline time.Time + signal chan time.Time + id int +} + +// Stopwatch measures simulated execution time and manages simulated timers +type Stopwatch struct { + t time.Time + resolution time.Duration + timers map[int]*Timer + timerCounter int + stopSignal chan struct{} + lock sync.RWMutex +} + +// NewStopwatch returns a simulated clock that ticks on `resolution` intervals +func NewStopwatch(resolution time.Duration) *Stopwatch { + s := &Stopwatch{ + resolution: resolution, + } + s.Reset() + return s +} + +// Reset clears all timers and sents the stopwatch to zero +func (s *Stopwatch) Reset() { + s.t = time.Time{} + s.timers = make(map[int]*Timer) + s.Stop() +} + +// Tick advances simulated time by the stopwatch's resolution and triggers +// all due timers +func (s *Stopwatch) Tick() { + s.t = s.t.Add(s.resolution) + + s.lock.Lock() + defer s.lock.Unlock() + + for id, timer := range s.timers { + if s.t.After(timer.deadline) || s.t.Equal(timer.deadline) { + timer.signal <- s.t + close(timer.signal) + delete(s.timers, id) + } + } +} + +// NewTimer returns a new timer that will trigger after `duration` elapses in the +// simulation +func (s *Stopwatch) NewTimer(duration time.Duration) <-chan time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + s.timerCounter++ + timer := &Timer{ + deadline: s.t.Add(duration), + signal: make(chan time.Time, 1), + id: s.timerCounter, + } + + s.timers[timer.id] = timer + return timer.signal +} + +// TimeAfter returns a simulated timer factory that can replace `time.After` +func (s *Stopwatch) TimeAfter() func(d time.Duration) <-chan time.Time { + return func(d time.Duration) <-chan time.Time { + return s.NewTimer(d) + } +} + +// Elapsed returns the time that has passed in the simulation +func (s *Stopwatch) Elapsed() time.Duration { + return s.t.Sub(time.Time{}) +} + +// Run starts the time simulation +func (s *Stopwatch) Run() { + go func() { + stopSignal := make(chan struct{}) + s.lock.Lock() + if s.stopSignal != nil { + close(s.stopSignal) + } + s.stopSignal = stopSignal + s.lock.Unlock() + for { + select { + case <-time.After(1 * time.Millisecond): + s.Tick() + case <-stopSignal: + return + } + } + }() +} + +// Stop stops the time simulation +func (s *Stopwatch) Stop() { + s.lock.Lock() + defer s.lock.Unlock() + + if s.stopSignal != nil { + close(s.stopSignal) + s.stopSignal = nil + } +} + +func (s *Stopwatch) Measure(measuredFunc func()) time.Duration { + s.Reset() + s.Run() + defer s.Stop() + measuredFunc() + return s.Elapsed() +} diff --git a/swarm/storage/feed/query_test.go b/swarm/storage/feed/query_test.go index 9fa5e2980..1ec45762e 100644 --- a/swarm/storage/feed/query_test.go +++ b/swarm/storage/feed/query_test.go @@ -30,7 +30,7 @@ func getTestQuery() *Query { } func TestQueryValues(t *testing.T) { - var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"} + var expected = KV{"hint.level": "31", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"} query := getTestQuery() testValueSerializer(t, query, expected) diff --git a/swarm/storage/feed/request_test.go b/swarm/storage/feed/request_test.go index c30158fdd..b9c1381c6 100644 --- a/swarm/storage/feed/request_test.go +++ b/swarm/storage/feed/request_test.go @@ -223,7 +223,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) { t.Fatalf("error creating update chunk:%s", err) } - compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f5a0ffe0bc27f207cd5b00944c8b9cee93e08b89b5ada777f123ac535189333f174a6a4ca2f43a92c4a477a49d774813c36ce8288552c58e6205b0ac35d0507eb00") + compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f9896df5937e64e51a7994479ff3fe0ed790d539b9b3e85e93c0014a8a64374f23603c79d16e99b50a757896d3816d7022ac594ad1415679a9b164afb2e5926d801") var recovered Request recovered.fromChunk(chunk) diff --git a/swarm/storage/feed/update_test.go b/swarm/storage/feed/update_test.go index 24c09b361..e4e0963e9 100644 --- a/swarm/storage/feed/update_test.go +++ b/swarm/storage/feed/update_test.go @@ -28,7 +28,7 @@ func getTestFeedUpdate() *Update { } func TestUpdateSerializer(t *testing.T) { - testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f") + testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f") } func TestUpdateLengthCheck(t *testing.T) {