ipld-eth-state-snapshot/pkg/snapshot/tracker.go

278 lines
6.6 KiB
Go
Raw Permalink Normal View History

package snapshot
import (
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
"bytes"
"context"
"encoding/csv"
"fmt"
"os"
2022-03-11 14:28:32 +00:00
"os/signal"
"sync"
"sync/atomic"
2022-03-11 14:28:32 +00:00
"syscall"
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
iter "github.com/ethereum/go-ethereum/trie/concurrent_iterator"
2022-03-11 14:28:32 +00:00
log "github.com/sirupsen/logrus"
)
var trackedIterCount int32
type trackedIter struct {
id int32
mu sync.Mutex
done atomic.Bool
trie.NodeIterator
tracker *iteratorTracker
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
seekedPath []byte // latest full node path seeked from the tracked iterator
startPath []byte // startPath for the tracked iterator
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
endPath []byte // endPath for the tracked iterator
lastPath []byte // latest it.Path() (not the full node path) seeked
}
func (it *trackedIter) getLastPath() []byte {
it.mu.Lock()
defer it.mu.Unlock()
return it.lastPath
}
func (it *trackedIter) setLastPath(val []byte) {
it.mu.Lock()
defer it.mu.Unlock()
it.lastPath = val
}
func (it *trackedIter) Next(descend bool) bool {
ret := it.NodeIterator.Next(descend)
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
if !ret {
if it.tracker.running {
it.tracker.stopChan <- it
} else {
log.Errorf("iterator stopped after tracker halted: path=%x", it.Path())
}
it.done.Store(true)
} else {
it.setLastPath(it.Path())
}
return ret
}
type iteratorTracker struct {
2022-03-11 14:28:32 +00:00
recoveryFile string
startChan chan *trackedIter
stopChan chan *trackedIter
started map[*trackedIter]struct{}
stopped []*trackedIter
running bool
}
2022-03-11 14:28:32 +00:00
func newTracker(file string, buf int) iteratorTracker {
return iteratorTracker{
2022-03-11 14:28:32 +00:00
recoveryFile: file,
startChan: make(chan *trackedIter, buf),
stopChan: make(chan *trackedIter, buf),
started: map[*trackedIter]struct{}{},
running: true,
}
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func (tr *iteratorTracker) captureSignal(cancelCtx context.CancelFunc) {
2022-03-11 14:28:32 +00:00
sigChan := make(chan os.Signal, 1)
2022-03-16 10:48:51 +00:00
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
2022-03-11 14:28:32 +00:00
go func() {
sig := <-sigChan
log.Errorf("Signal received (%v), stopping", sig)
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// cancel context on receiving a signal
// on ctx cancellation, all the iterators complete processing of their current node before stopping
cancelCtx()
2022-03-11 14:28:32 +00:00
}()
}
// Wraps an iterator in a trackedIter. This should not be called once halts are possible.
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
func (tr *iteratorTracker) tracked(it trie.NodeIterator, recoveredPath []byte) (ret *trackedIter) {
// create seeked path of max capacity (65)
iterSeekedPath := make([]byte, 0, 65)
// intially populate seeked path with the recovered path
// to be used in trie traversal
if recoveredPath != nil {
iterSeekedPath = append(iterSeekedPath, recoveredPath...)
}
// if the iterator being tracked is a PrefixBoundIterator, capture it's end path
// to be used in trie traversal
var endPath []byte
var startPath []byte
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
if boundedIter, ok := it.(*iter.PrefixBoundIterator); ok {
startPath = boundedIter.StartPath
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
endPath = boundedIter.EndPath
}
ret = &trackedIter{
atomic.AddInt32(&trackedIterCount, 1),
sync.Mutex{},
atomic.Bool{},
it,
tr,
iterSeekedPath,
startPath,
endPath,
nil,
}
tr.startChan <- ret
if prom.Enabled() {
pathDepth := max(max(len(startPath), len(endPath)), 1)
totalSteps := estimateSteps(startPath, endPath, pathDepth)
prom.RegisterGaugeFunc(
fmt.Sprintf("tracked_iterator_%d", ret.id),
func() float64 {
if ret.done.Load() {
return 100.0
}
lastPath := ret.getLastPath()
if nil == lastPath {
return 0.0
}
remainingSteps := estimateSteps(lastPath, endPath, pathDepth)
return (float64(totalSteps) - float64(remainingSteps)) / float64(totalSteps) * 100.0
})
}
return
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// explicitly stops an iterator
func (tr *iteratorTracker) stopIter(it *trackedIter) {
tr.stopChan <- it
}
// dumps iterator path and bounds to a text file so it can be restored later
2022-03-11 14:28:32 +00:00
func (tr *iteratorTracker) dump() error {
2022-06-08 12:08:17 +00:00
log.Debug("Dumping recovery state to: ", tr.recoveryFile)
var rows [][]string
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
for it := range tr.started {
var startPath []byte
var endPath []byte
if impl, ok := it.NodeIterator.(*iter.PrefixBoundIterator); ok {
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// if the iterator being tracked is a PrefixBoundIterator,
// initialize start and end paths with its bounds
startPath = impl.StartPath
endPath = impl.EndPath
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// if seeked path and iterator path are non-empty, use iterator's path as startpath
if !bytes.Equal(it.seekedPath, []byte{}) && !bytes.Equal(it.Path(), []byte{}) {
startPath = it.Path()
}
rows = append(rows, []string{
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
fmt.Sprintf("%x", startPath),
fmt.Sprintf("%x", endPath),
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
fmt.Sprintf("%x", it.seekedPath),
})
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
2022-03-11 14:28:32 +00:00
file, err := os.Create(tr.recoveryFile)
if err != nil {
return err
}
defer file.Close()
out := csv.NewWriter(file)
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
return out.WriteAll(rows)
}
// attempts to read iterator state from file
// if file doesn't exist, returns an empty slice with no error
2022-03-11 14:28:32 +00:00
func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) {
file, err := os.Open(tr.recoveryFile)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
2022-06-08 12:08:17 +00:00
log.Debug("Restoring recovery state from: ", tr.recoveryFile)
defer file.Close()
in := csv.NewReader(file)
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
in.FieldsPerRecord = 3
rows, err := in.ReadAll()
if err != nil {
return nil, err
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
var ret []trie.NodeIterator
for _, row := range rows {
// pick up where each interval left off
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
var startPath []byte
var endPath []byte
var recoveredPath []byte
if len(row[0]) != 0 {
if _, err = fmt.Sscanf(row[0], "%x", &startPath); err != nil {
return nil, err
}
}
if len(row[1]) != 0 {
if _, err = fmt.Sscanf(row[1], "%x", &endPath); err != nil {
return nil, err
}
}
if len(row[2]) != 0 {
if _, err = fmt.Sscanf(row[2], "%x", &recoveredPath); err != nil {
return nil, err
}
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// force the lower bound path to an even length
// (required by HexToKeyBytes())
if len(startPath)&0b1 == 1 {
// decrement first to avoid skipped nodes
decrementPath(startPath)
startPath = append(startPath, 0)
2022-05-31 15:34:02 +00:00
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath)
ret = append(ret, tr.tracked(it, recoveredPath))
}
return ret, nil
}
2022-03-11 14:28:32 +00:00
func (tr *iteratorTracker) haltAndDump() error {
tr.running = false
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
// drain any pending iterators
close(tr.startChan)
for start := range tr.startChan {
tr.started[start] = struct{}{}
}
close(tr.stopChan)
for stop := range tr.stopChan {
tr.stopped = append(tr.stopped, stop)
}
for _, stop := range tr.stopped {
delete(tr.started, stop)
}
if len(tr.started) == 0 {
// if the tracker state is empty, erase any existing recovery file
2022-03-11 14:28:32 +00:00
err := os.Remove(tr.recoveryFile)
if os.IsNotExist(err) {
err = nil
}
return err
}
Account selective snapshot (#46) * snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided * config and env updates * cmd update * Encode watched address path bytes to hex for comparison * actually ignore the subtries that are not along the paths of interest * Fixes for account selective snapshot * Use non-concurrent iterator when having a single worker * Only index root node when starting path of an iterator is nil * Upgrade deps * Avoid tracking iterators and skip recovery test * Fix recovery mechanism, use sync Map instead of buffered channels * Add test for account selective snapshot * Continue traversal with concurrent iterators with starting path nil * Use errgroup to simplify error handling with concurrent iterators * Check if all the nodes are indexed in the recovery test * Use concurrency safe sync Map in account selective snapshot test * Only track concurrent iterators and refactor code * Fix node and recovered path comparison * Revert back to using buffered channels for tracking iterators * Add a metric to monitor number of active iterators * Update docs * Update seeked path after node is processed * Return error on context cancellation from subtrie iteration * Add tests for account selective snapshot recovery * Explicity enforce concurrent iterator bounds to avoid duplicate nodes * Update full snapshot test to check nodes being indexed * Refactor code to simplify snapshot logic * Remove unnecessary function argument * Use ctx cancellation for handling signals * Add descriptive comments Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
2022-08-03 11:35:04 +00:00
2022-03-11 14:28:32 +00:00
return tr.dump()
}