From d72e3214c147f9abcac1aee75b2f34740e495835 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 19 Aug 2022 18:18:31 -0500 Subject: [PATCH] Add tracker package Moved from snapshot repo --- go.mod | 7 +- go.sum | 7 +- tracker/tracker.go | 243 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 254 insertions(+), 3 deletions(-) create mode 100644 tracker/tracker.go diff --git a/go.mod b/go.mod index 340c215..e08682c 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module github.com/vulcanize/go-eth-state-node-iterator go 1.18 -require github.com/ethereum/go-ethereum v1.10.23 +require ( + github.com/ethereum/go-ethereum v1.10.23 + github.com/sirupsen/logrus v1.9.0 +) require ( github.com/VictoriaMetrics/fastcache v1.6.0 // indirect @@ -24,7 +27,7 @@ require ( github.com/tklauser/numcpus v0.2.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect - golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect ) replace github.com/ethereum/go-ethereum v1.10.23 => github.com/vulcanize/go-ethereum v1.10.23-statediff-4.2.0-alpha diff --git a/go.sum b/go.sum index b95412a..fcac0e1 100644 --- a/go.sum +++ b/go.sum @@ -108,12 +108,15 @@ github.com/prometheus/tsdb v0.10.0/go.mod h1:oi49uRhEe9dPUTlS3JRZOwJuVi6tmh10QSg github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a h1:1ur3QoCqvE5fl+nylMaIr9PVV1w343YRDtsy+Rwu7XI= @@ -169,8 +172,9 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -205,5 +209,6 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tracker/tracker.go b/tracker/tracker.go new file mode 100644 index 0000000..d3ea7b9 --- /dev/null +++ b/tracker/tracker.go @@ -0,0 +1,243 @@ +package tracker + +import ( + "bytes" + "context" + "encoding/csv" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/trie" + log "github.com/sirupsen/logrus" + + iter "github.com/vulcanize/go-eth-state-node-iterator" +) + +type Tracker struct { + recoveryFile string + + startChan chan *Iterator + stopChan chan *Iterator + started map[*Iterator]struct{} + stopped []*Iterator + running bool +} + +type Iterator struct { + trie.NodeIterator + tracker *Tracker + + SeekedPath []byte // latest path seeked from the tracked iterator + EndPath []byte // endPath for the tracked iterator +} + +func New(file string, buf uint) Tracker { + return Tracker{ + recoveryFile: file, + startChan: make(chan *Iterator, buf), + stopChan: make(chan *Iterator, buf), + started: map[*Iterator]struct{}{}, + running: true, + } +} + +func (tr *Tracker) CaptureSignal(cancelCtx context.CancelFunc) { + sigChan := make(chan os.Signal, 1) + + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigChan + log.Errorf("Signal received (%v), stopping", sig) + // cancel context on receiving a signal + // on ctx cancellation, all the iterators complete processing of their current node before stopping + cancelCtx() + }() +} + +// Wraps an iterator in a Iterator. This should not be called once halts are possible. +func (tr *Tracker) Tracked(it trie.NodeIterator, recoveredPath []byte) (ret *Iterator) { + // create seeked path of max capacity (65) + iterSeekedPath := make([]byte, 0, 65) + // intially populate seeked path with the recovered path + // to be used in trie traversal + if recoveredPath != nil { + iterSeekedPath = append(iterSeekedPath, recoveredPath...) + } + + // if the iterator being tracked is a PrefixBoundIterator, capture it's end path + // to be used in trie traversal + var endPath []byte + if boundedIter, ok := it.(*iter.PrefixBoundIterator); ok { + endPath = boundedIter.EndPath + } + + ret = &Iterator{it, tr, iterSeekedPath, endPath} + tr.startChan <- ret + return +} + +// explicitly stops an iterator +func (tr *Tracker) StopIterator(it *Iterator) { + tr.stopChan <- it +} + +// dumps iterator path and bounds to a text file so it can be restored later +func (tr *Tracker) dump() error { + log.Debugf("Dumping recovery state to: %s", tr.recoveryFile) + var rows [][]string + for it := range tr.started { + var startPath []byte + var endPath []byte + if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok { + // if the iterator being tracked is a PrefixBoundIterator, + // initialize start and end paths with its bounds + startPath = impl.StartPath + endPath = impl.EndPath + } + + // if seeked path and iterator path are non-empty, use iterator's path as startpath + if !bytes.Equal(it.SeekedPath, []byte{}) && !bytes.Equal(it.Path(), []byte{}) { + startPath = it.Path() + } + + rows = append(rows, []string{ + fmt.Sprintf("%x", startPath), + fmt.Sprintf("%x", endPath), + fmt.Sprintf("%x", it.SeekedPath), + }) + } + + file, err := os.Create(tr.recoveryFile) + if err != nil { + return err + } + defer file.Close() + out := csv.NewWriter(file) + + return out.WriteAll(rows) +} + +// attempts to read iterator state from file +// if file doesn't exist, returns an empty slice with no error +func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) { + file, err := os.Open(tr.recoveryFile) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + log.Debugf("Restoring recovery state from: %s", tr.recoveryFile) + + defer file.Close() + in := csv.NewReader(file) + in.FieldsPerRecord = 3 + rows, err := in.ReadAll() + if err != nil { + return nil, err + } + + var ret []trie.NodeIterator + for _, row := range rows { + // pick up where each interval left off + var startPath []byte + var endPath []byte + var recoveredPath []byte + + if len(row[0]) != 0 { + if _, err = fmt.Sscanf(row[0], "%x", &startPath); err != nil { + return nil, err + } + } + if len(row[1]) != 0 { + if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil { + return nil, err + } + } + if len(row[2]) != 0 { + if _, err = fmt.Sscanf(row[2], "%x", &recoveredPath); err != nil { + return nil, err + } + } + + // force the lower bound path to an even length (required by geth API/HexToKeyBytes) + if len(startPath)&0b1 == 1 { + // decrement first to avoid skipped nodes + decrementPath(startPath) + startPath = append(startPath, 0) + } + it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath) + ret = append(ret, tr.Tracked(it, recoveredPath)) + } + + log.Debugf("Restored %d iterators", len(ret)) + return ret, nil +} + +func (tr *Tracker) HaltAndDump() error { + tr.running = false + + // drain any pending iterators + close(tr.startChan) + for start := range tr.startChan { + tr.started[start] = struct{}{} + } + close(tr.stopChan) + for stop := range tr.stopChan { + tr.stopped = append(tr.stopped, stop) + } + + for _, stop := range tr.stopped { + delete(tr.started, stop) + } + + if len(tr.started) == 0 { + // if the tracker state is empty, erase any existing recovery file + err := os.Remove(tr.recoveryFile) + if os.IsNotExist(err) { + err = nil + } + return err + } + + return tr.dump() +} + +func (it *Iterator) Next(descend bool) bool { + ret := it.NodeIterator.Next(descend) + + if !ret { + if it.tracker.running { + it.tracker.stopChan <- it + } else { + log.Errorf("iterator stopped after tracker halted: path=%x", it.Path()) + } + } + return ret +} + +// Subtracts 1 from the last byte in a path slice, carrying if needed. +// Does nothing, returning false, for all-zero inputs. +func decrementPath(path []byte) bool { + // check for all zeros + allzero := true + for i := 0; i < len(path); i++ { + allzero = allzero && path[i] == 0 + } + if allzero { + return false + } + for i := len(path) - 1; i >= 0; i-- { + val := path[i] + path[i]-- + if val == 0 { + path[i] = 0xf + } else { + return true + } + } + return true +}