forked from cerc-io/plugeth
swarm/feeds: Parallel feed lookups (#19414)
This commit is contained in:
parent
0c5f8c078a
commit
1e067202a2
@ -27,7 +27,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
hasherCount = 8
|
hasherCount = 8
|
||||||
feedsHashAlgorithm = storage.SHA3Hash
|
feedsHashAlgorithm = storage.SHA3Hash
|
||||||
defaultRetrieveTimeout = 100 * time.Millisecond
|
defaultRetrieveTimeout = 1000 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
// cacheEntry caches the last known update of a specific Swarm feed.
|
// cacheEntry caches the last known update of a specific Swarm feed.
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
|
|
||||||
@ -178,12 +179,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
|
|||||||
return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
|
return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
|
||||||
}
|
}
|
||||||
|
|
||||||
var readCount int
|
var readCount int32
|
||||||
|
|
||||||
// Invoke the lookup engine.
|
// Invoke the lookup engine.
|
||||||
// The callback will be called every time the lookup algorithm needs to guess
|
// The callback will be called every time the lookup algorithm needs to guess
|
||||||
requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) {
|
requestPtr, err := lookup.Lookup(ctx, timeLimit, query.Hint, func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) {
|
||||||
readCount++
|
atomic.AddInt32(&readCount, 1)
|
||||||
id := ID{
|
id := ID{
|
||||||
Feed: query.Feed,
|
Feed: query.Feed,
|
||||||
Epoch: epoch,
|
Epoch: epoch,
|
||||||
@ -228,17 +229,17 @@ func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
|
|||||||
updateAddr := request.Addr()
|
updateAddr := request.Addr()
|
||||||
log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
|
log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
|
||||||
|
|
||||||
feedUpdate := h.get(&request.Feed)
|
entry := h.get(&request.Feed)
|
||||||
if feedUpdate == nil {
|
if entry == nil {
|
||||||
feedUpdate = &cacheEntry{}
|
entry = &cacheEntry{}
|
||||||
h.set(&request.Feed, feedUpdate)
|
h.set(&request.Feed, entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update our rsrcs entry map
|
// update our rsrcs entry map
|
||||||
feedUpdate.lastKey = updateAddr
|
entry.lastKey = updateAddr
|
||||||
feedUpdate.Update = request.Update
|
entry.Update = request.Update
|
||||||
feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
|
entry.Reader = bytes.NewReader(entry.data)
|
||||||
return feedUpdate, nil
|
return entry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update publishes a feed update
|
// Update publishes a feed update
|
||||||
|
@ -177,8 +177,8 @@ func TestFeedsHandler(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if request.Epoch.Base() != 0 || request.Epoch.Level != 22 {
|
if request.Epoch.Base() != 0 || request.Epoch.Level != 28 {
|
||||||
t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 22, request.Epoch.Level)
|
t.Fatalf("Expected epoch base time to be %d, got %d. Expected epoch level to be %d, got %d", 0, request.Epoch.Base(), 28, request.Epoch.Level)
|
||||||
}
|
}
|
||||||
data = []byte(updates[3])
|
data = []byte(updates[3])
|
||||||
request.SetData(data)
|
request.SetData(data)
|
||||||
@ -213,8 +213,8 @@ func TestFeedsHandler(t *testing.T) {
|
|||||||
if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) {
|
if !bytes.Equal(update2.data, []byte(updates[len(updates)-1])) {
|
||||||
t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1])
|
t.Fatalf("feed update data was %v, expected %v", string(update2.data), updates[len(updates)-1])
|
||||||
}
|
}
|
||||||
if update2.Level != 22 {
|
if update2.Level != 28 {
|
||||||
t.Fatalf("feed update epoch level was %d, expected 22", update2.Level)
|
t.Fatalf("feed update epoch level was %d, expected 28", update2.Level)
|
||||||
}
|
}
|
||||||
if update2.Base() != 0 {
|
if update2.Base() != 0 {
|
||||||
t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base())
|
t.Fatalf("feed update epoch base time was %d, expected 0", update2.Base())
|
||||||
|
@ -16,11 +16,11 @@ func getTestID() *ID {
|
|||||||
func TestIDAddr(t *testing.T) {
|
func TestIDAddr(t *testing.T) {
|
||||||
id := getTestID()
|
id := getTestID()
|
||||||
updateAddr := id.Addr()
|
updateAddr := id.Addr()
|
||||||
compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783")
|
compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x842d0a81987b9755dfeaa5558f5c134c1c0af48b6545005cac7b533d9411453a")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIDSerializer(t *testing.T) {
|
func TestIDSerializer(t *testing.T) {
|
||||||
testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019")
|
testBinarySerializerRecovery(t, getTestID(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIDLengthCheck(t *testing.T) {
|
func TestIDLengthCheck(t *testing.T) {
|
||||||
|
63
swarm/storage/feed/lookup/algorithm_fluzcapacitor.go
Normal file
63
swarm/storage/feed/lookup/algorithm_fluzcapacitor.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package lookup
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found
|
||||||
|
// going back and forth in time
|
||||||
|
// First, it will attempt to find an update where it should be now if the hint was
|
||||||
|
// really the last update. If that lookup fails, then the last update must be either the hint itself
|
||||||
|
// or the epochs right below. If however, that lookup succeeds, then the update must be
|
||||||
|
// that one or within the epochs right below.
|
||||||
|
// see the guide for a more graphical representation
|
||||||
|
func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) {
|
||||||
|
var lastFound interface{}
|
||||||
|
var epoch Epoch
|
||||||
|
if hint == NoClue {
|
||||||
|
hint = worstHint
|
||||||
|
}
|
||||||
|
|
||||||
|
t := now
|
||||||
|
|
||||||
|
for {
|
||||||
|
epoch = GetNextEpoch(hint, t)
|
||||||
|
value, err = read(ctx, epoch, now)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if value != nil {
|
||||||
|
lastFound = value
|
||||||
|
if epoch.Level == LowestLevel || epoch.Equals(hint) {
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
hint = epoch
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if epoch.Base() == hint.Base() {
|
||||||
|
if lastFound != nil {
|
||||||
|
return lastFound, nil
|
||||||
|
}
|
||||||
|
// we have reached the hint itself
|
||||||
|
if hint == worstHint {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
// check it out
|
||||||
|
value, err = read(ctx, hint, now)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if value != nil {
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
// bad hint.
|
||||||
|
t = hint.Base()
|
||||||
|
hint = worstHint
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
base := epoch.Base()
|
||||||
|
if base == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
t = base - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
185
swarm/storage/feed/lookup/algorithm_longearth.go
Normal file
185
swarm/storage/feed/lookup/algorithm_longearth.go
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
package lookup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{}
|
||||||
|
|
||||||
|
// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches
|
||||||
|
var LongEarthLookaheadDelay = 250 * time.Millisecond
|
||||||
|
|
||||||
|
// LongEarthLookbackDelay is the headstart the lookback gives R before it launches
|
||||||
|
var LongEarthLookbackDelay = 250 * time.Millisecond
|
||||||
|
|
||||||
|
// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon
|
||||||
|
// as a more promising lookup path is found. As a result, this lookup algorithm is an order
|
||||||
|
// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads.
|
||||||
|
// This algorithm works as follows. On each step, the next epoch is immediately looked up (R)
|
||||||
|
// and given a head start, while two parallel "steps" are launched a short time after:
|
||||||
|
// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas
|
||||||
|
// look back (B) is the path the algorithm would take if the R lookup failed.
|
||||||
|
// as soon as R is actually finished, the A or B paths are pruned depending on the value of R.
|
||||||
|
// if A returns earlier than R, then R and B read operations can be safely canceled, saving time.
|
||||||
|
// The maximum number of active read operations is calculated as 2^(timeout/headstart).
|
||||||
|
// If headstart is infinite, this algorithm behaves as FluzCapacitor.
|
||||||
|
// timeout is the maximum execution time of the passed `read` function.
|
||||||
|
// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay
|
||||||
|
func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) {
|
||||||
|
if hint == NoClue {
|
||||||
|
hint = worstHint
|
||||||
|
}
|
||||||
|
|
||||||
|
var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance
|
||||||
|
|
||||||
|
errc := make(chan struct{}) // errc will help as an error shortcut signal
|
||||||
|
var gerr error // in case of error, this variable will be set
|
||||||
|
|
||||||
|
var step stepFunc // For efficiency, the algorithm step is defined as a closure
|
||||||
|
step = func(ctxS context.Context, t uint64, last Epoch) interface{} {
|
||||||
|
stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance
|
||||||
|
trace(stepID, "init: t=%d, last=%s", t, last.String())
|
||||||
|
var valueA, valueB, valueR interface{}
|
||||||
|
|
||||||
|
// initialize the three read contexts
|
||||||
|
ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation
|
||||||
|
ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path
|
||||||
|
ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path
|
||||||
|
|
||||||
|
epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance
|
||||||
|
|
||||||
|
// define the lookAhead function, which will follow the path as if R was successful
|
||||||
|
lookAhead := func() {
|
||||||
|
valueA = step(ctxA, t, epoch) // launch the next step, recursively.
|
||||||
|
if valueA != nil { // if this path is successful, we don't need R or B.
|
||||||
|
cancelB()
|
||||||
|
cancelR()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// define the lookBack function, which will follow the path as if R was unsuccessful
|
||||||
|
lookBack := func() {
|
||||||
|
if epoch.Base() == last.Base() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
base := epoch.Base()
|
||||||
|
if base == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
valueB = step(ctxB, base-1, last)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() { //goroutine to read the current epoch (R)
|
||||||
|
defer cancelR()
|
||||||
|
var err error
|
||||||
|
valueR, err = read(ctxR, epoch, now) // read this epoch
|
||||||
|
if valueR == nil { // if unsuccessful, cancel lookahead, otherwise cancel lookback.
|
||||||
|
cancelA()
|
||||||
|
} else {
|
||||||
|
cancelB()
|
||||||
|
}
|
||||||
|
if err != nil && err != context.Canceled {
|
||||||
|
gerr = err
|
||||||
|
close(errc)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() { // goroutine to give a headstart to R and then launch lookahead.
|
||||||
|
defer cancelA()
|
||||||
|
|
||||||
|
// if we are at the lowest level or the epoch to look up equals the last one,
|
||||||
|
// then we cannot lookahead (can't go lower or repeat the same lookup, this would
|
||||||
|
// cause an infinite loop)
|
||||||
|
if epoch.Level == LowestLevel || epoch.Equals(last) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// give a head start to R, or launch immediately if R finishes early enough
|
||||||
|
select {
|
||||||
|
case <-TimeAfter(LongEarthLookaheadDelay):
|
||||||
|
lookAhead()
|
||||||
|
case <-ctxR.Done():
|
||||||
|
if valueR != nil {
|
||||||
|
lookAhead() // only look ahead if R was successful
|
||||||
|
}
|
||||||
|
case <-ctxA.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() { // goroutine to give a headstart to R and then launch lookback.
|
||||||
|
defer cancelB()
|
||||||
|
|
||||||
|
// give a head start to R, or launch immediately if R finishes early enough
|
||||||
|
select {
|
||||||
|
case <-TimeAfter(LongEarthLookbackDelay):
|
||||||
|
lookBack()
|
||||||
|
case <-ctxR.Done():
|
||||||
|
if valueR == nil {
|
||||||
|
lookBack() // only look back in case R failed
|
||||||
|
}
|
||||||
|
case <-ctxB.Done():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-ctxA.Done()
|
||||||
|
if valueA != nil {
|
||||||
|
trace(stepID, "Returning valueA=%v", valueA)
|
||||||
|
return valueA
|
||||||
|
}
|
||||||
|
|
||||||
|
<-ctxR.Done()
|
||||||
|
if valueR != nil {
|
||||||
|
trace(stepID, "Returning valueR=%v", valueR)
|
||||||
|
return valueR
|
||||||
|
}
|
||||||
|
<-ctxB.Done()
|
||||||
|
trace(stepID, "Returning valueB=%v", valueB)
|
||||||
|
return valueB
|
||||||
|
}
|
||||||
|
|
||||||
|
var value interface{}
|
||||||
|
stepCtx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
go func() { // launch the root step in its own goroutine to allow cancellation
|
||||||
|
defer cancel()
|
||||||
|
value = step(stepCtx, now, hint)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for the algorithm to finish, but shortcut in case
|
||||||
|
// of errors
|
||||||
|
select {
|
||||||
|
case <-stepCtx.Done():
|
||||||
|
case <-errc:
|
||||||
|
cancel()
|
||||||
|
return nil, gerr
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if value != nil || hint == worstHint {
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// at this point the algorithm did not return a value,
|
||||||
|
// so we challenge the hint given.
|
||||||
|
value, err := read(ctx, hint, now)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if value != nil {
|
||||||
|
return value, nil // hint is valid, return it.
|
||||||
|
}
|
||||||
|
|
||||||
|
// hint is invalid. Invoke the algorithm
|
||||||
|
// without hint.
|
||||||
|
now = hint.Base()
|
||||||
|
if hint.Level == HighestLevel {
|
||||||
|
now--
|
||||||
|
}
|
||||||
|
|
||||||
|
return LongEarthAlgorithm(ctx, now, NoClue, read)
|
||||||
|
}
|
@ -87,5 +87,5 @@ func (e *Epoch) Equals(epoch Epoch) bool {
|
|||||||
|
|
||||||
// String implements the Stringer interface.
|
// String implements the Stringer interface.
|
||||||
func (e *Epoch) String() string {
|
func (e *Epoch) String() string {
|
||||||
return fmt.Sprintf("Epoch{Time:%d, Level:%d}", e.Time, e.Level)
|
return fmt.Sprintf("Epoch{Base: %d, Time:%d, Level:%d}", e.Base(), e.Time, e.Level)
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,10 @@ so they can be found
|
|||||||
*/
|
*/
|
||||||
package lookup
|
package lookup
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
const maxuint64 = ^uint64(0)
|
const maxuint64 = ^uint64(0)
|
||||||
|
|
||||||
@ -28,8 +31,8 @@ const maxuint64 = ^uint64(0)
|
|||||||
const LowestLevel uint8 = 0 // default is 0 (1 second)
|
const LowestLevel uint8 = 0 // default is 0 (1 second)
|
||||||
|
|
||||||
// HighestLevel sets the lowest frequency the algorithm will operate at, as a power of 2.
|
// HighestLevel sets the lowest frequency the algorithm will operate at, as a power of 2.
|
||||||
// 25 -> 2^25 equals to roughly one year.
|
// 31 -> 2^31 equals to roughly 38 years.
|
||||||
const HighestLevel = 25 // default is 25 (~1 year)
|
const HighestLevel = 31
|
||||||
|
|
||||||
// DefaultLevel sets what level will be chosen to search when there is no hint
|
// DefaultLevel sets what level will be chosen to search when there is no hint
|
||||||
const DefaultLevel = HighestLevel
|
const DefaultLevel = HighestLevel
|
||||||
@ -43,7 +46,12 @@ type Algorithm func(ctx context.Context, now uint64, hint Epoch, read ReadFunc)
|
|||||||
// read() will be called on each lookup attempt
|
// read() will be called on each lookup attempt
|
||||||
// Returns an error only if read() returns an error
|
// Returns an error only if read() returns an error
|
||||||
// Returns nil if an update was not found
|
// Returns nil if an update was not found
|
||||||
var Lookup Algorithm = FluzCapacitorAlgorithm
|
var Lookup Algorithm = LongEarthAlgorithm
|
||||||
|
|
||||||
|
// TimeAfter must point to a function that returns a timer
|
||||||
|
// This is here so that tests can replace it with
|
||||||
|
// a mock up timer factory to simulate time deterministically
|
||||||
|
var TimeAfter = time.After
|
||||||
|
|
||||||
// ReadFunc is a handler called by Lookup each time it attempts to find a value
|
// ReadFunc is a handler called by Lookup each time it attempts to find a value
|
||||||
// It should return <nil> if a value is not found
|
// It should return <nil> if a value is not found
|
||||||
@ -123,61 +131,6 @@ func GetFirstEpoch(now uint64) Epoch {
|
|||||||
|
|
||||||
var worstHint = Epoch{Time: 0, Level: 63}
|
var worstHint = Epoch{Time: 0, Level: 63}
|
||||||
|
|
||||||
// FluzCapacitorAlgorithm works by narrowing the epoch search area if an update is found
|
var trace = func(id int32, formatString string, a ...interface{}) {
|
||||||
// going back and forth in time
|
//fmt.Printf("Step ID #%d "+formatString+"\n", append([]interface{}{id}, a...)...)
|
||||||
// First, it will attempt to find an update where it should be now if the hint was
|
|
||||||
// really the last update. If that lookup fails, then the last update must be either the hint itself
|
|
||||||
// or the epochs right below. If however, that lookup succeeds, then the update must be
|
|
||||||
// that one or within the epochs right below.
|
|
||||||
// see the guide for a more graphical representation
|
|
||||||
func FluzCapacitorAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (value interface{}, err error) {
|
|
||||||
var lastFound interface{}
|
|
||||||
var epoch Epoch
|
|
||||||
if hint == NoClue {
|
|
||||||
hint = worstHint
|
|
||||||
}
|
|
||||||
|
|
||||||
t := now
|
|
||||||
|
|
||||||
for {
|
|
||||||
epoch = GetNextEpoch(hint, t)
|
|
||||||
value, err = read(ctx, epoch, now)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if value != nil {
|
|
||||||
lastFound = value
|
|
||||||
if epoch.Level == LowestLevel || epoch.Equals(hint) {
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
hint = epoch
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if epoch.Base() == hint.Base() {
|
|
||||||
if lastFound != nil {
|
|
||||||
return lastFound, nil
|
|
||||||
}
|
|
||||||
// we have reached the hint itself
|
|
||||||
if hint == worstHint {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
// check it out
|
|
||||||
value, err = read(ctx, hint, now)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if value != nil {
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
// bad hint.
|
|
||||||
t = hint.Base()
|
|
||||||
hint = worstHint
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
base := epoch.Base()
|
|
||||||
if base == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
t = base - 1
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
File diff suppressed because one or more lines are too long
154
swarm/storage/feed/lookup/store_test.go
Normal file
154
swarm/storage/feed/lookup/store_test.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
package lookup_test
|
||||||
|
|
||||||
|
/*
|
||||||
|
This file contains components to mock a storage for testing
|
||||||
|
lookup algorithms and measure the number of reads.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/log"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/storage/feed/lookup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Data is a struct to keep a value to store/retrieve during testing
|
||||||
|
type Data struct {
|
||||||
|
Payload uint64
|
||||||
|
Time uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// String implements fmt.Stringer
|
||||||
|
func (d *Data) String() string {
|
||||||
|
return fmt.Sprintf("%d-%d", d.Payload, d.Time)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Datamap is an internal map to hold the mocked storage
|
||||||
|
type DataMap map[lookup.EpochID]*Data
|
||||||
|
|
||||||
|
// StoreConfig allows to specify the simulated delays for each type of
|
||||||
|
// read operation
|
||||||
|
type StoreConfig struct {
|
||||||
|
CacheReadTime time.Duration // time it takes to read from the cache
|
||||||
|
FailedReadTime time.Duration // time it takes to acknowledge a read as failed
|
||||||
|
SuccessfulReadTime time.Duration // time it takes to fetch data
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreCounters will track read count metrics
|
||||||
|
type StoreCounters struct {
|
||||||
|
reads int
|
||||||
|
cacheHits int
|
||||||
|
failed int
|
||||||
|
successful int
|
||||||
|
canceled int
|
||||||
|
maxSimultaneous int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store simulates a store and keeps track of performance counters
|
||||||
|
type Store struct {
|
||||||
|
StoreConfig
|
||||||
|
StoreCounters
|
||||||
|
data DataMap
|
||||||
|
cache DataMap
|
||||||
|
lock sync.RWMutex
|
||||||
|
activeReads int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStore returns a new mock store ready for use
|
||||||
|
func NewStore(config *StoreConfig) *Store {
|
||||||
|
store := &Store{
|
||||||
|
StoreConfig: *config,
|
||||||
|
data: make(DataMap),
|
||||||
|
}
|
||||||
|
|
||||||
|
store.Reset()
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset reset performance counters and clears the cache
|
||||||
|
func (s *Store) Reset() {
|
||||||
|
s.cache = make(DataMap)
|
||||||
|
s.StoreCounters = StoreCounters{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put stores a value in the mock store at the given epoch
|
||||||
|
func (s *Store) Put(epoch lookup.Epoch, value *Data) {
|
||||||
|
log.Debug("Write: %d-%d, value='%d'\n", epoch.Base(), epoch.Level, value.Payload)
|
||||||
|
s.data[epoch.ID()] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update runs the seed algorithm to place the update in the appropriate epoch
|
||||||
|
func (s *Store) Update(last lookup.Epoch, now uint64, value *Data) lookup.Epoch {
|
||||||
|
epoch := lookup.GetNextEpoch(last, now)
|
||||||
|
s.Put(epoch, value)
|
||||||
|
return epoch
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves data at the specified epoch, simulating a delay
|
||||||
|
func (s *Store) Get(ctx context.Context, epoch lookup.Epoch, now uint64) (value interface{}, err error) {
|
||||||
|
epochID := epoch.ID()
|
||||||
|
var operationTime time.Duration
|
||||||
|
|
||||||
|
defer func() { // simulate a delay according to what has actually happened
|
||||||
|
select {
|
||||||
|
case <-lookup.TimeAfter(operationTime):
|
||||||
|
case <-ctx.Done():
|
||||||
|
s.lock.Lock()
|
||||||
|
s.canceled++
|
||||||
|
s.lock.Unlock()
|
||||||
|
value = nil
|
||||||
|
err = ctx.Err()
|
||||||
|
}
|
||||||
|
s.lock.Lock()
|
||||||
|
s.activeReads--
|
||||||
|
s.lock.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
s.reads++
|
||||||
|
s.activeReads++
|
||||||
|
if s.activeReads > s.maxSimultaneous {
|
||||||
|
s.maxSimultaneous = s.activeReads
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.- Simulate a cache read
|
||||||
|
item := s.cache[epochID]
|
||||||
|
operationTime += s.CacheReadTime
|
||||||
|
|
||||||
|
if item != nil {
|
||||||
|
s.cacheHits++
|
||||||
|
if item.Time <= now {
|
||||||
|
s.successful++
|
||||||
|
return item, nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.- simulate a full read
|
||||||
|
|
||||||
|
item = s.data[epochID]
|
||||||
|
if item != nil {
|
||||||
|
operationTime += s.SuccessfulReadTime
|
||||||
|
s.successful++
|
||||||
|
s.cache[epochID] = item
|
||||||
|
if item.Time <= now {
|
||||||
|
return item, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
operationTime += s.FailedReadTime
|
||||||
|
s.failed++
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeReadFunc returns a read function suitable for the lookup algorithm, mapped
|
||||||
|
// to this mock storage
|
||||||
|
func (s *Store) MakeReadFunc() lookup.ReadFunc {
|
||||||
|
return func(ctx context.Context, epoch lookup.Epoch, now uint64) (interface{}, error) {
|
||||||
|
return s.Get(ctx, epoch, now)
|
||||||
|
}
|
||||||
|
}
|
128
swarm/storage/feed/lookup/timesim_test.go
Normal file
128
swarm/storage/feed/lookup/timesim_test.go
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
package lookup_test
|
||||||
|
|
||||||
|
// This file contains simple time simulation tools for testing
|
||||||
|
// and measuring time-aware algorithms
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Timer tracks information about a simulated timer
|
||||||
|
type Timer struct {
|
||||||
|
deadline time.Time
|
||||||
|
signal chan time.Time
|
||||||
|
id int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stopwatch measures simulated execution time and manages simulated timers
|
||||||
|
type Stopwatch struct {
|
||||||
|
t time.Time
|
||||||
|
resolution time.Duration
|
||||||
|
timers map[int]*Timer
|
||||||
|
timerCounter int
|
||||||
|
stopSignal chan struct{}
|
||||||
|
lock sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStopwatch returns a simulated clock that ticks on `resolution` intervals
|
||||||
|
func NewStopwatch(resolution time.Duration) *Stopwatch {
|
||||||
|
s := &Stopwatch{
|
||||||
|
resolution: resolution,
|
||||||
|
}
|
||||||
|
s.Reset()
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset clears all timers and sents the stopwatch to zero
|
||||||
|
func (s *Stopwatch) Reset() {
|
||||||
|
s.t = time.Time{}
|
||||||
|
s.timers = make(map[int]*Timer)
|
||||||
|
s.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tick advances simulated time by the stopwatch's resolution and triggers
|
||||||
|
// all due timers
|
||||||
|
func (s *Stopwatch) Tick() {
|
||||||
|
s.t = s.t.Add(s.resolution)
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
for id, timer := range s.timers {
|
||||||
|
if s.t.After(timer.deadline) || s.t.Equal(timer.deadline) {
|
||||||
|
timer.signal <- s.t
|
||||||
|
close(timer.signal)
|
||||||
|
delete(s.timers, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTimer returns a new timer that will trigger after `duration` elapses in the
|
||||||
|
// simulation
|
||||||
|
func (s *Stopwatch) NewTimer(duration time.Duration) <-chan time.Time {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
s.timerCounter++
|
||||||
|
timer := &Timer{
|
||||||
|
deadline: s.t.Add(duration),
|
||||||
|
signal: make(chan time.Time, 1),
|
||||||
|
id: s.timerCounter,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.timers[timer.id] = timer
|
||||||
|
return timer.signal
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeAfter returns a simulated timer factory that can replace `time.After`
|
||||||
|
func (s *Stopwatch) TimeAfter() func(d time.Duration) <-chan time.Time {
|
||||||
|
return func(d time.Duration) <-chan time.Time {
|
||||||
|
return s.NewTimer(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Elapsed returns the time that has passed in the simulation
|
||||||
|
func (s *Stopwatch) Elapsed() time.Duration {
|
||||||
|
return s.t.Sub(time.Time{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the time simulation
|
||||||
|
func (s *Stopwatch) Run() {
|
||||||
|
go func() {
|
||||||
|
stopSignal := make(chan struct{})
|
||||||
|
s.lock.Lock()
|
||||||
|
if s.stopSignal != nil {
|
||||||
|
close(s.stopSignal)
|
||||||
|
}
|
||||||
|
s.stopSignal = stopSignal
|
||||||
|
s.lock.Unlock()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(1 * time.Millisecond):
|
||||||
|
s.Tick()
|
||||||
|
case <-stopSignal:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the time simulation
|
||||||
|
func (s *Stopwatch) Stop() {
|
||||||
|
s.lock.Lock()
|
||||||
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
if s.stopSignal != nil {
|
||||||
|
close(s.stopSignal)
|
||||||
|
s.stopSignal = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stopwatch) Measure(measuredFunc func()) time.Duration {
|
||||||
|
s.Reset()
|
||||||
|
s.Run()
|
||||||
|
defer s.Stop()
|
||||||
|
measuredFunc()
|
||||||
|
return s.Elapsed()
|
||||||
|
}
|
@ -30,7 +30,7 @@ func getTestQuery() *Query {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryValues(t *testing.T) {
|
func TestQueryValues(t *testing.T) {
|
||||||
var expected = KV{"hint.level": "25", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"}
|
var expected = KV{"hint.level": "31", "hint.time": "1000", "time": "5000", "topic": "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000", "user": "0x876A8936A7Cd0b79Ef0735AD0896c1AFe278781c"}
|
||||||
|
|
||||||
query := getTestQuery()
|
query := getTestQuery()
|
||||||
testValueSerializer(t, query, expected)
|
testValueSerializer(t, query, expected)
|
||||||
|
@ -223,7 +223,7 @@ func TestUpdateChunkSerializationErrorChecking(t *testing.T) {
|
|||||||
t.Fatalf("error creating update chunk:%s", err)
|
t.Fatalf("error creating update chunk:%s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f5a0ffe0bc27f207cd5b00944c8b9cee93e08b89b5ada777f123ac535189333f174a6a4ca2f43a92c4a477a49d774813c36ce8288552c58e6205b0ac35d0507eb00")
|
compareByteSliceToExpectedHex(t, "chunk", chunk.Data(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f416c206269656e206861636572206a616dc3a173206c652066616c7461207072656d696f9896df5937e64e51a7994479ff3fe0ed790d539b9b3e85e93c0014a8a64374f23603c79d16e99b50a757896d3816d7022ac594ad1415679a9b164afb2e5926d801")
|
||||||
|
|
||||||
var recovered Request
|
var recovered Request
|
||||||
recovered.fromChunk(chunk)
|
recovered.fromChunk(chunk)
|
||||||
|
@ -28,7 +28,7 @@ func getTestFeedUpdate() *Update {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateSerializer(t *testing.T) {
|
func TestUpdateSerializer(t *testing.T) {
|
||||||
testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f")
|
testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce80300000000001f456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateLengthCheck(t *testing.T) {
|
func TestUpdateLengthCheck(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user