Make it possible to toggle off BeaconState or BeaconBlock processing selectively.

This commit is contained in:
Thomas E Lackey 2022-09-15 16:17:59 -05:00
parent e42da813bd
commit a0f5ec8d03
8 changed files with 155 additions and 79 deletions

View File

@ -52,6 +52,8 @@ type BeaconClient struct {
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
PerformBeaconStateProcessing bool // Should we process BeaconStates?
ProcessBeaconBlockProcessing bool // Should we process BeaconBlocks?
// Used for Head Tracking
@ -110,6 +112,8 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
Metrics: metrics,
UniqueNodeIdentifier: uniqueNodeIdentifier,
CheckDb: checkDb,
ProcessBeaconBlockProcessing: true,
PerformBeaconStateProcessing: true,
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}, nil
}

View File

@ -22,7 +22,6 @@ import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
@ -31,7 +30,7 @@ import (
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed)
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed)
log.Debug("Exiting Historical")
return errs
}
@ -91,7 +90,7 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan int)
processedCh := make(chan slotsToProcess)
@ -108,7 +107,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker)
go processSlotRangeWorker(ctx, workCh, errCh, spd, incrementTracker)
}
// Process all ranges and send each individual slot to the worker.

View File

@ -229,6 +229,11 @@ func (dw *DatabaseWriter) upsertSlots() error {
// Add the information for the signed_block to a transaction.
func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
if nil == dw.rawSignedBeaconBlock || len(*dw.rawSignedBeaconBlock) == 0 {
log.Warn("Skipping writing of empty BeaconBlock.")
return nil
}
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
if err != nil {
return err
@ -262,6 +267,11 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
// Add the information for the state to a transaction.
func (dw *DatabaseWriter) transactBeaconState() error {
if nil == dw.rawBeaconState || len(*dw.rawBeaconState) == 0 {
log.Warn("Skipping writing of empty BeaconState.")
return nil
}
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
if err != nil {
return err

View File

@ -31,7 +31,7 @@ var _ = Describe("Healthcheck", func() {
BeforeEach(func() {
var err error
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false)
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 8005, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred())
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred())

View File

@ -66,7 +66,7 @@ func (bc *BeaconClient) handleHead() {
bc.StartingSlot = slot
}
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
go processHeadSlot(slot, head.Block, head.State, bc.SlotProcessingDetails())
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")

View File

@ -97,14 +97,14 @@ func (hp HistoricProcessing) releaseDbLocks() error {
}
// Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) {
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) {
for {
select {
case <-ctx.Done():
return
case slot := <-workCh:
log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(ctx, db, serverAddress, slot, metrics, checkDb)
err, errProcess := handleHistoricSlot(ctx, slot, spd)
if err != nil {
errMs := batchHistoricError{
err: err,

View File

@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed)
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed)
log.Debug("Exiting known gaps processing service")
return errs
}

View File

@ -35,6 +35,39 @@ import (
"golang.org/x/sync/errgroup"
)
type SlotProcessingDetails struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
PerformBeaconStateProcessing bool // Should we process BeaconStates?
ProcessBeaconBlockProcessing bool // Should we process BeaconBlocks?
StartingSlot int // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
}
func (bc *BeaconClient) SlotProcessingDetails() SlotProcessingDetails {
return SlotProcessingDetails{
Context: bc.Context,
ServerEndpoint: bc.ServerEndpoint,
Db: bc.Db,
Metrics: bc.Metrics,
CheckDb: bc.CheckDb,
ProcessBeaconBlockProcessing: bc.ProcessBeaconBlockProcessing,
PerformBeaconStateProcessing: bc.PerformBeaconStateProcessing,
KnownGapTableIncrement: bc.KnownGapTableIncrement,
StartingSlot: bc.StartingSlot,
PreviousSlot: bc.PreviousSlot,
PreviousBlockRoot: bc.PreviousBlockRoot,
}
}
type ProcessSlot struct {
// Generic
@ -51,10 +84,10 @@ type ProcessSlot struct {
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
FullSignedBeaconBlock SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
FullSignedBeaconBlock *SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
// BeaconState
FullBeaconState BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
FullBeaconState *BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
SszBeaconState []byte // The entire SSZ encoded BeaconState
// DB Write objects
@ -79,7 +112,16 @@ type PerformanceMetrics struct {
// This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table.
func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
func processFullSlot(
ctx context.Context,
slot int,
blockRoot string,
stateRoot string,
previousSlot int,
previousBlockRoot string,
knownGapsTableIncrement int,
headOrHistoric string,
spd *SlotProcessingDetails) (error, string) {
select {
case <-ctx.Done():
return nil, ""
@ -90,8 +132,8 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
BlockRoot: blockRoot,
StateRoot: stateRoot,
HeadOrHistoric: headOrHistoric,
Db: db,
Metrics: metrics,
Db: spd.Db,
Metrics: spd.Metrics,
PerformanceMetrics: PerformanceMetrics{
BeaconNodeBlockRetrievalTime: 0,
BeaconNodeStateRetrievalTime: 0,
@ -108,6 +150,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
g, _ := errgroup.WithContext(context.Background())
if spd.PerformBeaconStateProcessing {
// Get the BeaconState.
g.Go(func() error {
select {
@ -115,7 +158,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return nil
default:
start := time.Now()
err := ps.getBeaconState(serverAddress)
err := ps.getBeaconState(spd.ServerEndpoint)
if err != nil {
return err
}
@ -123,7 +166,9 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return nil
}
})
}
if spd.ProcessBeaconBlockProcessing {
// Get the SignedBeaconBlock.
g.Go(func() error {
select {
@ -131,7 +176,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return nil
default:
start := time.Now()
err := ps.getSignedBeaconBlock(serverAddress)
err := ps.getSignedBeaconBlock(spd.ServerEndpoint)
if err != nil {
return err
}
@ -139,6 +184,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
return nil
}
})
}
if err := g.Wait(); err != nil {
return err, "processSlot"
@ -151,7 +197,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
}
ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime)
if checkDb {
if spd.CheckDb {
checkDbTime := time.Now()
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil {
@ -220,20 +266,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
func processHeadSlot(slot int, blockRoot string, stateRoot string, spd SlotProcessingDetails) {
// Get the knownGaps at startUp
if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" {
writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics)
}
err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
// TODO(telackey): Why context.Background()?
err, errReason := processFullSlot(context.Background(), slot, blockRoot, stateRoot,
spd.PreviousSlot, spd.PreviousBlockRoot, spd.KnownGapTableIncrement, "head", &spd)
if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
writeKnownGaps(spd.Db, spd.KnownGapTableIncrement, slot, slot, err, errReason, spd.Metrics)
}
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
return processFullSlot(ctx, db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
func handleHistoricSlot(ctx context.Context, slot int, spd SlotProcessingDetails) (error, string) {
return processFullSlot(ctx, slot, "", "", 0, "",
1, "historic", &spd)
}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
@ -250,7 +299,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
if err != nil || rc != 200 {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.")
ps.FullSignedBeaconBlock = SignedBeaconBlock{}
ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
ps.Status = "skipped"
@ -261,14 +310,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock)
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to unmarshal SignedBeaconBlock for slot.")
ps.FullSignedBeaconBlock = SignedBeaconBlock{}
ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = ""
ps.Status = "skipped"
return nil
}
ps.FullSignedBeaconBlock = signedBeaconBlock
ps.FullSignedBeaconBlock = &signedBeaconBlock
ps.SszSignedBeaconBlock = sszSignedBeaconBlock
ps.ParentBlockRoot = toHex(ps.FullSignedBeaconBlock.Block().ParentRoot())
@ -298,13 +347,17 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
return err
}
ps.FullBeaconState = beaconState
ps.FullBeaconState = &beaconState
ps.SszBeaconState = sszBeaconState
return nil
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
if nil == ps.FullSignedBeaconBlock {
log.Debug("Can't check previous slot, no current slot.")
return
}
parentRoot := toHex(ps.FullSignedBeaconBlock.Block().ParentRoot())
slot := int(ps.FullBeaconState.Slot())
if previousSlot == slot {
@ -368,19 +421,29 @@ func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) {
if ps.StateRoot != "" {
stateRoot = ps.StateRoot
} else {
if nil != ps.FullSignedBeaconBlock {
stateRoot = toHex(ps.FullSignedBeaconBlock.Block().StateRoot())
log.Debug("StateRoot: ", stateRoot)
log.Debug("BeaconBlock StateRoot: ", stateRoot)
} else {
log.Debug("BeaconBlock StateRoot: <nil beacon block>")
}
}
if ps.BlockRoot != "" {
blockRoot = ps.BlockRoot
} else {
if nil != ps.FullSignedBeaconBlock {
rawBlockRoot := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
blockRoot = toHex(rawBlockRoot)
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz")
} else {
log.Debug("BeaconBlock HashTreeRoot: <nil beacon block>")
}
}
if nil != ps.FullSignedBeaconBlock {
eth1BlockHash = toHex(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
}
}
return blockRoot, stateRoot, eth1BlockHash, nil
}