swarm: LocalStore metrics
* swarm/shed: remove metrics fields from DB struct * swarm/schunk: add String methods to modes * swarm/storage/localstore: add metrics and traces * swarm/chunk: unknown modes without spaces in String methods * swarm/storage/localstore: remove bin number from pull subscription metrics * swarm/storage/localstore: add resetting time metrics and code improvements
This commit is contained in:
parent
993b145f25
commit
c1213bd00c
@ -112,6 +112,19 @@ func Proximity(one, other []byte) (ret int) {
|
|||||||
// ModeGet enumerates different Getter modes.
|
// ModeGet enumerates different Getter modes.
|
||||||
type ModeGet int
|
type ModeGet int
|
||||||
|
|
||||||
|
func (m ModeGet) String() string {
|
||||||
|
switch m {
|
||||||
|
case ModeGetRequest:
|
||||||
|
return "Request"
|
||||||
|
case ModeGetSync:
|
||||||
|
return "Sync"
|
||||||
|
case ModeGetLookup:
|
||||||
|
return "Lookup"
|
||||||
|
default:
|
||||||
|
return "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Getter modes.
|
// Getter modes.
|
||||||
const (
|
const (
|
||||||
// ModeGetRequest: when accessed for retrieval
|
// ModeGetRequest: when accessed for retrieval
|
||||||
@ -125,6 +138,19 @@ const (
|
|||||||
// ModePut enumerates different Putter modes.
|
// ModePut enumerates different Putter modes.
|
||||||
type ModePut int
|
type ModePut int
|
||||||
|
|
||||||
|
func (m ModePut) String() string {
|
||||||
|
switch m {
|
||||||
|
case ModePutRequest:
|
||||||
|
return "Request"
|
||||||
|
case ModePutSync:
|
||||||
|
return "Sync"
|
||||||
|
case ModePutUpload:
|
||||||
|
return "Upload"
|
||||||
|
default:
|
||||||
|
return "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Putter modes.
|
// Putter modes.
|
||||||
const (
|
const (
|
||||||
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
|
// ModePutRequest: when a chunk is received as a result of retrieve request and delivery
|
||||||
@ -138,6 +164,19 @@ const (
|
|||||||
// ModeSet enumerates different Setter modes.
|
// ModeSet enumerates different Setter modes.
|
||||||
type ModeSet int
|
type ModeSet int
|
||||||
|
|
||||||
|
func (m ModeSet) String() string {
|
||||||
|
switch m {
|
||||||
|
case ModeSetAccess:
|
||||||
|
return "Access"
|
||||||
|
case ModeSetSync:
|
||||||
|
return "Sync"
|
||||||
|
case ModeSetRemove:
|
||||||
|
return "Remove"
|
||||||
|
default:
|
||||||
|
return "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Setter modes.
|
// Setter modes.
|
||||||
const (
|
const (
|
||||||
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
|
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
|
||||||
|
@ -45,16 +45,7 @@ const (
|
|||||||
// It provides a schema functionality to store fields and indexes
|
// It provides a schema functionality to store fields and indexes
|
||||||
// information about naming and types.
|
// information about naming and types.
|
||||||
type DB struct {
|
type DB struct {
|
||||||
ldb *leveldb.DB
|
ldb *leveldb.DB
|
||||||
|
|
||||||
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
|
|
||||||
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
|
|
||||||
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
|
|
||||||
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
|
|
||||||
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
|
|
||||||
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
|
|
||||||
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
|
|
||||||
|
|
||||||
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
|
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,13 +77,10 @@ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure meters for DB
|
|
||||||
db.configure(metricsPrefix)
|
|
||||||
|
|
||||||
// Create a quit channel for the periodic metrics collector and run it
|
// Create a quit channel for the periodic metrics collector and run it
|
||||||
db.quit = make(chan struct{})
|
db.quit = make(chan struct{})
|
||||||
|
|
||||||
go db.meter(10 * time.Second)
|
go db.meter(metricsPrefix, 10*time.Second)
|
||||||
|
|
||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
@ -169,19 +157,22 @@ func (db *DB) Close() (err error) {
|
|||||||
return db.ldb.Close()
|
return db.ldb.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure configures the database metrics collectors
|
func (db *DB) meter(prefix string, refresh time.Duration) {
|
||||||
func (db *DB) configure(prefix string) {
|
// Meter for measuring the total time spent in database compaction
|
||||||
// Initialize all the metrics collector at the requested prefix
|
compTimeMeter := metrics.NewRegisteredMeter(prefix+"compact/time", nil)
|
||||||
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
|
// Meter for measuring the data read during compaction
|
||||||
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
|
compReadMeter := metrics.NewRegisteredMeter(prefix+"compact/input", nil)
|
||||||
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
|
// Meter for measuring the data written during compaction
|
||||||
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
|
compWriteMeter := metrics.NewRegisteredMeter(prefix+"compact/output", nil)
|
||||||
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
|
// Meter for measuring the write delay number due to database compaction
|
||||||
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
|
writeDelayMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
|
||||||
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
|
// Meter for measuring the write delay duration due to database compaction
|
||||||
}
|
writeDelayNMeter := metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
|
||||||
|
// Meter for measuring the effective amount of data read
|
||||||
|
diskReadMeter := metrics.NewRegisteredMeter(prefix+"disk/read", nil)
|
||||||
|
// Meter for measuring the effective amount of data written
|
||||||
|
diskWriteMeter := metrics.NewRegisteredMeter(prefix+"disk/write", nil)
|
||||||
|
|
||||||
func (db *DB) meter(refresh time.Duration) {
|
|
||||||
// Create the counters to store current and previous compaction values
|
// Create the counters to store current and previous compaction values
|
||||||
compactions := make([][]float64, 2)
|
compactions := make([][]float64, 2)
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
@ -234,14 +225,14 @@ func (db *DB) meter(refresh time.Duration) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Update all the requested meters
|
// Update all the requested meters
|
||||||
if db.compTimeMeter != nil {
|
if compTimeMeter != nil {
|
||||||
db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
|
compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
|
||||||
}
|
}
|
||||||
if db.compReadMeter != nil {
|
if compReadMeter != nil {
|
||||||
db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
|
compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
|
||||||
}
|
}
|
||||||
if db.compWriteMeter != nil {
|
if compWriteMeter != nil {
|
||||||
db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
|
compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the write delay statistic
|
// Retrieve the write delay statistic
|
||||||
@ -265,11 +256,11 @@ func (db *DB) meter(refresh time.Duration) {
|
|||||||
log.Error("Failed to parse delay duration", "err", err)
|
log.Error("Failed to parse delay duration", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if db.writeDelayNMeter != nil {
|
if writeDelayNMeter != nil {
|
||||||
db.writeDelayNMeter.Mark(delayN - delaystats[0])
|
writeDelayNMeter.Mark(delayN - delaystats[0])
|
||||||
}
|
}
|
||||||
if db.writeDelayMeter != nil {
|
if writeDelayMeter != nil {
|
||||||
db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
|
writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
|
||||||
}
|
}
|
||||||
// If a warning that db is performing compaction has been displayed, any subsequent
|
// If a warning that db is performing compaction has been displayed, any subsequent
|
||||||
// warnings will be withheld for one minute not to overwhelm the user.
|
// warnings will be withheld for one minute not to overwhelm the user.
|
||||||
@ -300,11 +291,11 @@ func (db *DB) meter(refresh time.Duration) {
|
|||||||
log.Error("Bad syntax of write entry", "entry", parts[1])
|
log.Error("Bad syntax of write entry", "entry", parts[1])
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if db.diskReadMeter != nil {
|
if diskReadMeter != nil {
|
||||||
db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
|
diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
|
||||||
}
|
}
|
||||||
if db.diskWriteMeter != nil {
|
if diskWriteMeter != nil {
|
||||||
db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
|
diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
|
||||||
}
|
}
|
||||||
iostats[0], iostats[1] = nRead, nWrite
|
iostats[0], iostats[1] = nRead, nWrite
|
||||||
|
|
||||||
|
@ -17,7 +17,10 @@
|
|||||||
package localstore
|
package localstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
@ -75,6 +78,15 @@ func (db *DB) collectGarbageWorker() {
|
|||||||
// the rest of the garbage as the batch size limit is reached.
|
// the rest of the garbage as the batch size limit is reached.
|
||||||
// This function is called in collectGarbageWorker.
|
// This function is called in collectGarbageWorker.
|
||||||
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
|
func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) {
|
||||||
|
metricName := "localstore.gc"
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
batch := new(leveldb.Batch)
|
batch := new(leveldb.Batch)
|
||||||
target := db.gcTarget()
|
target := db.gcTarget()
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
"github.com/ethereum/go-ethereum/swarm/storage/mock"
|
"github.com/ethereum/go-ethereum/swarm/storage/mock"
|
||||||
@ -388,3 +389,12 @@ func init() {
|
|||||||
return time.Now().UTC().UnixNano()
|
return time.Now().UTC().UnixNano()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// totalTimeMetric logs a message about time between provided start time
|
||||||
|
// and the time when the function is called and sends a resetting timer metric
|
||||||
|
// with provided name appended with ".total-time".
|
||||||
|
func totalTimeMetric(name string, start time.Time) {
|
||||||
|
totalTime := time.Since(start)
|
||||||
|
log.Trace(name+" total time", "time", totalTime)
|
||||||
|
metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime)
|
||||||
|
}
|
||||||
|
@ -18,10 +18,15 @@ package localstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,7 +35,22 @@ import (
|
|||||||
// All required indexes will be updated required by the
|
// All required indexes will be updated required by the
|
||||||
// Getter Mode. Get is required to implement chunk.Store
|
// Getter Mode. Get is required to implement chunk.Store
|
||||||
// interface.
|
// interface.
|
||||||
func (db *DB) Get(_ context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
|
func (db *DB) Get(ctx context.Context, mode chunk.ModeGet, addr chunk.Address) (ch chunk.Chunk, err error) {
|
||||||
|
metricName := fmt.Sprintf("localstore.Get.%s", mode)
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName)
|
||||||
|
defer sp.Finish()
|
||||||
|
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-get", mode.String()))
|
||||||
|
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(fmt.Sprintf(metricName+".error", mode), nil).Inc(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
out, err := db.get(mode, addr)
|
out, err := db.get(mode, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
@ -66,8 +86,14 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
|
|||||||
// for a new goroutine
|
// for a new goroutine
|
||||||
defer func() { <-db.updateGCSem }()
|
defer func() { <-db.updateGCSem }()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricName := "localstore.updateGC"
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
|
||||||
err := db.updateGC(out)
|
err := db.updateGC(out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
|
||||||
log.Error("localstore update gc", "err", err)
|
log.Error("localstore update gc", "err", err)
|
||||||
}
|
}
|
||||||
// if gc update hook is defined, call it
|
// if gc update hook is defined, call it
|
||||||
|
@ -18,11 +18,28 @@ package localstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Has returns true if the chunk is stored in database.
|
// Has returns true if the chunk is stored in database.
|
||||||
func (db *DB) Has(_ context.Context, addr chunk.Address) (bool, error) {
|
func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
|
||||||
return db.retrievalDataIndex.Has(addressToItem(addr))
|
metricName := "localstore.Has"
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName)
|
||||||
|
defer sp.Finish()
|
||||||
|
sp.LogFields(olog.String("ref", addr.String()))
|
||||||
|
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
|
||||||
|
has, err := db.retrievalDataIndex.Has(addressToItem(addr))
|
||||||
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
|
||||||
|
}
|
||||||
|
return has, err
|
||||||
}
|
}
|
||||||
|
@ -18,9 +18,14 @@ package localstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,8 +33,21 @@ import (
|
|||||||
// on the Putter mode, it updates required indexes.
|
// on the Putter mode, it updates required indexes.
|
||||||
// Put is required to implement chunk.Store
|
// Put is required to implement chunk.Store
|
||||||
// interface.
|
// interface.
|
||||||
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
|
func (db *DB) Put(ctx context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
|
||||||
return db.put(mode, chunkToItem(ch))
|
metricName := fmt.Sprintf("localstore.Put.%s", mode)
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName)
|
||||||
|
defer sp.Finish()
|
||||||
|
sp.LogFields(olog.String("ref", ch.Address().String()), olog.String("mode-put", mode.String()))
|
||||||
|
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
|
||||||
|
exists, err = db.put(mode, chunkToItem(ch))
|
||||||
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
|
||||||
|
}
|
||||||
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// put stores Item to database and updates other
|
// put stores Item to database and updates other
|
||||||
|
@ -18,8 +18,13 @@ package localstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -27,8 +32,21 @@ import (
|
|||||||
// chunk represented by the address.
|
// chunk represented by the address.
|
||||||
// Set is required to implement chunk.Store
|
// Set is required to implement chunk.Store
|
||||||
// interface.
|
// interface.
|
||||||
func (db *DB) Set(_ context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
|
func (db *DB) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
|
||||||
return db.set(mode, addr)
|
metricName := fmt.Sprintf("localstore.Set.%s", mode)
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName)
|
||||||
|
defer sp.Finish()
|
||||||
|
sp.LogFields(olog.String("ref", addr.String()), olog.String("mode-set", mode.String()))
|
||||||
|
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
defer totalTimeMetric(metricName, time.Now())
|
||||||
|
|
||||||
|
err = db.set(mode, addr)
|
||||||
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// set updates database indexes for a specific
|
// set updates database indexes for a specific
|
||||||
|
@ -20,10 +20,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
"github.com/syndtr/goleveldb/leveldb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,6 +41,9 @@ import (
|
|||||||
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
|
// Make sure that you check the second returned parameter from the channel to stop iteration when its value
|
||||||
// is false.
|
// is false.
|
||||||
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
|
func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan chunk.Descriptor, stop func()) {
|
||||||
|
metricName := "localstore.SubscribePull"
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
|
||||||
chunkDescriptors := make(chan chunk.Descriptor)
|
chunkDescriptors := make(chan chunk.Descriptor)
|
||||||
trigger := make(chan struct{}, 1)
|
trigger := make(chan struct{}, 1)
|
||||||
|
|
||||||
@ -57,6 +65,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
|
|||||||
var errStopSubscription = errors.New("stop subscription")
|
var errStopSubscription = errors.New("stop subscription")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer metrics.GetOrRegisterCounter(metricName+".stop", nil).Inc(1)
|
||||||
// close the returned chunk.Descriptor channel at the end to
|
// close the returned chunk.Descriptor channel at the end to
|
||||||
// signal that the subscription is done
|
// signal that the subscription is done
|
||||||
defer close(chunkDescriptors)
|
defer close(chunkDescriptors)
|
||||||
@ -77,12 +86,20 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
|
|||||||
// - last index Item is reached
|
// - last index Item is reached
|
||||||
// - subscription stop is called
|
// - subscription stop is called
|
||||||
// - context is done
|
// - context is done
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
|
||||||
|
sp.LogFields(olog.Int("bin", int(bin)), olog.Uint64("since", since), olog.Uint64("until", until))
|
||||||
|
|
||||||
|
iterStart := time.Now()
|
||||||
|
var count int
|
||||||
err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
|
err := db.pullIndex.Iterate(func(item shed.Item) (stop bool, err error) {
|
||||||
select {
|
select {
|
||||||
case chunkDescriptors <- chunk.Descriptor{
|
case chunkDescriptors <- chunk.Descriptor{
|
||||||
Address: item.Address,
|
Address: item.Address,
|
||||||
BinID: item.BinID,
|
BinID: item.BinID,
|
||||||
}:
|
}:
|
||||||
|
count++
|
||||||
// until chunk descriptor is sent
|
// until chunk descriptor is sent
|
||||||
// break the iteration
|
// break the iteration
|
||||||
if until > 0 && item.BinID >= until {
|
if until > 0 && item.BinID >= until {
|
||||||
@ -111,12 +128,25 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
|
|||||||
SkipStartFromItem: !first,
|
SkipStartFromItem: !first,
|
||||||
Prefix: []byte{bin},
|
Prefix: []byte{bin},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
totalTimeMetric(metricName+".iter", iterStart)
|
||||||
|
|
||||||
|
sp.FinishWithOptions(opentracing.FinishOptions{
|
||||||
|
LogRecords: []opentracing.LogRecord{
|
||||||
|
{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Fields: []olog.Field{olog.Int("count", count)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errStopSubscription {
|
if err == errStopSubscription {
|
||||||
// stop subscription without any errors
|
// stop subscription without any errors
|
||||||
// if until is reached
|
// if until is reached
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
|
||||||
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
|
log.Error("localstore pull subscription iteration", "bin", bin, "since", since, "until", until, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -162,6 +192,8 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
|
|||||||
// in pull syncing index for a provided bin. If there are no chunks in
|
// in pull syncing index for a provided bin. If there are no chunks in
|
||||||
// that bin, 0 value is returned.
|
// that bin, 0 value is returned.
|
||||||
func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
|
func (db *DB) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) {
|
||||||
|
metrics.GetOrRegisterCounter("localstore.LastPullSubscriptionBinID", nil).Inc(1)
|
||||||
|
|
||||||
item, err := db.pullIndex.Last([]byte{bin})
|
item, err := db.pullIndex.Last([]byte{bin})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == leveldb.ErrNotFound {
|
if err == leveldb.ErrNotFound {
|
||||||
|
@ -19,10 +19,15 @@ package localstore
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||||
"github.com/ethereum/go-ethereum/swarm/shed"
|
"github.com/ethereum/go-ethereum/swarm/shed"
|
||||||
|
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||||
|
"github.com/opentracing/opentracing-go"
|
||||||
|
olog "github.com/opentracing/opentracing-go/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
|
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
|
||||||
@ -30,6 +35,9 @@ import (
|
|||||||
// the returned channel without any errors. Make sure that you check the second returned parameter
|
// the returned channel without any errors. Make sure that you check the second returned parameter
|
||||||
// from the channel to stop iteration when its value is false.
|
// from the channel to stop iteration when its value is false.
|
||||||
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
|
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
|
||||||
|
metricName := "localstore.SubscribePush"
|
||||||
|
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
|
||||||
|
|
||||||
chunks := make(chan chunk.Chunk)
|
chunks := make(chan chunk.Chunk)
|
||||||
trigger := make(chan struct{}, 1)
|
trigger := make(chan struct{}, 1)
|
||||||
|
|
||||||
@ -44,6 +52,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
|
|||||||
var stopChanOnce sync.Once
|
var stopChanOnce sync.Once
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
|
||||||
// close the returned chunkInfo channel at the end to
|
// close the returned chunkInfo channel at the end to
|
||||||
// signal that the subscription is done
|
// signal that the subscription is done
|
||||||
defer close(chunks)
|
defer close(chunks)
|
||||||
@ -57,6 +66,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
|
|||||||
// - last index Item is reached
|
// - last index Item is reached
|
||||||
// - subscription stop is called
|
// - subscription stop is called
|
||||||
// - context is done
|
// - context is done
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
|
||||||
|
|
||||||
|
ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
|
||||||
|
|
||||||
|
iterStart := time.Now()
|
||||||
|
var count int
|
||||||
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
|
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
|
||||||
// get chunk data
|
// get chunk data
|
||||||
dataItem, err := db.retrievalDataIndex.Get(item)
|
dataItem, err := db.retrievalDataIndex.Get(item)
|
||||||
@ -66,6 +81,7 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
|
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
|
||||||
|
count++
|
||||||
// set next iteration start item
|
// set next iteration start item
|
||||||
// when its chunk is successfully sent to channel
|
// when its chunk is successfully sent to channel
|
||||||
sinceItem = &item
|
sinceItem = &item
|
||||||
@ -87,7 +103,20 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
|
|||||||
// iterator call, skip it in this one
|
// iterator call, skip it in this one
|
||||||
SkipStartFromItem: true,
|
SkipStartFromItem: true,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
totalTimeMetric(metricName+".iter", iterStart)
|
||||||
|
|
||||||
|
sp.FinishWithOptions(opentracing.FinishOptions{
|
||||||
|
LogRecords: []opentracing.LogRecord{
|
||||||
|
{
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
Fields: []olog.Field{olog.Int("count", count)},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
|
||||||
log.Error("localstore push subscription iteration", "err", err)
|
log.Error("localstore push subscription iteration", "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user