diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index be7d903..bba80e8 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -109,8 +109,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { } headerID := header.Hash().String() s.tracker = newTracker(s.recoveryFile, int(params.Workers)) - go s.tracker.run() - go s.tracker.captureSignal() + s.tracker.captureSignal() log.Infof("after goroutines start") diff --git a/pkg/snapshot/tracker.go b/pkg/snapshot/tracker.go index 3bbdec6..e66b8d0 100644 --- a/pkg/snapshot/tracker.go +++ b/pkg/snapshot/tracker.go @@ -22,7 +22,11 @@ type trackedIter struct { func (it *trackedIter) Next(descend bool) bool { ret := it.NodeIterator.Next(descend) if !ret { - it.tracker.stopChan <- it + if it.tracker.running { + it.tracker.stopChan <- it + } else { + log.Errorf("iterator stopped after tracker halted: path=%x", it.Path()) + } } return ret } @@ -34,9 +38,7 @@ type iteratorTracker struct { stopChan chan *trackedIter started map[*trackedIter]struct{} stopped []*trackedIter - - haltChan chan struct{} - done chan struct{} + running bool } func newTracker(file string, buf int) iteratorTracker { @@ -45,8 +47,7 @@ func newTracker(file string, buf int) iteratorTracker { startChan: make(chan *trackedIter, buf), stopChan: make(chan *trackedIter, buf), started: map[*trackedIter]struct{}{}, - haltChan: make(chan struct{}), - done: make(chan struct{}), + running: true, } } @@ -62,23 +63,7 @@ func (tr *iteratorTracker) captureSignal() { }() } -// listens for starts/stops and manages current state -func (tr *iteratorTracker) run() { -loop: - for { - select { - case start := <-tr.startChan: - tr.started[start] = struct{}{} - case stop := <-tr.stopChan: - tr.stopped = append(tr.stopped, stop) - case <-tr.haltChan: - break loop - default: - } - } - tr.done <- struct{}{} -} - +// Wraps an iterator in a trackedIter. This should not be called once halts are possible. func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { ret = &trackedIter{it, tr} tr.startChan <- ret @@ -149,8 +134,7 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) } func (tr *iteratorTracker) haltAndDump() error { - tr.haltChan <- struct{}{} - <-tr.done + tr.running = false // drain any pending events close(tr.startChan)