implement job recovery; fix traversal & iterator

This commit is contained in:
Roy Crihfield 2022-02-18 19:12:53 +08:00
parent a99d0a5ad6
commit b5d0010581
7 changed files with 696 additions and 63 deletions

View File

@ -44,13 +44,18 @@ func stateSnapshot() {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
recoveryFile := viper.GetString("snapshot.recoveryFile")
if recoveryFile == "" {
recoveryFile = "./snapshot_recovery"
}
mode := viper.GetString("snapshot.mode") mode := viper.GetString("snapshot.mode")
pub, err := snapshot.NewPublisher(snapshot.SnapshotMode(mode), config) pub, err := snapshot.NewPublisher(snapshot.SnapshotMode(mode), config)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
snapshotService, err := snapshot.NewSnapshotService(edb, pub) snapshotService, err := snapshot.NewSnapshotService(edb, pub, recoveryFile)
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }

359
fixture/node_paths.go Normal file
View File

@ -0,0 +1,359 @@
package fixture
var Block1_StateNodePaths = [][]byte{
[]byte{},
[]byte{0},
[]byte{0, 0},
[]byte{0, 2},
[]byte{0, 2, 1},
[]byte{0, 2, 8},
[]byte{0, 2, 12},
[]byte{0, 3},
[]byte{0, 4},
[]byte{0, 6},
[]byte{0, 6, 3},
[]byte{0, 6, 13},
[]byte{0, 7},
[]byte{0, 8},
[]byte{0, 8, 7},
[]byte{0, 8, 11},
[]byte{0, 9},
[]byte{0, 9, 9},
[]byte{0, 9, 10},
[]byte{0, 12},
[]byte{0, 13},
[]byte{0, 14},
[]byte{1},
[]byte{1, 2},
[]byte{1, 2, 5},
[]byte{1, 2, 7},
[]byte{1, 3},
[]byte{1, 3, 1},
[]byte{1, 3, 11},
[]byte{1, 4},
[]byte{1, 5},
[]byte{1, 5, 11},
[]byte{1, 5, 12},
[]byte{1, 5, 15},
[]byte{1, 6},
[]byte{1, 8},
[]byte{1, 10},
[]byte{1, 13},
[]byte{1, 14},
[]byte{1, 14, 2},
[]byte{1, 14, 11},
[]byte{1, 15},
[]byte{1, 15, 9},
[]byte{1, 15, 15},
[]byte{2},
[]byte{2, 0},
[]byte{2, 0, 9},
[]byte{2, 0, 14},
[]byte{2, 1},
[]byte{2, 1, 1},
[]byte{2, 1, 3},
[]byte{2, 1, 14},
[]byte{2, 5},
[]byte{2, 6},
[]byte{2, 9},
[]byte{2, 9, 1},
[]byte{2, 9, 7},
[]byte{2, 11},
[]byte{2, 11, 7},
[]byte{2, 11, 13},
[]byte{2, 13},
[]byte{2, 13, 1},
[]byte{2, 13, 15},
[]byte{2, 15},
[]byte{3},
[]byte{3, 0},
[]byte{3, 0, 0},
[]byte{3, 0, 1},
[]byte{3, 2},
[]byte{3, 2, 3},
[]byte{3, 2, 15},
[]byte{3, 3},
[]byte{3, 4},
[]byte{3, 4, 2},
[]byte{3, 4, 4},
[]byte{3, 4, 5},
[]byte{3, 6},
[]byte{3, 8},
[]byte{3, 9},
[]byte{3, 10},
[]byte{3, 10, 2},
[]byte{3, 10, 8},
[]byte{3, 10, 12},
[]byte{3, 11},
[]byte{3, 12},
[]byte{3, 13},
[]byte{3, 14},
[]byte{3, 14, 4},
[]byte{3, 14, 9},
[]byte{3, 14, 14},
[]byte{3, 14, 14, 10},
[]byte{3, 14, 14, 15},
[]byte{4},
[]byte{4, 0},
[]byte{4, 0, 6},
[]byte{4, 0, 15},
[]byte{4, 1},
[]byte{4, 2},
[]byte{4, 2, 1},
[]byte{4, 2, 11},
[]byte{4, 3},
[]byte{4, 5},
[]byte{4, 6},
[]byte{4, 7},
[]byte{4, 8},
[]byte{4, 11},
[]byte{4, 11, 6},
[]byte{4, 11, 9},
[]byte{4, 11, 12},
[]byte{4, 14},
[]byte{5},
[]byte{5, 0},
[]byte{5, 0, 3},
[]byte{5, 0, 9},
[]byte{5, 0, 15},
[]byte{5, 1},
[]byte{5, 1, 14},
[]byte{5, 1, 15},
[]byte{5, 2},
[]byte{5, 2, 8},
[]byte{5, 2, 10},
[]byte{5, 3},
[]byte{5, 4},
[]byte{5, 4, 6},
[]byte{5, 4, 12},
[]byte{5, 6},
[]byte{5, 8},
[]byte{5, 8, 3},
[]byte{5, 8, 11},
[]byte{5, 10},
[]byte{5, 11},
[]byte{5, 12},
[]byte{5, 13},
[]byte{5, 15},
[]byte{6},
[]byte{6, 0},
[]byte{6, 2},
[]byte{6, 2, 3},
[]byte{6, 2, 9},
[]byte{6, 4},
[]byte{6, 4, 0},
[]byte{6, 4, 0, 0},
[]byte{6, 4, 0, 5},
[]byte{6, 5},
[]byte{6, 5, 4},
[]byte{6, 5, 10},
[]byte{6, 5, 12},
[]byte{6, 5, 13},
[]byte{6, 6},
[]byte{6, 6, 0},
[]byte{6, 6, 8},
[]byte{6, 8},
[]byte{6, 8, 4},
[]byte{6, 8, 4, 2},
[]byte{6, 8, 4, 9},
[]byte{6, 8, 9},
[]byte{6, 10},
[]byte{6, 10, 1},
[]byte{6, 10, 14},
[]byte{6, 11},
[]byte{6, 11, 2},
[]byte{6, 11, 12},
[]byte{6, 11, 14},
[]byte{6, 13},
[]byte{6, 13, 2},
[]byte{6, 13, 12},
[]byte{7},
[]byte{7, 1},
[]byte{7, 5},
[]byte{7, 7},
[]byte{7, 8},
[]byte{7, 8, 2},
[]byte{7, 8, 5},
[]byte{7, 9},
[]byte{7, 13},
[]byte{7, 13, 1},
[]byte{7, 13, 1, 0},
[]byte{7, 13, 1, 13},
[]byte{7, 13, 7},
[]byte{7, 14},
[]byte{7, 14, 8},
[]byte{7, 14, 11},
[]byte{8},
[]byte{8, 0},
[]byte{8, 0, 3},
[]byte{8, 0, 11},
[]byte{8, 2},
[]byte{8, 4},
[]byte{8, 8},
[]byte{8, 9},
[]byte{8, 9, 3},
[]byte{8, 9, 13},
[]byte{8, 10},
[]byte{8, 12},
[]byte{8, 12, 3},
[]byte{8, 12, 15},
[]byte{8, 13},
[]byte{8, 15},
[]byte{8, 15, 8},
[]byte{8, 15, 13},
[]byte{9},
[]byte{9, 0},
[]byte{9, 5},
[]byte{9, 6},
[]byte{9, 6, 10},
[]byte{9, 6, 14},
[]byte{9, 7},
[]byte{9, 9},
[]byte{9, 14},
[]byte{9, 15},
[]byte{9, 15, 0},
[]byte{9, 15, 4},
[]byte{9, 15, 10},
[]byte{10},
[]byte{10, 0},
[]byte{10, 0, 9},
[]byte{10, 0, 10},
[]byte{10, 0, 15},
[]byte{10, 2},
[]byte{10, 3},
[]byte{10, 6},
[]byte{10, 8},
[]byte{10, 9},
[]byte{10, 10},
[]byte{10, 10, 5},
[]byte{10, 10, 8},
[]byte{10, 13},
[]byte{10, 13, 0},
[]byte{10, 13, 13},
[]byte{10, 14},
[]byte{10, 14, 4},
[]byte{10, 14, 11},
[]byte{10, 14, 11, 8},
[]byte{10, 14, 11, 14},
[]byte{10, 15},
[]byte{11},
[]byte{11, 0},
[]byte{11, 0, 2},
[]byte{11, 0, 15},
[]byte{11, 1},
[]byte{11, 2},
[]byte{11, 3},
[]byte{11, 4},
[]byte{11, 5},
[]byte{11, 7},
[]byte{11, 7, 12},
[]byte{11, 7, 15},
[]byte{11, 8},
[]byte{11, 8, 8},
[]byte{11, 8, 15},
[]byte{11, 9},
[]byte{11, 11},
[]byte{11, 12},
[]byte{11, 13},
[]byte{11, 14},
[]byte{11, 14, 0},
[]byte{11, 14, 0, 1},
[]byte{11, 14, 0, 3},
[]byte{11, 14, 8},
[]byte{11, 14, 13},
[]byte{12},
[]byte{12, 0},
[]byte{12, 0, 0},
[]byte{12, 0, 1},
[]byte{12, 0, 1, 3},
[]byte{12, 0, 1, 11},
[]byte{12, 0, 15},
[]byte{12, 2},
[]byte{12, 2, 9},
[]byte{12, 2, 12},
[]byte{12, 4},
[]byte{12, 5},
[]byte{12, 6},
[]byte{12, 6, 0},
[]byte{12, 6, 4},
[]byte{12, 6, 14},
[]byte{12, 7},
[]byte{12, 7, 0},
[]byte{12, 7, 12},
[]byte{12, 7, 13},
[]byte{12, 9},
[]byte{12, 11},
[]byte{12, 12},
[]byte{13},
[]byte{13, 2},
[]byte{13, 2, 0},
[]byte{13, 2, 2},
[]byte{13, 2, 4},
[]byte{13, 3},
[]byte{13, 3, 7},
[]byte{13, 3, 10},
[]byte{13, 5},
[]byte{13, 8},
[]byte{13, 8, 1},
[]byte{13, 8, 15},
[]byte{13, 9},
[]byte{13, 9, 0},
[]byte{13, 9, 14},
[]byte{13, 10},
[]byte{13, 12},
[]byte{13, 12, 8},
[]byte{13, 12, 11},
[]byte{13, 13},
[]byte{13, 13, 7},
[]byte{13, 13, 12},
[]byte{13, 14},
[]byte{14},
[]byte{14, 0},
[]byte{14, 1},
[]byte{14, 2},
[]byte{14, 2, 2},
[]byte{14, 2, 12},
[]byte{14, 3},
[]byte{14, 4},
[]byte{14, 5},
[]byte{14, 6},
[]byte{14, 6, 9},
[]byte{14, 6, 12},
[]byte{14, 7},
[]byte{14, 7, 4},
[]byte{14, 7, 12},
[]byte{14, 8},
[]byte{14, 8, 3},
[]byte{14, 8, 12},
[]byte{14, 8, 12, 0},
[]byte{14, 8, 12, 6},
[]byte{14, 10},
[]byte{14, 10, 6},
[]byte{14, 10, 12},
[]byte{14, 11},
[]byte{14, 11, 8},
[]byte{14, 11, 13},
[]byte{14, 12},
[]byte{14, 14},
[]byte{14, 14, 3},
[]byte{14, 14, 9},
[]byte{15},
[]byte{15, 0},
[]byte{15, 5},
[]byte{15, 6},
[]byte{15, 9},
[]byte{15, 9, 0},
[]byte{15, 9, 2},
[]byte{15, 9, 3},
[]byte{15, 11},
[]byte{15, 11, 1},
[]byte{15, 11, 6},
[]byte{15, 12},
[]byte{15, 12, 3},
[]byte{15, 12, 14},
[]byte{15, 12, 14, 7},
[]byte{15, 12, 14, 13},
[]byte{15, 13},
[]byte{15, 14},
[]byte{15, 15},
}

