Add tracker interface and tests #3

Merged
roysc merged 7 commits from tracker-interface into main 2023-09-26 11:34:42 +00:00
Showing only changes of commit 7625e51fe4 - Show all commits

View File

@ -1,3 +1,27 @@
// This package provides a way to track multiple concurrently running trie iterators, save their
// state to a file on failures or interruptions, and restore them at the positions where they
// stopped.
//
// Example usage:
//
// tr := tracker.New("recovery.txt", 100)
// // ensure the tracker is closed and saves its state
// defer tr.CloseAndSave()
//
// // iterate over the trie, from one or multiple threads
// it := tr.Tracked(tree.NodeIterator(nil))
// for it.Next(true) {
// // ... do work that could fail or be interrupted
// }
//
// // later, restore the iterators
// tr := tracker.New("recovery.txt", 100)
// defer tr.CloseAndSave()
//
// its, err := tr.Restore(tree.NodeIterator)
// for _, it := range its {
// // ... resume traversal
// }
package tracker
import (
@ -12,26 +36,30 @@ import (
iter "github.com/cerc-io/eth-iterator-utils"
)
type Tracker interface {
// IteratorTracker exposes a minimal interface to register and consume iterators.
type IteratorTracker interface {
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, error)
Tracked(trie.NodeIterator) trie.NodeIterator
}
var _ Tracker = &trackerAdaptor{}
var _ IteratorTracker = &Tracker{}
// Wrap the tracker state to only expose NodeIterators
type trackerAdaptor struct {
// Tracker is a trie iterator tracker which saves state to and restores it from a file.
type Tracker struct {
*TrackerImpl
}
// New creates a new tracker which saves state to a given file. bufsize sets the size of the
// channel buffers used internally to manage tracking. Note that passing a bufsize smaller than the expected
// number of concurrent iterators could lead to deadlock.
func New(file string, bufsize uint) *trackerAdaptor {
return &trackerAdaptor{NewImpl(file, bufsize)}
func New(file string, bufsize uint) *Tracker {
return &Tracker{NewImpl(file, bufsize)}
}
func (tr *trackerAdaptor) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
// Restore attempts to read iterator state from the recovery file.
// If the file doesn't exist, returns an empty slice with no error.
// Restored iterators are constructed in the same order they appear in the returned slice.
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
its, err := tr.TrackerImpl.Restore(makeIterator)
if err != nil {
return nil, err
@ -44,9 +72,10 @@ func (tr *trackerAdaptor) Restore(makeIterator iter.IteratorConstructor) ([]trie
return ret, nil
}
func (tr *trackerAdaptor) Tracked(it trie.NodeIterator) trie.NodeIterator {
var trick any = tr.TrackerImpl.Tracked(it)
return trick.(trie.NodeIterator)
// Tracked wraps an iterator in a tracked iterator. This should not be called when the tracker can
// potentially be closed.
func (tr *Tracker) Tracked(it trie.NodeIterator) trie.NodeIterator {
return tr.TrackerImpl.Tracked(it)
}
func NewImpl(file string, bufsize uint) *TrackerImpl {
@ -75,22 +104,12 @@ type Iterator struct {
tracker *TrackerImpl
}
// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible.
func (tr *TrackerImpl) Tracked(it trie.NodeIterator) *Iterator {
ret := &Iterator{it, tr}
tr.startChan <- ret
return ret
}
// StopIterator explicitly stops an iterator
func (tr *TrackerImpl) StopIterator(it *Iterator) {
tr.RLock()
defer tr.RUnlock()
if tr.running {
tr.stopChan <- it
}
}
// Save dumps iterator path and bounds to a text file so it can be restored later.
func (tr *TrackerImpl) Save() error {
log.Debug("Saving recovery state", "to", tr.recoveryFile)
@ -127,9 +146,6 @@ func (tr *TrackerImpl) removeRecoveryFile() error {
return err
}
// Restore attempts to read iterator state from the recovery file.
// If the file doesn't exist, returns an empty slice with no error.
// Restored iterators are constructed in the same order they appear in the returned slice.
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterator, error) {
file, err := os.Open(tr.recoveryFile)
if err != nil {
@ -179,7 +195,8 @@ func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterat
}
// CloseAndSave stops all tracked iterators and dumps their state to a file.
// This closes the tracker, so adding a new iterator will fail.
// This closes the tracker, so adding a new iterator afterwards will fail.
// A new Tracker must be constructed in order to restore state.
func (tr *TrackerImpl) CloseAndSave() error {
tr.Lock()
tr.running = false
@ -211,7 +228,7 @@ func (it *Iterator) Next(descend bool) bool {
if it.tracker.running {
it.tracker.stopChan <- it
} else {
log.Error("Iterator stopped after tracker halted", "path", it.Path())
log.Error("Tracker was closed before iterator finished")
}
}
return ret