de38c531d5
* Allow the application to process events in parallel there is a main thread that tracks incoming messages, but then it spawns goroutines to actually process each slot so that they can happen concurrently. * Control knownGaps in existing test * Use Interfaces for different fork version Use interfaces for `SignedBeaconBlock` and `BeaconState`, this allows the application to determine the correct forked struct. In the test we also use a switch condition to properly serve the correct mimics. * Utilize new ipld-ethcl-db repository * Add final tests * Update timeout and secret * Update token * Update docker compose * Update expected inserts
73 lines
2.5 KiB
Go
73 lines
2.5 KiB
Go
// This file contains all the functions to handle SSE events after they have been turned
|
|
// to the structs.
|
|
|
|
package beaconclient
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// This function will perform the necessary steps to handle a reorg.
|
|
func (bc *BeaconClient) handleReorg() {
|
|
log.Info("Starting to process reorgs.")
|
|
for {
|
|
reorg := <-bc.ReOrgTracking.ProcessCh
|
|
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
|
|
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
|
|
}
|
|
}
|
|
|
|
// This function will handle the latest head event.
|
|
func (bc *BeaconClient) handleHead() {
|
|
log.Info("Starting to process head.")
|
|
errorSlots := 0
|
|
for {
|
|
head := <-bc.HeadTracking.ProcessCh
|
|
// Process all the work here.
|
|
slot, err := strconv.Atoi(head.Slot)
|
|
if err != nil {
|
|
bc.HeadTracking.ErrorCh <- &SseError{
|
|
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
|
}
|
|
errorSlots = errorSlots + 1
|
|
continue
|
|
}
|
|
if errorSlots != 0 && bc.PreviousSlot != 0 {
|
|
log.WithFields(log.Fields{
|
|
"lastProcessedSlot": bc.PreviousSlot,
|
|
"errorMessages": errorSlots,
|
|
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
|
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
|
|
}
|
|
|
|
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
|
|
|
go func(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
|
|
errG := new(errgroup.Group)
|
|
errG.Go(func() error {
|
|
err = processHeadSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, metrics, knownGapsTableIncrement)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err := errG.Wait(); err != nil {
|
|
loghelper.LogSlotError(strconv.Itoa(slot), err).Error("Unable to process a slot")
|
|
}
|
|
}(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
|
|
|
|
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
|
|
|
|
// Update the previous block
|
|
bc.PreviousSlot = slot
|
|
bc.PreviousBlockRoot = head.Block
|
|
}
|
|
|
|
}
|