Add tracker package
Moved from snapshot repo
This commit is contained in:
parent
2508990948
commit
d72e3214c1
7
go.mod
7
go.mod
@ -2,7 +2,10 @@ module github.com/vulcanize/go-eth-state-node-iterator
|
||||
|
||||
go 1.18
|
||||
|
||||
require github.com/ethereum/go-ethereum v1.10.23
|
||||
require (
|
||||
github.com/ethereum/go-ethereum v1.10.23
|
||||
github.com/sirupsen/logrus v1.9.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
|
||||
@ -24,7 +27,7 @@ require (
|
||||
github.com/tklauser/numcpus v0.2.2 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
|
||||
)
|
||||
|
||||
replace github.com/ethereum/go-ethereum v1.10.23 => github.com/vulcanize/go-ethereum v1.10.23-statediff-4.2.0-alpha
|
||||
|
7
go.sum
7
go.sum
@ -108,12 +108,15 @@ github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSg
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
|
||||
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
|
||||
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
|
||||
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI=
|
||||
@ -169,8 +172,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@ -205,5 +209,6 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
243
tracker/tracker.go
Normal file
243
tracker/tracker.go
Normal file
@ -0,0 +1,243 @@
|
||||
package tracker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
iter "github.com/vulcanize/go-eth-state-node-iterator"
|
||||
)
|
||||
|
||||
type Tracker struct {
|
||||
recoveryFile string
|
||||
|
||||
startChan chan *Iterator
|
||||
stopChan chan *Iterator
|
||||
started map[*Iterator]struct{}
|
||||
stopped []*Iterator
|
||||
running bool
|
||||
}
|
||||
|
||||
type Iterator struct {
|
||||
trie.NodeIterator
|
||||
tracker *Tracker
|
||||
|
||||
SeekedPath []byte // latest path seeked from the tracked iterator
|
||||
EndPath []byte // endPath for the tracked iterator
|
||||
}
|
||||
|
||||
func New(file string, buf uint) Tracker {
|
||||
return Tracker{
|
||||
recoveryFile: file,
|
||||
startChan: make(chan *Iterator, buf),
|
||||
stopChan: make(chan *Iterator, buf),
|
||||
started: map[*Iterator]struct{}{},
|
||||
running: true,
|
||||
}
|
||||
}
|
||||
|
||||
func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
sig := <-sigChan
|
||||
log.Errorf("Signal received (%v), stopping", sig)
|
||||
// cancel context on receiving a signal
|
||||
// on ctx cancellation, all the iterators complete processing of their current node before stopping
|
||||
cancelCtx()
|
||||
}()
|
||||
}
|
||||
|
||||
// Wraps an iterator in a Iterator. This should not be called once halts are possible.
|
||||
func (tr *Tracker) Tracked(it trie.NodeIterator, recoveredPath []byte) (ret *Iterator) {
|
||||
// create seeked path of max capacity (65)
|
||||
iterSeekedPath := make([]byte, 0, 65)
|
||||
// intially populate seeked path with the recovered path
|
||||
// to be used in trie traversal
|
||||
if recoveredPath != nil {
|
||||
iterSeekedPath = append(iterSeekedPath, recoveredPath...)
|
||||
}
|
||||
|
||||
// if the iterator being tracked is a PrefixBoundIterator, capture it's end path
|
||||
// to be used in trie traversal
|
||||
var endPath []byte
|
||||
if boundedIter, ok := it.(*iter.PrefixBoundIterator); ok {
|
||||
endPath = boundedIter.EndPath
|
||||
}
|
||||
|
||||
ret = &Iterator{it, tr, iterSeekedPath, endPath}
|
||||
tr.startChan <- ret
|
||||
return
|
||||
}
|
||||
|
||||
// explicitly stops an iterator
|
||||
func (tr *Tracker) StopIterator(it *Iterator) {
|
||||
tr.stopChan <- it
|
||||
}
|
||||
|
||||
// dumps iterator path and bounds to a text file so it can be restored later
|
||||
func (tr *Tracker) dump() error {
|
||||
log.Debugf("Dumping recovery state to: %s", tr.recoveryFile)
|
||||
var rows [][]string
|
||||
for it := range tr.started {
|
||||
var startPath []byte
|
||||
var endPath []byte
|
||||
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
|
||||
// if the iterator being tracked is a PrefixBoundIterator,
|
||||
// initialize start and end paths with its bounds
|
||||
startPath = impl.StartPath
|
||||
endPath = impl.EndPath
|
||||
}
|
||||
|
||||
// if seeked path and iterator path are non-empty, use iterator's path as startpath
|
||||
if !bytes.Equal(it.SeekedPath, []byte{}) && !bytes.Equal(it.Path(), []byte{}) {
|
||||
startPath = it.Path()
|
||||
}
|
||||
|
||||
rows = append(rows, []string{
|
||||
fmt.Sprintf("%x", startPath),
|
||||
fmt.Sprintf("%x", endPath),
|
||||
fmt.Sprintf("%x", it.SeekedPath),
|
||||
})
|
||||
}
|
||||
|
||||
file, err := os.Create(tr.recoveryFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
out := csv.NewWriter(file)
|
||||
|
||||
return out.WriteAll(rows)
|
||||
}
|
||||
|
||||
// attempts to read iterator state from file
|
||||
// if file doesn't exist, returns an empty slice with no error
|
||||
func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
|
||||
file, err := os.Open(tr.recoveryFile)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
log.Debugf("Restoring recovery state from: %s", tr.recoveryFile)
|
||||
|
||||
defer file.Close()
|
||||
in := csv.NewReader(file)
|
||||
in.FieldsPerRecord = 3
|
||||
rows, err := in.ReadAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ret []trie.NodeIterator
|
||||
for _, row := range rows {
|
||||
// pick up where each interval left off
|
||||
var startPath []byte
|
||||
var endPath []byte
|
||||
var recoveredPath []byte
|
||||
|
||||
if len(row[0]) != 0 {
|
||||
if _, err = fmt.Sscanf(row[0], "%x", &startPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(row[1]) != 0 {
|
||||
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(row[2]) != 0 {
|
||||
if _, err = fmt.Sscanf(row[2], "%x", &recoveredPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// force the lower bound path to an even length (required by geth API/HexToKeyBytes)
|
||||
if len(startPath)&0b1 == 1 {
|
||||
// decrement first to avoid skipped nodes
|
||||
decrementPath(startPath)
|
||||
startPath = append(startPath, 0)
|
||||
}
|
||||
it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath)
|
||||
ret = append(ret, tr.Tracked(it, recoveredPath))
|
||||
}
|
||||
|
||||
log.Debugf("Restored %d iterators", len(ret))
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (tr *Tracker) HaltAndDump() error {
|
||||
tr.running = false
|
||||
|
||||
// drain any pending iterators
|
||||
close(tr.startChan)
|
||||
for start := range tr.startChan {
|
||||
tr.started[start] = struct{}{}
|
||||
}
|
||||
close(tr.stopChan)
|
||||
for stop := range tr.stopChan {
|
||||
tr.stopped = append(tr.stopped, stop)
|
||||
}
|
||||
|
||||
for _, stop := range tr.stopped {
|
||||
delete(tr.started, stop)
|
||||
}
|
||||
|
||||
if len(tr.started) == 0 {
|
||||
// if the tracker state is empty, erase any existing recovery file
|
||||
err := os.Remove(tr.recoveryFile)
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return tr.dump()
|
||||
}
|
||||
|
||||
func (it *Iterator) Next(descend bool) bool {
|
||||
ret := it.NodeIterator.Next(descend)
|
||||
|
||||
if !ret {
|
||||
if it.tracker.running {
|
||||
it.tracker.stopChan <- it
|
||||
} else {
|
||||
log.Errorf("iterator stopped after tracker halted: path=%x", it.Path())
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// Subtracts 1 from the last byte in a path slice, carrying if needed.
|
||||
// Does nothing, returning false, for all-zero inputs.
|
||||
func decrementPath(path []byte) bool {
|
||||
// check for all zeros
|
||||
allzero := true
|
||||
for i := 0; i < len(path); i++ {
|
||||
allzero = allzero && path[i] == 0
|
||||
}
|
||||
if allzero {
|
||||
return false
|
||||
}
|
||||
for i := len(path) - 1; i >= 0; i-- {
|
||||
val := path[i]
|
||||
path[i]--
|
||||
if val == 0 {
|
||||
path[i] = 0xf
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
Loading…
Reference in New Issue
Block a user