eth-iterator-utils/tracker/tracker.go

247 lines
6.0 KiB
Go

package tracker
import (
"encoding/csv"
"fmt"
"os"
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie"
iter "github.com/cerc-io/eth-iterator-utils"
)
type Tracker interface {
Restore(iter.IteratorConstructor) ([]trie.NodeIterator, error)
Tracked(trie.NodeIterator) trie.NodeIterator
CloseAndSave() error
}
var _ Tracker = &trackerAdaptor{}
// Wrap the tracker state to only expose NodeIterators
type trackerAdaptor struct {
*TrackerImpl
}
// New creates a new tracker which saves state to a given file. bufsize sets the size of the
// channel buffers used internally to manage tracking. Note that passing a bufsize smaller than the expected
// number of concurrent iterators could lead to deadlock.
func New(file string, bufsize uint) *trackerAdaptor {
return &trackerAdaptor{NewImpl(file, bufsize)}
}
func (tr *trackerAdaptor) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
its, err := tr.TrackerImpl.Restore(makeIterator)
if err != nil {
return nil, err
}
var ret []trie.NodeIterator
for _, it := range its {
ret = append(ret, it)
}
return ret, nil
}
func (tr *trackerAdaptor) Tracked(it trie.NodeIterator) trie.NodeIterator {
var trick any = tr.TrackerImpl.Tracked(it)
return trick.(trie.NodeIterator)
}
func NewImpl(file string, bufsize uint) *TrackerImpl {
return &TrackerImpl{
recoveryFile: file,
startChan: make(chan *Iterator, bufsize),
stopChan: make(chan *Iterator, bufsize),
started: map[*Iterator]struct{}{},
running: true,
}
}
type TrackerImpl struct {
recoveryFile string
startChan chan *Iterator
stopChan chan *Iterator
started map[*Iterator]struct{}
stopped []*Iterator
running bool
sync.RWMutex // guards closing of the tracker
}
type Iterator struct {
trie.NodeIterator
tracker *TrackerImpl
}
// Tracked wraps an iterator in a Iterator. This should not be called once halts are possible.
func (tr *TrackerImpl) Tracked(it trie.NodeIterator) *Iterator {
ret := &Iterator{it, tr}
tr.startChan <- ret
return ret
}
// StopIterator explicitly stops an iterator
func (tr *TrackerImpl) StopIterator(it *Iterator) {
tr.RLock()
defer tr.RUnlock()
if tr.running {
tr.stopChan <- it
}
}
// Save dumps iterator path and bounds to a text file so it can be restored later.
func (tr *TrackerImpl) Save() error {
log.Debug("Saving recovery state", "to", tr.recoveryFile)
// if the tracker state is empty, erase any existing recovery file
if len(tr.started) == 0 {
return tr.removeRecoveryFile()
}
var rows [][]string
for it := range tr.started {
_, endPath := it.Bounds()
rows = append(rows, []string{
fmt.Sprintf("%x", it.Path()),
fmt.Sprintf("%x", endPath),
})
}
file, err := os.Create(tr.recoveryFile)
if err != nil {
return err
}
defer file.Close()
out := csv.NewWriter(file)
return out.WriteAll(rows)
}
func (tr *TrackerImpl) removeRecoveryFile() error {
err := os.Remove(tr.recoveryFile)
if os.IsNotExist(err) {
err = nil
}
return err
}
// Restore attempts to read iterator state from the recovery file.
// If the file doesn't exist, returns an empty slice with no error.
// Restored iterators are constructed in the same order they appear in the returned slice.
func (tr *TrackerImpl) Restore(makeIterator iter.IteratorConstructor) ([]*Iterator, error) {
file, err := os.Open(tr.recoveryFile)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
defer file.Close()
log.Debug("Restoring recovery state", "from", tr.recoveryFile)
in := csv.NewReader(file)
in.FieldsPerRecord = 2
rows, err := in.ReadAll()
if err != nil {
return nil, err
}
var ret []*Iterator
for _, row := range rows {
// pick up where each recovered iterator left off
var recoveredPath []byte
var endPath []byte
if len(row[0]) != 0 {
if _, err = fmt.Sscanf(row[0], "%x", &recoveredPath); err != nil {
return nil, err
}
}
if len(row[1]) != 0 {
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
return nil, err
}
}
// force the lower bound path to an even length (required by geth API/HexToKeyBytes)
if len(recoveredPath)&1 == 1 {
// to avoid skipped nodes, we must rewind by one index
recoveredPath = rewindPath(recoveredPath)
}
it := makeIterator(iter.HexToKeyBytes(recoveredPath))
boundIt := iter.NewPrefixBoundIterator(it, endPath)
ret = append(ret, tr.Tracked(boundIt))
}
return ret, tr.removeRecoveryFile()
}
// CloseAndSave stops all tracked iterators and dumps their state to a file.
// This closes the tracker, so adding a new iterator will fail.
func (tr *TrackerImpl) CloseAndSave() error {
tr.Lock()
tr.running = false
close(tr.stopChan)
tr.Unlock()
// drain any pending iterators
close(tr.startChan)
for start := range tr.startChan {
tr.started[start] = struct{}{}
}
for stop := range tr.stopChan {
tr.stopped = append(tr.stopped, stop)
}
for _, stop := range tr.stopped {
delete(tr.started, stop)
}
return tr.Save()
}
// Next advances the iterator, notifying its owning tracker when it finishes.
func (it *Iterator) Next(descend bool) bool {
ret := it.NodeIterator.Next(descend)
if !ret {
it.tracker.RLock()
defer it.tracker.RUnlock()
if it.tracker.running {
it.tracker.stopChan <- it
} else {
log.Error("Iterator stopped after tracker halted", "path", it.Path())
}
}
return ret
}
func (it *Iterator) Bounds() ([]byte, []byte) {
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
return impl.Bounds()
}
return nil, nil
}
// Rewinds to the path of the previous (pre-order) node:
// If the last byte of the path is zero, pops it (e.g. [1 0] => [1]).
// Otherwise, decrements it and pads with 0xF to 64 bytes (e.g. [1] => [0 f f f ...]).
// The passed slice is not modified.
func rewindPath(path []byte) []byte {
if len(path) == 0 {
return path
}
if path[len(path)-1] == 0 {
return path[:len(path)-1]
}
path[len(path)-1]--
padded := make([]byte, 64)
i := copy(padded, path)
for ; i < len(padded); i++ {
padded[i] = 0xf
}
return padded
}