View File

@ -10,7 +10,7 @@ import (
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types" snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
) )
var Header1 = types.Header{ var Block1_Header = types.Header{
ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"), ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"),
UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"), UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"), Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"),
@ -29,7 +29,7 @@ var Header1 = types.Header{
BaseFee: nil, BaseFee: nil,
} }
var StateNode1 = snapt.Node{ var Block1_StateNode0 = snapt.Node{
NodeType: 0, NodeType: 0,
Path: []byte{12, 0}, Path: []byte{12, 0},
Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"),

View File

@ -50,6 +50,8 @@ type Service struct {
stateDB state.Database stateDB state.Database
ipfsPublisher Publisher ipfsPublisher Publisher
maxBatchSize uint maxBatchSize uint
tracker iteratorTracker
recoveryFile string
} }
func NewLevelDB(con *EthConfig) (ethdb.Database, error) { func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
@ -59,12 +61,13 @@ func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
} }
// NewSnapshotService creates Service. // NewSnapshotService creates Service.
func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) { func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string) (*Service, error) {
return &Service{ return &Service{
ethDB: edb, ethDB: edb,
stateDB: state.NewDatabase(edb), stateDB: state.NewDatabase(edb),
ipfsPublisher: pub, ipfsPublisher: pub,
maxBatchSize: defaultBatchSize, maxBatchSize: defaultBatchSize,
recoveryFile: recoveryFile,
}, nil }, nil
} }
@ -90,16 +93,49 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
return err return err
} }
t, err := s.stateDB.OpenTrie(header.Root) tree, err := s.stateDB.OpenTrie(header.Root)
if err != nil { if err != nil {
return err return err
} }
headerID := header.Hash().String() headerID := header.Hash().String()
s.tracker = newTracker(int(params.Workers))
go s.tracker.run()
if params.Workers > 0 { var iters []trie.NodeIterator
return s.createSnapshotAsync(t, headerID, params.Workers) // attempt to restore from recovery file if it exists
iters, err = s.tracker.restore(tree, s.recoveryFile)
if err != nil {
return err
}
if iters != nil {
if params.Workers < uint(len(iters)) {
return fmt.Errorf(
"number of recovered workers (%d) is greater than number configured (%d)",
len(iters), params.Workers,
)
}
} else { // nothing to restore
if params.Workers > 1 {
iters = iter.SubtrieIterators(tree, params.Workers)
} else { } else {
return s.createSnapshot(t.NodeIterator(nil), headerID) iters = []trie.NodeIterator{tree.NodeIterator(nil)}
}
for i, it := range iters {
iters[i] = s.tracker.tracked(it)
}
}
defer func() {
err := s.tracker.haltAndDump(s.recoveryFile)
if err != nil {
logrus.Error("failed to write recovery file: ", err)
}
}()
if len(iters) > 0 {
return s.createSnapshotAsync(iters, headerID)
} else {
return s.createSnapshot(iters[0], headerID)
} }
return nil return nil
} }
@ -176,7 +212,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
switch res.node.NodeType { switch res.node.NodeType {
case Leaf: case Leaf:
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any // if the node is a leaf, decode the account and publish the associated storage trie
// nodes if there are any
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil { if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
return fmt.Errorf( return fmt.Errorf(
@ -217,34 +254,37 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
default: default:
return errors.New("unexpected node type") return errors.New("unexpected node type")
} }
return nil
} }
return it.Error() return it.Error()
} }
// Full-trie concurrent snapshot // Full-trie concurrent snapshot
func (s *Service) createSnapshotAsync(tree state.Trie, headerID string, workers uint) error { func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string) error {
errors := make(chan error) errors := make(chan error)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, it := range iter.SubtrieIterators(tree, workers) { for _, it := range iters {
wg.Add(1) wg.Add(1)
go func() { go func(it trie.NodeIterator) {
defer wg.Done() defer wg.Done()
if err := s.createSnapshot(it, headerID); err != nil { if err := s.createSnapshot(it, headerID); err != nil {
errors <- err errors <- err
} }
}() }(it)
} }
done := make(chan struct{})
go func() { go func() {
defer close(errors)
wg.Wait() wg.Wait()
done <- struct{}{}
}() }()
var err error
select { select {
case err := <-errors: case err = <-errors:
return err case <-done:
close(errors)
} }
return nil return err
} }
func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) { func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) {

View File

@ -1,12 +1,16 @@
package snapshot package snapshot
import ( import (
"errors"
"os"
"path/filepath"
"testing" "testing"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture" fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture"
mock "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot" mock "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot"
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/test" "github.com/vulcanize/eth-pg-ipfs-state-snapshot/test"
) )
@ -24,37 +28,39 @@ func testConfig(leveldbpath, ancientdbpath string) *Config {
} }
} }
func TestCreateSnapshot(t *testing.T) { func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) {
config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
workers := 4
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
tx := mock.NewMockTx(ctl)
pub := mock.NewMockPublisher(ctl) pub := mock.NewMockPublisher(ctl)
tx := mock.NewMockTx(ctl)
return pub, tx
}
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Header1)) func TestCreateSnapshot(t *testing.T) {
pub.EXPECT().BeginTx(). runCase := func(t *testing.T, workers int) {
Return(tx, nil). pub, tx := makeMocks(t)
Times(workers) pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()). pub.EXPECT().BeginTx().Return(tx, nil).
Return(tx, nil).
Times(workers) Times(workers)
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
AnyTimes()
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()). pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
Times(workers) Times(len(fixt.Block1_StateNodePaths))
// TODO: fixtures for storage node // TODO: fixtures for storage node
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any()) // pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
// pub.EXPECT().CommitTx(gomock.Any()).
// Times(workers)
tx.EXPECT().Commit(). tx.EXPECT().Commit().
Times(workers) Times(workers)
service, err := NewSnapshotService(edb, pub) config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -64,9 +70,71 @@ func TestCreateSnapshot(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
// err = service.CreateLatestSnapshot(0)
// if err != nil { testCases := []int{1, 4, 16, 32}
// t.Fatal(err) for _, tc := range testCases {
// } t.Run("case", func(t *testing.T) { runCase(t, tc) })
}
}
func failingPublishStateNode(_ *snapt.Node, _ string, _ snapt.Tx) error {
return errors.New("failingPublishStateNode")
}
func TestRecovery(t *testing.T) {
runCase := func(t *testing.T, workers int) {
pub, tx := makeMocks(t)
pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes()
pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
Times(workers).
DoAndReturn(failingPublishStateNode)
tx.EXPECT().Commit().AnyTimes()
config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
edb, err := NewLevelDB(config.Eth)
if err != nil {
t.Fatal(err)
}
defer edb.Close()
recovery := filepath.Join(t.TempDir(), "recover.csv")
service, err := NewSnapshotService(edb, pub, recovery)
if err != nil {
t.Fatal(err)
}
params := SnapshotParams{Height: 1, Workers: uint(workers)}
err = service.CreateSnapshot(params)
if err == nil {
t.Fatal("expected an error")
}
if _, err = os.Stat(recovery); err != nil {
t.Fatal("cannot stat recovery file:", err)
}
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
err = service.CreateSnapshot(params)
if err != nil {
t.Fatal(err)
}
_, err = os.Stat(recovery)
if err == nil {
t.Fatal("recovery file still present")
} else {
if !os.IsNotExist(err) {
t.Fatal(err)
}
}
}
testCases := []int{1, 4, 32}
for _, tc := range testCases {
t.Run("case", func(t *testing.T) { runCase(t, tc) })
}
} }

