per-iterator prom metrics

This commit is contained in:
Roy Crihfield 2023-09-25 18:37:00 +08:00
parent 5b0cc124a5
commit e6c103bfb6

View File

@ -19,9 +19,12 @@ import (
"context" "context"
"fmt" "fmt"
"math/big" "math/big"
"os"
"os/signal"
"sync" "sync"
"syscall"
"github.com/cerc-io/eth-iterator-utils/tracker" "github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
statediff "github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer" "github.com/cerc-io/plugeth-statediff/indexer"
@ -90,11 +93,14 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
if header == nil { if header == nil {
return fmt.Errorf("unable to read canonical header at height %d", params.Height) return fmt.Errorf("unable to read canonical header at height %d", params.Height)
} }
log.Info("Creating snapshot", "height", params.Height, "hash", hash) log.WithField("height", params.Height).WithField("hash", hash).Info("Creating snapshot")
// Context for snapshot work // Context for snapshot work
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx() defer cancelCtx()
// Cancel context on receiving a signal. On cancellation, all tracked iterators complete
// processing of their current node before stopping.
captureSignal(cancelCtx)
var err error var err error
tx := s.indexer.BeginTx(header.Number, ctx) tx := s.indexer.BeginTx(header.Number, ctx)
@ -106,11 +112,9 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
return err return err
} }
tracker := tracker.New(s.recoveryFile, params.Workers) tr := prom.NewTracker(s.recoveryFile, params.Workers)
tracker.CaptureSignal(cancelCtx)
defer func() { defer func() {
err := tracker.HaltAndDump() err := tr.CloseAndSave()
if err != nil { if err != nil {
log.Errorf("failed to write recovery file: %v", err) log.Errorf("failed to write recovery file: %v", err)
} }
@ -128,7 +132,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
return s.indexer.PushIPLD(tx, c) return s.indexer.PushIPLD(tx, c)
} }
// Build a diff compared against the zero hash to get a full snapshot // Build a diff against the zero hash (empty trie) to get a full snapshot
sdargs := statediff.Args{ sdargs := statediff.Args{
NewStateRoot: header.Root, NewStateRoot: header.Root,
BlockHash: header.Hash(), BlockHash: header.Hash(),
@ -140,7 +144,7 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
sdparams.ComputeWatchedAddressesLeafPaths() sdparams.ComputeWatchedAddressesLeafPaths()
builder := statediff.NewBuilder(adapt.GethStateView(s.stateDB)) builder := statediff.NewBuilder(adapt.GethStateView(s.stateDB))
builder.SetSubtrieWorkers(params.Workers) builder.SetSubtrieWorkers(params.Workers)
if err = builder.WriteStateDiffTracked(sdargs, sdparams, nodeSink, ipldSink, &tracker); err != nil { if err = builder.WriteStateDiffTracked(sdargs, sdparams, nodeSink, ipldSink, tr); err != nil {
return err return err
} }
@ -160,3 +164,14 @@ func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses []common.A
} }
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses}) return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses})
} }
func captureSignal(cb func()) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Errorf("Signal received (%v), stopping", sig)
cb()
}()
}