Add KnownGaps table and corresponding entries #33

Merged
abdulrabbani00 merged 1 commits from feature/29-batch-processing into develop 2022-05-12 19:44:05 +00:00
11 changed files with 282 additions and 107 deletions

View File

@ -9,7 +9,7 @@ COPY go.sum .
RUN go mod tidy; go mod download RUN go mod tidy; go mod download
COPY . . COPY . .
RUN GCO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer . RUN GCO_ENABLED=0 GOOS=linux go build -race -a -installsuffix cgo -ldflags '-extldflags "-static"' -o ipld-ethcl-indexer .
RUN chmod +x ipld-ethcl-indexer RUN chmod +x ipld-ethcl-indexer
FROM frolvlad/alpine-bash:latest FROM frolvlad/alpine-bash:latest

View File

@ -22,6 +22,21 @@ The `database` package allows us to interact with a postgres DB. We utilize the
This package will contain code to interact with the beacon client. This package will contain code to interact with the beacon client.
### Known Gaps
Known Gaps tracking is handled within this package. The columns are as follows:
- StartSlot - The start slot for known_gaps, inclusive.
- EndSlot - The end slot for known_gaps, inclusive.
- CheckedOut - Indicates if any process is currently processing this entry.
- ErrorMessage - Captures any error message that might have occurred when previously processing this entry.
- EntryTime - The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
- EntryProcess - The entry process that added this process. Potential options are StartUp, Error, HeadGap.
- This can help us understand how a specific entry was added. It can be useful for debugging the application.
- StartUp - Gaps found when we started the application.
- Error - Indicates that the entry was added due to an error with processing.
- HeadGap - Indicates that gaps where found when keeping up with Head.
## `pkg/version` ## `pkg/version`
A generic package which can be utilized to easily version our applications. A generic package which can be utilized to easily version our applications.

View File

@ -23,6 +23,7 @@ var (
bcPort int bcPort int
bcConnectionProtocol string bcConnectionProtocol string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
) )
// captureCmd represents the capture command // captureCmd represents the capture command

View File

