diff --git a/.gitignore b/.gitignore index 5d7a86c..385c906 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ .idea/ +.vscode/ ipld-eth-state-snapshot +mocks/ diff --git a/Makefile b/Makefile index eebd422..b612d53 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ MOCKS_DIR = $(CURDIR)/mocks mockgen_cmd=mockgen -.PHONY: mocks +.PHONY: mocks test mocks: mocks/snapshot/publisher.go @@ -14,3 +14,6 @@ clean: build: go fmt ./... go build + +test: mocks + go clean -testcache && go test -v ./... diff --git a/README.md b/README.md index ca4744e..ac67170 100644 --- a/README.md +++ b/README.md @@ -52,3 +52,8 @@ Config format: chainID = "1" # $ETH_CHAIN_ID genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK ``` + +## Tests + +* Install [mockgen](https://github.com/golang/mock#installation) +* `make test` diff --git a/cmd/stateSnapshot.go b/cmd/stateSnapshot.go index c44d84e..72a51fe 100644 --- a/cmd/stateSnapshot.go +++ b/cmd/stateSnapshot.go @@ -44,7 +44,7 @@ func stateSnapshot() { mode := snapshot.SnapshotMode(modeStr) config, err := snapshot.NewConfig(mode) if err != nil { - logWithCommand.Fatal("unable to initialize config: %v", err) + logWithCommand.Fatalf("unable to initialize config: %v", err) } logWithCommand.Infof("opening levelDB and ancient data at %s and %s", config.Eth.LevelDBPath, config.Eth.AncientDBPath) @@ -56,7 +56,7 @@ func stateSnapshot() { recoveryFile := viper.GetString(snapshot.SNAPSHOT_RECOVERY_FILE_TOML) if recoveryFile == "" { recoveryFile = fmt.Sprintf("./%d_snapshot_recovery", height) - logWithCommand.Infof("no recovery file set, creating default: %s", recoveryFile) + logWithCommand.Infof("no recovery file set, using default: %s", recoveryFile) } pub, err := snapshot.NewPublisher(mode, config) diff --git a/go.mod b/go.mod index 935e83a..d04cccd 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 - github.com/vulcanize/go-eth-state-node-iterator v1.0.3 + github.com/vulcanize/go-eth-state-node-iterator v1.1.0 ) require ( diff --git a/go.sum b/go.sum index 82a97af..20fb308 100644 --- a/go.sum +++ b/go.sum @@ -776,8 +776,8 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/vulcanize/go-eth-state-node-iterator v1.0.3 h1:gtkU0aP7aW0tPBfzxVSymRhX9eyIQv7HiGkGVGwoh8I= -github.com/vulcanize/go-eth-state-node-iterator v1.0.3/go.mod h1:a1aRW1/fEmcpY8AAU47p9TNL3QNjpeqCJd/hLtFKVL8= +github.com/vulcanize/go-eth-state-node-iterator v1.1.0 h1:wbOpP9BnqDcN8OWbuOw/QvR7Mde5JuuI15SGfLil0ZE= +github.com/vulcanize/go-eth-state-node-iterator v1.1.0/go.mod h1:a1aRW1/fEmcpY8AAU47p9TNL3QNjpeqCJd/hLtFKVL8= github.com/vulcanize/go-ethereum v1.10.18-statediff-4.0.2-alpha h1:gTcfBHoaRpsRbP1/dnurg5LFUaqsQMTNqs4R4S2wm9U= github.com/vulcanize/go-ethereum v1.10.18-statediff-4.0.2-alpha/go.mod h1:HelXH7UT1uWdb+St6UAj4pPf93GOggjIV7pVbrWIZ3o= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index c6f1318..bd841cf 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -102,18 +102,21 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { if err != nil { return err } + headerID := header.Hash().String() s.tracker = newTracker(s.recoveryFile, int(params.Workers)) - go s.tracker.run() - go s.tracker.captureSignal() + s.tracker.captureSignal() var iters []trie.NodeIterator // attempt to restore from recovery file if it exists iters, err = s.tracker.restore(tree) if err != nil { + log.Errorf("restore error: %s", err.Error()) return err } + if iters != nil { + log.Debugf("restored iterators; count: %d", len(iters)) if params.Workers < uint(len(iters)) { return fmt.Errorf( "number of recovered workers (%d) is greater than number configured (%d)", @@ -121,6 +124,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { ) } } else { // nothing to restore + log.Debugf("no iterators to restore") if params.Workers > 1 { iters = iter.SubtrieIterators(tree, params.Workers) } else { @@ -134,7 +138,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { defer func() { err := s.tracker.haltAndDump() if err != nil { - log.Error("failed to write recovery file: ", err) + log.Errorf("failed to write recovery file: %v", err) } }() diff --git a/pkg/snapshot/service_test.go b/pkg/snapshot/service_test.go index 31bdc16..c213b01 100644 --- a/pkg/snapshot/service_test.go +++ b/pkg/snapshot/service_test.go @@ -45,7 +45,8 @@ func TestCreateSnapshot(t *testing.T) { pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil). AnyTimes() pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Times(len(fixt.Block1_StateNodePaths)) + // Use MinTimes as duplicate nodes are expected at boundaries + MinTimes(len(fixt.Block1_StateNodePaths)) // TODO: fixtures for storage node // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) diff --git a/pkg/snapshot/tracker.go b/pkg/snapshot/tracker.go index 1e5fbdf..7192eec 100644 --- a/pkg/snapshot/tracker.go +++ b/pkg/snapshot/tracker.go @@ -22,7 +22,11 @@ type trackedIter struct { func (it *trackedIter) Next(descend bool) bool { ret := it.NodeIterator.Next(descend) if !ret { - it.tracker.stopChan <- it + if it.tracker.running { + it.tracker.stopChan <- it + } else { + log.Errorf("iterator stopped after tracker halted: path=%x", it.Path()) + } } return ret } @@ -34,9 +38,7 @@ type iteratorTracker struct { stopChan chan *trackedIter started map[*trackedIter]struct{} stopped []*trackedIter - - haltChan chan struct{} - done chan struct{} + running bool } func newTracker(file string, buf int) iteratorTracker { @@ -45,8 +47,7 @@ func newTracker(file string, buf int) iteratorTracker { startChan: make(chan *trackedIter, buf), stopChan: make(chan *trackedIter, buf), started: map[*trackedIter]struct{}{}, - haltChan: make(chan struct{}), - done: make(chan struct{}), + running: true, } } @@ -62,23 +63,7 @@ func (tr *iteratorTracker) captureSignal() { }() } -// listens for starts/stops and manages current state -func (tr *iteratorTracker) run() { -loop: - for { - select { - case start := <-tr.startChan: - tr.started[start] = struct{}{} - case stop := <-tr.stopChan: - tr.stopped = append(tr.stopped, stop) - case <-tr.haltChan: - break loop - default: - } - } - tr.done <- struct{}{} -} - +// Wraps an iterator in a trackedIter. This should not be called once halts are possible. func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { ret = &trackedIter{it, tr} tr.startChan <- ret @@ -87,7 +72,7 @@ func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { // dumps iterator path and bounds to a text file so it can be restored later func (tr *iteratorTracker) dump() error { - log.Info("Dumping recovery state to: ", tr.recoveryFile) + log.Debug("Dumping recovery state to: ", tr.recoveryFile) var rows [][]string for it, _ := range tr.started { var endPath []byte @@ -118,6 +103,7 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) } return nil, err } + log.Debug("Restoring recovery state from: ", tr.recoveryFile) defer file.Close() in := csv.NewReader(file) in.FieldsPerRecord = 2 @@ -137,15 +123,19 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) } } - it := iter.NewPrefixBoundIterator(tree, paths[0], paths[1]) + // Force the lower bound path to an even length + if len(paths[0])&0b1 == 1 { + decrementPath(paths[0]) // decrement first to avoid skipped nodes + paths[0] = append(paths[0], 0) + } + it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(paths[0])), paths[1]) ret = append(ret, tr.tracked(it)) } return ret, nil } func (tr *iteratorTracker) haltAndDump() error { - tr.haltChan <- struct{}{} - <-tr.done + tr.running = false // drain any pending events close(tr.startChan) diff --git a/pkg/snapshot/util.go b/pkg/snapshot/util.go index a2ca7ee..e9db47d 100644 --- a/pkg/snapshot/util.go +++ b/pkg/snapshot/util.go @@ -28,3 +28,26 @@ func NewPublisher(mode SnapshotMode, config *Config) (snapt.Publisher, error) { } return nil, fmt.Errorf("invalid snapshot mode: %s", mode) } + +// Subtracts 1 from the last byte in a path slice, carrying if needed. +// Does nothing, returning false, for all-zero inputs. +func decrementPath(path []byte) bool { + // check for all zeros + allzero := true + for i := 0; i < len(path); i++ { + allzero = allzero && path[i] == 0 + } + if allzero { + return false + } + for i := len(path) - 1; i >= 0; i-- { + val := path[i] + path[i]-- + if val == 0 { + path[i] = 0xf + } else { + return true + } + } + return true +}