Add KnownGaps Errors (#33)

This commit is contained in:
Abdul Rabbani 2022-05-12 15:44:05 -04:00 committed by Abdul Rabbani
parent e9f86e82ec
commit 8cab81f12e
11 changed files with 282 additions and 107 deletions

View File

@ -9,7 +9,7 @@ COPY go.sum .
RUN go mod tidy; go mod download
COPY . .
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
RUN chmod +x ipld-ethcl-indexer
FROM frolvlad/alpine-bash:latest

View File

@ -22,6 +22,21 @@ The `database` package allows us to interact with a postgres DB. We utilize the
This package will contain code to interact with the beacon client.
### Known Gaps
Known Gaps tracking is handled within this package. The columns are as follows:
- StartSlot - The start slot for known_gaps, inclusive.
- EndSlot - The end slot for known_gaps, inclusive.
- CheckedOut - Indicates if any process is currently processing this entry.
- ErrorMessage - Captures any error message that might have occurred when previously processing this entry.
- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap.
- This can help us understand how a specific entry was added. It can be useful for debugging the application.
- StartUp - Gaps found when we started the application.
- Error - Indicates that the entry was added due to an error with processing.
- HeadGap - Indicates that gaps where found when keeping up with Head.
## `pkg/version`
A generic package which can be utilized to easily version our applications.

View File

@ -23,6 +23,7 @@ var (
bcPort int
bcConnectionProtocol string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
)
// captureCmd represents the capture command

View File

@ -6,15 +6,19 @@ package cmd
import (
"context"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
kgTableIncrement int
)
// headCmd represents the head command
var headCmd = &cobra.Command{
Use: "head",
@ -37,10 +41,9 @@ func startHeadTracking() {
}
// Capture head blocks
go BC.CaptureHead()
go BC.CaptureHead(kgTableIncrement)
// Shutdown when the time is right.
notifierCh := make(chan os.Signal, 1)
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
@ -53,13 +56,8 @@ func startHeadTracking() {
func init() {
captureCmd.AddCommand(headCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// headCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
// Known Gaps specific
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
exitErr(err)
}

View File

@ -34,6 +34,7 @@ type BeaconClient struct {
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?

View File

@ -10,7 +10,8 @@ import (
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() {
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
bc.KnownGapTableIncrement = knownGapsTableIncrement
log.Info("We are tracking the head of the chain.")
//bc.tempHelper()
go bc.handleHead()

View File

@ -56,6 +56,7 @@ var _ = Describe("Capturehead", func() {
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 10000
)
BeforeEach(func() {
@ -156,6 +157,7 @@ var _ = Describe("Capturehead", func() {
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement,
}
BeaconNodeTester = TestBeaconNode{
@ -244,6 +246,7 @@ type Config struct {
dbUser string
dbPassword string
dbDriver string
knownGapsTableIncrement int
}
//////////////////////////////////////////////////////
@ -307,7 +310,7 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"}
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred())
@ -436,7 +439,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
@ -488,7 +491,7 @@ func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head)
validateSlot(bc, &head, epoch, "proposed")
@ -501,7 +504,7 @@ func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHe
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead)

View File

@ -19,8 +19,8 @@ INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = `
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key)
VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING`
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = `
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
@ -40,6 +40,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
)
// Put all functionality to prepare the write object
@ -55,13 +60,13 @@ type DatabaseWriter struct {
rawSignedBeaconBlock []byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
dw := &DatabaseWriter{
Db: db,
Metrics: metrics,
}
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
dw.prepareBeaconStateModel(slot, stateRoot)
return dw
}
@ -82,11 +87,12 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo
}
// Create the model for the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) {
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) {
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: strconv.Itoa(slot),
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
Eth1BlockHash: eth1BlockHash,
MhKey: calculateMhKey(),
}
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
@ -104,79 +110,115 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
}
// Write all the data for a given slot.
func (dw *DatabaseWriter) writeFullSlot() {
func (dw *DatabaseWriter) writeFullSlot() error {
// Add errors for each function call
// If an error occurs, write to knownGaps table.
dw.writeSlots()
dw.writeSignedBeaconBlocks()
dw.writeBeaconState()
err := dw.writeSlots()
if err != nil {
return err
}
err = dw.writeSignedBeaconBlocks()
if err != nil {
return err
}
err = dw.writeBeaconState()
if err != nil {
return err
}
dw.Metrics.IncrementHeadTrackingInserts(1)
return nil
}
// Write the information for the generic slots table. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() {
dw.upsertSlots()
func (dw *DatabaseWriter) writeSlots() error {
return dw.upsertSlots()
}
// Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() {
func (dw *DatabaseWriter) upsertSlots() error {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
return err
}
return nil
}
// Write the information for the signed_beacon_block.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() {
dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
dw.upsertSignedBeaconBlock()
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
if err != nil {
return err
}
err = dw.upsertSignedBeaconBlock()
if err != nil {
return err
}
return nil
}
// Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err
}
return nil
}
// Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
return err
}
return nil
}
// Write the information for the beacon_state.
func (dw *DatabaseWriter) writeBeaconState() {
dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
dw.upsertBeaconState()
func (dw *DatabaseWriter) writeBeaconState() error {
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
if err != nil {
return err
}
err = dw.upsertBeaconState()
if err != nil {
return err
}
return nil
}
// Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() {
func (dw *DatabaseWriter) upsertBeaconState() error {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
return err
}
return nil
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot)
if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
}
forkCount, err := updateForked(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
// Add to knownGaps Table
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
// Add to knownGaps Table
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
if forkCount > 0 {
@ -187,7 +229,6 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
@ -198,16 +239,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else if proposedCount == 0 {
var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
}
if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
@ -246,6 +290,82 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
return count, err
}
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
if endSlot-startSlot <= tableIncrement {
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot),
EndSlot: strconv.Itoa(endSlot),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
totalSlots := endSlot - startSlot
var chunks int
chunks = totalSlots / tableIncrement
if totalSlots%tableIncrement != 0 {
chunks = chunks + 1
}
for i := 0; i < chunks; i++ {
var tempStart, tempEnd int
tempStart = startSlot + (i * tableIncrement)
if i+1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + ((i + 1) * tableIncrement)
}
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(tempStart),
EndSlot: strconv.Itoa(tempEnd),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
}
// A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
}
log.WithFields(log.Fields{
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
var maxSlot int
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
if err != nil {
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
}
if err != nil {
loghelper.LogError(err).WithFields(log.Fields{
"maxSlot": maxSlot,
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
}
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
}
// Dummy function for calculating the mhKey.
func calculateMhKey() string {
rand.Seed(time.Now().UnixNano())

View File

@ -50,6 +50,7 @@ type DbSignedBeaconBlock struct {
Slot string // The slot.
BlockRoot string // The block root
ParentBlock string // The parent block root.
Eth1BlockHash string // The eth1 block_hash
MhKey string // The ipld multihash key.
}
@ -60,3 +61,14 @@ type DbBeaconState struct {
StateRoot string // The state root
MhKey string // The ipld multihash key.
}
// A structure to capture whats being written to the ethcl.known_gaps table.
type DbKnownGaps struct {
StartSlot string // The start slot for known_gaps, inclusive.
EndSlot string // The end slot for known_gaps, inclusive.
CheckedOut bool // Indicates if any process is currently processing this entry.
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
EntryError string // The error that caused this entry to be added to the table. Could be null.
EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap.
}

View File

@ -24,6 +24,7 @@ func (bc *BeaconClient) handleReorg() {
// This function will handle the latest head event.
func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.")
errorSlots := 0
for {
head := <-bc.HeadTracking.ProcessCh
// Process all the work here.
@ -32,10 +33,20 @@ func (bc *BeaconClient) handleHead() {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
errorSlots = errorSlots + 1
continue
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics)
if errorSlots != 0 && bc.PreviousSlot != 0 {
log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorMessages": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
if err != nil {
loghelper.LogSlotError(head.Slot, err)
loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot")
}
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")

View File

@ -25,6 +25,7 @@ var (
}
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash"
)
type ProcessSlot struct {
@ -55,11 +56,7 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
}
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
@ -72,33 +69,43 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
// Get the SignedBeaconBlock.
err := ps.getSignedBeaconBlock(serverAddress)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err
}
// Get the BeaconState.
err = ps.getBeaconState(serverAddress)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
}
// Get this object ready to write
dw := ps.createWriteObjects()
// Write the object to the DB.
dw.writeFullSlot()
err = dw.writeFullSlot()
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
}
// Handle any reorgs or skipped slots.
if ps.HeadOrHistoric == "head" {
if previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!")
}
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
}
return nil
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics)
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
@ -141,7 +148,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
return fmt.Errorf(ParentRootUnmarshalError)
} else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data)
return fmt.Errorf(MissingEth1Data)
}
log.Warn("We received a processing error: ", err)
}
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
return nil
@ -174,7 +185,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) {
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
if previousSlot == int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
@ -182,21 +193,21 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
} else if previousSlot+1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
// Check to see if the slot was skipped.
// Call our batch processing function.
// Call our known_gaps function.
} else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
// Call our batch processing function.
// Continue with this slot.
writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
} else {
log.Debug("Previous Slot and Current Slot are one distance from each other.")
}
@ -210,7 +221,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
// Check if head or historic.
// 1. BeaconBlock is 404.
// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
// 3. I query BeaconState for slot X, and get a BeaconState.
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
func (ps *ProcessSlot) checkMissedSlot() {
@ -254,7 +265,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
status = "proposed"
}
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics)
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
dw.rawBeaconState = ps.SszBeaconState