@ -6,15 +6,19 @@ package cmd
import ( import (
"context" "context"
"os"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
var (
kgTableIncrement int
)
// headCmd represents the head command // headCmd represents the head command
var headCmd = &cobra.Command{ var headCmd = &cobra.Command{
Use: "head", Use: "head",
@ -37,10 +41,9 @@ func startHeadTracking() {
} }
// Capture head blocks // Capture head blocks
go BC.CaptureHead() go BC.CaptureHead(kgTableIncrement)
// Shutdown when the time is right. // Shutdown when the time is right.
notifierCh := make(chan os.Signal, 1)
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
@ -53,13 +56,8 @@ func startHeadTracking() {
func init() { func init() {
captureCmd.AddCommand(headCmd) captureCmd.AddCommand(headCmd)
// Here you will define your flags and configuration settings. // Known Gaps specific
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
// Cobra supports Persistent Flags which will work for this command err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment"))
// and all subcommands, e.g.: exitErr(err)
// headCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// headCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
} }

View File

@ -34,6 +34,7 @@ type BeaconClient struct {
PerformHistoricalProcessing bool // Should we perform historical processing? PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes. Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking // Used for Head Tracking
PerformHeadTracking bool // Should we track head? PerformHeadTracking bool // Should we track head?

View File

@ -10,7 +10,8 @@ import (
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() { func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
bc.KnownGapTableIncrement = knownGapsTableIncrement
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
//bc.tempHelper() //bc.tempHelper()
go bc.handleHead() go bc.handleHead()

View File

@ -56,6 +56,7 @@ var _ = Describe("Capturehead", func() {
dbPassword string = "password" dbPassword string = "password"
dbDriver string = "pgx" dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 10000
) )
BeforeEach(func() { BeforeEach(func() {
@ -156,6 +157,7 @@ var _ = Describe("Capturehead", func() {
dbUser: dbUser, dbUser: dbUser,
dbPassword: dbPassword, dbPassword: dbPassword,
dbDriver: dbDriver, dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement,
} }
BeaconNodeTester = TestBeaconNode{ BeaconNodeTester = TestBeaconNode{
@ -244,6 +246,7 @@ type Config struct {
dbUser string dbUser string
dbPassword string dbPassword string
dbDriver string dbDriver string
knownGapsTableIncrement int
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
@ -307,7 +310,7 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
// A function that will remove all entries from the ethcl tables for you. // A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) { func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"} deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
for _, queries := range deleteQueries { for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries) _, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -436,7 +439,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, second
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
go bc.CaptureHead() go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient") log.Info("Sending Phase0 Messages to BeaconClient")
@ -488,7 +491,7 @@ func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
go bc.CaptureHead() go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head) sendHeadMessage(bc, head)
validateSlot(bc, &head, epoch, "proposed") validateSlot(bc, &head, epoch, "proposed")
@ -501,7 +504,7 @@ func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHe
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
go bc.CaptureHead() go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead) sendHeadMessage(bc, firstHead)

View File

@ -19,8 +19,8 @@ INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING` VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.signed_beacon_blocks table. // Statement to upsert to the ethcl.signed_beacon_blocks table.
UpsertSignedBeaconBlockStmt string = ` UpsertSignedBeaconBlockStmt string = `
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, mh_key) INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
VALUES ($1, $2, $3, $4) ON CONFLICT (slot, block_root) DO NOTHING` VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
// Statement to upsert to the ethcl.beacon_state table. // Statement to upsert to the ethcl.beacon_state table.
UpsertBeaconState string = ` UpsertBeaconState string = `
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key) INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
@ -40,6 +40,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;` WHERE slot=$1 AND block_root=$2;`
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
UpsertKnownGapsStmt string = `
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
) )
// Put all functionality to prepare the write object // Put all functionality to prepare the write object
@ -55,13 +60,13 @@ type DatabaseWriter struct {
rawSignedBeaconBlock []byte rawSignedBeaconBlock []byte
} }
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter { func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
dw := &DatabaseWriter{ dw := &DatabaseWriter{
Db: db, Db: db,
Metrics: metrics, Metrics: metrics,
} }
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot) dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
dw.prepareBeaconStateModel(slot, stateRoot) dw.prepareBeaconStateModel(slot, stateRoot)
return dw return dw
} }
@ -82,11 +87,12 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo
} }
// Create the model for the ethcl.signed_beacon_block table. // Create the model for the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string) { func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) {
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{ dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: strconv.Itoa(slot), Slot: strconv.Itoa(slot),
BlockRoot: blockRoot, BlockRoot: blockRoot,
ParentBlock: parentBlockRoot, ParentBlock: parentBlockRoot,
Eth1BlockHash: eth1BlockHash,
MhKey: calculateMhKey(), MhKey: calculateMhKey(),
} }
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock) log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
@ -104,79 +110,115 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
} }
// Write all the data for a given slot. // Write all the data for a given slot.
func (dw *DatabaseWriter) writeFullSlot() { func (dw *DatabaseWriter) writeFullSlot() error {
// Add errors for each function call // Add errors for each function call
// If an error occurs, write to knownGaps table. // If an error occurs, write to knownGaps table.
dw.writeSlots() err := dw.writeSlots()
dw.writeSignedBeaconBlocks() if err != nil {
dw.writeBeaconState() return err
}
err = dw.writeSignedBeaconBlocks()
if err != nil {
return err
}
err = dw.writeBeaconState()
if err != nil {
return err
}
dw.Metrics.IncrementHeadTrackingInserts(1) dw.Metrics.IncrementHeadTrackingInserts(1)
return nil
} }
// Write the information for the generic slots table. For now this is only one function. // Write the information for the generic slots table. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the // But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here. // slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() { func (dw *DatabaseWriter) writeSlots() error {
dw.upsertSlots() return dw.upsertSlots()
} }
// Upsert to the ethcl.slots table. // Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() { func (dw *DatabaseWriter) upsertSlots() error {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) _, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
return err
} }
return nil
} }
// Write the information for the signed_beacon_block. // Write the information for the signed_beacon_block.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() { func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
dw.upsertSignedBeaconBlock() if err != nil {
return err
}
err = dw.upsertSignedBeaconBlock()
if err != nil {
return err
}
return nil
} }
// Upsert to public.blocks. // Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) { func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) _, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err
} }
return nil
} }
// Upsert to the ethcl.signed_beacon_block table. // Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() { func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey) _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
return err
} }
return nil
} }
// Write the information for the beacon_state. // Write the information for the beacon_state.
func (dw *DatabaseWriter) writeBeaconState() { func (dw *DatabaseWriter) writeBeaconState() error {
dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
dw.upsertBeaconState() if err != nil {
return err
}
err = dw.upsertBeaconState()
if err != nil {
return err
}
return nil
} }
// Upsert to the ethcl.beacon_state table. // Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() { func (dw *DatabaseWriter) upsertBeaconState() error {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) _, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
return err
} }
return nil
} }
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot)
if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
}
forkCount, err := updateForked(db, slot, latestBlockRoot) forkCount, err := updateForked(db, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
// Add to knownGaps Table writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} }
proposedCount, err := updateProposed(db, slot, latestBlockRoot) proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
// Add to knownGaps Table writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} }
if forkCount > 0 { if forkCount > 0 {
@ -187,7 +229,6 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount, "forkCount": forkCount,
}).Warn("There were no forked rows to update.") }).Warn("There were no forked rows to update.")
} }
if proposedCount == 1 { if proposedCount == 1 {
@ -198,16 +239,19 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount, "proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!") }).Error("Too many rows were marked as proposed!")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else if proposedCount == 0 { } else if proposedCount == 0 {
var count int var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} }
if count != 1 { if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count, "proposedCount": count,
}).Warn("The proposed block was not marked as proposed...") }).Warn("The proposed block was not marked as proposed...")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
} else { } else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
} }
@ -246,6 +290,82 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
return count, err return count, err
} }
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
if endSlot-startSlot <= tableIncrement {
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot),
EndSlot: strconv.Itoa(endSlot),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
totalSlots := endSlot - startSlot
var chunks int
chunks = totalSlots / tableIncrement
if totalSlots%tableIncrement != 0 {
chunks = chunks + 1
}
for i := 0; i < chunks; i++ {
var tempStart, tempEnd int
tempStart = startSlot + (i * tableIncrement)
if i+1 == chunks {
tempEnd = endSlot
} else {
tempEnd = startSlot + ((i + 1) * tableIncrement)
}
kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(tempStart),
EndSlot: strconv.Itoa(tempEnd),
CheckedOut: false,
ReprocessingError: "",
EntryError: entryError.Error(),
EntryProcess: entryProcess,
}
upsertKnownGaps(db, kgModel)
}
}
// A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil {
log.WithFields(log.Fields{
"err": err,
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
}
log.WithFields(log.Fields{
"startSlot": knModel.StartSlot,
"endSlot": knModel.EndSlot,
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
}
// A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
var maxSlot int
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
if err != nil {
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
}
if err != nil {
loghelper.LogError(err).WithFields(log.Fields{
"maxSlot": maxSlot,
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
}
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
}
// Dummy function for calculating the mhKey. // Dummy function for calculating the mhKey.
func calculateMhKey() string { func calculateMhKey() string {
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())

View File

@ -50,6 +50,7 @@ type DbSignedBeaconBlock struct {
Slot string // The slot. Slot string // The slot.
BlockRoot string // The block root BlockRoot string // The block root
ParentBlock string // The parent block root. ParentBlock string // The parent block root.
Eth1BlockHash string // The eth1 block_hash
MhKey string // The ipld multihash key. MhKey string // The ipld multihash key.
} }
@ -60,3 +61,14 @@ type DbBeaconState struct {
StateRoot string // The state root StateRoot string // The state root
MhKey string // The ipld multihash key. MhKey string // The ipld multihash key.
} }
// A structure to capture whats being written to the ethcl.known_gaps table.
type DbKnownGaps struct {
StartSlot string // The start slot for known_gaps, inclusive.
EndSlot string // The end slot for known_gaps, inclusive.
CheckedOut bool // Indicates if any process is currently processing this entry.
ReprocessingError string // The error that occurred when attempting to reprocess these entries.
EntryError string // The error that caused this entry to be added to the table. Could be null.
EntryTime string // The time this range was added to the DB. This can help us catch ranges that have not been processed for a long time due to some error.
EntryProcess string // The entry process that added this process. Potential options are StartUp, Error, ManualEntry, HeadGap.
}

View File

@ -24,6 +24,7 @@ func (bc *BeaconClient) handleReorg() {
// This function will handle the latest head event. // This function will handle the latest head event.
func (bc *BeaconClient) handleHead() { func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.") log.Info("Starting to process head.")
errorSlots := 0
for { for {
head := <-bc.HeadTracking.ProcessCh head := <-bc.HeadTracking.ProcessCh
// Process all the work here. // Process all the work here.
@ -32,10 +33,20 @@ func (bc *BeaconClient) handleHead() {
bc.HeadTracking.ErrorCh <- &SseError{ bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
} }
errorSlots = errorSlots + 1
continue
} }
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics) if errorSlots != 0 && bc.PreviousSlot != 0 {
log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorMessages": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot, bcSlotsPerEpoch+errorSlots, fmt.Errorf("Bad Head Messages"), "headProcessing")
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
if err != nil { if err != nil {
loghelper.LogSlotError(head.Slot, err) loghelper.LogSlotError(head.Slot, err).Error("Unable to process a slot")
} }
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")

View File

@ -25,6 +25,7 @@ var (
} }
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't query state without a set slot or block_root" MissingIdentifiedError = "Can't query state without a set slot or block_root"
MissingEth1Data = "Can't get the Eth1 block_hash"
) )
type ProcessSlot struct { type ProcessSlot struct {
@ -55,11 +56,7 @@ type ProcessSlot struct {
} }
// This function will do all the work to process the slot and write it to the DB. // This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error { func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
}
ps := &ProcessSlot{ ps := &ProcessSlot{
Slot: slot, Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
@ -72,33 +69,43 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
// Get the SignedBeaconBlock. // Get the SignedBeaconBlock.
err := ps.getSignedBeaconBlock(serverAddress) err := ps.getSignedBeaconBlock(serverAddress)
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err return err
} }
// Get the BeaconState. // Get the BeaconState.
err = ps.getBeaconState(serverAddress) err = ps.getBeaconState(serverAddress)
if err != nil { if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
return err return err
} }
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
}
// Get this object ready to write // Get this object ready to write
dw := ps.createWriteObjects() dw := ps.createWriteObjects()
// Write the object to the DB. // Write the object to the DB.
dw.writeFullSlot() err = dw.writeFullSlot()
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
}
// Handle any reorgs or skipped slots. // Handle any reorgs or skipped slots.
if ps.HeadOrHistoric == "head" { headOrHistoric = strings.ToLower(headOrHistoric)
if previousSlot != 0 && previousBlockRoot != "" { if headOrHistoric != "head" && headOrHistoric != "historic" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot) return fmt.Errorf("headOrHistoric must be either historic or head!")
} }
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
return nil return nil
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error { func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics) return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
@ -141,7 +148,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil { } else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError) loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
return fmt.Errorf(ParentRootUnmarshalError) return fmt.Errorf(ParentRootUnmarshalError)
} else if hex.EncodeToString(ps.FullBeaconState.Eth1Data.BlockHash) == "" {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(MissingEth1Data)
return fmt.Errorf(MissingEth1Data)
} }
log.Warn("We received a processing error: ", err)
} }
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
return nil return nil
@ -174,7 +185,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
} }
// Check to make sure that the previous block we processed is the parent of the current block. // Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) { func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
if previousSlot == int(ps.FullBeaconState.Slot) { if previousSlot == int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -182,21 +193,21 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) { } else if previousSlot+1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot, "currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.") }).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
// Check to see if the slot was skipped. // Check to see if the slot was skipped.
// Call our batch processing function. // Call our known_gaps function.
} else if previousBlockRoot != parentRoot { } else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
// Call our batch processing function. writeKnownGaps(ps.Db, 1, ps.Slot-1, ps.Slot-1, fmt.Errorf("Incorrect Parent"), "processSlot")
// Continue with this slot.
} else { } else {
log.Debug("Previous Slot and Current Slot are one distance from each other.") log.Debug("Previous Slot and Current Slot are one distance from each other.")
} }
@ -210,7 +221,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
// Check if head or historic. // Check if head or historic.
// 1. BeaconBlock is 404. // 1. BeaconBlock is 404.
// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. // 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
// 3. I query BeaconState for slot X, and get a BeaconState. // 3. I query BeaconState for slot X, and get a BeaconState.
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head // 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
func (ps *ProcessSlot) checkMissedSlot() { func (ps *ProcessSlot) checkMissedSlot() {
@ -254,7 +265,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
status = "proposed" status = "proposed"
} }
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics) eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
dw.rawBeaconState = ps.SszBeaconState dw.rawBeaconState = ps.SszBeaconState