Add tracker interface and tests #3
2
go.mod
2
go.mod
@ -57,7 +57,7 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
|
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.3.1
|
||||||
// Not strictly necessary, but tells go what dependency versions should be
|
// Not strictly necessary, but tells go what dependency versions should be
|
||||||
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@ -1,6 +1,6 @@
|
|||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
|
git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0 h1:fWAvsSiuDqveuxwnfc8psInfLZhMqHlQnmOpOHsd8Tk=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
||||||
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
||||||
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
|
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
|
||||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||||
|
39
internal/test_helper.go
Normal file
39
internal/test_helper.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/cerc-io/eth-testing/chaindata/small2"
|
||||||
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
FixtureNodePaths = small2.Block1_StateNodePaths
|
||||||
|
FixtureLeafKeys = small2.Block1_StateNodeLeafKeys
|
||||||
|
)
|
||||||
|
|
||||||
|
func OpenFixtureTrie(t *testing.T, height uint64) (state.Trie, ethdb.Database) {
|
||||||
|
data := small2.ChainData
|
||||||
|
kvdb, ldberr := rawdb.NewLevelDBDatabase(data.ChainData, 1024, 256, t.Name(), true)
|
||||||
|
if ldberr != nil {
|
||||||
|
t.Fatal(ldberr)
|
||||||
|
}
|
||||||
|
edb, err := rawdb.NewDatabaseWithFreezer(kvdb, data.Ancient, t.Name(), true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := rawdb.ReadCanonicalHash(edb, height)
|
||||||
|
header := rawdb.ReadHeader(edb, hash, height)
|
||||||
|
if header == nil {
|
||||||
|
t.Fatalf("unable to read canonical header at height %d", height)
|
||||||
|
}
|
||||||
|
sdb := state.NewDatabase(edb)
|
||||||
|
tree, err := sdb.OpenTrie(header.Root)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return tree, edb
|
||||||
|
}
|
22
iterator.go
22
iterator.go
@ -23,16 +23,21 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes)
|
|
||||||
type PrefixBoundIterator struct {
|
|
||||||
trie.NodeIterator
|
|
||||||
EndPath []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this
|
// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this
|
||||||
// code from the trie implementation.
|
// code from the trie implementation.
|
||||||
type IteratorConstructor = func(startKey []byte) trie.NodeIterator
|
type IteratorConstructor = func(startKey []byte) trie.NodeIterator
|
||||||
|
|
||||||
|
// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes)
|
||||||
|
type PrefixBoundIterator struct {
|
||||||
|
trie.NodeIterator
|
||||||
|
StartPath, EndPath []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix)
|
||||||
|
func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator {
|
||||||
|
return &PrefixBoundIterator{NodeIterator: it, StartPath: it.Path(), EndPath: to}
|
||||||
|
}
|
||||||
|
|
||||||
func (it *PrefixBoundIterator) Next(descend bool) bool {
|
func (it *PrefixBoundIterator) Next(descend bool) bool {
|
||||||
if it.EndPath == nil {
|
if it.EndPath == nil {
|
||||||
return it.NodeIterator.Next(descend)
|
return it.NodeIterator.Next(descend)
|
||||||
@ -49,9 +54,8 @@ func (it *PrefixBoundIterator) Next(descend bool) bool {
|
|||||||
return bytes.Compare(it.Path(), it.EndPath) <= 0
|
return bytes.Compare(it.Path(), it.EndPath) <= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix)
|
func (it *PrefixBoundIterator) Bounds() ([]byte, []byte) {
|
||||||
func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator {
|
return it.StartPath, it.EndPath
|
||||||
return &PrefixBoundIterator{NodeIterator: it, EndPath: to}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// generates nibble slice prefixes at uniform intervals
|
// generates nibble slice prefixes at uniform intervals
|
||||||
|
@ -5,11 +5,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
|
||||||
|
|
||||||
iter "github.com/cerc-io/eth-iterator-utils"
|
iter "github.com/cerc-io/eth-iterator-utils"
|
||||||
fixture "github.com/cerc-io/eth-testing/chaindata/medium"
|
"github.com/cerc-io/eth-iterator-utils/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMakePaths(t *testing.T) {
|
func TestMakePaths(t *testing.T) {
|
||||||
@ -24,30 +21,8 @@ func TestMakePaths(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestIterator(t *testing.T) {
|
func TestIterator(t *testing.T) {
|
||||||
kvdb, ldberr := rawdb.NewLevelDBDatabase(fixture.ChainDataPath, 1024, 256, "vdb-geth", true)
|
tree, edb := internal.OpenFixtureTrie(t, 1)
|
||||||
if ldberr != nil {
|
t.Cleanup(func() { edb.Close() })
|
||||||
t.Fatal(ldberr)
|
|
||||||
}
|
|
||||||
edb, err := rawdb.NewDatabaseWithFreezer(kvdb, fixture.AncientDataPath, "vdb-geth", true)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer edb.Close()
|
|
||||||
|
|
||||||
height := uint64(1)
|
|
||||||
hash := rawdb.ReadCanonicalHash(edb, height)
|
|
||||||
header := rawdb.ReadHeader(edb, hash, height)
|
|
||||||
if header == nil {
|
|
||||||
t.Fatalf("unable to read canonical header at height %d", height)
|
|
||||||
}
|
|
||||||
sdb := state.NewDatabase(edb)
|
|
||||||
tree, err := sdb.OpenTrie(header.Root)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("in bounds", func(t *testing.T) {
|
t.Run("in bounds", func(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
@ -79,7 +54,7 @@ func TestIterator(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("trie is covered", func(t *testing.T) {
|
t.Run("trie is covered", func(t *testing.T) {
|
||||||
allPaths := fixture.Block1_Paths
|
allPaths := internal.FixtureNodePaths
|
||||||
cases := []uint{1, 2, 4, 8, 16, 32}
|
cases := []uint{1, 2, 4, 8, 16, 32}
|
||||||
runCase := func(t *testing.T, nbins uint) {
|
runCase := func(t *testing.T, nbins uint) {
|
||||||
iters := iter.SubtrieIterators(tree.NodeIterator, nbins)
|
iters := iter.SubtrieIterators(tree.NodeIterator, nbins)
|
||||||
|
@ -1,12 +1,34 @@
|
|||||||
|
// This package provides a way to track multiple concurrently running trie iterators, save their
|
||||||
|
// state to a file on failures or interruptions, and restore them at the positions where they
|
||||||
|
// stopped.
|
||||||
|
//
|
||||||
|
// Example usage:
|
||||||
|
//
|
||||||
|
// tr := tracker.New("recovery.txt", 100)
|
||||||
|
// // Ensure the tracker is closed and saves its state
|
||||||
|
// defer tr.CloseAndSave()
|
||||||
|
//
|
||||||
|
// // Iterate over the trie, from one or multiple threads
|
||||||
|
// it := tr.Tracked(tree.NodeIterator(nil))
|
||||||
|
// for it.Next(true) {
|
||||||
|
// // ... do work that could fail or be interrupted
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Later, restore the iterators
|
||||||
|
// tr := tracker.New("recovery.txt", 100)
|
||||||
|
// defer tr.CloseAndSave()
|
||||||
|
//
|
||||||
|
// its, err := tr.Restore(tree.NodeIterator)
|
||||||
|
// for _, it := range its {
|
||||||
|
// // ... resume traversal
|
||||||
|
// }
|
||||||
package tracker
|
package tracker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"sync"
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
@ -14,23 +36,55 @@ import (
|
|||||||
iter "github.com/cerc-io/eth-iterator-utils"
|
iter "github.com/cerc-io/eth-iterator-utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IteratorTracker exposes a minimal interface to register and consume iterators.
|
||||||
|
type IteratorTracker interface {
|
||||||
|
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, []trie.NodeIterator, error)
|
||||||
|
Tracked(trie.NodeIterator) trie.NodeIterator
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ IteratorTracker = &Tracker{}
|
||||||
|
|
||||||
|
// Tracker is a trie iterator tracker which saves state to and restores it from a file.
|
||||||
type Tracker struct {
|
type Tracker struct {
|
||||||
recoveryFile string
|
*TrackerImpl
|
||||||
|
|
||||||
startChan chan *Iterator
|
|
||||||
stopChan chan *Iterator
|
|
||||||
started map[*Iterator]struct{}
|
|
||||||
stopped []*Iterator
|
|
||||||
running bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Iterator struct {
|
// New creates a new tracker which saves state to a given file. bufsize sets the size of the
|
||||||
trie.NodeIterator
|
// channel buffers used internally to manage tracking. Note that passing a bufsize smaller than the expected
|
||||||
tracker *Tracker
|
// number of concurrent iterators could lead to deadlock.
|
||||||
|
func New(file string, bufsize uint) *Tracker {
|
||||||
|
return &Tracker{NewImpl(file, bufsize)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(file string, bufsize uint) Tracker {
|
// Restore attempts to read iterator state from the recovery file.
|
||||||
return Tracker{
|
// Returns:
|
||||||
|
// - slice of tracked iterators
|
||||||
|
// - slice of iterators originally returned by constructor
|
||||||
|
// If the file doesn't exist, returns an empty slice with no error.
|
||||||
|
// Restored iterators are constructed in the same order they appear in the returned slice.
|
||||||
|
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) (
|
||||||
|
[]trie.NodeIterator, []trie.NodeIterator, error,
|
||||||
|
) {
|
||||||
|
its, bases, err := tr.TrackerImpl.Restore(makeIterator)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret []trie.NodeIterator
|
||||||
|
for _, it := range its {
|
||||||
|
ret = append(ret, it)
|
||||||
|
}
|
||||||
|
return ret, bases, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tracked wraps an iterator in a tracked iterator. This should not be called when the tracker can
|
||||||
|
// potentially be closed.
|
||||||
|
func (tr *Tracker) Tracked(it trie.NodeIterator) trie.NodeIterator {
|
||||||
|
return tr.TrackerImpl.Tracked(it)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImpl(file string, bufsize uint) *TrackerImpl {
|
||||||
|
return &TrackerImpl{
|
||||||
recoveryFile: file,
|
recoveryFile: file,
|
||||||
startChan: make(chan *Iterator, bufsize),
|
startChan: make(chan *Iterator, bufsize),
|
||||||
stopChan: make(chan *Iterator, bufsize),
|
stopChan: make(chan *Iterator, bufsize),
|
||||||
@ -39,41 +93,40 @@ func New(file string, bufsize uint) Tracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) {
|
type TrackerImpl struct {
|
||||||
sigChan := make(chan os.Signal, 1)
|
recoveryFile string
|
||||||
|
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
startChan chan *Iterator
|
||||||
go func() {
|
stopChan chan *Iterator
|
||||||
sig := <-sigChan
|
started map[*Iterator]struct{}
|
||||||
log.Error("Signal received (%v), stopping", "signal", sig)
|
stopped []*Iterator
|
||||||
// Cancel context on receiving a signal. On cancellation, all tracked iterators complete
|
running bool
|
||||||
// processing of their current node before stopping.
|
sync.RWMutex // guards closing of the tracker
|
||||||
cancelCtx()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible.
|
type Iterator struct {
|
||||||
func (tr *Tracker) Tracked(it trie.NodeIterator) (ret *Iterator) {
|
trie.NodeIterator
|
||||||
ret = &Iterator{it, tr}
|
tracker *TrackerImpl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TrackerImpl) Tracked(it trie.NodeIterator) *Iterator {
|
||||||
|
ret := &Iterator{it, tr}
|
||||||
tr.startChan <- ret
|
tr.startChan <- ret
|
||||||
return
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopIterator explicitly stops an iterator
|
// Save dumps iterator path and bounds to a text file so it can be restored later.
|
||||||
func (tr *Tracker) StopIterator(it *Iterator) {
|
func (tr *TrackerImpl) Save() error {
|
||||||
tr.stopChan <- it
|
log.Debug("Saving recovery state", "to", tr.recoveryFile)
|
||||||
}
|
|
||||||
|
|
||||||
// dumps iterator path and bounds to a text file so it can be restored later
|
// if the tracker state is empty, erase any existing recovery file
|
||||||
func (tr *Tracker) dump() error {
|
if len(tr.started) == 0 {
|
||||||
log.Debug("Dumping recovery state", "to", tr.recoveryFile)
|
return tr.removeRecoveryFile()
|
||||||
var rows [][]string
|
|
||||||
for it := range tr.started {
|
|
||||||
var endPath []byte
|
|
||||||
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
|
||||||
endPath = impl.EndPath
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var rows [][]string
|
||||||
|
for it := range tr.started {
|
||||||
|
_, endPath := it.Bounds()
|
||||||
rows = append(rows, []string{
|
rows = append(rows, []string{
|
||||||
fmt.Sprintf("%x", it.Path()),
|
fmt.Sprintf("%x", it.Path()),
|
||||||
fmt.Sprintf("%x", endPath),
|
fmt.Sprintf("%x", endPath),
|
||||||
@ -90,16 +143,23 @@ func (tr *Tracker) dump() error {
|
|||||||
return out.WriteAll(rows)
|
return out.WriteAll(rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore attempts to read iterator state from the recovery file.
|
func (tr *TrackerImpl) removeRecoveryFile() error {
|
||||||
// If the file doesn't exist, returns an empty slice with no error.
|
err := os.Remove(tr.recoveryFile)
|
||||||
// Restored iterators are constructed in the same order as in the returned slice.
|
if os.IsNotExist(err) {
|
||||||
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) (
|
||||||
|
[]*Iterator, []trie.NodeIterator, error,
|
||||||
|
) {
|
||||||
file, err := os.Open(tr.recoveryFile)
|
file, err := os.Open(tr.recoveryFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
|
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
|
||||||
@ -108,10 +168,11 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
in.FieldsPerRecord = 2
|
in.FieldsPerRecord = 2
|
||||||
rows, err := in.ReadAll()
|
rows, err := in.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var ret []trie.NodeIterator
|
var wrapped []*Iterator
|
||||||
|
var base []trie.NodeIterator
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
// pick up where each recovered iterator left off
|
// pick up where each recovered iterator left off
|
||||||
var recoveredPath []byte
|
var recoveredPath []byte
|
||||||
@ -119,12 +180,12 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
|
|
||||||
if len(row[0]) != 0 {
|
if len(row[0]) != 0 {
|
||||||
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
|
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
roysc marked this conversation as resolved
Outdated
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(row[1]) != 0 {
|
if len(row[1]) != 0 {
|
||||||
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
|
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,59 +196,64 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
}
|
}
|
||||||
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
|
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
|
||||||
boundIt := iter.NewPrefixBoundIterator(it, endPath)
|
boundIt := iter.NewPrefixBoundIterator(it, endPath)
|
||||||
ret = append(ret, tr.Tracked(boundIt))
|
wrapped = append(wrapped, tr.Tracked(boundIt))
|
||||||
|
base = append(base, it)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Restored iterators", "count", len(ret))
|
return wrapped, base, tr.removeRecoveryFile()
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *Tracker) HaltAndDump() error {
|
// CloseAndSave stops all tracked iterators and dumps their state to a file.
|
||||||
|
// This closes the tracker, so adding a new iterator afterwards will fail.
|
||||||
|
// A new Tracker must be constructed in order to restore state.
|
||||||
|
func (tr *TrackerImpl) CloseAndSave() error {
|
||||||
roysc marked this conversation as resolved
Outdated
telackey
commented
Why not call Why not call `it.StopIterator()`? I am always nervous about code that requires a precise locking scheme being duplicated multiple places.
roysc
commented
That's fair - I don't think we actually need StopIterator any more, so I will just delete it for now. That's fair - I don't think we actually need StopIterator any more, so I will just delete it for now.
|
|||||||
|
tr.Lock()
|
||||||
tr.running = false
|
tr.running = false
|
||||||
|
close(tr.stopChan)
|
||||||
|
tr.Unlock()
|
||||||
|
|
||||||
// drain any pending iterators
|
// drain any pending iterators
|
||||||
close(tr.startChan)
|
close(tr.startChan)
|
||||||
for start := range tr.startChan {
|
for start := range tr.startChan {
|
||||||
tr.started[start] = struct{}{}
|
tr.started[start] = struct{}{}
|
||||||
}
|
}
|
||||||
close(tr.stopChan)
|
|
||||||
for stop := range tr.stopChan {
|
for stop := range tr.stopChan {
|
||||||
tr.stopped = append(tr.stopped, stop)
|
tr.stopped = append(tr.stopped, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, stop := range tr.stopped {
|
for _, stop := range tr.stopped {
|
||||||
delete(tr.started, stop)
|
delete(tr.started, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tr.started) == 0 {
|
return tr.Save()
|
||||||
// if the tracker state is empty, erase any existing recovery file
|
|
||||||
err := os.Remove(tr.recoveryFile)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tr.dump()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Next advances the iterator, notifying its owning tracker when it finishes.
|
||||||
func (it *Iterator) Next(descend bool) bool {
|
func (it *Iterator) Next(descend bool) bool {
|
||||||
ret := it.NodeIterator.Next(descend)
|
ret := it.NodeIterator.Next(descend)
|
||||||
|
|
||||||
if !ret {
|
if !ret {
|
||||||
|
it.tracker.RLock()
|
||||||
|
defer it.tracker.RUnlock()
|
||||||
if it.tracker.running {
|
if it.tracker.running {
|
||||||
it.tracker.stopChan <- it
|
it.tracker.stopChan <- it
|
||||||
} else {
|
} else {
|
||||||
log.Error("Iterator stopped after tracker halted", "path", it.Path())
|
log.Error("Tracker was closed before iterator finished")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *Iterator) Bounds() ([]byte, []byte) {
|
||||||
|
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
||||||
|
return impl.Bounds()
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Rewinds to the path of the previous (pre-order) node:
|
// Rewinds to the path of the previous (pre-order) node:
|
||||||
// If the last byte of the path is zero, pops it. Otherwise, decrements it
|
// If the last byte of the path is zero, pops it (e.g. [1 0] => [1]).
|
||||||
// and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]).
|
// Otherwise, decrements it and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]).
|
||||||
// Returns the passed path (which is also modified in place)
|
// The passed slice is not modified.
|
||||||
func rewindPath(path []byte) []byte {
|
func rewindPath(path []byte) []byte {
|
||||||
if len(path) == 0 {
|
if len(path) == 0 {
|
||||||
return path
|
return path
|
||||||
|
69
tracker/tracker_test.go
Normal file
69
tracker/tracker_test.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package tracker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/cerc-io/eth-iterator-utils/internal"
|
||||||
|
"github.com/cerc-io/eth-iterator-utils/tracker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTracker(t *testing.T) {
|
||||||
|
NumIters := uint(1)
|
||||||
|
recoveryFile := filepath.Join(t.TempDir(), "tracker_test.csv")
|
||||||
|
|
||||||
|
tree, edb := internal.OpenFixtureTrie(t, 1)
|
||||||
|
t.Cleanup(func() { edb.Close() })
|
||||||
|
|
||||||
|
// traverse trie and trigger error at some intermediate point
|
||||||
|
N := len(internal.FixtureNodePaths)
|
||||||
|
interrupt := rand.Intn(N/2) + N/4
|
||||||
|
failedTraverse := func() []byte {
|
||||||
|
tr := tracker.New(recoveryFile, NumIters)
|
||||||
|
defer tr.CloseAndSave()
|
||||||
|
|
||||||
|
var prevPath []byte
|
||||||
|
count := 0
|
||||||
|
for it := tr.Tracked(tree.NodeIterator(nil)); it.Next(true); {
|
||||||
|
if count == interrupt {
|
||||||
|
return prevPath // tracker rewinds one node to prevent gaps
|
||||||
|
}
|
||||||
|
prevPath = it.Path()
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
failedAt := failedTraverse()
|
||||||
|
if failedAt == nil {
|
||||||
|
t.Fatal("traversal wasn't interrupted")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !fileExists(recoveryFile) {
|
||||||
|
t.Fatal("recovery file wasn't created")
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := tracker.New(recoveryFile, NumIters)
|
||||||
|
its, _, err := tr.Restore(tree.NodeIterator)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if uint(len(its)) != NumIters {
|
||||||
|
t.Fatalf("expected to restore %d iterators, got %d", NumIters, len(its))
|
||||||
|
}
|
||||||
|
if !bytes.Equal(failedAt, its[0].Path()) {
|
||||||
|
t.Fatalf("iterator restored to wrong position: expected %v, got %v", failedAt, its[0].Path())
|
||||||
|
}
|
||||||
|
|
||||||
|
if fileExists(recoveryFile) {
|
||||||
|
t.Fatal("recovery file wasn't removed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func fileExists(file string) bool {
|
||||||
|
_, err := os.Stat(file)
|
||||||
|
return !os.IsNotExist(err)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user
Is it explicit anywhere that you must call CloseAndSave in order to close the channel, set running to false, etc?
What happens if you don't call that, but the iterator completes?
If we need to call it, I think the pattern:
Needs to be documented explicitly somewhere (unless I am just missing something).
If it's not called, it should just be sort of a leak. The channels won't be flushed or saved, but once all its iterators die, the tracker will still be gc'd. No critical consequences that I foresee, but I will document it.