Add tracker interface and tests #3

Merged
roysc merged 7 commits from tracker-interface into main 2023-09-26 11:34:42 +00:00
2 changed files with 24 additions and 15 deletions
Showing only changes of commit 8025510126 - Show all commits

View File

@ -38,7 +38,7 @@ import (
// IteratorTracker exposes a minimal interface to register and consume iterators.
type IteratorTracker interface {
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, error)
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, []trie.NodeIterator, error)
Tracked(trie.NodeIterator) trie.NodeIterator
}
@ -57,19 +57,24 @@ func New(file string, bufsize uint) *Tracker {
}
// Restore attempts to read iterator state from the recovery file.
// Returns:
// - slice of tracked iterators
// - slice of iterators originally returned by constructor
// If the file doesn't exist, returns an empty slice with no error.
// Restored iterators are constructed in the same order they appear in the returned slice.
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
its, err := tr.TrackerImpl.Restore(makeIterator)
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) (
[]trie.NodeIterator, []trie.NodeIterator, error,
) {
its, bases, err := tr.TrackerImpl.Restore(makeIterator)
if err != nil {
return nil, err
return nil, nil, err
}
var ret []trie.NodeIterator
for _, it := range its {
ret = append(ret, it)
}
return ret, nil
return ret, bases, nil
}
// Tracked wraps an iterator in a tracked iterator. This should not be called when the tracker can
@ -146,13 +151,15 @@ func (tr *TrackerImpl) removeRecoveryFile() error {
return err
}
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterator, error) {
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) (
[]*Iterator, []trie.NodeIterator, error,
) {
file, err := os.Open(tr.recoveryFile)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
return nil, nil, nil
}
return nil, err
return nil, nil, err
}
defer file.Close()
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
@ -161,10 +168,11 @@ func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterat
in.FieldsPerRecord = 2
rows, err := in.ReadAll()
if err != nil {
return nil, err
return nil, nil, err
}
var ret []*Iterator
var wrapped []*Iterator
var base []trie.NodeIterator
for _, row := range rows {
// pick up where each recovered iterator left off
var recoveredPath []byte
@ -172,12 +180,12 @@ func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterat
if len(row[0]) != 0 {
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
return nil, err
return nil, nil, err
roysc marked this conversation as resolved Outdated

Is it explicit anywhere that you must call CloseAndSave in order to close the channel, set running to false, etc?

What happens if you don't call that, but the iterator completes?

If we need to call it, I think the pattern:

      tr := tracker.New(recoveryFile, NumIters)
      defer tr.CloseAndSave()  // <--- You must do this!

Needs to be documented explicitly somewhere (unless I am just missing something).

Is it explicit anywhere that you *must* call CloseAndSave in order to close the channel, set running to false, etc? What happens if you don't call that, but the iterator completes? If we need to call it, I think the pattern: ``` tr := tracker.New(recoveryFile, NumIters) defer tr.CloseAndSave() // <--- You must do this! ``` Needs to be documented explicitly somewhere (unless I am just missing something).
Outdated
Review

If it's not called, it should just be sort of a leak. The channels won't be flushed or saved, but once all its iterators die, the tracker will still be gc'd. No critical consequences that I foresee, but I will document it.

If it's not called, it should just be sort of a leak. The channels won't be flushed or saved, but once all its iterators die, the tracker will still be gc'd. No critical consequences that I foresee, but I will document it.
}
}
if len(row[1]) != 0 {
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
return nil, err
return nil, nil, err
}
}
@ -188,10 +196,11 @@ func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterat
}
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
boundIt := iter.NewPrefixBoundIterator(it, endPath)
ret = append(ret, tr.Tracked(boundIt))
wrapped = append(wrapped, tr.Tracked(boundIt))
base = append(base, it)
}
return ret, tr.removeRecoveryFile()
return wrapped, base, tr.removeRecoveryFile()
}
// CloseAndSave stops all tracked iterators and dumps their state to a file.

View File

@ -47,7 +47,7 @@ func TestTracker(t *testing.T) {
}
tr := tracker.New(recoveryFile, NumIters)
its, err := tr.Restore(tree.NodeIterator)
its, _, err := tr.Restore(tree.NodeIterator)
if err != nil {
t.Fatal(err)
}