154
pkg/snapshot/tracker.go Normal file
View File

@ -0,0 +1,154 @@
package snapshot
import (
"encoding/csv"
"fmt"
"os"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
iter "github.com/vulcanize/go-eth-state-node-iterator"
)
type trackedIter struct {
trie.NodeIterator
tracker *iteratorTracker
}
func (it *trackedIter) Next(descend bool) bool {
ret := it.NodeIterator.Next(descend)
if !ret {
it.tracker.stopChan <- it
}
return ret
}
type iteratorTracker struct {
startChan chan *trackedIter
stopChan chan *trackedIter
started map[*trackedIter]struct{}
stopped []*trackedIter
haltChan chan struct{}
done chan struct{}
}
func newTracker(buf int) iteratorTracker {
return iteratorTracker{
startChan: make(chan *trackedIter, buf),
stopChan: make(chan *trackedIter, buf),
started: map[*trackedIter]struct{}{},
haltChan: make(chan struct{}),
done: make(chan struct{}),
}
}
// listens for starts/stops and manages current state
func (tr *iteratorTracker) run() {
loop:
for {
select {
case start := <-tr.startChan:
tr.started[start] = struct{}{}
case stop := <-tr.stopChan:
tr.stopped = append(tr.stopped, stop)
case <-tr.haltChan:
break loop
default:
}
}
tr.done <- struct{}{}
}
func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
ret = &trackedIter{it, tr}
tr.startChan <- ret
return
}
// dumps iterator path and bounds to a text file so it can be restored later
func (tr *iteratorTracker) dump(path string) error {
var rows [][]string
for it, _ := range tr.started {
var endPath []byte
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
endPath = impl.EndPath
}
rows = append(rows, []string{
fmt.Sprintf("%x", it.Path()),
fmt.Sprintf("%x", endPath),
})
}
file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()
out := csv.NewWriter(file)
return out.WriteAll(rows)
}
// attempts to read iterator state from file
// if file doesn't exist, returns an empty slice with no error
func (tr *iteratorTracker) restore(tree state.Trie, path string) ([]trie.NodeIterator, error) {
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
defer file.Close()
in := csv.NewReader(file)
in.FieldsPerRecord = 2
rows, err := in.ReadAll()
if err != nil {
return nil, err
}
var ret []trie.NodeIterator
for _, row := range rows {
// pick up where each interval left off
var paths [2][]byte
for i, val := range row {
if len(val) != 0 {
if _, err = fmt.Sscanf(val, "%x", &paths[i]); err != nil {
return nil, err
}
}
}
it := iter.NewPrefixBoundIterator(tree, paths[0], paths[1])
ret = append(ret, tr.tracked(it))
}
return ret, nil
}
func (tr *iteratorTracker) haltAndDump(path string) error {
tr.haltChan <- struct{}{}
<-tr.done
// drain any pending events
close(tr.startChan)
for start := range tr.startChan {
tr.started[start] = struct{}{}
}
close(tr.stopChan)
for stop := range tr.stopChan {
tr.stopped = append(tr.stopped, stop)
}
for _, stop := range tr.stopped {
delete(tr.started, stop)
}
if len(tr.started) == 0 {
// if the tracker state is empty, erase any existing recovery file
err := os.Remove(path)
if os.IsNotExist(err) {
err = nil
}
return err
}
return tr.dump(path)
}

View File

@ -1,6 +1,7 @@
package test package test
import ( import (
"bytes"
"os" "os"
"reflect" "reflect"
"testing" "testing"
@ -45,8 +46,14 @@ func NoError(t *testing.T, err error) {
} }
// ExpectEqual asserts the provided interfaces are deep equal // ExpectEqual asserts the provided interfaces are deep equal
func ExpectEqual(t *testing.T, want interface{}, got interface{}) { func ExpectEqual(t *testing.T, want, got interface{}) {
if !reflect.DeepEqual(want, got) { if !reflect.DeepEqual(want, got) {
t.Fatalf("Expected: %v\nActual: %v", want, got) t.Fatalf("Values not equal:\nExpected:\t%v\nActual:\t\t%v", want, got)
}
}
func ExpectEqualBytes(t *testing.T, want, got []byte) {
if !bytes.Equal(want, got) {
t.Fatalf("Bytes not equal:\nExpected:\t%v\nActual:\t\t%v", want, got)
} }
} }