From fd053a240f4068cc16f11300d4d75b8875e2b548 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 19:32:08 -0400 Subject: [PATCH] Checkpoint - Check the DB before writing --- cmd/boot.go | 2 +- cmd/capture.go | 4 ++ cmd/head.go | 2 +- cmd/historic.go | 2 +- example.ipld-ethcl-indexer-config.json | 3 +- internal/boot/boot.go | 10 ++-- internal/boot/boot_test.go | 15 ++--- internal/shutdown/shutdown_test.go | 3 +- pkg/beaconclient/beaconclient.go | 4 +- pkg/beaconclient/capturehead_test.go | 4 +- pkg/beaconclient/capturehistoric.go | 6 +- pkg/beaconclient/databasewrite.go | 78 ++++++++++++++++++++------ pkg/beaconclient/healthcheck_test.go | 4 +- pkg/beaconclient/processevents.go | 2 +- pkg/beaconclient/processhistoric.go | 4 +- pkg/beaconclient/processknowngaps.go | 2 +- pkg/beaconclient/processslot.go | 73 ++++++++++++++---------- 17 files changed, 145 insertions(+), 73 deletions(-) diff --git a/cmd/boot.go b/cmd/boot.go index 6f98729..18dc6dd 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -47,7 +47,7 @@ func bootApp() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/capture.go b/cmd/capture.go index d4ad6fc..5735b11 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -40,6 +40,7 @@ var ( bcType string bcMaxHistoricProcessWorker int bcUniqueNodeIdentifier int + bcCheckDb bool kgMaxWorker int kgTableIncrement int kgProcessGaps bool @@ -95,6 +96,7 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") + captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") // err = captureCmd.MarkPersistentFlagRequired("bc.address") // exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -149,6 +151,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) exitErr(err) + err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) + exitErr(err) // Here you will define your flags and configuration settings. //// Known Gap Specific diff --git a/cmd/head.go b/cmd/head.go index f2dd0e5..fa7aa6a 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -50,7 +50,7 @@ func startHeadTracking() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/historic.go b/cmd/historic.go index e9d8d95..34ad1b5 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -50,7 +50,7 @@ func startHistoricProcessing() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 69746ca..96c2c6f 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -15,7 +15,8 @@ "bootMaxRetry": 5, "maxHistoricProcessWorker": 2, "connectionProtocol": "http", - "uniqueNodeIdentifier": 100 + "uniqueNodeIdentifier": 100, + "checkDb": true }, "t": { "skipSync": true diff --git a/internal/boot/boot.go b/internal/boot/boot.go index b1e476b..1e42cce 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -42,11 +42,11 @@ var ( // // 3. Make sure the node is synced, unless disregardSync is true. func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) + Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier, checkDb) if err != nil { return Bc, nil, err } @@ -86,14 +86,14 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, - startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { + startUpMode string, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) { var err error if bcMaxRetry < 0 { i := 0 for { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -108,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } else { for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index ef621ae..fe8933f 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -39,51 +39,52 @@ var _ = Describe("Boot", func() { bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 bcUniqueIdentifier int = 100 + bcCheckDb bool = false ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0, bcCheckDb) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index c6fa00e..9dbb94d 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -50,6 +50,7 @@ var ( bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 bcUniqueIdentifier int = 100 + bcCheckDb bool = false maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second DB sql.Database BC *beaconclient.BeaconClient @@ -62,7 +63,7 @@ var _ = Describe("Shutdown", func() { BeforeEach(func() { ctx = context.Background() BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, - bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) + bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index e9fd5c0..8f24207 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -51,6 +51,7 @@ type BeaconClient struct { KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing + CheckDb bool // Should we check the DB to see if the slot exists before processing it? // Used for Head Tracking @@ -88,7 +89,7 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int, checkDb bool) (*BeaconClient, error) { if uniqueNodeIdentifier == 0 { uniqueNodeIdentifier := rand.Int() log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") @@ -109,6 +110,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: metrics, UniqueNodeIdentifier: uniqueNodeIdentifier, + CheckDb: checkDb, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), }, nil } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 0856cbe..0561913 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -204,6 +204,7 @@ var ( dbDriver: dbDriver, knownGapsTableIncrement: knownGapsTableIncrement, bcUniqueIdentifier: bcUniqueIdentifier, + checkDb: true, } BeaconNodeTester = TestBeaconNode{ @@ -424,6 +425,7 @@ type Config struct { dbDriver string knownGapsTableIncrement int bcUniqueIdentifier int + checkDb bool } ////////////////////////////////////////////////////// @@ -433,7 +435,7 @@ type Config struct { // Must run before each test. We can't use the beforeEach because of the way // Gingko treats race conditions. func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { - bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) + bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier, config.checkDb) Expect(err).ToNot(HaveOccurred()) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index cd88c90..a569568 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -30,7 +30,7 @@ import ( func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) + errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting Historical") return errs } @@ -89,7 +89,7 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { +func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) @@ -99,7 +99,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d // Start workers for w := 1; w <= maxWorkers; w++ { log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") - go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) + go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb) } // Process all ranges and send each individual slot to the worker. diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 5e51fb8..d351f9a 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -24,6 +24,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" + "golang.org/x/sync/errgroup" ) var ( @@ -54,6 +55,14 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` CheckProposedStmt string = `SELECT slot, block_root FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` + // Check to see if the slot and block_root exist in ethcl.signed_beacon_block + CheckSignedBeaconBlockStmt string = `SELECT slot, block_root + FROM ethcl.signed_beacon_block + WHERE slot=$1 AND block_root=$2` + // Check to see if the slot and state_root exist in ethcl.beacon_state + CheckBeaconStateStmt string = `SELECT slot, state_root + FROM ethcl.beacon_state + WHERE slot=$1 AND state_root=$2` // Used to get a single slot from the table if it exists QueryBySlotStmt string = `SELECT slot FROM ethcl.slots @@ -275,7 +284,7 @@ func (dw *DatabaseWriter) upsertBeaconState() error { // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. -func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { +func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { slotNum, strErr := strconv.Atoi(slot) if strErr != nil { loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") @@ -312,19 +321,8 @@ func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string }).Error("Too many rows were marked as proposed!") transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } else if proposedCount == 0 { - var count int - err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) - if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) - } else if count != 1 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ - "proposedCount": count, - }).Warn("The proposed block was not marked as proposed...") - transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) - } else { - loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") - } + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) + loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") } metrics.IncrementReorgsInsert(1) @@ -343,7 +341,7 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") } }() - transactReorgs(db, tx, ctx, slot, latestBlockRoot, metrics) + transactReorgs(tx, ctx, slot, latestBlockRoot, metrics) if err = tx.Commit(ctx); err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") } @@ -520,7 +518,55 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo if err != nil { return false, err } - + if row > 0 { + return true, nil + } + return false, nil +} + +// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block +// and ethcl.beacon_state. If the slot exists, return true +func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) { + var ( + isInBeaconState bool + isInSignedBeaconBlock bool + err error + ) + errG, _ := errgroup.WithContext(context.Background()) + errG.Go(func() error { + isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state") + } + return err + }) + errG.Go(func() error { + isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block") + } + return err + }) + if err := errG.Wait(); err != nil { + return false, err + } + if isInBeaconState && isInSignedBeaconBlock { + return true, nil + } + return false, nil +} + +// Provide a statement, slot, and root, and this function will check to see +// if the slot and root exist in the table. +func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) { + processRow, err := db.Exec(context.Background(), statement, slot, root) + if err != nil { + return false, err + } + row, err := processRow.RowsAffected() + if err != nil { + return false, err + } if row > 0 { return true, nil } diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index c09e3d8..23d55cc 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -31,9 +31,9 @@ var _ = Describe("Healthcheck", func() { BeforeEach(func() { var err error - Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) - errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) + errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) }) diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 7610191..457a116 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -60,7 +60,7 @@ func (bc *BeaconClient) handleHead() { log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") - go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) + go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 3433f77..2e2b995 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -94,10 +94,10 @@ func (hp historicProcessing) releaseDbLocks() error { } // Process the slot range. -func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { +func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { for slot := range workCh { log.Debug("Handling slot: ", slot) - err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics) + err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb) errMs := batchHistoricError{ err: err, errProcess: errProcess, diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 01762da..85f2834 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -61,7 +61,7 @@ type KnownGapsProcessing struct { func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} - errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) + errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting known gaps processing service") return errs } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 6ab3043..e515f5d 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -90,10 +90,6 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot Metrics: metrics, } - if checkDb { - // Check the DB to see if this slot exists. - } - g, _ := errgroup.WithContext(context.Background()) vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) @@ -119,8 +115,23 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return err, "processSlot" } + finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() + if err != nil { + return err, "CalculateBlockRoot" + } + if checkDb { + inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) + if err != nil { + return err, "checkDb" + } + if inDb { + log.WithField("slot", slot).Info("Slot already in the DB.") + return nil, "" + } + } + // Get this object ready to write - dw, err := ps.createWriteObjects() + dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, ps.Status, finalEth1BlockHash) if err != nil { return err, "blockRoot" } @@ -154,20 +165,20 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { +func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { // Get the knownGaps at startUp. if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, true) + err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { - return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, true) +func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) { + return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb) } // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. @@ -256,7 +267,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou "slot": ps.FullBeaconState.Slot(), "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - transactReorgs(ps.Db, tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) + transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) } else if previousSlot > int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -273,21 +284,37 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") - transactReorgs(ps.Db, tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) + transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) } else { log.Debug("Previous Slot and Current Slot are one distance from each other.") } } // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { +func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, status, eth1BlockHash string) (*DatabaseWriter, error) { + + if ps.Status != "" { + status = ps.Status + } else { + status = "proposed" + } + + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) + if err != nil { + return dw, err + } + + return dw, nil +} + +// This function will return the final blockRoot, stateRoot, and eth1BlockHash that will be +// used to write to a DB +func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) { var ( stateRoot string blockRoot string - status string eth1BlockHash string ) - if ps.Status == "skipped" { stateRoot = "" blockRoot = "" @@ -307,24 +334,12 @@ func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot() //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) if err != nil { - return nil, err + return "", "", "", err } blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:]) - log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:") + log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz") } eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) } - - if ps.Status != "" { - status = ps.Status - } else { - status = "proposed" - } - - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) - if err != nil { - return dw, err - } - - return dw, nil + return blockRoot, stateRoot, eth1BlockHash, nil }