diff --git a/cmd/swarm/swarm-snapshot/create_test.go b/cmd/swarm/swarm-snapshot/create_test.go index 17745af5d..4cd78f35a 100644 --- a/cmd/swarm/swarm-snapshot/create_test.go +++ b/cmd/swarm/swarm-snapshot/create_test.go @@ -33,7 +33,7 @@ import ( // It runs a few "create" commands with different flag values and loads generated // snapshot files to validate their content. func TestSnapshotCreate(t *testing.T) { - t.Skip("todo: fix this") + t.Skip("test is flaky. disabling until underlying problem is addressed") for _, v := range []struct { name string diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index 17f49348b..c44292bb9 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -197,7 +197,7 @@ func (m ModeSet) String() string { const ( // ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery ModeSetAccess ModeSet = iota - // ModeSetSync: when push sync receipt is received + // ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received ModeSetSync // ModeSetRemove: when a chunk is removed ModeSetRemove diff --git a/swarm/network/simulation/kademlia_test.go b/swarm/network/simulation/kademlia_test.go index 0ac1e7803..4d7dc6240 100644 --- a/swarm/network/simulation/kademlia_test.go +++ b/swarm/network/simulation/kademlia_test.go @@ -156,6 +156,7 @@ func createSimServiceMap(discovery bool) map[string]ServiceFunc { // Call WaitTillSnapshotRecreated() function and wait until it returns // Iterate the nodes and check if all the connections are successfully recreated func TestWaitTillSnapshotRecreated(t *testing.T) { + t.Skip("test is flaky. disabling until underlying problem is addressed") var err error sim := New(createSimServiceMap(true)) _, err = sim.AddNodesAndConnectRing(16) diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 043192903..47320e860 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -21,7 +21,9 @@ import ( "strconv" "time" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -34,27 +36,29 @@ const ( // * live request delivery with or without checkback // * (live/non-live historical) chunk syncing per proximity bin type SwarmSyncerServer struct { - po uint8 - netStore *storage.NetStore - quit chan struct{} + correlateId string //used for logging + po uint8 + netStore *storage.NetStore + quit chan struct{} } // NewSwarmSyncerServer is constructor for SwarmSyncerServer -func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore) (*SwarmSyncerServer, error) { +func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error) { return &SwarmSyncerServer{ - po: po, - netStore: netStore, - quit: make(chan struct{}), + correlateId: correlateId, + po: po, + netStore: netStore, + quit: make(chan struct{}), }, nil } func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore) { - streamer.RegisterServerFunc("SYNC", func(_ *Peer, t string, _ bool) (Server, error) { + streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, _ bool) (Server, error) { po, err := ParseSyncBinKey(t) if err != nil { return nil, err } - return NewSwarmSyncerServer(po, netStore) + return NewSwarmSyncerServer(po, netStore, p.ID().String()+"|"+string(po)) }) // streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { // return NewOutgoingProvableSwarmSyncer(po, db) @@ -92,7 +96,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 if from > 0 { from-- } - + batchStart := time.Now() descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() @@ -106,7 +110,10 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 timer *time.Timer timerC <-chan time.Time ) + defer func() { + metrics.GetOrRegisterResettingTimer("syncer.set-next-batch.total-time", nil).UpdateSince(batchStart) + metrics.GetOrRegisterCounter("syncer.set-next-batch.batch-size", nil).Inc(int64(batchSize)) if timer != nil { timer.Stop() } @@ -125,6 +132,8 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // validating that the chunk is successfully stored by the peer. err := s.netStore.Set(context.Background(), chunk.ModeSetSync, d.Address) if err != nil { + metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) + log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) return nil, 0, 0, nil, err } batchSize++ @@ -136,13 +145,17 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 batchEndID = d.BinID if batchSize >= BatchSize { iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.full-batch", nil).Inc(1) + log.Debug("syncer pull subscription - batch size reached", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } if timer == nil { timer = time.NewTimer(batchTimeout) } else { + log.Debug("syncer pull subscription - stopping timer", "correlateId", s.correlateId) if !timer.Stop() { <-timer.C } + log.Debug("syncer pull subscription - channel drained, resetting timer", "correlateId", s.correlateId) timer.Reset(batchTimeout) } timerC = timer.C @@ -150,8 +163,12 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // return batch if new chunks are not // received after some time iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.timer-expire", nil).Inc(1) + log.Debug("syncer pull subscription timer expired", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) case <-s.quit: iterate = false + metrics.GetOrRegisterCounter("syncer.set-next-batch.quit-sig", nil).Inc(1) + log.Debug("syncer pull subscription - quit received", "correlateId", s.correlateId, "batchSize", batchSize, "batchStartID", batchStartID, "batchEndID", batchEndID) } } if batchStartID == nil { diff --git a/swarm/storage/localstore/gc.go b/swarm/storage/localstore/gc.go index 28c7b6db9..748e0d663 100644 --- a/swarm/storage/localstore/gc.go +++ b/swarm/storage/localstore/gc.go @@ -98,12 +98,17 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if err != nil { return 0, true, err } + metrics.GetOrRegisterGauge(metricName+".gcsize", nil).Update(int64(gcSize)) done = true err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) { if gcSize-collectedCount <= target { return true, nil } + + metrics.GetOrRegisterGauge(metricName+".storets", nil).Update(item.StoreTimestamp) + metrics.GetOrRegisterGauge(metricName+".accessts", nil).Update(item.AccessTimestamp) + // delete from retrieve, pull, gc db.retrievalDataIndex.DeleteInBatch(batch, item) db.retrievalAccessIndex.DeleteInBatch(batch, item) @@ -121,11 +126,13 @@ func (db *DB) collectGarbage() (collectedCount uint64, done bool, err error) { if err != nil { return 0, false, err } + metrics.GetOrRegisterCounter(metricName+".collected-count", nil).Inc(int64(collectedCount)) db.gcSize.PutInBatch(batch, gcSize-collectedCount) err = db.shed.WriteBatch(batch) if err != nil { + metrics.GetOrRegisterCounter(metricName+".writebatch.err", nil).Inc(1) return 0, false, err } return collectedCount, done, nil