Add tracker interface and tests #3
2
go.mod
2
go.mod
@ -57,7 +57,7 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
|
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.3.1
|
||||||
// Not strictly necessary, but tells go what dependency versions should be
|
// Not strictly necessary, but tells go what dependency versions should be
|
||||||
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
||||||
)
|
)
|
||||||
|
4
go.sum
4
go.sum
@ -1,6 +1,6 @@
|
|||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
|
git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0 h1:fWAvsSiuDqveuxwnfc8psInfLZhMqHlQnmOpOHsd8Tk=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
||||||
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
||||||
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
|
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
|
||||||
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||||
|
39
internal/test_helper.go
Normal file
39
internal/test_helper.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/cerc-io/eth-testing/chaindata/small2"
|
||||||
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
FixtureNodePaths = small2.Block1_StateNodePaths
|
||||||
|
FixtureLeafKeys = small2.Block1_StateNodeLeafKeys
|
||||||
|
)
|
||||||
|
|
||||||
|
func OpenFixtureTrie(t *testing.T, height uint64) (state.Trie, ethdb.Database) {
|
||||||
|
data := small2.ChainData
|
||||||
|
kvdb, ldberr := rawdb.NewLevelDBDatabase(data.ChainData, 1024, 256, t.Name(), true)
|
||||||
|
if ldberr != nil {
|
||||||
|
t.Fatal(ldberr)
|
||||||
|
}
|
||||||
|
edb, err := rawdb.NewDatabaseWithFreezer(kvdb, data.Ancient, t.Name(), true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := rawdb.ReadCanonicalHash(edb, height)
|
||||||
|
header := rawdb.ReadHeader(edb, hash, height)
|
||||||
|
if header == nil {
|
||||||
|
t.Fatalf("unable to read canonical header at height %d", height)
|
||||||
|
}
|
||||||
|
sdb := state.NewDatabase(edb)
|
||||||
|
tree, err := sdb.OpenTrie(header.Root)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return tree, edb
|
||||||
|
}
|
22
iterator.go
22
iterator.go
@ -23,16 +23,21 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes)
|
|
||||||
type PrefixBoundIterator struct {
|
|
||||||
trie.NodeIterator
|
|
||||||
EndPath []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this
|
// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this
|
||||||
// code from the trie implementation.
|
// code from the trie implementation.
|
||||||
type IteratorConstructor = func(startKey []byte) trie.NodeIterator
|
type IteratorConstructor = func(startKey []byte) trie.NodeIterator
|
||||||
|
|
||||||
|
// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes)
|
||||||
|
type PrefixBoundIterator struct {
|
||||||
|
trie.NodeIterator
|
||||||
|
StartPath, EndPath []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix)
|
||||||
|
func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator {
|
||||||
|
return &PrefixBoundIterator{NodeIterator: it, StartPath: it.Path(), EndPath: to}
|
||||||
|
}
|
||||||
|
|
||||||
func (it *PrefixBoundIterator) Next(descend bool) bool {
|
func (it *PrefixBoundIterator) Next(descend bool) bool {
|
||||||
if it.EndPath == nil {
|
if it.EndPath == nil {
|
||||||
return it.NodeIterator.Next(descend)
|
return it.NodeIterator.Next(descend)
|
||||||
@ -49,9 +54,8 @@ func (it *PrefixBoundIterator) Next(descend bool) bool {
|
|||||||
return bytes.Compare(it.Path(), it.EndPath) <= 0
|
return bytes.Compare(it.Path(), it.EndPath) <= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix)
|
func (it *PrefixBoundIterator) Bounds() ([]byte, []byte) {
|
||||||
func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator {
|
return it.StartPath, it.EndPath
|
||||||
return &PrefixBoundIterator{NodeIterator: it, EndPath: to}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// generates nibble slice prefixes at uniform intervals
|
// generates nibble slice prefixes at uniform intervals
|
||||||
|
@ -5,11 +5,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
|
||||||
|
|
||||||
iter "github.com/cerc-io/eth-iterator-utils"
|
iter "github.com/cerc-io/eth-iterator-utils"
|
||||||
fixture "github.com/cerc-io/eth-testing/chaindata/medium"
|
"github.com/cerc-io/eth-iterator-utils/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMakePaths(t *testing.T) {
|
func TestMakePaths(t *testing.T) {
|
||||||
@ -24,30 +21,8 @@ func TestMakePaths(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestIterator(t *testing.T) {
|
func TestIterator(t *testing.T) {
|
||||||
kvdb, ldberr := rawdb.NewLevelDBDatabase(fixture.ChainDataPath, 1024, 256, "vdb-geth", true)
|
tree, edb := internal.OpenFixtureTrie(t, 1)
|
||||||
if ldberr != nil {
|
t.Cleanup(func() { edb.Close() })
|
||||||
t.Fatal(ldberr)
|
|
||||||
}
|
|
||||||
edb, err := rawdb.NewDatabaseWithFreezer(kvdb, fixture.AncientDataPath, "vdb-geth", true)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer edb.Close()
|
|
||||||
|
|
||||||
height := uint64(1)
|
|
||||||
hash := rawdb.ReadCanonicalHash(edb, height)
|
|
||||||
header := rawdb.ReadHeader(edb, hash, height)
|
|
||||||
if header == nil {
|
|
||||||
t.Fatalf("unable to read canonical header at height %d", height)
|
|
||||||
}
|
|
||||||
sdb := state.NewDatabase(edb)
|
|
||||||
tree, err := sdb.OpenTrie(header.Root)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("in bounds", func(t *testing.T) {
|
t.Run("in bounds", func(t *testing.T) {
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
@ -79,7 +54,7 @@ func TestIterator(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("trie is covered", func(t *testing.T) {
|
t.Run("trie is covered", func(t *testing.T) {
|
||||||
allPaths := fixture.Block1_Paths
|
allPaths := internal.FixtureNodePaths
|
||||||
cases := []uint{1, 2, 4, 8, 16, 32}
|
cases := []uint{1, 2, 4, 8, 16, 32}
|
||||||
runCase := func(t *testing.T, nbins uint) {
|
runCase := func(t *testing.T, nbins uint) {
|
||||||
iters := iter.SubtrieIterators(tree.NodeIterator, nbins)
|
iters := iter.SubtrieIterators(tree.NodeIterator, nbins)
|
||||||
|
@ -1,12 +1,34 @@
|
|||||||
|
// This package provides a way to track multiple concurrently running trie iterators, save their
|
||||||
|
// state to a file on failures or interruptions, and restore them at the positions where they
|
||||||
|
// stopped.
|
||||||
|
//
|
||||||
|
// Example usage:
|
||||||
|
//
|
||||||
|
// tr := tracker.New("recovery.txt", 100)
|
||||||
|
// // Ensure the tracker is closed and saves its state
|
||||||
|
// defer tr.CloseAndSave()
|
||||||
|
//
|
||||||
|
// // Iterate over the trie, from one or multiple threads
|
||||||
|
// it := tr.Tracked(tree.NodeIterator(nil))
|
||||||
|
// for it.Next(true) {
|
||||||
|
// // ... do work that could fail or be interrupted
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // Later, restore the iterators
|
||||||
|
// tr := tracker.New("recovery.txt", 100)
|
||||||
|
// defer tr.CloseAndSave()
|
||||||
|
//
|
||||||
|
// its, err := tr.Restore(tree.NodeIterator)
|
||||||
|
// for _, it := range its {
|
||||||
|
// // ... resume traversal
|
||||||
|
// }
|
||||||
package tracker
|
package tracker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"sync"
|
||||||
"syscall"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
@ -14,23 +36,55 @@ import (
|
|||||||
iter "github.com/cerc-io/eth-iterator-utils"
|
iter "github.com/cerc-io/eth-iterator-utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// IteratorTracker exposes a minimal interface to register and consume iterators.
|
||||||
|
type IteratorTracker interface {
|
||||||
|
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, []trie.NodeIterator, error)
|
||||||
|
Tracked(trie.NodeIterator) trie.NodeIterator
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ IteratorTracker = &Tracker{}
|
||||||
|
|
||||||
|
// Tracker is a trie iterator tracker which saves state to and restores it from a file.
|
||||||
type Tracker struct {
|
type Tracker struct {
|
||||||
recoveryFile string
|
*TrackerImpl
|
||||||
|
|
||||||
startChan chan *Iterator
|
|
||||||
stopChan chan *Iterator
|
|
||||||
started map[*Iterator]struct{}
|
|
||||||
stopped []*Iterator
|
|
||||||
running bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Iterator struct {
|
// New creates a new tracker which saves state to a given file. bufsize sets the size of the
|
||||||
trie.NodeIterator
|
// channel buffers used internally to manage tracking. Note that passing a bufsize smaller than the expected
|
||||||
tracker *Tracker
|
// number of concurrent iterators could lead to deadlock.
|
||||||
|
func New(file string, bufsize uint) *Tracker {
|
||||||
|
return &Tracker{NewImpl(file, bufsize)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(file string, bufsize uint) Tracker {
|
// Restore attempts to read iterator state from the recovery file.
|
||||||
return Tracker{
|
// Returns:
|
||||||
|
// - slice of tracked iterators
|
||||||
|
// - slice of iterators originally returned by constructor
|
||||||
|
// If the file doesn't exist, returns an empty slice with no error.
|
||||||
|
// Restored iterators are constructed in the same order they appear in the returned slice.
|
||||||
|
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) (
|
||||||
|
[]trie.NodeIterator, []trie.NodeIterator, error,
|
||||||
|
) {
|
||||||
|
its, bases, err := tr.TrackerImpl.Restore(makeIterator)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret []trie.NodeIterator
|
||||||
|
for _, it := range its {
|
||||||
|
ret = append(ret, it)
|
||||||
|
}
|
||||||
|
return ret, bases, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tracked wraps an iterator in a tracked iterator. This should not be called when the tracker can
|
||||||
|
// potentially be closed.
|
||||||
|
func (tr *Tracker) Tracked(it trie.NodeIterator) trie.NodeIterator {
|
||||||
|
return tr.TrackerImpl.Tracked(it)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImpl(file string, bufsize uint) *TrackerImpl {
|
||||||
|
return &TrackerImpl{
|
||||||
recoveryFile: file,
|
recoveryFile: file,
|
||||||
startChan: make(chan *Iterator, bufsize),
|
startChan: make(chan *Iterator, bufsize),
|
||||||
stopChan: make(chan *Iterator, bufsize),
|
stopChan: make(chan *Iterator, bufsize),
|
||||||
@ -39,41 +93,40 @@ func New(file string, bufsize uint) Tracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) {
|
type TrackerImpl struct {
|
||||||
sigChan := make(chan os.Signal, 1)
|
recoveryFile string
|
||||||
|
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
startChan chan *Iterator
|
||||||
go func() {
|
stopChan chan *Iterator
|
||||||
sig := <-sigChan
|
started map[*Iterator]struct{}
|
||||||
log.Error("Signal received (%v), stopping", "signal", sig)
|
stopped []*Iterator
|
||||||
// Cancel context on receiving a signal. On cancellation, all tracked iterators complete
|
running bool
|
||||||
// processing of their current node before stopping.
|
sync.RWMutex // guards closing of the tracker
|
||||||
cancelCtx()
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible.
|
type Iterator struct {
|
||||||
func (tr *Tracker) Tracked(it trie.NodeIterator) (ret *Iterator) {
|
trie.NodeIterator
|
||||||
ret = &Iterator{it, tr}
|
tracker *TrackerImpl
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TrackerImpl) Tracked(it trie.NodeIterator) *Iterator {
|
||||||
|
ret := &Iterator{it, tr}
|
||||||
tr.startChan <- ret
|
tr.startChan <- ret
|
||||||
return
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopIterator explicitly stops an iterator
|
// Save dumps iterator path and bounds to a text file so it can be restored later.
|
||||||
func (tr *Tracker) StopIterator(it *Iterator) {
|
func (tr *TrackerImpl) Save() error {
|
||||||
tr.stopChan <- it
|
log.Debug("Saving recovery state", "to", tr.recoveryFile)
|
||||||
}
|
|
||||||
|
// if the tracker state is empty, erase any existing recovery file
|
||||||
|
if len(tr.started) == 0 {
|
||||||
|
return tr.removeRecoveryFile()
|
||||||
|
}
|
||||||
|
|
||||||
// dumps iterator path and bounds to a text file so it can be restored later
|
|
||||||
func (tr *Tracker) dump() error {
|
|
||||||
log.Debug("Dumping recovery state", "to", tr.recoveryFile)
|
|
||||||
var rows [][]string
|
var rows [][]string
|
||||||
for it := range tr.started {
|
for it := range tr.started {
|
||||||
var endPath []byte
|
_, endPath := it.Bounds()
|
||||||
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
|
||||||
endPath = impl.EndPath
|
|
||||||
}
|
|
||||||
|
|
||||||
rows = append(rows, []string{
|
rows = append(rows, []string{
|
||||||
fmt.Sprintf("%x", it.Path()),
|
fmt.Sprintf("%x", it.Path()),
|
||||||
fmt.Sprintf("%x", endPath),
|
fmt.Sprintf("%x", endPath),
|
||||||
@ -90,16 +143,23 @@ func (tr *Tracker) dump() error {
|
|||||||
return out.WriteAll(rows)
|
return out.WriteAll(rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore attempts to read iterator state from the recovery file.
|
func (tr *TrackerImpl) removeRecoveryFile() error {
|
||||||
// If the file doesn't exist, returns an empty slice with no error.
|
err := os.Remove(tr.recoveryFile)
|
||||||
// Restored iterators are constructed in the same order as in the returned slice.
|
if os.IsNotExist(err) {
|
||||||
func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) (
|
||||||
|
[]*Iterator, []trie.NodeIterator, error,
|
||||||
|
) {
|
||||||
file, err := os.Open(tr.recoveryFile)
|
file, err := os.Open(tr.recoveryFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
|
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
|
||||||
@ -108,10 +168,11 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
in.FieldsPerRecord = 2
|
in.FieldsPerRecord = 2
|
||||||
rows, err := in.ReadAll()
|
rows, err := in.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var ret []trie.NodeIterator
|
var wrapped []*Iterator
|
||||||
|
var base []trie.NodeIterator
|
||||||
for _, row := range rows {
|
for _, row := range rows {
|
||||||
// pick up where each recovered iterator left off
|
// pick up where each recovered iterator left off
|
||||||
var recoveredPath []byte
|
var recoveredPath []byte
|
||||||
@ -119,12 +180,12 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
|
|
||||||
if len(row[0]) != 0 {
|
if len(row[0]) != 0 {
|
||||||
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
|
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(row[1]) != 0 {
|
if len(row[1]) != 0 {
|
||||||
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
|
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -135,59 +196,64 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt
|
|||||||
}
|
}
|
||||||
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
|
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
|
||||||
boundIt := iter.NewPrefixBoundIterator(it, endPath)
|
boundIt := iter.NewPrefixBoundIterator(it, endPath)
|
||||||
ret = append(ret, tr.Tracked(boundIt))
|
wrapped = append(wrapped, tr.Tracked(boundIt))
|
||||||
|
base = append(base, it)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("Restored iterators", "count", len(ret))
|
return wrapped, base, tr.removeRecoveryFile()
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *Tracker) HaltAndDump() error {
|
// CloseAndSave stops all tracked iterators and dumps their state to a file.
|
||||||
|
// This closes the tracker, so adding a new iterator afterwards will fail.
|
||||||
|
// A new Tracker must be constructed in order to restore state.
|
||||||
|
func (tr *TrackerImpl) CloseAndSave() error {
|
||||||
|
tr.Lock()
|
||||||
tr.running = false
|
tr.running = false
|
||||||
|
close(tr.stopChan)
|
||||||
|
tr.Unlock()
|
||||||
|
|
||||||
// drain any pending iterators
|
// drain any pending iterators
|
||||||
close(tr.startChan)
|
close(tr.startChan)
|
||||||
for start := range tr.startChan {
|
for start := range tr.startChan {
|
||||||
tr.started[start] = struct{}{}
|
tr.started[start] = struct{}{}
|
||||||
}
|
}
|
||||||
close(tr.stopChan)
|
|
||||||
for stop := range tr.stopChan {
|
for stop := range tr.stopChan {
|
||||||
tr.stopped = append(tr.stopped, stop)
|
tr.stopped = append(tr.stopped, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, stop := range tr.stopped {
|
for _, stop := range tr.stopped {
|
||||||
delete(tr.started, stop)
|
delete(tr.started, stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(tr.started) == 0 {
|
return tr.Save()
|
||||||
// if the tracker state is empty, erase any existing recovery file
|
|
||||||
err := os.Remove(tr.recoveryFile)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tr.dump()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Next advances the iterator, notifying its owning tracker when it finishes.
|
||||||
func (it *Iterator) Next(descend bool) bool {
|
func (it *Iterator) Next(descend bool) bool {
|
||||||
ret := it.NodeIterator.Next(descend)
|
ret := it.NodeIterator.Next(descend)
|
||||||
|
|
||||||
if !ret {
|
if !ret {
|
||||||
|
it.tracker.RLock()
|
||||||
|
defer it.tracker.RUnlock()
|
||||||
if it.tracker.running {
|
if it.tracker.running {
|
||||||
it.tracker.stopChan <- it
|
it.tracker.stopChan <- it
|
||||||
} else {
|
} else {
|
||||||
log.Error("Iterator stopped after tracker halted", "path", it.Path())
|
log.Error("Tracker was closed before iterator finished")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *Iterator) Bounds() ([]byte, []byte) {
|
||||||
|
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
||||||
|
return impl.Bounds()
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Rewinds to the path of the previous (pre-order) node:
|
// Rewinds to the path of the previous (pre-order) node:
|
||||||
// If the last byte of the path is zero, pops it. Otherwise, decrements it
|
// If the last byte of the path is zero, pops it (e.g. [1 0] => [1]).
|
||||||
// and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]).
|
// Otherwise, decrements it and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]).
|
||||||
// Returns the passed path (which is also modified in place)
|
// The passed slice is not modified.
|
||||||
func rewindPath(path []byte) []byte {
|
func rewindPath(path []byte) []byte {
|
||||||
if len(path) == 0 {
|
if len(path) == 0 {
|
||||||
return path
|
return path
|
||||||
|
69
tracker/tracker_test.go
Normal file
69
tracker/tracker_test.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package tracker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/cerc-io/eth-iterator-utils/internal"
|
||||||
|
"github.com/cerc-io/eth-iterator-utils/tracker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTracker(t *testing.T) {
|
||||||
|
NumIters := uint(1)
|
||||||
|
recoveryFile := filepath.Join(t.TempDir(), "tracker_test.csv")
|
||||||
|
|
||||||
|
tree, edb := internal.OpenFixtureTrie(t, 1)
|
||||||
|
t.Cleanup(func() { edb.Close() })
|
||||||
|
|
||||||
|
// traverse trie and trigger error at some intermediate point
|
||||||
|
N := len(internal.FixtureNodePaths)
|
||||||
|
interrupt := rand.Intn(N/2) + N/4
|
||||||
|
failedTraverse := func() []byte {
|
||||||
|
tr := tracker.New(recoveryFile, NumIters)
|
||||||
|
defer tr.CloseAndSave()
|
||||||
|
|
||||||
|
var prevPath []byte
|
||||||
|
count := 0
|
||||||
|
for it := tr.Tracked(tree.NodeIterator(nil)); it.Next(true); {
|
||||||
|
if count == interrupt {
|
||||||
|
return prevPath // tracker rewinds one node to prevent gaps
|
||||||
|
}
|
||||||
|
prevPath = it.Path()
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
failedAt := failedTraverse()
|
||||||
|
if failedAt == nil {
|
||||||
|
t.Fatal("traversal wasn't interrupted")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !fileExists(recoveryFile) {
|
||||||
|
t.Fatal("recovery file wasn't created")
|
||||||
|
}
|
||||||
|
|
||||||
|
tr := tracker.New(recoveryFile, NumIters)
|
||||||
|
its, _, err := tr.Restore(tree.NodeIterator)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if uint(len(its)) != NumIters {
|
||||||
|
t.Fatalf("expected to restore %d iterators, got %d", NumIters, len(its))
|
||||||
|
}
|
||||||
|
if !bytes.Equal(failedAt, its[0].Path()) {
|
||||||
|
t.Fatalf("iterator restored to wrong position: expected %v, got %v", failedAt, its[0].Path())
|
||||||
|
}
|
||||||
|
|
||||||
|
if fileExists(recoveryFile) {
|
||||||
|
t.Fatal("recovery file wasn't removed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func fileExists(file string) bool {
|
||||||
|
_, err := os.Stat(file)
|
||||||
|
return !os.IsNotExist(err)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user