From 2481d0a28d8c303b06bcce8f5dbb1a3d567abc5b Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 11 Mar 2022 22:28:32 +0800 Subject: [PATCH] recovery - handle signals --- pkg/snapshot/service.go | 7 ++++--- pkg/snapshot/tracker.go | 45 +++++++++++++++++++++++++++++------------ 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 4efe153..8978dd0 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -98,12 +98,13 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { return err } headerID := header.Hash().String() - s.tracker = newTracker(int(params.Workers)) + s.tracker = newTracker(s.recoveryFile, int(params.Workers)) go s.tracker.run() + go s.tracker.captureSignal() var iters []trie.NodeIterator // attempt to restore from recovery file if it exists - iters, err = s.tracker.restore(tree, s.recoveryFile) + iters, err = s.tracker.restore(tree) if err != nil { return err } @@ -126,7 +127,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error { } defer func() { - err := s.tracker.haltAndDump(s.recoveryFile) + err := s.tracker.haltAndDump() if err != nil { log.Error("failed to write recovery file: ", err) } diff --git a/pkg/snapshot/tracker.go b/pkg/snapshot/tracker.go index 772da3c..4ff7634 100644 --- a/pkg/snapshot/tracker.go +++ b/pkg/snapshot/tracker.go @@ -4,9 +4,12 @@ import ( "encoding/csv" "fmt" "os" + "os/signal" + "syscall" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/trie" + log "github.com/sirupsen/logrus" iter "github.com/vulcanize/go-eth-state-node-iterator" ) @@ -25,6 +28,8 @@ func (it *trackedIter) Next(descend bool) bool { } type iteratorTracker struct { + recoveryFile string + startChan chan *trackedIter stopChan chan *trackedIter started map[*trackedIter]struct{} @@ -34,16 +39,29 @@ type iteratorTracker struct { done chan struct{} } -func newTracker(buf int) iteratorTracker { +func newTracker(file string, buf int) iteratorTracker { return iteratorTracker{ - startChan: make(chan *trackedIter, buf), - stopChan: make(chan *trackedIter, buf), - started: map[*trackedIter]struct{}{}, - haltChan: make(chan struct{}), - done: make(chan struct{}), + recoveryFile: file, + startChan: make(chan *trackedIter, buf), + stopChan: make(chan *trackedIter, buf), + started: map[*trackedIter]struct{}{}, + haltChan: make(chan struct{}), + done: make(chan struct{}), } } +func (tr *iteratorTracker) captureSignal() { + sigChan := make(chan os.Signal, 1) + + signal.Notify(sigChan, syscall.SIGINT) + go func() { + sig := <-sigChan + log.Errorf("Signal received (%v), stopping", sig) + tr.haltAndDump() + os.Exit(1) + }() +} + // listens for starts/stops and manages current state func (tr *iteratorTracker) run() { loop: @@ -68,7 +86,8 @@ func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) { } // dumps iterator path and bounds to a text file so it can be restored later -func (tr *iteratorTracker) dump(path string) error { +func (tr *iteratorTracker) dump() error { + log.Info("Dumping recovery state to:", tr.recoveryFile) var rows [][]string for it, _ := range tr.started { var endPath []byte @@ -80,7 +99,7 @@ func (tr *iteratorTracker) dump(path string) error { fmt.Sprintf("%x", endPath), }) } - file, err := os.Create(path) + file, err := os.Create(tr.recoveryFile) if err != nil { return err } @@ -91,8 +110,8 @@ func (tr *iteratorTracker) dump(path string) error { // attempts to read iterator state from file // if file doesn't exist, returns an empty slice with no error -func (tr *iteratorTracker) restore(tree state.Trie, path string) ([]trie.NodeIterator, error) { - file, err := os.Open(path) +func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) { + file, err := os.Open(tr.recoveryFile) if err != nil { if os.IsNotExist(err) { return nil, nil @@ -124,7 +143,7 @@ func (tr *iteratorTracker) restore(tree state.Trie, path string) ([]trie.NodeIte return ret, nil } -func (tr *iteratorTracker) haltAndDump(path string) error { +func (tr *iteratorTracker) haltAndDump() error { tr.haltChan <- struct{}{} <-tr.done @@ -144,11 +163,11 @@ func (tr *iteratorTracker) haltAndDump(path string) error { if len(tr.started) == 0 { // if the tracker state is empty, erase any existing recovery file - err := os.Remove(path) + err := os.Remove(tr.recoveryFile) if os.IsNotExist(err) { err = nil } return err } - return tr.dump(path) + return tr.dump() }