From 7703ac8ea4809a8f790a50efb7d0da33a60b7afb Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 26 Sep 2023 11:34:41 +0000 Subject: [PATCH] Add tracker interface and test (#3) Refactors the tracker to expose an interface and an implementation type. Decoupling these will let us inject new state/functionality transparently to the client function. Reviewed-on: https://git.vdb.to/cerc-io/eth-iterator-utils/pulls/3 --- go.mod | 2 +- go.sum | 4 +- internal/test_helper.go | 39 ++++++++ iterator.go | 22 +++-- iterator_test.go | 33 +------ tracker/tracker.go | 208 ++++++++++++++++++++++++++-------------- tracker/tracker_test.go | 69 +++++++++++++ 7 files changed, 265 insertions(+), 112 deletions(-) create mode 100644 internal/test_helper.go create mode 100644 tracker/tracker_test.go diff --git a/go.mod b/go.mod index 5397dbd..5493b8d 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( ) replace ( - github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1 + github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.3.1 // Not strictly necessary, but tells go what dependency versions should be github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 ) diff --git a/go.sum b/go.sum index 4653dfd..76f44ca 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ= -git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk= +git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0 h1:fWAvsSiuDqveuxwnfc8psInfLZhMqHlQnmOpOHsd8Tk= +git.vdb.to/cerc-io/eth-testing v0.3.1-0.20230925181540-2ea71042e7e0/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= diff --git a/internal/test_helper.go b/internal/test_helper.go new file mode 100644 index 0000000..f91eb3d --- /dev/null +++ b/internal/test_helper.go @@ -0,0 +1,39 @@ +package internal + +import ( + "testing" + + "github.com/cerc-io/eth-testing/chaindata/small2" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/ethdb" +) + +var ( + FixtureNodePaths = small2.Block1_StateNodePaths + FixtureLeafKeys = small2.Block1_StateNodeLeafKeys +) + +func OpenFixtureTrie(t *testing.T, height uint64) (state.Trie, ethdb.Database) { + data := small2.ChainData + kvdb, ldberr := rawdb.NewLevelDBDatabase(data.ChainData, 1024, 256, t.Name(), true) + if ldberr != nil { + t.Fatal(ldberr) + } + edb, err := rawdb.NewDatabaseWithFreezer(kvdb, data.Ancient, t.Name(), true) + if err != nil { + t.Fatal(err) + } + + hash := rawdb.ReadCanonicalHash(edb, height) + header := rawdb.ReadHeader(edb, hash, height) + if header == nil { + t.Fatalf("unable to read canonical header at height %d", height) + } + sdb := state.NewDatabase(edb) + tree, err := sdb.OpenTrie(header.Root) + if err != nil { + t.Fatal(err) + } + return tree, edb +} diff --git a/iterator.go b/iterator.go index 31cde21..fb093fd 100644 --- a/iterator.go +++ b/iterator.go @@ -23,16 +23,21 @@ import ( "github.com/ethereum/go-ethereum/trie" ) -// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes) -type PrefixBoundIterator struct { - trie.NodeIterator - EndPath []byte -} - // IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this // code from the trie implementation. type IteratorConstructor = func(startKey []byte) trie.NodeIterator +// PrefixBoundIterator is a NodeIterator constrained by a lower & upper bound (as hex path prefixes) +type PrefixBoundIterator struct { + trie.NodeIterator + StartPath, EndPath []byte +} + +// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix) +func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator { + return &PrefixBoundIterator{NodeIterator: it, StartPath: it.Path(), EndPath: to} +} + func (it *PrefixBoundIterator) Next(descend bool) bool { if it.EndPath == nil { return it.NodeIterator.Next(descend) @@ -49,9 +54,8 @@ func (it *PrefixBoundIterator) Next(descend bool) bool { return bytes.Compare(it.Path(), it.EndPath) <= 0 } -// NewPrefixBoundIterator returns an iterator with an upper bound value (hex path prefix) -func NewPrefixBoundIterator(it trie.NodeIterator, to []byte) *PrefixBoundIterator { - return &PrefixBoundIterator{NodeIterator: it, EndPath: to} +func (it *PrefixBoundIterator) Bounds() ([]byte, []byte) { + return it.StartPath, it.EndPath } // generates nibble slice prefixes at uniform intervals diff --git a/iterator_test.go b/iterator_test.go index 339f41f..882a145 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -5,11 +5,8 @@ import ( "fmt" "testing" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" - iter "github.com/cerc-io/eth-iterator-utils" - fixture "github.com/cerc-io/eth-testing/chaindata/medium" + "github.com/cerc-io/eth-iterator-utils/internal" ) func TestMakePaths(t *testing.T) { @@ -24,30 +21,8 @@ func TestMakePaths(t *testing.T) { } func TestIterator(t *testing.T) { - kvdb, ldberr := rawdb.NewLevelDBDatabase(fixture.ChainDataPath, 1024, 256, "vdb-geth", true) - if ldberr != nil { - t.Fatal(ldberr) - } - edb, err := rawdb.NewDatabaseWithFreezer(kvdb, fixture.AncientDataPath, "vdb-geth", true) - if err != nil { - t.Fatal(err) - } - if err != nil { - t.Fatal(err) - } - defer edb.Close() - - height := uint64(1) - hash := rawdb.ReadCanonicalHash(edb, height) - header := rawdb.ReadHeader(edb, hash, height) - if header == nil { - t.Fatalf("unable to read canonical header at height %d", height) - } - sdb := state.NewDatabase(edb) - tree, err := sdb.OpenTrie(header.Root) - if err != nil { - t.Fatal(err) - } + tree, edb := internal.OpenFixtureTrie(t, 1) + t.Cleanup(func() { edb.Close() }) t.Run("in bounds", func(t *testing.T) { type testCase struct { @@ -79,7 +54,7 @@ func TestIterator(t *testing.T) { }) t.Run("trie is covered", func(t *testing.T) { - allPaths := fixture.Block1_Paths + allPaths := internal.FixtureNodePaths cases := []uint{1, 2, 4, 8, 16, 32} runCase := func(t *testing.T, nbins uint) { iters := iter.SubtrieIterators(tree.NodeIterator, nbins) diff --git a/tracker/tracker.go b/tracker/tracker.go index b0996da..2e9025e 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -1,12 +1,34 @@ +// This package provides a way to track multiple concurrently running trie iterators, save their +// state to a file on failures or interruptions, and restore them at the positions where they +// stopped. +// +// Example usage: +// +// tr := tracker.New("recovery.txt", 100) +// // Ensure the tracker is closed and saves its state +// defer tr.CloseAndSave() +// +// // Iterate over the trie, from one or multiple threads +// it := tr.Tracked(tree.NodeIterator(nil)) +// for it.Next(true) { +// // ... do work that could fail or be interrupted +// } +// +// // Later, restore the iterators +// tr := tracker.New("recovery.txt", 100) +// defer tr.CloseAndSave() +// +// its, err := tr.Restore(tree.NodeIterator) +// for _, it := range its { +// // ... resume traversal +// } package tracker import ( - "context" "encoding/csv" "fmt" "os" - "os/signal" - "syscall" + "sync" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie" @@ -14,23 +36,55 @@ import ( iter "github.com/cerc-io/eth-iterator-utils" ) +// IteratorTracker exposes a minimal interface to register and consume iterators. +type IteratorTracker interface { + Restore(iter.IteratorConstructor) ([]trie.NodeIterator, []trie.NodeIterator, error) + Tracked(trie.NodeIterator) trie.NodeIterator +} + +var _ IteratorTracker = &Tracker{} + +// Tracker is a trie iterator tracker which saves state to and restores it from a file. type Tracker struct { - recoveryFile string - - startChan chan *Iterator - stopChan chan *Iterator - started map[*Iterator]struct{} - stopped []*Iterator - running bool + *TrackerImpl } -type Iterator struct { - trie.NodeIterator - tracker *Tracker +// New creates a new tracker which saves state to a given file. bufsize sets the size of the +// channel buffers used internally to manage tracking. Note that passing a bufsize smaller than the expected +// number of concurrent iterators could lead to deadlock. +func New(file string, bufsize uint) *Tracker { + return &Tracker{NewImpl(file, bufsize)} } -func New(file string, bufsize uint) Tracker { - return Tracker{ +// Restore attempts to read iterator state from the recovery file. +// Returns: +// - slice of tracked iterators +// - slice of iterators originally returned by constructor +// If the file doesn't exist, returns an empty slice with no error. +// Restored iterators are constructed in the same order they appear in the returned slice. +func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ( + []trie.NodeIterator, []trie.NodeIterator, error, +) { + its, bases, err := tr.TrackerImpl.Restore(makeIterator) + if err != nil { + return nil, nil, err + } + + var ret []trie.NodeIterator + for _, it := range its { + ret = append(ret, it) + } + return ret, bases, nil +} + +// Tracked wraps an iterator in a tracked iterator. This should not be called when the tracker can +// potentially be closed. +func (tr *Tracker) Tracked(it trie.NodeIterator) trie.NodeIterator { + return tr.TrackerImpl.Tracked(it) +} + +func NewImpl(file string, bufsize uint) *TrackerImpl { + return &TrackerImpl{ recoveryFile: file, startChan: make(chan *Iterator, bufsize), stopChan: make(chan *Iterator, bufsize), @@ -39,41 +93,40 @@ func New(file string, bufsize uint) Tracker { } } -func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) { - sigChan := make(chan os.Signal, 1) +type TrackerImpl struct { + recoveryFile string - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigChan - log.Error("Signal received (%v), stopping", "signal", sig) - // Cancel context on receiving a signal. On cancellation, all tracked iterators complete - // processing of their current node before stopping. - cancelCtx() - }() + startChan chan *Iterator + stopChan chan *Iterator + started map[*Iterator]struct{} + stopped []*Iterator + running bool + sync.RWMutex // guards closing of the tracker } -// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible. -func (tr *Tracker) Tracked(it trie.NodeIterator) (ret *Iterator) { - ret = &Iterator{it, tr} +type Iterator struct { + trie.NodeIterator + tracker *TrackerImpl +} + +func (tr *TrackerImpl) Tracked(it trie.NodeIterator) *Iterator { + ret := &Iterator{it, tr} tr.startChan <- ret - return + return ret } -// StopIterator explicitly stops an iterator -func (tr *Tracker) StopIterator(it *Iterator) { - tr.stopChan <- it -} +// Save dumps iterator path and bounds to a text file so it can be restored later. +func (tr *TrackerImpl) Save() error { + log.Debug("Saving recovery state", "to", tr.recoveryFile) + + // if the tracker state is empty, erase any existing recovery file + if len(tr.started) == 0 { + return tr.removeRecoveryFile() + } -// dumps iterator path and bounds to a text file so it can be restored later -func (tr *Tracker) dump() error { - log.Debug("Dumping recovery state", "to", tr.recoveryFile) var rows [][]string for it := range tr.started { - var endPath []byte - if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok { - endPath = impl.EndPath - } - + _, endPath := it.Bounds() rows = append(rows, []string{ fmt.Sprintf("%x", it.Path()), fmt.Sprintf("%x", endPath), @@ -90,16 +143,23 @@ func (tr *Tracker) dump() error { return out.WriteAll(rows) } -// Restore attempts to read iterator state from the recovery file. -// If the file doesn't exist, returns an empty slice with no error. -// Restored iterators are constructed in the same order as in the returned slice. -func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) { +func (tr *TrackerImpl) removeRecoveryFile() error { + err := os.Remove(tr.recoveryFile) + if os.IsNotExist(err) { + err = nil + } + return err +} + +func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ( + []*Iterator, []trie.NodeIterator, error, +) { file, err := os.Open(tr.recoveryFile) if err != nil { if os.IsNotExist(err) { - return nil, nil + return nil, nil, nil } - return nil, err + return nil, nil, err } defer file.Close() log.Debug("Restoring recovery state", "from", tr.recoveryFile) @@ -108,10 +168,11 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt in.FieldsPerRecord = 2 rows, err := in.ReadAll() if err != nil { - return nil, err + return nil, nil, err } - var ret []trie.NodeIterator + var wrapped []*Iterator + var base []trie.NodeIterator for _, row := range rows { // pick up where each recovered iterator left off var recoveredPath []byte @@ -119,12 +180,12 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt if len(row[0]) != 0 { if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil { - return nil, err + return nil, nil, err } } if len(row[1]) != 0 { if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil { - return nil, err + return nil, nil, err } } @@ -135,59 +196,64 @@ func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIt } it := makeIterator(iter.HexToKeyBytes(recoveredPath)) boundIt := iter.NewPrefixBoundIterator(it, endPath) - ret = append(ret, tr.Tracked(boundIt)) + wrapped = append(wrapped, tr.Tracked(boundIt)) + base = append(base, it) } - log.Debug("Restored iterators", "count", len(ret)) - return ret, nil + return wrapped, base, tr.removeRecoveryFile() } -func (tr *Tracker) HaltAndDump() error { +// CloseAndSave stops all tracked iterators and dumps their state to a file. +// This closes the tracker, so adding a new iterator afterwards will fail. +// A new Tracker must be constructed in order to restore state. +func (tr *TrackerImpl) CloseAndSave() error { + tr.Lock() tr.running = false + close(tr.stopChan) + tr.Unlock() // drain any pending iterators close(tr.startChan) for start := range tr.startChan { tr.started[start] = struct{}{} } - close(tr.stopChan) for stop := range tr.stopChan { tr.stopped = append(tr.stopped, stop) } - for _, stop := range tr.stopped { delete(tr.started, stop) } - if len(tr.started) == 0 { - // if the tracker state is empty, erase any existing recovery file - err := os.Remove(tr.recoveryFile) - if os.IsNotExist(err) { - err = nil - } - return err - } - - return tr.dump() + return tr.Save() } +// Next advances the iterator, notifying its owning tracker when it finishes. func (it *Iterator) Next(descend bool) bool { ret := it.NodeIterator.Next(descend) if !ret { + it.tracker.RLock() + defer it.tracker.RUnlock() if it.tracker.running { it.tracker.stopChan <- it } else { - log.Error("Iterator stopped after tracker halted", "path", it.Path()) + log.Error("Tracker was closed before iterator finished") } } return ret } +func (it *Iterator) Bounds() ([]byte, []byte) { + if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok { + return impl.Bounds() + } + return nil, nil +} + // Rewinds to the path of the previous (pre-order) node: -// If the last byte of the path is zero, pops it. Otherwise, decrements it -// and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]). -// Returns the passed path (which is also modified in place) +// If the last byte of the path is zero, pops it (e.g. [1 0] => [1]). +// Otherwise, decrements it and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]). +// The passed slice is not modified. func rewindPath(path []byte) []byte { if len(path) == 0 { return path diff --git a/tracker/tracker_test.go b/tracker/tracker_test.go new file mode 100644 index 0000000..92f7ec1 --- /dev/null +++ b/tracker/tracker_test.go @@ -0,0 +1,69 @@ +package tracker_test + +import ( + "bytes" + "math/rand" + "os" + "path/filepath" + "testing" + + "github.com/cerc-io/eth-iterator-utils/internal" + "github.com/cerc-io/eth-iterator-utils/tracker" +) + +func TestTracker(t *testing.T) { + NumIters := uint(1) + recoveryFile := filepath.Join(t.TempDir(), "tracker_test.csv") + + tree, edb := internal.OpenFixtureTrie(t, 1) + t.Cleanup(func() { edb.Close() }) + + // traverse trie and trigger error at some intermediate point + N := len(internal.FixtureNodePaths) + interrupt := rand.Intn(N/2) + N/4 + failedTraverse := func() []byte { + tr := tracker.New(recoveryFile, NumIters) + defer tr.CloseAndSave() + + var prevPath []byte + count := 0 + for it := tr.Tracked(tree.NodeIterator(nil)); it.Next(true); { + if count == interrupt { + return prevPath // tracker rewinds one node to prevent gaps + } + prevPath = it.Path() + count++ + } + return nil + } + + failedAt := failedTraverse() + if failedAt == nil { + t.Fatal("traversal wasn't interrupted") + } + + if !fileExists(recoveryFile) { + t.Fatal("recovery file wasn't created") + } + + tr := tracker.New(recoveryFile, NumIters) + its, _, err := tr.Restore(tree.NodeIterator) + if err != nil { + t.Fatal(err) + } + if uint(len(its)) != NumIters { + t.Fatalf("expected to restore %d iterators, got %d", NumIters, len(its)) + } + if !bytes.Equal(failedAt, its[0].Path()) { + t.Fatalf("iterator restored to wrong position: expected %v, got %v", failedAt, its[0].Path()) + } + + if fileExists(recoveryFile) { + t.Fatal("recovery file wasn't removed") + } +} + +func fileExists(file string) bool { + _, err := os.Stat(file) + return !os.IsNotExist(err) +}