Merge pull request #43 from roysc/fix-iterator-and-update
Fix iterator and update
This commit is contained in:
commit
4979e4090d
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,2 +1,4 @@
|
|||||||
.idea/
|
.idea/
|
||||||
|
.vscode/
|
||||||
ipld-eth-state-snapshot
|
ipld-eth-state-snapshot
|
||||||
|
mocks/
|
||||||
|
5
Makefile
5
Makefile
@ -1,7 +1,7 @@
|
|||||||
MOCKS_DIR = $(CURDIR)/mocks
|
MOCKS_DIR = $(CURDIR)/mocks
|
||||||
mockgen_cmd=mockgen
|
mockgen_cmd=mockgen
|
||||||
|
|
||||||
.PHONY: mocks
|
.PHONY: mocks test
|
||||||
|
|
||||||
mocks: mocks/snapshot/publisher.go
|
mocks: mocks/snapshot/publisher.go
|
||||||
|
|
||||||
@ -14,3 +14,6 @@ clean:
|
|||||||
build:
|
build:
|
||||||
go fmt ./...
|
go fmt ./...
|
||||||
go build
|
go build
|
||||||
|
|
||||||
|
test: mocks
|
||||||
|
go clean -testcache && go test -v ./...
|
||||||
|
@ -52,3 +52,8 @@ Config format:
|
|||||||
chainID = "1" # $ETH_CHAIN_ID
|
chainID = "1" # $ETH_CHAIN_ID
|
||||||
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
|
genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
* Install [mockgen](https://github.com/golang/mock#installation)
|
||||||
|
* `make test`
|
||||||
|
@ -44,7 +44,7 @@ func stateSnapshot() {
|
|||||||
mode := snapshot.SnapshotMode(modeStr)
|
mode := snapshot.SnapshotMode(modeStr)
|
||||||
config, err := snapshot.NewConfig(mode)
|
config, err := snapshot.NewConfig(mode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal("unable to initialize config: %v", err)
|
logWithCommand.Fatalf("unable to initialize config: %v", err)
|
||||||
}
|
}
|
||||||
logWithCommand.Infof("opening levelDB and ancient data at %s and %s",
|
logWithCommand.Infof("opening levelDB and ancient data at %s and %s",
|
||||||
config.Eth.LevelDBPath, config.Eth.AncientDBPath)
|
config.Eth.LevelDBPath, config.Eth.AncientDBPath)
|
||||||
@ -56,7 +56,7 @@ func stateSnapshot() {
|
|||||||
recoveryFile := viper.GetString(snapshot.SNAPSHOT_RECOVERY_FILE_TOML)
|
recoveryFile := viper.GetString(snapshot.SNAPSHOT_RECOVERY_FILE_TOML)
|
||||||
if recoveryFile == "" {
|
if recoveryFile == "" {
|
||||||
recoveryFile = fmt.Sprintf("./%d_snapshot_recovery", height)
|
recoveryFile = fmt.Sprintf("./%d_snapshot_recovery", height)
|
||||||
logWithCommand.Infof("no recovery file set, creating default: %s", recoveryFile)
|
logWithCommand.Infof("no recovery file set, using default: %s", recoveryFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub, err := snapshot.NewPublisher(mode, config)
|
pub, err := snapshot.NewPublisher(mode, config)
|
||||||
|
2
go.mod
2
go.mod
@ -14,7 +14,7 @@ require (
|
|||||||
github.com/sirupsen/logrus v1.6.0
|
github.com/sirupsen/logrus v1.6.0
|
||||||
github.com/spf13/cobra v1.0.0
|
github.com/spf13/cobra v1.0.0
|
||||||
github.com/spf13/viper v1.7.0
|
github.com/spf13/viper v1.7.0
|
||||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.3
|
github.com/vulcanize/go-eth-state-node-iterator v1.1.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
4
go.sum
4
go.sum
@ -776,8 +776,8 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
|
|||||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||||
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
|
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
|
||||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.3 h1:gtkU0aP7aW0tPBfzxVSymRhX9eyIQv7HiGkGVGwoh8I=
|
github.com/vulcanize/go-eth-state-node-iterator v1.1.0 h1:wbOpP9BnqDcN8OWbuOw/QvR7Mde5JuuI15SGfLil0ZE=
|
||||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.3/go.mod h1:a1aRW1/fEmcpY8AAU47p9TNL3QNjpeqCJd/hLtFKVL8=
|
github.com/vulcanize/go-eth-state-node-iterator v1.1.0/go.mod h1:a1aRW1/fEmcpY8AAU47p9TNL3QNjpeqCJd/hLtFKVL8=
|
||||||
github.com/vulcanize/go-ethereum v1.10.18-statediff-3.2.2 h1:hAZFOtnfdHtVgiwtcNFZDxCEN01c4W5Xsw+1K/IlHBA=
|
github.com/vulcanize/go-ethereum v1.10.18-statediff-3.2.2 h1:hAZFOtnfdHtVgiwtcNFZDxCEN01c4W5Xsw+1K/IlHBA=
|
||||||
github.com/vulcanize/go-ethereum v1.10.18-statediff-3.2.2/go.mod h1:HelXH7UT1uWdb+St6UAj4pPf93GOggjIV7pVbrWIZ3o=
|
github.com/vulcanize/go-ethereum v1.10.18-statediff-3.2.2/go.mod h1:HelXH7UT1uWdb+St6UAj4pPf93GOggjIV7pVbrWIZ3o=
|
||||||
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
|
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
|
||||||
|
@ -101,18 +101,21 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
s.tracker = newTracker(s.recoveryFile, int(params.Workers))
|
s.tracker = newTracker(s.recoveryFile, int(params.Workers))
|
||||||
go s.tracker.run()
|
s.tracker.captureSignal()
|
||||||
go s.tracker.captureSignal()
|
|
||||||
|
|
||||||
var iters []trie.NodeIterator
|
var iters []trie.NodeIterator
|
||||||
// attempt to restore from recovery file if it exists
|
// attempt to restore from recovery file if it exists
|
||||||
iters, err = s.tracker.restore(tree)
|
iters, err = s.tracker.restore(tree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("restore error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if iters != nil {
|
if iters != nil {
|
||||||
|
log.Debugf("restored iterators; count: %d", len(iters))
|
||||||
if params.Workers < uint(len(iters)) {
|
if params.Workers < uint(len(iters)) {
|
||||||
return fmt.Errorf(
|
return fmt.Errorf(
|
||||||
"number of recovered workers (%d) is greater than number configured (%d)",
|
"number of recovered workers (%d) is greater than number configured (%d)",
|
||||||
@ -120,6 +123,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
} else { // nothing to restore
|
} else { // nothing to restore
|
||||||
|
log.Debugf("no iterators to restore")
|
||||||
if params.Workers > 1 {
|
if params.Workers > 1 {
|
||||||
iters = iter.SubtrieIterators(tree, params.Workers)
|
iters = iter.SubtrieIterators(tree, params.Workers)
|
||||||
} else {
|
} else {
|
||||||
@ -133,7 +137,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
|||||||
defer func() {
|
defer func() {
|
||||||
err := s.tracker.haltAndDump()
|
err := s.tracker.haltAndDump()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to write recovery file: ", err)
|
log.Errorf("failed to write recovery file: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -44,7 +44,8 @@ func TestCreateSnapshot(t *testing.T) {
|
|||||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
|
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
|
||||||
AnyTimes()
|
AnyTimes()
|
||||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
|
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||||
Times(len(fixt.Block1_StateNodePaths))
|
// Use MinTimes as duplicate nodes are expected at boundaries
|
||||||
|
MinTimes(len(fixt.Block1_StateNodePaths))
|
||||||
|
|
||||||
// TODO: fixtures for storage node
|
// TODO: fixtures for storage node
|
||||||
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
|
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
|
||||||
|
@ -22,7 +22,11 @@ type trackedIter struct {
|
|||||||
func (it *trackedIter) Next(descend bool) bool {
|
func (it *trackedIter) Next(descend bool) bool {
|
||||||
ret := it.NodeIterator.Next(descend)
|
ret := it.NodeIterator.Next(descend)
|
||||||
if !ret {
|
if !ret {
|
||||||
|
if it.tracker.running {
|
||||||
it.tracker.stopChan <- it
|
it.tracker.stopChan <- it
|
||||||
|
} else {
|
||||||
|
log.Errorf("iterator stopped after tracker halted: path=%x", it.Path())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
@ -34,9 +38,7 @@ type iteratorTracker struct {
|
|||||||
stopChan chan *trackedIter
|
stopChan chan *trackedIter
|
||||||
started map[*trackedIter]struct{}
|
started map[*trackedIter]struct{}
|
||||||
stopped []*trackedIter
|
stopped []*trackedIter
|
||||||
|
running bool
|
||||||
haltChan chan struct{}
|
|
||||||
done chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTracker(file string, buf int) iteratorTracker {
|
func newTracker(file string, buf int) iteratorTracker {
|
||||||
@ -45,8 +47,7 @@ func newTracker(file string, buf int) iteratorTracker {
|
|||||||
startChan: make(chan *trackedIter, buf),
|
startChan: make(chan *trackedIter, buf),
|
||||||
stopChan: make(chan *trackedIter, buf),
|
stopChan: make(chan *trackedIter, buf),
|
||||||
started: map[*trackedIter]struct{}{},
|
started: map[*trackedIter]struct{}{},
|
||||||
haltChan: make(chan struct{}),
|
running: true,
|
||||||
done: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,23 +63,7 @@ func (tr *iteratorTracker) captureSignal() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// listens for starts/stops and manages current state
|
// Wraps an iterator in a trackedIter. This should not be called once halts are possible.
|
||||||
func (tr *iteratorTracker) run() {
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case start := <-tr.startChan:
|
|
||||||
tr.started[start] = struct{}{}
|
|
||||||
case stop := <-tr.stopChan:
|
|
||||||
tr.stopped = append(tr.stopped, stop)
|
|
||||||
case <-tr.haltChan:
|
|
||||||
break loop
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tr.done <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
|
func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
|
||||||
ret = &trackedIter{it, tr}
|
ret = &trackedIter{it, tr}
|
||||||
tr.startChan <- ret
|
tr.startChan <- ret
|
||||||
@ -87,7 +72,7 @@ func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
|
|||||||
|
|
||||||
// dumps iterator path and bounds to a text file so it can be restored later
|
// dumps iterator path and bounds to a text file so it can be restored later
|
||||||
func (tr *iteratorTracker) dump() error {
|
func (tr *iteratorTracker) dump() error {
|
||||||
log.Info("Dumping recovery state to: ", tr.recoveryFile)
|
log.Debug("Dumping recovery state to: ", tr.recoveryFile)
|
||||||
var rows [][]string
|
var rows [][]string
|
||||||
for it, _ := range tr.started {
|
for it, _ := range tr.started {
|
||||||
var endPath []byte
|
var endPath []byte
|
||||||
@ -118,6 +103,7 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error)
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug("Restoring recovery state from: ", tr.recoveryFile)
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
in := csv.NewReader(file)
|
in := csv.NewReader(file)
|
||||||
in.FieldsPerRecord = 2
|
in.FieldsPerRecord = 2
|
||||||
@ -137,15 +123,19 @@ func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
it := iter.NewPrefixBoundIterator(tree, paths[0], paths[1])
|
// Force the lower bound path to an even length
|
||||||
|
if len(paths[0])&0b1 == 1 {
|
||||||
|
decrementPath(paths[0]) // decrement first to avoid skipped nodes
|
||||||
|
paths[0] = append(paths[0], 0)
|
||||||
|
}
|
||||||
|
it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(paths[0])), paths[1])
|
||||||
ret = append(ret, tr.tracked(it))
|
ret = append(ret, tr.tracked(it))
|
||||||
}
|
}
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tr *iteratorTracker) haltAndDump() error {
|
func (tr *iteratorTracker) haltAndDump() error {
|
||||||
tr.haltChan <- struct{}{}
|
tr.running = false
|
||||||
<-tr.done
|
|
||||||
|
|
||||||
// drain any pending events
|
// drain any pending events
|
||||||
close(tr.startChan)
|
close(tr.startChan)
|
||||||
|
@ -28,3 +28,26 @@ func NewPublisher(mode SnapshotMode, config *Config) (snapt.Publisher, error) {
|
|||||||
}
|
}
|
||||||
return nil, fmt.Errorf("invalid snapshot mode: %s", mode)
|
return nil, fmt.Errorf("invalid snapshot mode: %s", mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subtracts 1 from the last byte in a path slice, carrying if needed.
|
||||||
|
// Does nothing, returning false, for all-zero inputs.
|
||||||
|
func decrementPath(path []byte) bool {
|
||||||
|
// check for all zeros
|
||||||
|
allzero := true
|
||||||
|
for i := 0; i < len(path); i++ {
|
||||||
|
allzero = allzero && path[i] == 0
|
||||||
|
}
|
||||||
|
if allzero {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := len(path) - 1; i >= 0; i-- {
|
||||||
|
val := path[i]
|
||||||
|
path[i]--
|
||||||
|
if val == 0 {
|
||||||
|
path[i] = 0xf
|
||||||
|
} else {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user