recovery - handle signals

This commit is contained in:
Roy Crihfield 2022-03-11 22:28:32 +08:00
parent 02488e2b79
commit 2481d0a28d
2 changed files with 36 additions and 16 deletions

View File

@ -98,12 +98,13 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
return err return err
} }
headerID := header.Hash().String() headerID := header.Hash().String()
s.tracker = newTracker(int(params.Workers)) s.tracker = newTracker(s.recoveryFile, int(params.Workers))
go s.tracker.run() go s.tracker.run()
go s.tracker.captureSignal()
var iters []trie.NodeIterator var iters []trie.NodeIterator
// attempt to restore from recovery file if it exists // attempt to restore from recovery file if it exists
iters, err = s.tracker.restore(tree, s.recoveryFile) iters, err = s.tracker.restore(tree)
if err != nil { if err != nil {
return err return err
} }
@ -126,7 +127,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
} }
defer func() { defer func() {
err := s.tracker.haltAndDump(s.recoveryFile) err := s.tracker.haltAndDump()
if err != nil { if err != nil {
log.Error("failed to write recovery file: ", err) log.Error("failed to write recovery file: ", err)
} }

View File

@ -4,9 +4,12 @@ import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"os" "os"
"os/signal"
"syscall"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
log "github.com/sirupsen/logrus"
iter "github.com/vulcanize/go-eth-state-node-iterator" iter "github.com/vulcanize/go-eth-state-node-iterator"
) )
@ -25,6 +28,8 @@ func (it *trackedIter) Next(descend bool) bool {
} }
type iteratorTracker struct { type iteratorTracker struct {
recoveryFile string
startChan chan *trackedIter startChan chan *trackedIter
stopChan chan *trackedIter stopChan chan *trackedIter
started map[*trackedIter]struct{} started map[*trackedIter]struct{}
@ -34,8 +39,9 @@ type iteratorTracker struct {
done chan struct{} done chan struct{}
} }
func newTracker(buf int) iteratorTracker { func newTracker(file string, buf int) iteratorTracker {
return iteratorTracker{ return iteratorTracker{
recoveryFile: file,
startChan: make(chan *trackedIter, buf), startChan: make(chan *trackedIter, buf),
stopChan: make(chan *trackedIter, buf), stopChan: make(chan *trackedIter, buf),
started: map[*trackedIter]struct{}{}, started: map[*trackedIter]struct{}{},
@ -44,6 +50,18 @@ func newTracker(buf int) iteratorTracker {
} }
} }
func (tr *iteratorTracker) captureSignal() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)
go func() {
sig := <-sigChan
log.Errorf("Signal received (%v), stopping", sig)
tr.haltAndDump()
os.Exit(1)
}()
}
// listens for starts/stops and manages current state // listens for starts/stops and manages current state
func (tr *iteratorTracker) run() { func (tr *iteratorTracker) run() {
loop: loop:
@ -68,7 +86,8 @@ func (tr *iteratorTracker) tracked(it trie.NodeIterator) (ret *trackedIter) {
} }
// dumps iterator path and bounds to a text file so it can be restored later // dumps iterator path and bounds to a text file so it can be restored later
func (tr *iteratorTracker) dump(path string) error { func (tr *iteratorTracker) dump() error {
log.Info("Dumping recovery state to:", tr.recoveryFile)
var rows [][]string var rows [][]string
for it, _ := range tr.started { for it, _ := range tr.started {
var endPath []byte var endPath []byte
@ -80,7 +99,7 @@ func (tr *iteratorTracker) dump(path string) error {
fmt.Sprintf("%x", endPath), fmt.Sprintf("%x", endPath),
}) })
} }
file, err := os.Create(path) file, err := os.Create(tr.recoveryFile)
if err != nil { if err != nil {
return err return err
} }
@ -91,8 +110,8 @@ func (tr *iteratorTracker) dump(path string) error {
// attempts to read iterator state from file // attempts to read iterator state from file
// if file doesn't exist, returns an empty slice with no error // if file doesn't exist, returns an empty slice with no error
func (tr *iteratorTracker) restore(tree state.Trie, path string) ([]trie.NodeIterator, error) { func (tr *iteratorTracker) restore(tree state.Trie) ([]trie.NodeIterator, error) {
file, err := os.Open(path) file, err := os.Open(tr.recoveryFile)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
@ -124,7 +143,7 @@ func (tr *iteratorTracker) restore(tree state.Trie, path string) ([]trie.NodeIte
return ret, nil return ret, nil
} }
func (tr *iteratorTracker) haltAndDump(path string) error { func (tr *iteratorTracker) haltAndDump() error {
tr.haltChan <- struct{}{} tr.haltChan <- struct{}{}
<-tr.done <-tr.done
@ -144,11 +163,11 @@ func (tr *iteratorTracker) haltAndDump(path string) error {
if len(tr.started) == 0 { if len(tr.started) == 0 {
// if the tracker state is empty, erase any existing recovery file // if the tracker state is empty, erase any existing recovery file
err := os.Remove(path) err := os.Remove(tr.recoveryFile)
if os.IsNotExist(err) { if os.IsNotExist(err) {
err = nil err = nil
} }
return err return err
} }
return tr.dump(path) return tr.dump()
} }