diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 2455904f3..9ae59c95f 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) { // ModeGet enumerates different Getter modes. type ModeGet int +func (m ModeGet) String() string { + switch m { + case ModeGetRequest: + return "Request" + case ModeGetSync: + return "Sync" + case ModeGetLookup: + return "Lookup" + default: + return "Unknown" + } +} + // Getter modes. const ( // ModeGetRequest: when accessed for retrieval @@ -125,6 +138,19 @@ const ( // ModePut enumerates different Putter modes. type ModePut int +func (m ModePut) String() string { + switch m { + case ModePutRequest: + return "Request" + case ModePutSync: + return "Sync" + case ModePutUpload: + return "Upload" + default: + return "Unknown" + } +} + // Putter modes. const ( // ModePutRequest: when a chunk is received as a result of retrieve request and delivery @@ -138,6 +164,19 @@ const ( // ModeSet enumerates different Setter modes. type ModeSet int +func (m ModeSet) String() string { + switch m { + case ModeSetAccess: + return "Access" + case ModeSetSync: + return "Sync" + case ModeSetRemove: + return "Remove" + default: + return "Unknown" + } +} + // Setter modes. const ( // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery diff --git a/swarm/shed/db.go b/swarm/shed/db.go index 8c11bf48b..6fc520866 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -45,16 +45,7 @@ const ( // It provides a schema functionality to store fields and indexes // information about naming and types. type DB struct { - ldb *leveldb.DB - - compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction - compReadMeter metrics.Meter // Meter for measuring the data read during compaction - compWriteMeter metrics.Meter // Meter for measuring the data written during compaction - writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction - writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction - diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read - diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written - + ldb *leveldb.DB quit chan struct{} // Quit channel to stop the metrics collection before closing the database } @@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) { } } - // Configure meters for DB - db.configure(metricsPrefix) - // Create a quit channel for the periodic metrics collector and run it db.quit = make(chan struct{}) - go db.meter(10 * time.Second) + go db.meter(metricsPrefix, 10*time.Second) return db, nil } @@ -169,19 +157,22 @@ func (db *DB) Close() (err error) { return db.ldb.Close() } -// Configure configures the database metrics collectors -func (db *DB) configure(prefix string) { - // Initialize all the metrics collector at the requested prefix - db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil) - db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil) - db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil) - db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil) - db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil) - db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) - db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) -} +func (db *DB) meter(prefix string, refresh time.Duration) { + // Meter for measuring the total time spent in database compaction + compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil) + // Meter for measuring the data read during compaction + compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil) + // Meter for measuring the data written during compaction + compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil) + // Meter for measuring the write delay number due to database compaction + writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil) + // Meter for measuring the write delay duration due to database compaction + writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil) + // Meter for measuring the effective amount of data read + diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil) + // Meter for measuring the effective amount of data written + diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil) -func (db *DB) meter(refresh time.Duration) { // Create the counters to store current and previous compaction values compactions := make([][]float64, 2) for i := 0; i < 2; i++ { @@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) { } } // Update all the requested meters - if db.compTimeMeter != nil { - db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) + if compTimeMeter != nil { + compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000)) } - if db.compReadMeter != nil { - db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) + if compReadMeter != nil { + compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024)) } - if db.compWriteMeter != nil { - db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) + if compWriteMeter != nil { + compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024)) } // Retrieve the write delay statistic @@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) { log.Error("Failed to parse delay duration", "err", err) continue } - if db.writeDelayNMeter != nil { - db.writeDelayNMeter.Mark(delayN - delaystats[0]) + if writeDelayNMeter != nil { + writeDelayNMeter.Mark(delayN - delaystats[0]) } - if db.writeDelayMeter != nil { - db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) + if writeDelayMeter != nil { + writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1]) } // If a warning that db is performing compaction has been displayed, any subsequent // warnings will be withheld for one minute not to overwhelm the user. @@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) { log.Error("Bad syntax of write entry", "entry", parts[1]) continue } - if db.diskReadMeter != nil { - db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) + if diskReadMeter != nil { + diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024)) } - if db.diskWriteMeter != nil { - db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) + if diskWriteMeter != nil { + diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024)) } iostats[0], iostats[1] = nRead, nWrite diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go index 84c4f596d..28c7b6db9 100644 --- a/swarm/storage/localstore/gc.go +++ b/swarm/storage/localstore/gc.go @@ -17,7 +17,10 @@ package localstore import ( + "time" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/syndtr/goleveldb/leveldb" ) @@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() { // the rest of the garbage as the batch size limit is reached. // This function is called in collectGarbageWorker. func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { + metricName := "localstore.gc" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + }() + batch := new(leveldb.Batch) target := db.gcTarget() diff --git a/swarm/storage/localstore/localstore.go b/swarm/storage/localstore/localstore.go index 56a6d10e6..c32d2972d 100644 --- a/swarm/storage/localstore/localstore.go +++ b/swarm/storage/localstore/localstore.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" "github.com/ethereum/go-ethereum/swarm/storage/mock" @@ -388,3 +389,12 @@ func init() { return time.Now().UTC().UnixNano() } } + +// totalTimeMetric logs a message about time between provided start time +// and the time when the function is called and sends a resetting timer metric +// with provided name appended with ".total-time". +func totalTimeMetric(name string, start time.Time) { + totalTime := time.Since(start) + log.Trace(name+" total time", "time", totalTime) + metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) +} diff --git a/swarm/storage/localstore/mode_get.go b/swarm/storage/localstore/mode_get.go index 0df0e9b7d..48603550c 100644 --- a/swarm/storage/localstore/mode_get.go +++ b/swarm/storage/localstore/mode_get.go @@ -18,10 +18,15 @@ package localstore import ( "context" + "fmt" + "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" ) @@ -30,7 +35,22 @@ import ( // All required indexes will be updated required by the // Getter Mode. Get is required to implement chunk.Store // interface. -func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) { +func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) { + metricName := fmt.Sprintf("localstore.Get.%s", mode) + + ctx, sp := spancontext.StartSpan(ctx, metricName) + defer sp.Finish() + sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String())) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + defer func() { + if err != nil { + metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1) + } + }() + out, err := db.get(mode, addr) if err != nil { if err == leveldb.ErrNotFound { @@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er // for a new goroutine defer func() { <-db.updateGCSem }() } + + metricName := "localstore.updateGC" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + err := db.updateGC(out) if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) log.Error("localstore update gc", "err", err) } // if gc update hook is defined, call it diff --git a/swarm/storage/localstore/mode_has.go b/swarm/storage/localstore/mode_has.go index fea8a50bf..ae1a8970a 100644 --- a/swarm/storage/localstore/mode_has.go +++ b/swarm/storage/localstore/mode_has.go @@ -18,11 +18,28 @@ package localstore import ( "context" + "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/spancontext" + olog "github.com/opentracing/opentracing-go/log" ) // Has returns true if the chunk is stored in database. -func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) { - return db.retrievalDataIndex.Has(addressToItem(addr)) +func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) { + metricName := "localstore.Has" + + ctx, sp := spancontext.StartSpan(ctx, metricName) + defer sp.Finish() + sp.LogFields(olog.String("ref", addr.String())) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + has, err := db.retrievalDataIndex.Has(addressToItem(addr)) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + return has, err } diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go index 488e4d8e1..c91a394a0 100644 --- a/swarm/storage/localstore/mode_put.go +++ b/swarm/storage/localstore/mode_put.go @@ -18,9 +18,14 @@ package localstore import ( "context" + "fmt" + "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" ) @@ -28,8 +33,21 @@ import ( // on the Putter mode, it updates required indexes. // Put is required to implement chunk.Store // interface. -func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) { - return db.put(mode, chunkToItem(ch)) +func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) { + metricName := fmt.Sprintf("localstore.Put.%s", mode) + + ctx, sp := spancontext.StartSpan(ctx, metricName) + defer sp.Finish() + sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String())) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + exists, err = db.put(mode, chunkToItem(ch)) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + return exists, err } // put stores Item to database and updates other diff --git a/swarm/storage/localstore/mode_set.go b/swarm/storage/localstore/mode_set.go index 13e98d1ec..7edfa6703 100644 --- a/swarm/storage/localstore/mode_set.go +++ b/swarm/storage/localstore/mode_set.go @@ -18,8 +18,13 @@ package localstore import ( "context" + "fmt" + "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/spancontext" + olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" ) @@ -27,8 +32,21 @@ import ( // chunk represented by the address. // Set is required to implement chunk.Store // interface. -func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) { - return db.set(mode, addr) +func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) { + metricName := fmt.Sprintf("localstore.Set.%s", mode) + + ctx, sp := spancontext.StartSpan(ctx, metricName) + defer sp.Finish() + sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String())) + + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + defer totalTimeMetric(metricName, time.Now()) + + err = db.set(mode, addr) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1) + } + return err } // set updates database indexes for a specific diff --git a/swarm/storage/localstore/subscription_pull.go b/swarm/storage/localstore/subscription_pull.go index fd81b045b..7a18141b3 100644 --- a/swarm/storage/localstore/subscription_pull.go +++ b/swarm/storage/localstore/subscription_pull.go @@ -20,10 +20,15 @@ import ( "context" "errors" "sync" + "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" ) @@ -36,6 +41,9 @@ import ( // Make sure that you check the second returned parameter from the channel to stop iteration when its value // is false. func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) { + metricName := "localstore.SubscribePull" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + chunkDescriptors := make(chan chunk.Descriptor) trigger := make(chan struct{}, 1) @@ -57,6 +65,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) var errStopSubscription = errors.New("stop subscription") go func() { + defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1) // close the returned chunk.Descriptor channel at the end to // signal that the subscription is done defer close(chunkDescriptors) @@ -77,12 +86,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) // - last index Item is reached // - subscription stop is called // - context is done + metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1) + + ctx, sp := spancontext.StartSpan(ctx, metricName+".iter") + sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until)) + + iterStart := time.Now() + var count int err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) { select { case chunkDescriptors <- chunk.Descriptor{ Address: item.Address, BinID: item.BinID, }: + count++ // until chunk descriptor is sent // break the iteration if until > 0 && item.BinID >= until { @@ -111,12 +128,25 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) SkipStartFromItem: !first, Prefix: []byte{bin}, }) + + totalTimeMetric(metricName+".iter", iterStart) + + sp.FinishWithOptions(opentracing.FinishOptions{ + LogRecords: []opentracing.LogRecord{ + { + Timestamp: time.Now(), + Fields: []olog.Field{olog.Int("count", count)}, + }, + }, + }) + if err != nil { if err == errStopSubscription { // stop subscription without any errors // if until is reached return } + metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1) log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err) return } @@ -162,6 +192,8 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) // in pull syncing index for a provided bin. If there are no chunks in // that bin, 0 value is returned. func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) { + metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1) + item, err := db.pullIndex.Last([]byte{bin}) if err != nil { if err == leveldb.ErrNotFound { diff --git a/swarm/storage/localstore/subscription_push.go b/swarm/storage/localstore/subscription_push.go index 5cbc2eb6f..f2463af2a 100644 --- a/swarm/storage/localstore/subscription_push.go +++ b/swarm/storage/localstore/subscription_push.go @@ -19,10 +19,15 @@ package localstore import ( "context" "sync" + "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/opentracing/opentracing-go" + olog "github.com/opentracing/opentracing-go/log" ) // SubscribePush returns a channel that provides storage chunks with ordering from push syncing index. @@ -30,6 +35,9 @@ import ( // the returned channel without any errors. Make sure that you check the second returned parameter // from the channel to stop iteration when its value is false. func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) { + metricName := "localstore.SubscribePush" + metrics.GetOrRegisterCounter(metricName, nil).Inc(1) + chunks := make(chan chunk.Chunk) trigger := make(chan struct{}, 1) @@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun var stopChanOnce sync.Once go func() { + defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1) // close the returned chunkInfo channel at the end to // signal that the subscription is done defer close(chunks) @@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun // - last index Item is reached // - subscription stop is called // - context is done + metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1) + + ctx, sp := spancontext.StartSpan(ctx, metricName+".iter") + + iterStart := time.Now() + var count int err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) { // get chunk data dataItem, err := db.retrievalDataIndex.Get(item) @@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun select { case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data): + count++ // set next iteration start item // when its chunk is successfully sent to channel sinceItem = &item @@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun // iterator call, skip it in this one SkipStartFromItem: true, }) + + totalTimeMetric(metricName+".iter", iterStart) + + sp.FinishWithOptions(opentracing.FinishOptions{ + LogRecords: []opentracing.LogRecord{ + { + Timestamp: time.Now(), + Fields: []olog.Field{olog.Int("count", count)}, + }, + }, + }) + if err != nil { + metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1) log.Error("localstore push subscription iteration", "err", err) return }