From 12675f85c270df94bae8de9af9bba205b04c73b4 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani <58230246+abdulrabbani00@users.noreply.github.com> Date: Thu, 12 May 2022 15:44:05 -0400 Subject: [PATCH] Add KnownGaps Errors (#33) --- Dockerfile | 2 +- application_component.md | 15 +++ cmd/capture.go | 3 +- cmd/head.go | 22 ++-- pkg/beaconclient/beaconclient.go | 1 + pkg/beaconclient/capturehead.go | 3 +- pkg/beaconclient/capturehead_test.go | 77 ++++++------ pkg/beaconclient/databasewrite.go | 180 ++++++++++++++++++++++----- pkg/beaconclient/models.go | 20 ++- pkg/beaconclient/processevents.go | 15 ++- pkg/beaconclient/processslot.go | 51 +++++--- 11 files changed, 282 insertions(+), 107 deletions(-) diff --git a/Dockerfile b/Dockerfile index 46f2575..a9a2c58 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ COPY go.sum . RUN go mod tidy; go mod download COPY . . -RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer . +RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer . RUN chmod +x ipld-ethcl-indexer FROM frolvlad/alpine-bash:latest diff --git a/application_component.md b/application_component.md index d8bd3ec..d4cba2f 100644 --- a/application_component.md +++ b/application_component.md @@ -22,6 +22,21 @@ The `database` package allows us to interact with a postgres DB. We utilize the This package will contain code to interact with the beacon client. +### Known Gaps + +Known Gaps tracking is handled within this package. The columns are as follows: + +- StartSlot - The start slot for known_gaps, inclusive. +- EndSlot - The end slot for known_gaps, inclusive. +- CheckedOut - Indicates if any process is currently processing this entry. +- ErrorMessage - Captures any error message that might have occurred when previously processing this entry. +- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error. +- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap. + - This can help us understand how a specific entry was added. It can be useful for debugging the application. + - StartUp - Gaps found when we started the application. + - Error - Indicates that the entry was added due to an error with processing. + - HeadGap - Indicates that gaps where found when keeping up with Head. + ## `pkg/version` A generic package which can be utilized to easily version our applications. diff --git a/cmd/capture.go b/cmd/capture.go index 32113a8..63ddbdc 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -22,7 +22,8 @@ var ( bcAddress string bcPort int bcConnectionProtocol string - maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second + maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second + notifierCh chan os.Signal = make(chan os.Signal, 1) ) // captureCmd represents the capture command diff --git a/cmd/head.go b/cmd/head.go index fc84ed1..0058f8d 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -6,15 +6,19 @@ package cmd import ( "context" - "os" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) +var ( + kgTableIncrement int +) + // headCmd represents the head command var headCmd = &cobra.Command{ Use: "head", @@ -37,10 +41,9 @@ func startHeadTracking() { } // Capture head blocks - go BC.CaptureHead() + go BC.CaptureHead(kgTableIncrement) // Shutdown when the time is right. - notifierCh := make(chan os.Signal, 1) err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") @@ -53,13 +56,8 @@ func startHeadTracking() { func init() { captureCmd.AddCommand(headCmd) - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // headCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + // Known Gaps specific + captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") + err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment")) + exitErr(err) } diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index abd1953..a0d70c4 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -34,6 +34,7 @@ type BeaconClient struct { PerformHistoricalProcessing bool // Should we perform historical processing? Db sql.Database // Database object used for reads and writes. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. + KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. // Used for Head Tracking PerformHeadTracking bool // Should we track head? diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index f57031e..ca171fb 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -10,7 +10,8 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHead() { +func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) { + bc.KnownGapTableIncrement = knownGapsTableIncrement log.Info("We are tracking the head of the chain.") //bc.tempHelper() go bc.handleHead() diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 2896645..10e2df6 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -43,19 +43,20 @@ type MimicConfig struct { var _ = Describe("Capturehead", func() { var ( - TestConfig Config - BeaconNodeTester TestBeaconNode - address string = "localhost" - port int = 8080 - protocol string = "http" - TestEvents map[string]Message - dbHost string = "localhost" - dbPort int = 8077 - dbName string = "vulcanize_testing" - dbUser string = "vdbm" - dbPassword string = "password" - dbDriver string = "pgx" - dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" + TestConfig Config + BeaconNodeTester TestBeaconNode + address string = "localhost" + port int = 8080 + protocol string = "http" + TestEvents map[string]Message + dbHost string = "localhost" + dbPort int = 8077 + dbName string = "vulcanize_testing" + dbUser string = "vdbm" + dbPassword string = "password" + dbDriver string = "pgx" + dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" + knownGapsTableIncrement int = 10000 ) BeforeEach(func() { @@ -146,16 +147,17 @@ var _ = Describe("Capturehead", func() { }, } TestConfig = Config{ - protocol: protocol, - address: address, - port: port, - dummyParentRoot: dummyParentRoot, - dbHost: dbHost, - dbPort: dbPort, - dbName: dbName, - dbUser: dbUser, - dbPassword: dbPassword, - dbDriver: dbDriver, + protocol: protocol, + address: address, + port: port, + dummyParentRoot: dummyParentRoot, + dbHost: dbHost, + dbPort: dbPort, + dbName: dbName, + dbUser: dbUser, + dbPassword: dbPassword, + dbDriver: dbDriver, + knownGapsTableIncrement: knownGapsTableIncrement, } BeaconNodeTester = TestBeaconNode{ @@ -234,16 +236,17 @@ var _ = Describe("Capturehead", func() { }) type Config struct { - protocol string - address string - port int - dummyParentRoot string - dbHost string - dbPort int - dbName string - dbUser string - dbPassword string - dbDriver string + protocol string + address string + port int + dummyParentRoot string + dbHost string + dbPort int + dbName string + dbUser string + dbPassword string + dbDriver string + knownGapsTableIncrement int } ////////////////////////////////////////////////////// @@ -307,7 +310,7 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin // A function that will remove all entries from the ethcl tables for you. func clearEthclDbTables(db sql.Database) { - deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"} + deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"} for _, queries := range deleteQueries { _, err := db.Exec(context.Background(), queries) Expect(err).ToNot(HaveOccurred()) @@ -436,7 +439,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - go bc.CaptureHead() + go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) time.Sleep(1 * time.Second) log.Info("Sending Phase0 Messages to BeaconClient") @@ -488,7 +491,7 @@ func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) { tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - go bc.CaptureHead() + go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) time.Sleep(1 * time.Second) sendHeadMessage(bc, head) validateSlot(bc, &head, epoch, "proposed") @@ -501,7 +504,7 @@ func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHe tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - go bc.CaptureHead() + go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement) time.Sleep(1 * time.Second) sendHeadMessage(bc, firstHead) diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index c6ae8a0..7671a6b 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -19,8 +19,8 @@ INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING` // Statement to upsert to the ethcl.signed_beacon_blocks table. UpsertSignedBeaconBlockStmt string = ` -INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key) -VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING` +INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key) +VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING` // Statement to upsert to the ethcl.beacon_state table. UpsertBeaconState string = ` INSERT INTO ethcl.beacon_state (slot, state_root, mh_key) @@ -40,6 +40,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` CheckProposedStmt string = `SELECT slot, block_root FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` + // Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one. + UpsertKnownGapsStmt string = ` +INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process) +VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` + QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots" ) // Put all functionality to prepare the write object @@ -55,13 +60,13 @@ type DatabaseWriter struct { rawSignedBeaconBlock []byte } -func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter { +func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, metrics *BeaconClientMetrics) *DatabaseWriter { dw := &DatabaseWriter{ Db: db, Metrics: metrics, } dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) - dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot) + dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) dw.prepareBeaconStateModel(slot, stateRoot) return dw } @@ -82,12 +87,13 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo } // Create the model for the ethcl.signed_beacon_block table. -func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) { +func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) { dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{ - Slot: strconv.Itoa(slot), - BlockRoot: blockRoot, - ParentBlock: parentBlockRoot, - MhKey: calculateMhKey(), + Slot: strconv.Itoa(slot), + BlockRoot: blockRoot, + ParentBlock: parentBlockRoot, + Eth1BlockHash: eth1BlockHash, + MhKey: calculateMhKey(), } log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock) } @@ -104,79 +110,115 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) { } // Write all the data for a given slot. -func (dw *DatabaseWriter) writeFullSlot() { +func (dw *DatabaseWriter) writeFullSlot() error { // Add errors for each function call // If an error occurs, write to knownGaps table. - dw.writeSlots() - dw.writeSignedBeaconBlocks() - dw.writeBeaconState() + err := dw.writeSlots() + if err != nil { + return err + } + err = dw.writeSignedBeaconBlocks() + if err != nil { + return err + } + err = dw.writeBeaconState() + if err != nil { + return err + } dw.Metrics.IncrementHeadTrackingInserts(1) + return nil } // Write the information for the generic slots table. For now this is only one function. // But in the future if we need to incorporate any FK's or perform any actions to write to the // slots table we can do it all here. -func (dw *DatabaseWriter) writeSlots() { - dw.upsertSlots() - +func (dw *DatabaseWriter) writeSlots() error { + return dw.upsertSlots() } // Upsert to the ethcl.slots table. -func (dw *DatabaseWriter) upsertSlots() { +func (dw *DatabaseWriter) upsertSlots() error { _, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") + return err } + return nil } // Write the information for the signed_beacon_block. -func (dw *DatabaseWriter) writeSignedBeaconBlocks() { - dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) - dw.upsertSignedBeaconBlock() +func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { + err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) + if err != nil { + return err + } + err = dw.upsertSignedBeaconBlock() + if err != nil { + return err + } + return nil } // Upsert to public.blocks. -func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) { +func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { _, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") + return err } + return nil } // Upsert to the ethcl.signed_beacon_block table. -func (dw *DatabaseWriter) upsertSignedBeaconBlock() { - _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey) +func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { + _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") + return err } + return nil } // Write the information for the beacon_state. -func (dw *DatabaseWriter) writeBeaconState() { - dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) - dw.upsertBeaconState() +func (dw *DatabaseWriter) writeBeaconState() error { + err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) + if err != nil { + return err + } + err = dw.upsertBeaconState() + if err != nil { + return err + } + return nil } // Upsert to the ethcl.beacon_state table. -func (dw *DatabaseWriter) upsertBeaconState() { +func (dw *DatabaseWriter) upsertBeaconState() error { _, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") + return err } + return nil } // Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { + slotNum, strErr := strconv.Atoi(slot) + if strErr != nil { + loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") + } + forkCount, err := updateForked(db, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") - // Add to knownGaps Table + writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg") } proposedCount, err := updateProposed(db, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") - // Add to knownGaps Table + writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg") } if forkCount > 0 { @@ -187,7 +229,6 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "forkCount": forkCount, }).Warn("There were no forked rows to update.") - } if proposedCount == 1 { @@ -198,16 +239,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "proposedCount": proposedCount, }).Error("Too many rows were marked as proposed!") + writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg") } else if proposedCount == 0 { var count int err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") + writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg") } if count != 1 { loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "proposedCount": count, }).Warn("The proposed block was not marked as proposed...") + writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg") } else { loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") } @@ -246,6 +290,82 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64 return count, err } +// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into +// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would +// have 10 entries as follows: 1-10, 11-20, etc... +func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) { + if endSlot-startSlot <= tableIncrement { + kgModel := DbKnownGaps{ + StartSlot: strconv.Itoa(startSlot), + EndSlot: strconv.Itoa(endSlot), + CheckedOut: false, + ReprocessingError: "", + EntryError: entryError.Error(), + EntryProcess: entryProcess, + } + upsertKnownGaps(db, kgModel) + } + totalSlots := endSlot - startSlot + var chunks int + chunks = totalSlots / tableIncrement + if totalSlots%tableIncrement != 0 { + chunks = chunks + 1 + } + + for i := 0; i < chunks; i++ { + var tempStart, tempEnd int + tempStart = startSlot + (i * tableIncrement) + if i+1 == chunks { + tempEnd = endSlot + } else { + tempEnd = startSlot + ((i + 1) * tableIncrement) + } + kgModel := DbKnownGaps{ + StartSlot: strconv.Itoa(tempStart), + EndSlot: strconv.Itoa(tempEnd), + CheckedOut: false, + ReprocessingError: "", + EntryError: entryError.Error(), + EntryProcess: entryProcess, + } + upsertKnownGaps(db, kgModel) + } + +} + +// A function to upsert a single entry to the ethcl.known_gaps table. +func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) { + _, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, + knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) + if err != nil { + log.WithFields(log.Fields{ + "err": err, + "startSlot": knModel.StartSlot, + "endSlot": knModel.EndSlot, + }).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.") + } + log.WithFields(log.Fields{ + "startSlot": knModel.StartSlot, + "endSlot": knModel.EndSlot, + }).Warn("A new gap has been added to the ethcl.known_gaps table.") +} + +// A function to write the gap between the highest slot in the DB and the first processed slot. +func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) { + var maxSlot int + err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot) + if err != nil { + loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.") + } + + if err != nil { + loghelper.LogError(err).WithFields(log.Fields{ + "maxSlot": maxSlot, + }).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.") + } + writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup") +} + // Dummy function for calculating the mhKey. func calculateMhKey() string { rand.Seed(time.Now().UnixNano()) diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index 1cdb443..9c6cc68 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -47,10 +47,11 @@ type DbSlots struct { // A struct to capture whats being written to ethcl.signed_beacon_block table. type DbSignedBeaconBlock struct { - Slot string // The slot. - BlockRoot string // The block root - ParentBlock string // The parent block root. - MhKey string // The ipld multihash key. + Slot string // The slot. + BlockRoot string // The block root + ParentBlock string // The parent block root. + Eth1BlockHash string // The eth1 block_hash + MhKey string // The ipld multihash key. } @@ -60,3 +61,14 @@ type DbBeaconState struct { StateRoot string // The state root MhKey string // The ipld multihash key. } + +// A structure to capture whats being written to the ethcl.known_gaps table. +type DbKnownGaps struct { + StartSlot string // The start slot for known_gaps, inclusive. + EndSlot string // The end slot for known_gaps, inclusive. + CheckedOut bool // Indicates if any process is currently processing this entry. + ReprocessingError string // The error that occurred when attempting to reprocess these entries. + EntryError string // The error that caused this entry to be added to the table. Could be null. + EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error. + EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap. +} diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 14713e6..1361ac5 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -24,6 +24,7 @@ func (bc *BeaconClient) handleReorg() { // This function will handle the latest head event. func (bc *BeaconClient) handleHead() { log.Info("Starting to process head.") + errorSlots := 0 for { head := <-bc.HeadTracking.ProcessCh // Process all the work here. @@ -32,10 +33,20 @@ func (bc *BeaconClient) handleHead() { bc.HeadTracking.ErrorCh <- &SseError{ err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), } + errorSlots = errorSlots + 1 + continue } - err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics) + if errorSlots != 0 && bc.PreviousSlot != 0 { + log.WithFields(log.Fields{ + "lastProcessedSlot": bc.PreviousSlot, + "errorMessages": errorSlots, + }).Warn("We added slots to the knownGaps table because we got bad head messages.") + writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing") + } + + err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) if err != nil { - loghelper.LogSlotError(head.Slot, err) + loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot") } log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 61dfd85..ead46d1 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -25,6 +25,7 @@ var ( } ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." MissingIdentifiedError = "Can't query state without a set slot or block_root" + MissingEth1Data = "Can't get the Eth1 block_hash" ) type ProcessSlot struct { @@ -55,11 +56,7 @@ type ProcessSlot struct { } // This function will do all the work to process the slot and write it to the DB. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error { - headOrHistoric = strings.ToLower(headOrHistoric) - if headOrHistoric != "head" && headOrHistoric != "historic" { - return fmt.Errorf("headOrBatch must be either historic or head!") - } +func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { ps := &ProcessSlot{ Slot: slot, BlockRoot: blockRoot, @@ -72,33 +69,43 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot // Get the SignedBeaconBlock. err := ps.getSignedBeaconBlock(serverAddress) if err != nil { + writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } // Get the BeaconState. err = ps.getBeaconState(serverAddress) if err != nil { + writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } + if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { + writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot) + } // Get this object ready to write dw := ps.createWriteObjects() // Write the object to the DB. - dw.writeFullSlot() + err = dw.writeFullSlot() + if err != nil { + writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") + } // Handle any reorgs or skipped slots. - if ps.HeadOrHistoric == "head" { - if previousSlot != 0 && previousBlockRoot != "" { - ps.checkPreviousSlot(previousSlot, previousBlockRoot) - } + headOrHistoric = strings.ToLower(headOrHistoric) + if headOrHistoric != "head" && headOrHistoric != "historic" { + return fmt.Errorf("headOrHistoric must be either historic or head!") + } + if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" { + ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) } return nil } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error { - return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics) +func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { + return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. @@ -141,7 +148,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { } else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError) return fmt.Errorf(ParentRootUnmarshalError) + } else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" { + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data) + return fmt.Errorf(MissingEth1Data) } + log.Warn("We received a processing error: ", err) } ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) return nil @@ -174,7 +185,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error { } // Check to make sure that the previous block we processed is the parent of the current block. -func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) { +func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) if previousSlot == int(ps.FullBeaconState.Slot) { log.WithFields(log.Fields{ @@ -182,21 +193,21 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) - } else if previousSlot-1 != int(ps.FullBeaconState.Slot) { + } else if previousSlot+1 != int(ps.FullBeaconState.Slot) { log.WithFields(log.Fields{ "previousSlot": previousSlot, "currentSlot": ps.FullBeaconState.Slot, }).Error("We skipped a few slots.") + writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps") // Check to see if the slot was skipped. - // Call our batch processing function. + // Call our known_gaps function. } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) - // Call our batch processing function. - // Continue with this slot. + writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot") } else { log.Debug("Previous Slot and Current Slot are one distance from each other.") } @@ -210,7 +221,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str // Check if head or historic. // 1. BeaconBlock is 404. -// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. +// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. // 3. I query BeaconState for slot X, and get a BeaconState. // 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head func (ps *ProcessSlot) checkMissedSlot() { @@ -254,7 +265,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { status = "proposed" } - dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics) + eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash) + + dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics) dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawBeaconState = ps.SszBeaconState