commit
9e768dc2ba
@ -44,13 +44,18 @@ func stateSnapshot() {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
recoveryFile := viper.GetString("snapshot.recoveryFile")
|
||||
if recoveryFile == "" {
|
||||
recoveryFile = "./snapshot_recovery"
|
||||
}
|
||||
|
||||
mode := viper.GetString("snapshot.mode")
|
||||
pub, err := snapshot.NewPublisher(snapshot.SnapshotMode(mode), config)
|
||||
if err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
||||
snapshotService, err := snapshot.NewSnapshotService(edb, pub)
|
||||
snapshotService, err := snapshot.NewSnapshotService(edb, pub, recoveryFile)
|
||||
if err != nil {
|
||||
logWithCommand.Fatal(err)
|
||||
}
|
||||
|
6
fixture/chaindata/.gitignore
vendored
Normal file
6
fixture/chaindata/.gitignore
vendored
Normal file
@ -0,0 +1,6 @@
|
||||
*.log
|
||||
CURRENT*
|
||||
LOCK
|
||||
LOG
|
||||
MANIFEST-*
|
||||
ancient/FLOCK
|
359
fixture/node_paths.go
Normal file
359
fixture/node_paths.go
Normal file
@ -0,0 +1,359 @@
|
||||
package fixture
|
||||
|
||||
var Block1_StateNodePaths = [][]byte{
|
||||
[]byte{},
|
||||
[]byte{0},
|
||||
[]byte{0, 0},
|
||||
[]byte{0, 2},
|
||||
[]byte{0, 2, 1},
|
||||
[]byte{0, 2, 8},
|
||||
[]byte{0, 2, 12},
|
||||
[]byte{0, 3},
|
||||
[]byte{0, 4},
|
||||
[]byte{0, 6},
|
||||
[]byte{0, 6, 3},
|
||||
[]byte{0, 6, 13},
|
||||
[]byte{0, 7},
|
||||
[]byte{0, 8},
|
||||
[]byte{0, 8, 7},
|
||||
[]byte{0, 8, 11},
|
||||
[]byte{0, 9},
|
||||
[]byte{0, 9, 9},
|
||||
[]byte{0, 9, 10},
|
||||
[]byte{0, 12},
|
||||
[]byte{0, 13},
|
||||
[]byte{0, 14},
|
||||
[]byte{1},
|
||||
[]byte{1, 2},
|
||||
[]byte{1, 2, 5},
|
||||
[]byte{1, 2, 7},
|
||||
[]byte{1, 3},
|
||||
[]byte{1, 3, 1},
|
||||
[]byte{1, 3, 11},
|
||||
[]byte{1, 4},
|
||||
[]byte{1, 5},
|
||||
[]byte{1, 5, 11},
|
||||
[]byte{1, 5, 12},
|
||||
[]byte{1, 5, 15},
|
||||
[]byte{1, 6},
|
||||
[]byte{1, 8},
|
||||
[]byte{1, 10},
|
||||
[]byte{1, 13},
|
||||
[]byte{1, 14},
|
||||
[]byte{1, 14, 2},
|
||||
[]byte{1, 14, 11},
|
||||
[]byte{1, 15},
|
||||
[]byte{1, 15, 9},
|
||||
[]byte{1, 15, 15},
|
||||
[]byte{2},
|
||||
[]byte{2, 0},
|
||||
[]byte{2, 0, 9},
|
||||
[]byte{2, 0, 14},
|
||||
[]byte{2, 1},
|
||||
[]byte{2, 1, 1},
|
||||
[]byte{2, 1, 3},
|
||||
[]byte{2, 1, 14},
|
||||
[]byte{2, 5},
|
||||
[]byte{2, 6},
|
||||
[]byte{2, 9},
|
||||
[]byte{2, 9, 1},
|
||||
[]byte{2, 9, 7},
|
||||
[]byte{2, 11},
|
||||
[]byte{2, 11, 7},
|
||||
[]byte{2, 11, 13},
|
||||
[]byte{2, 13},
|
||||
[]byte{2, 13, 1},
|
||||
[]byte{2, 13, 15},
|
||||
[]byte{2, 15},
|
||||
[]byte{3},
|
||||
[]byte{3, 0},
|
||||
[]byte{3, 0, 0},
|
||||
[]byte{3, 0, 1},
|
||||
[]byte{3, 2},
|
||||
[]byte{3, 2, 3},
|
||||
[]byte{3, 2, 15},
|
||||
[]byte{3, 3},
|
||||
[]byte{3, 4},
|
||||
[]byte{3, 4, 2},
|
||||
[]byte{3, 4, 4},
|
||||
[]byte{3, 4, 5},
|
||||
[]byte{3, 6},
|
||||
[]byte{3, 8},
|
||||
[]byte{3, 9},
|
||||
[]byte{3, 10},
|
||||
[]byte{3, 10, 2},
|
||||
[]byte{3, 10, 8},
|
||||
[]byte{3, 10, 12},
|
||||
[]byte{3, 11},
|
||||
[]byte{3, 12},
|
||||
[]byte{3, 13},
|
||||
[]byte{3, 14},
|
||||
[]byte{3, 14, 4},
|
||||
[]byte{3, 14, 9},
|
||||
[]byte{3, 14, 14},
|
||||
[]byte{3, 14, 14, 10},
|
||||
[]byte{3, 14, 14, 15},
|
||||
[]byte{4},
|
||||
[]byte{4, 0},
|
||||
[]byte{4, 0, 6},
|
||||
[]byte{4, 0, 15},
|
||||
[]byte{4, 1},
|
||||
[]byte{4, 2},
|
||||
[]byte{4, 2, 1},
|
||||
[]byte{4, 2, 11},
|
||||
[]byte{4, 3},
|
||||
[]byte{4, 5},
|
||||
[]byte{4, 6},
|
||||
[]byte{4, 7},
|
||||
[]byte{4, 8},
|
||||
[]byte{4, 11},
|
||||
[]byte{4, 11, 6},
|
||||
[]byte{4, 11, 9},
|
||||
[]byte{4, 11, 12},
|
||||
[]byte{4, 14},
|
||||
[]byte{5},
|
||||
[]byte{5, 0},
|
||||
[]byte{5, 0, 3},
|
||||
[]byte{5, 0, 9},
|
||||
[]byte{5, 0, 15},
|
||||
[]byte{5, 1},
|
||||
[]byte{5, 1, 14},
|
||||
[]byte{5, 1, 15},
|
||||
[]byte{5, 2},
|
||||
[]byte{5, 2, 8},
|
||||
[]byte{5, 2, 10},
|
||||
[]byte{5, 3},
|
||||
[]byte{5, 4},
|
||||
[]byte{5, 4, 6},
|
||||
[]byte{5, 4, 12},
|
||||
[]byte{5, 6},
|
||||
[]byte{5, 8},
|
||||
[]byte{5, 8, 3},
|
||||
[]byte{5, 8, 11},
|
||||
[]byte{5, 10},
|
||||
[]byte{5, 11},
|
||||
[]byte{5, 12},
|
||||
[]byte{5, 13},
|
||||
[]byte{5, 15},
|
||||
[]byte{6},
|
||||
[]byte{6, 0},
|
||||
[]byte{6, 2},
|
||||
[]byte{6, 2, 3},
|
||||
[]byte{6, 2, 9},
|
||||
[]byte{6, 4},
|
||||
[]byte{6, 4, 0},
|
||||
[]byte{6, 4, 0, 0},
|
||||
[]byte{6, 4, 0, 5},
|
||||
[]byte{6, 5},
|
||||
[]byte{6, 5, 4},
|
||||
[]byte{6, 5, 10},
|
||||
[]byte{6, 5, 12},
|
||||
[]byte{6, 5, 13},
|
||||
[]byte{6, 6},
|
||||
[]byte{6, 6, 0},
|
||||
[]byte{6, 6, 8},
|
||||
[]byte{6, 8},
|
||||
[]byte{6, 8, 4},
|
||||
[]byte{6, 8, 4, 2},
|
||||
[]byte{6, 8, 4, 9},
|
||||
[]byte{6, 8, 9},
|
||||
[]byte{6, 10},
|
||||
[]byte{6, 10, 1},
|
||||
[]byte{6, 10, 14},
|
||||
[]byte{6, 11},
|
||||
[]byte{6, 11, 2},
|
||||
[]byte{6, 11, 12},
|
||||
[]byte{6, 11, 14},
|
||||
[]byte{6, 13},
|
||||
[]byte{6, 13, 2},
|
||||
[]byte{6, 13, 12},
|
||||
[]byte{7},
|
||||
[]byte{7, 1},
|
||||
[]byte{7, 5},
|
||||
[]byte{7, 7},
|
||||
[]byte{7, 8},
|
||||
[]byte{7, 8, 2},
|
||||
[]byte{7, 8, 5},
|
||||
[]byte{7, 9},
|
||||
[]byte{7, 13},
|
||||
[]byte{7, 13, 1},
|
||||
[]byte{7, 13, 1, 0},
|
||||
[]byte{7, 13, 1, 13},
|
||||
[]byte{7, 13, 7},
|
||||
[]byte{7, 14},
|
||||
[]byte{7, 14, 8},
|
||||
[]byte{7, 14, 11},
|
||||
[]byte{8},
|
||||
[]byte{8, 0},
|
||||
[]byte{8, 0, 3},
|
||||
[]byte{8, 0, 11},
|
||||
[]byte{8, 2},
|
||||
[]byte{8, 4},
|
||||
[]byte{8, 8},
|
||||
[]byte{8, 9},
|
||||
[]byte{8, 9, 3},
|
||||
[]byte{8, 9, 13},
|
||||
[]byte{8, 10},
|
||||
[]byte{8, 12},
|
||||
[]byte{8, 12, 3},
|
||||
[]byte{8, 12, 15},
|
||||
[]byte{8, 13},
|
||||
[]byte{8, 15},
|
||||
[]byte{8, 15, 8},
|
||||
[]byte{8, 15, 13},
|
||||
[]byte{9},
|
||||
[]byte{9, 0},
|
||||
[]byte{9, 5},
|
||||
[]byte{9, 6},
|
||||
[]byte{9, 6, 10},
|
||||
[]byte{9, 6, 14},
|
||||
[]byte{9, 7},
|
||||
[]byte{9, 9},
|
||||
[]byte{9, 14},
|
||||
[]byte{9, 15},
|
||||
[]byte{9, 15, 0},
|
||||
[]byte{9, 15, 4},
|
||||
[]byte{9, 15, 10},
|
||||
[]byte{10},
|
||||
[]byte{10, 0},
|
||||
[]byte{10, 0, 9},
|
||||
[]byte{10, 0, 10},
|
||||
[]byte{10, 0, 15},
|
||||
[]byte{10, 2},
|
||||
[]byte{10, 3},
|
||||
[]byte{10, 6},
|
||||
[]byte{10, 8},
|
||||
[]byte{10, 9},
|
||||
[]byte{10, 10},
|
||||
[]byte{10, 10, 5},
|
||||
[]byte{10, 10, 8},
|
||||
[]byte{10, 13},
|
||||
[]byte{10, 13, 0},
|
||||
[]byte{10, 13, 13},
|
||||
[]byte{10, 14},
|
||||
[]byte{10, 14, 4},
|
||||
[]byte{10, 14, 11},
|
||||
[]byte{10, 14, 11, 8},
|
||||
[]byte{10, 14, 11, 14},
|
||||
[]byte{10, 15},
|
||||
[]byte{11},
|
||||
[]byte{11, 0},
|
||||
[]byte{11, 0, 2},
|
||||
[]byte{11, 0, 15},
|
||||
[]byte{11, 1},
|
||||
[]byte{11, 2},
|
||||
[]byte{11, 3},
|
||||
[]byte{11, 4},
|
||||
[]byte{11, 5},
|
||||
[]byte{11, 7},
|
||||
[]byte{11, 7, 12},
|
||||
[]byte{11, 7, 15},
|
||||
[]byte{11, 8},
|
||||
[]byte{11, 8, 8},
|
||||
[]byte{11, 8, 15},
|
||||
[]byte{11, 9},
|
||||
[]byte{11, 11},
|
||||
[]byte{11, 12},
|
||||
[]byte{11, 13},
|
||||
[]byte{11, 14},
|
||||
[]byte{11, 14, 0},
|
||||
[]byte{11, 14, 0, 1},
|
||||
[]byte{11, 14, 0, 3},
|
||||
[]byte{11, 14, 8},
|
||||
[]byte{11, 14, 13},
|
||||
[]byte{12},
|
||||
[]byte{12, 0},
|
||||
[]byte{12, 0, 0},
|
||||
[]byte{12, 0, 1},
|
||||
[]byte{12, 0, 1, 3},
|
||||
[]byte{12, 0, 1, 11},
|
||||
[]byte{12, 0, 15},
|
||||
[]byte{12, 2},
|
||||
[]byte{12, 2, 9},
|
||||
[]byte{12, 2, 12},
|
||||
[]byte{12, 4},
|
||||
[]byte{12, 5},
|
||||
[]byte{12, 6},
|
||||
[]byte{12, 6, 0},
|
||||
[]byte{12, 6, 4},
|
||||
[]byte{12, 6, 14},
|
||||
[]byte{12, 7},
|
||||
[]byte{12, 7, 0},
|
||||
[]byte{12, 7, 12},
|
||||
[]byte{12, 7, 13},
|
||||
[]byte{12, 9},
|
||||
[]byte{12, 11},
|
||||
[]byte{12, 12},
|
||||
[]byte{13},
|
||||
[]byte{13, 2},
|
||||
[]byte{13, 2, 0},
|
||||
[]byte{13, 2, 2},
|
||||
[]byte{13, 2, 4},
|
||||
[]byte{13, 3},
|
||||
[]byte{13, 3, 7},
|
||||
[]byte{13, 3, 10},
|
||||
[]byte{13, 5},
|
||||
[]byte{13, 8},
|
||||
[]byte{13, 8, 1},
|
||||
[]byte{13, 8, 15},
|
||||
[]byte{13, 9},
|
||||
[]byte{13, 9, 0},
|
||||
[]byte{13, 9, 14},
|
||||
[]byte{13, 10},
|
||||
[]byte{13, 12},
|
||||
[]byte{13, 12, 8},
|
||||
[]byte{13, 12, 11},
|
||||
[]byte{13, 13},
|
||||
[]byte{13, 13, 7},
|
||||
[]byte{13, 13, 12},
|
||||
[]byte{13, 14},
|
||||
[]byte{14},
|
||||
[]byte{14, 0},
|
||||
[]byte{14, 1},
|
||||
[]byte{14, 2},
|
||||
[]byte{14, 2, 2},
|
||||
[]byte{14, 2, 12},
|
||||
[]byte{14, 3},
|
||||
[]byte{14, 4},
|
||||
[]byte{14, 5},
|
||||
[]byte{14, 6},
|
||||
[]byte{14, 6, 9},
|
||||
[]byte{14, 6, 12},
|
||||
[]byte{14, 7},
|
||||
[]byte{14, 7, 4},
|
||||
[]byte{14, 7, 12},
|
||||
[]byte{14, 8},
|
||||
[]byte{14, 8, 3},
|
||||
[]byte{14, 8, 12},
|
||||
[]byte{14, 8, 12, 0},
|
||||
[]byte{14, 8, 12, 6},
|
||||
[]byte{14, 10},
|
||||
[]byte{14, 10, 6},
|
||||
[]byte{14, 10, 12},
|
||||
[]byte{14, 11},
|
||||
[]byte{14, 11, 8},
|
||||
[]byte{14, 11, 13},
|
||||
[]byte{14, 12},
|
||||
[]byte{14, 14},
|
||||
[]byte{14, 14, 3},
|
||||
[]byte{14, 14, 9},
|
||||
[]byte{15},
|
||||
[]byte{15, 0},
|
||||
[]byte{15, 5},
|
||||
[]byte{15, 6},
|
||||
[]byte{15, 9},
|
||||
[]byte{15, 9, 0},
|
||||
[]byte{15, 9, 2},
|
||||
[]byte{15, 9, 3},
|
||||
[]byte{15, 11},
|
||||
[]byte{15, 11, 1},
|
||||
[]byte{15, 11, 6},
|
||||
[]byte{15, 12},
|
||||
[]byte{15, 12, 3},
|
||||
[]byte{15, 12, 14},
|
||||
[]byte{15, 12, 14, 7},
|
||||
[]byte{15, 12, 14, 13},
|
||||
[]byte{15, 13},
|
||||
[]byte{15, 14},
|
||||
[]byte{15, 15},
|
||||
}
|
@ -10,7 +10,7 @@ import (
|
||||
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
|
||||
)
|
||||
|
||||
var Header1 = types.Header{
|
||||
var Block1_Header = types.Header{
|
||||
ParentHash: common.HexToHash("0x6341fd3daf94b748c72ced5a5b26028f2474f5f00d824504e4fa37a75767e177"),
|
||||
UncleHash: common.HexToHash("0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"),
|
||||
Coinbase: common.HexToAddress("0x0000000000000000000000000000000000000000"),
|
||||
@ -29,7 +29,7 @@ var Header1 = types.Header{
|
||||
BaseFee: nil,
|
||||
}
|
||||
|
||||
var StateNode1 = snapt.Node{
|
||||
var Block1_StateNode0 = snapt.Node{
|
||||
NodeType: 0,
|
||||
Path: []byte{12, 0},
|
||||
Key: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"),
|
||||
|
2
go.mod
2
go.mod
@ -28,7 +28,7 @@ require (
|
||||
github.com/smartystreets/assertions v1.0.0 // indirect
|
||||
github.com/spf13/cobra v1.0.0
|
||||
github.com/spf13/viper v1.7.0
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.0
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.1
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/goleak v1.1.11 // indirect
|
||||
go.uber.org/multierr v1.7.0 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -805,6 +805,8 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU
|
||||
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.0 h1:FQ4s0K5TnRD44p3vO4uQtjfE8C1Gr2EDMT2vVGS3Lu8=
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.0/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo=
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.1 h1:lI8+moQ0Nv62NwmiCxgvWkC/Wj46KmDPX1dhBwyqRy8=
|
||||
github.com/vulcanize/go-eth-state-node-iterator v1.0.1/go.mod h1:uWhleTvUEZ+cEkNRIAmBpZ14KilTP71OxY5NZDrpNlo=
|
||||
github.com/vulcanize/go-ethereum v1.10.15-statediff-3.0.1 h1:MX7WcTwxpxOoYVMifrs9vsHDmM6gKOFF+2KnP19LddI=
|
||||
github.com/vulcanize/go-ethereum v1.10.15-statediff-3.0.1/go.mod h1:XO9WLkNXfwoJN05BZj0//xgOWHJyUrUPdnudbQfKlUo=
|
||||
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
|
||||
|
@ -147,14 +147,13 @@ func (p *publisher) txDir(index uint32) string {
|
||||
}
|
||||
|
||||
func (p *publisher) BeginTx() (snapt.Tx, error) {
|
||||
index := atomic.LoadUint32(&p.txCounter)
|
||||
index := atomic.AddUint32(&p.txCounter, 1) - 1
|
||||
dir := p.txDir(index)
|
||||
writers, err := makeFileWriters(dir, perNodeTables)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
atomic.AddUint32(&p.txCounter, 1)
|
||||
return fileTx{writers}, nil
|
||||
}
|
||||
|
||||
@ -281,13 +280,15 @@ func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx,
|
||||
func (p *publisher) logNodeCounters() {
|
||||
t := time.NewTicker(logInterval)
|
||||
for range t.C {
|
||||
p.printNodeCounters()
|
||||
p.printNodeCounters("progress")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *publisher) printNodeCounters() {
|
||||
logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String())
|
||||
logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter))
|
||||
logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter))
|
||||
logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter))
|
||||
func (p *publisher) printNodeCounters(msg string) {
|
||||
log.WithFields(log.Fields{
|
||||
"runtime": time.Now().Sub(p.startTime).String(),
|
||||
"state nodes": atomic.LoadUint64(&p.stateNodeCounter),
|
||||
"storage nodes": atomic.LoadUint64(&p.storageNodeCounter),
|
||||
"code nodes": atomic.LoadUint64(&p.codeNodeCounter),
|
||||
}).Info(msg)
|
||||
}
|
||||
|
@ -33,12 +33,12 @@ var (
|
||||
func writeFiles(t *testing.T, dir string) *publisher {
|
||||
pub, err := NewPublisher(dir, nodeInfo)
|
||||
test.NoError(t, err)
|
||||
test.NoError(t, pub.PublishHeader(&fixt.Header1))
|
||||
test.NoError(t, pub.PublishHeader(&fixt.Block1_Header))
|
||||
tx, err := pub.BeginTx()
|
||||
test.NoError(t, err)
|
||||
|
||||
headerID := fixt.Header1.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.StateNode1, headerID, tx))
|
||||
headerID := fixt.Block1_Header.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx))
|
||||
|
||||
test.NoError(t, tx.Commit())
|
||||
return pub
|
||||
@ -122,11 +122,11 @@ func TestPgCopy(t *testing.T) {
|
||||
BlockHash string
|
||||
}
|
||||
var header res
|
||||
err = conn.QueryRow(ctx, pgQueryHeader, fixt.Header1.Number.Uint64()).Scan(
|
||||
err = conn.QueryRow(ctx, pgQueryHeader, fixt.Block1_Header.Number.Uint64()).Scan(
|
||||
&header.CID, &header.BlockHash)
|
||||
test.NoError(t, err)
|
||||
|
||||
headerNode, err := ipld.NewEthHeader(&fixt.Header1)
|
||||
headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header)
|
||||
test.ExpectEqual(t, headerNode.Cid().String(), header.CID)
|
||||
test.ExpectEqual(t, fixt.Header1.Hash().String(), header.BlockHash)
|
||||
test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash)
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
@ -81,8 +81,7 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
|
||||
}
|
||||
go p.logNodeCounters()
|
||||
return pubTx{tx, func() {
|
||||
logrus.Info("----- final counts -----")
|
||||
p.printNodeCounters()
|
||||
p.printNodeCounters("final stats")
|
||||
}}, nil
|
||||
}
|
||||
|
||||
@ -227,13 +226,15 @@ func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx,
|
||||
func (p *publisher) logNodeCounters() {
|
||||
t := time.NewTicker(logInterval)
|
||||
for range t.C {
|
||||
p.printNodeCounters()
|
||||
p.printNodeCounters("progress")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *publisher) printNodeCounters() {
|
||||
logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String())
|
||||
logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter))
|
||||
logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter))
|
||||
logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter))
|
||||
func (p *publisher) printNodeCounters(msg string) {
|
||||
log.WithFields(log.Fields{
|
||||
"runtime": time.Now().Sub(p.startTime).String(),
|
||||
"state nodes": atomic.LoadUint64(&p.stateNodeCounter),
|
||||
"storage nodes": atomic.LoadUint64(&p.storageNodeCounter),
|
||||
"code nodes": atomic.LoadUint64(&p.codeNodeCounter),
|
||||
}).Info(msg)
|
||||
}
|
||||
|
@ -31,12 +31,12 @@ func writeData(t *testing.T) *publisher {
|
||||
driver, err := postgres.NewPGXDriver(context.Background(), pgConfig, nodeInfo)
|
||||
test.NoError(t, err)
|
||||
pub := NewPublisher(postgres.NewPostgresDB(driver))
|
||||
test.NoError(t, pub.PublishHeader(&fixt.Header1))
|
||||
test.NoError(t, pub.PublishHeader(&fixt.Block1_Header))
|
||||
tx, err := pub.BeginTx()
|
||||
test.NoError(t, err)
|
||||
|
||||
headerID := fixt.Header1.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.StateNode1, headerID, tx))
|
||||
headerID := fixt.Block1_Header.Hash().String()
|
||||
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, tx))
|
||||
|
||||
test.NoError(t, tx.Commit())
|
||||
return pub
|
||||
@ -68,11 +68,11 @@ func TestBasic(t *testing.T) {
|
||||
BlockHash string
|
||||
}
|
||||
var header res
|
||||
err = conn.QueryRow(ctx, pgQueryHeader, fixt.Header1.Number.Uint64()).Scan(
|
||||
err = conn.QueryRow(ctx, pgQueryHeader, fixt.Block1_Header.Number.Uint64()).Scan(
|
||||
&header.CID, &header.BlockHash)
|
||||
test.NoError(t, err)
|
||||
|
||||
headerNode, err := ipld.NewEthHeader(&fixt.Header1)
|
||||
headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header)
|
||||
test.ExpectEqual(t, headerNode.Cid().String(), header.CID)
|
||||
test.ExpectEqual(t, fixt.Header1.Hash().String(), header.BlockHash)
|
||||
test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash)
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
"github.com/sirupsen/logrus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
. "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
|
||||
iter "github.com/vulcanize/go-eth-state-node-iterator"
|
||||
@ -50,6 +50,8 @@ type Service struct {
|
||||
stateDB state.Database
|
||||
ipfsPublisher Publisher
|
||||
maxBatchSize uint
|
||||
tracker iteratorTracker
|
||||
recoveryFile string
|
||||
}
|
||||
|
||||
func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
|
||||
@ -59,12 +61,13 @@ func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
|
||||
}
|
||||
|
||||
// NewSnapshotService creates Service.
|
||||
func NewSnapshotService(edb ethdb.Database, pub Publisher) (*Service, error) {
|
||||
func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string) (*Service, error) {
|
||||
return &Service{
|
||||
ethDB: edb,
|
||||
stateDB: state.NewDatabase(edb),
|
||||
ipfsPublisher: pub,
|
||||
maxBatchSize: defaultBatchSize,
|
||||
recoveryFile: recoveryFile,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -76,37 +79,71 @@ type SnapshotParams struct {
|
||||
func (s *Service) CreateSnapshot(params SnapshotParams) error {
|
||||
// extract header from lvldb and publish to PG-IPFS
|
||||
// hold onto the headerID so that we can link the state nodes to this header
|
||||
logrus.Infof("Creating snapshot at height %d", params.Height)
|
||||
log.Infof("Creating snapshot at height %d", params.Height)
|
||||
hash := rawdb.ReadCanonicalHash(s.ethDB, params.Height)
|
||||
header := rawdb.ReadHeader(s.ethDB, hash, params.Height)
|
||||
if header == nil {
|
||||
return fmt.Errorf("unable to read canonical header at height %d", params.Height)
|
||||
}
|
||||
|
||||
logrus.Infof("head hash: %s head height: %d", hash.Hex(), params.Height)
|
||||
log.Infof("head hash: %s head height: %d", hash.Hex(), params.Height)
|
||||
|
||||
err := s.ipfsPublisher.PublishHeader(header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t, err := s.stateDB.OpenTrie(header.Root)
|
||||
tree, err := s.stateDB.OpenTrie(header.Root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
headerID := header.Hash().String()
|
||||
s.tracker = newTracker(s.recoveryFile, int(params.Workers))
|
||||
go s.tracker.run()
|
||||
go s.tracker.captureSignal()
|
||||
|
||||
if params.Workers > 0 {
|
||||
return s.createSnapshotAsync(t, headerID, params.Workers)
|
||||
var iters []trie.NodeIterator
|
||||
// attempt to restore from recovery file if it exists
|
||||
iters, err = s.tracker.restore(tree)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if iters != nil {
|
||||
if params.Workers < uint(len(iters)) {
|
||||
return fmt.Errorf(
|
||||
"number of recovered workers (%d) is greater than number configured (%d)",
|
||||
len(iters), params.Workers,
|
||||
)
|
||||
}
|
||||
} else { // nothing to restore
|
||||
if params.Workers > 1 {
|
||||
iters = iter.SubtrieIterators(tree, params.Workers)
|
||||
} else {
|
||||
iters = []trie.NodeIterator{tree.NodeIterator(nil)}
|
||||
}
|
||||
for i, it := range iters {
|
||||
iters[i] = s.tracker.tracked(it)
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := s.tracker.haltAndDump()
|
||||
if err != nil {
|
||||
log.Error("failed to write recovery file: ", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(iters) > 0 {
|
||||
return s.createSnapshotAsync(iters, headerID)
|
||||
} else {
|
||||
return s.createSnapshot(t.NodeIterator(nil), headerID)
|
||||
return s.createSnapshot(iters[0], headerID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create snapshot up to head (ignores height param)
|
||||
func (s *Service) CreateLatestSnapshot(workers uint) error {
|
||||
logrus.Info("Creating snapshot at head")
|
||||
log.Info("Creating snapshot at head")
|
||||
hash := rawdb.ReadHeadHeaderHash(s.ethDB)
|
||||
height := rawdb.ReadHeaderNumber(s.ethDB, hash)
|
||||
if height == nil {
|
||||
@ -176,7 +213,8 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
|
||||
|
||||
switch res.node.NodeType {
|
||||
case Leaf:
|
||||
// if the node is a leaf, decode the account and publish the associated storage trie nodes if there are any
|
||||
// if the node is a leaf, decode the account and publish the associated storage trie
|
||||
// nodes if there are any
|
||||
var account types.StateAccount
|
||||
if err := rlp.DecodeBytes(res.elements[1].([]byte), &account); err != nil {
|
||||
return fmt.Errorf(
|
||||
@ -197,7 +235,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
|
||||
codeHash := common.BytesToHash(account.CodeHash)
|
||||
codeBytes := rawdb.ReadCode(s.ethDB, codeHash)
|
||||
if len(codeBytes) == 0 {
|
||||
logrus.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
|
||||
log.Error("Code is missing", "account", common.BytesToHash(it.LeafKey()))
|
||||
return errors.New("missing code")
|
||||
}
|
||||
|
||||
@ -217,34 +255,37 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string) error {
|
||||
default:
|
||||
return errors.New("unexpected node type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return it.Error()
|
||||
}
|
||||
|
||||
// Full-trie concurrent snapshot
|
||||
func (s *Service) createSnapshotAsync(tree state.Trie, headerID string, workers uint) error {
|
||||
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string) error {
|
||||
errors := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
for _, it := range iter.SubtrieIterators(tree, workers) {
|
||||
for _, it := range iters {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
go func(it trie.NodeIterator) {
|
||||
defer wg.Done()
|
||||
if err := s.createSnapshot(it, headerID); err != nil {
|
||||
errors <- err
|
||||
}
|
||||
}()
|
||||
}(it)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(errors)
|
||||
wg.Wait()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
var err error
|
||||
select {
|
||||
case err := <-errors:
|
||||
return err
|
||||
case err = <-errors:
|
||||
case <-done:
|
||||
close(errors)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) storageSnapshot(sr common.Hash, headerID string, statePath []byte, tx Tx) (Tx, error) {
|
||||
|
@ -1,12 +1,16 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
|
||||
fixt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/fixture"
|
||||
mock "github.com/vulcanize/eth-pg-ipfs-state-snapshot/mocks/snapshot"
|
||||
snapt "github.com/vulcanize/eth-pg-ipfs-state-snapshot/pkg/types"
|
||||
"github.com/vulcanize/eth-pg-ipfs-state-snapshot/test"
|
||||
)
|
||||
|
||||
@ -24,49 +28,113 @@ func testConfig(leveldbpath, ancientdbpath string) *Config {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateSnapshot(t *testing.T) {
|
||||
config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
|
||||
|
||||
edb, err := NewLevelDB(config.Eth)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
workers := 4
|
||||
|
||||
func makeMocks(t *testing.T) (*mock.MockPublisher, *mock.MockTx) {
|
||||
ctl := gomock.NewController(t)
|
||||
tx := mock.NewMockTx(ctl)
|
||||
pub := mock.NewMockPublisher(ctl)
|
||||
|
||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Header1))
|
||||
pub.EXPECT().BeginTx().
|
||||
Return(tx, nil).
|
||||
Times(workers)
|
||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).
|
||||
Return(tx, nil).
|
||||
Times(workers)
|
||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
Times(workers)
|
||||
// TODO: fixtures for storage node
|
||||
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
|
||||
// pub.EXPECT().CommitTx(gomock.Any()).
|
||||
// Times(workers)
|
||||
|
||||
tx.EXPECT().Commit().
|
||||
Times(workers)
|
||||
|
||||
service, err := NewSnapshotService(edb, pub)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
params := SnapshotParams{Height: 1, Workers: uint(workers)}
|
||||
err = service.CreateSnapshot(params)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// err = service.CreateLatestSnapshot(0)
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
tx := mock.NewMockTx(ctl)
|
||||
return pub, tx
|
||||
}
|
||||
|
||||
func TestCreateSnapshot(t *testing.T) {
|
||||
runCase := func(t *testing.T, workers int) {
|
||||
pub, tx := makeMocks(t)
|
||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
||||
pub.EXPECT().BeginTx().Return(tx, nil).
|
||||
Times(workers)
|
||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
|
||||
AnyTimes()
|
||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
Times(len(fixt.Block1_StateNodePaths))
|
||||
|
||||
// TODO: fixtures for storage node
|
||||
// pub.EXPECT().PublishStorageNode(gomock.Eq(fixt.StorageNode), gomock.Eq(int64(0)), gomock.Any())
|
||||
|
||||
tx.EXPECT().Commit().
|
||||
Times(workers)
|
||||
|
||||
config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
|
||||
edb, err := NewLevelDB(config.Eth)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer edb.Close()
|
||||
|
||||
recovery := filepath.Join(t.TempDir(), "recover.csv")
|
||||
service, err := NewSnapshotService(edb, pub, recovery)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
params := SnapshotParams{Height: 1, Workers: uint(workers)}
|
||||
err = service.CreateSnapshot(params)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
testCases := []int{1, 4, 16, 32}
|
||||
for _, tc := range testCases {
|
||||
t.Run("case", func(t *testing.T) { runCase(t, tc) })
|
||||
}
|
||||
}
|
||||
|
||||
func failingPublishStateNode(_ *snapt.Node, _ string, _ snapt.Tx) error {
|
||||
return errors.New("failingPublishStateNode")
|
||||
}
|
||||
|
||||
func TestRecovery(t *testing.T) {
|
||||
runCase := func(t *testing.T, workers int) {
|
||||
pub, tx := makeMocks(t)
|
||||
pub.EXPECT().PublishHeader(gomock.Any()).AnyTimes()
|
||||
pub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
|
||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
Times(workers).
|
||||
DoAndReturn(failingPublishStateNode)
|
||||
tx.EXPECT().Commit().AnyTimes()
|
||||
|
||||
config := testConfig(fixt.ChaindataPath, fixt.AncientdataPath)
|
||||
edb, err := NewLevelDB(config.Eth)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer edb.Close()
|
||||
|
||||
recovery := filepath.Join(t.TempDir(), "recover.csv")
|
||||
service, err := NewSnapshotService(edb, pub, recovery)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
params := SnapshotParams{Height: 1, Workers: uint(workers)}
|
||||
err = service.CreateSnapshot(params)
|
||||
if err == nil {
|
||||
t.Fatal("expected an error")
|
||||
}
|
||||
|
||||
if _, err = os.Stat(recovery); err != nil {
|
||||
t.Fatal("cannot stat recovery file:", err)
|
||||
}
|
||||
|
||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
|
||||
err = service.CreateSnapshot(params)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = os.Stat(recovery)
|
||||
if err == nil {
|
||||
t.Fatal("recovery file still present")
|
||||
} else {
|
||||
if !os.IsNotExist(err) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testCases := []int{1, 4, 32}
|
||||
for _, tc := range testCases {
|
||||
t.Run("case", func(t *testing.T) { runCase(t, tc) })
|
||||
}
|
||||
|
||||
}
|
||||
|
173
pkg/snapshot/tracker.go
Normal file
173
pkg/snapshot/tracker.go
Normal file
@ -0,0 +1,173 @@
|
||||
package snapshot
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
iter "github.com/vulcanize/go-eth-state-node-iterator"
|
||||
)
|
||||
|
||||
type trackedIter struct {
|
||||
trie.NodeIterator
|
||||
tracker *iteratorTracker
|
||||
}
|
||||
|
||||
func (it *trackedIter) Next(descend bool) bool {
|
||||
ret := it.NodeIterator.Next(descend)
|
||||
if !ret {
|
||||
it.tracker.stopChan <- it
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
type iteratorTracker struct {
|
||||
recoveryFile string
|
||||
|
||||
startChan chan *trackedIter
|
||||
stopChan chan *trackedIter
|
||||
started map[*trackedIter]struct{}
|
||||
stopped []*trackedIter
|
||||
|
||||
haltChan chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newTracker(file string, buf int) iteratorTracker {
|
||||
return iteratorTracker{
|
||||
recoveryFile: file,
|
||||
startChan: make(chan *trackedIter, buf),
|
||||
stopChan: make(chan *trackedIter, buf),
|
||||
started: map[*trackedIter]struct{}{},
|
||||
haltChan: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (tr *iteratorTracker) captureSignal() {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
sig := <-sigChan
|
||||
log.Errorf("Signal received (%v), stopping", sig)
|
||||
tr.haltAndDump()
|
||||
os.Exit(1)
|
||||
}()
|
||||
}
|
||||
|
||||
// listens for starts/stops and manages current state
|
||||
func (tr *iteratorTracker) run() {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case start := <-tr.startChan:
|
||||
tr.started[start] = struct{}{}
|
||||
case stop := <-tr.stopChan:
|
||||
tr.stopped = append(tr.stopped, stop)
|
||||
case <-tr.haltChan:
|
||||
break loop
|
||||
default:
|
||||
}
|
||||
}
|
||||
tr.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
|
||||
ret = &trackedIter{it, tr}
|
||||
tr.startChan <- ret
|
||||
return
|
||||
}
|
||||
|
||||
// dumps iterator path and bounds to a text file so it can be restored later
|
||||
func (tr *iteratorTracker) dump() error {
|
||||
log.Info("Dumping recovery state to:", tr.recoveryFile)
|
||||
var rows [][]string
|
||||
for it, _ := range tr.started {
|
||||
var endPath []byte
|
||||
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
||||
endPath = impl.EndPath
|
||||
}
|
||||
rows = append(rows, []string{
|
||||
fmt.Sprintf("%x", it.Path()),
|
||||
fmt.Sprintf("%x", endPath),
|
||||
})
|
||||
}
|
||||
file, err := os.Create(tr.recoveryFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
out := csv.NewWriter(file)
|
||||
return out.WriteAll(rows)
|
||||
}
|
||||
|
||||
// attempts to read iterator state from file
|
||||
// if file doesn't exist, returns an empty slice with no error
|
||||
func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) {
|
||||
file, err := os.Open(tr.recoveryFile)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
in := csv.NewReader(file)
|
||||
in.FieldsPerRecord = 2
|
||||
rows, err := in.ReadAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ret []trie.NodeIterator
|
||||
for _, row := range rows {
|
||||
// pick up where each interval left off
|
||||
var paths [2][]byte
|
||||
for i, val := range row {
|
||||
if len(val) != 0 {
|
||||
if _, err = fmt.Sscanf(val, "%x", &paths[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
it := iter.NewPrefixBoundIterator(tree, paths[0], paths[1])
|
||||
ret = append(ret, tr.tracked(it))
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (tr *iteratorTracker) haltAndDump() error {
|
||||
tr.haltChan <- struct{}{}
|
||||
<-tr.done
|
||||
|
||||
// drain any pending events
|
||||
close(tr.startChan)
|
||||
for start := range tr.startChan {
|
||||
tr.started[start] = struct{}{}
|
||||
}
|
||||
close(tr.stopChan)
|
||||
for stop := range tr.stopChan {
|
||||
tr.stopped = append(tr.stopped, stop)
|
||||
}
|
||||
|
||||
for _, stop := range tr.stopped {
|
||||
delete(tr.started, stop)
|
||||
}
|
||||
|
||||
if len(tr.started) == 0 {
|
||||
// if the tracker state is empty, erase any existing recovery file
|
||||
err := os.Remove(tr.recoveryFile)
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return tr.dump()
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
@ -45,8 +46,14 @@ func NoError(t *testing.T, err error) {
|
||||
}
|
||||
|
||||
// ExpectEqual asserts the provided interfaces are deep equal
|
||||
func ExpectEqual(t *testing.T, want interface{}, got interface{}) {
|
||||
func ExpectEqual(t *testing.T, want, got interface{}) {
|
||||
if !reflect.DeepEqual(want, got) {
|
||||
t.Fatalf("Expected: %v\nActual: %v", want, got)
|
||||
t.Fatalf("Values not equal:\nExpected:\t%v\nActual:\t\t%v", want, got)
|
||||
}
|
||||
}
|
||||
|
||||
func ExpectEqualBytes(t *testing.T, want, got []byte) {
|
||||
if !bytes.Equal(want, got) {
|
||||
t.Fatalf("Bytes not equal:\nExpected:\t%v\nActual:\t\t%v", want, got)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user