diff --git a/cmd/boot.go b/cmd/boot.go index 6f98729..18dc6dd 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -47,7 +47,7 @@ func bootApp() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/capture.go b/cmd/capture.go index d4ad6fc..5735b11 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -40,6 +40,7 @@ var ( bcType string bcMaxHistoricProcessWorker int bcUniqueNodeIdentifier int + bcCheckDb bool kgMaxWorker int kgTableIncrement int kgProcessGaps bool @@ -95,6 +96,7 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") + captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") // err = captureCmd.MarkPersistentFlagRequired("bc.address") // exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -149,6 +151,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) exitErr(err) + err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) + exitErr(err) // Here you will define your flags and configuration settings. //// Known Gap Specific diff --git a/cmd/head.go b/cmd/head.go index f2dd0e5..fa7aa6a 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -50,7 +50,7 @@ func startHeadTracking() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/historic.go b/cmd/historic.go index e9d8d95..34ad1b5 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -50,7 +50,7 @@ func startHistoricProcessing() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) + viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 69746ca..96c2c6f 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -15,7 +15,8 @@ "bootMaxRetry": 5, "maxHistoricProcessWorker": 2, "connectionProtocol": "http", - "uniqueNodeIdentifier": 100 + "uniqueNodeIdentifier": 100, + "checkDb": true }, "t": { "skipSync": true diff --git a/internal/boot/boot.go b/internal/boot/boot.go index b1e476b..1e42cce 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -42,11 +42,11 @@ var ( // // 3. Make sure the node is synced, unless disregardSync is true. func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) + Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier, checkDb) if err != nil { return Bc, nil, err } @@ -86,14 +86,14 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, - startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { + startUpMode string, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) { var err error if bcMaxRetry < 0 { i := 0 for { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -108,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } else { for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index ef621ae..fe8933f 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -39,51 +39,52 @@ var _ = Describe("Boot", func() { bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 bcUniqueIdentifier int = 100 + bcCheckDb bool = false ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier, bcCheckDb) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0, bcCheckDb) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb) Expect(err).To(HaveOccurred()) }) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index c6fa00e..9dbb94d 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -50,6 +50,7 @@ var ( bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 bcUniqueIdentifier int = 100 + bcCheckDb bool = false maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second DB sql.Database BC *beaconclient.BeaconClient @@ -62,7 +63,7 @@ var _ = Describe("Shutdown", func() { BeforeEach(func() { ctx = context.Background() BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, - bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) + bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index e9fd5c0..8f24207 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -51,6 +51,7 @@ type BeaconClient struct { KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing + CheckDb bool // Should we check the DB to see if the slot exists before processing it? // Used for Head Tracking @@ -88,7 +89,7 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int, checkDb bool) (*BeaconClient, error) { if uniqueNodeIdentifier == 0 { uniqueNodeIdentifier := rand.Int() log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") @@ -109,6 +110,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: metrics, UniqueNodeIdentifier: uniqueNodeIdentifier, + CheckDb: checkDb, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), }, nil } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 0856cbe..0561913 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -204,6 +204,7 @@ var ( dbDriver: dbDriver, knownGapsTableIncrement: knownGapsTableIncrement, bcUniqueIdentifier: bcUniqueIdentifier, + checkDb: true, } BeaconNodeTester = TestBeaconNode{ @@ -424,6 +425,7 @@ type Config struct { dbDriver string knownGapsTableIncrement int bcUniqueIdentifier int + checkDb bool } ////////////////////////////////////////////////////// @@ -433,7 +435,7 @@ type Config struct { // Must run before each test. We can't use the beforeEach because of the way // Gingko treats race conditions. func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { - bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) + bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier, config.checkDb) Expect(err).ToNot(HaveOccurred()) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index cd88c90..a569568 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -30,7 +30,7 @@ import ( func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) + errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting Historical") return errs } @@ -89,7 +89,7 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { +func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) @@ -99,7 +99,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d // Start workers for w := 1; w <= maxWorkers; w++ { log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") - go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) + go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb) } // Process all ranges and send each individual slot to the worker. diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 0a288c3..d69f306 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -16,7 +16,7 @@ import ( var _ = Describe("Capturehistoric", func() { Describe("Run the application in historic mode", Label("unit", "behavioral"), func() { - Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() { + Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", func() { It("Successfully Process the Block", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index fd0ac6c..cc53646 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" + "github.com/jackc/pgx/v4" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" @@ -54,6 +55,14 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` CheckProposedStmt string = `SELECT slot, block_root FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` + // Check to see if the slot and block_root exist in ethcl.signed_beacon_block + CheckSignedBeaconBlockStmt string = `SELECT slot, block_root + FROM ethcl.signed_beacon_block + WHERE slot=$1 AND block_root=$2` + // Check to see if the slot and state_root exist in ethcl.beacon_state + CheckBeaconStateStmt string = `SELECT slot, state_root + FROM ethcl.beacon_state + WHERE slot=$1 AND state_root=$2` // Used to get a single slot from the table if it exists QueryBySlotStmt string = `SELECT slot FROM ethcl.slots @@ -75,6 +84,8 @@ VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` // Remove any of it from the processslot file. type DatabaseWriter struct { Db sql.Database + Tx sql.Tx + Ctx context.Context Metrics *BeaconClientMetrics DbSlots *DbSlots DbSignedBeaconBlock *DbSignedBeaconBlock @@ -85,14 +96,21 @@ type DatabaseWriter struct { func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogError(err).Error("We are unable to Begin a SQL transaction") + } dw := &DatabaseWriter{ Db: db, + Tx: tx, + Ctx: ctx, rawBeaconState: rawBeaconState, rawSignedBeaconBlock: rawSignedBeaconBlock, Metrics: metrics, } dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) - err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) + err = dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) if err != nil { return nil, err } @@ -150,28 +168,40 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er return nil } -// Write all the data for a given slot. -func (dw *DatabaseWriter) writeFullSlot() error { +// Add all the data for a given slot to a SQL transaction. +// Originally it wrote to each table individually. +func (dw *DatabaseWriter) transactFullSlot() error { // If an error occurs, write to knownGaps table. log.WithFields(log.Fields{ "slot": dw.DbSlots.Slot, }).Debug("Starting to write to the DB.") - err := dw.writeSlots() + err := dw.transactSlots() if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...") return err } log.Debug("We finished writing to the ethcl.slots table.") if dw.DbSlots.Status != "skipped" { - errG, _ := errgroup.WithContext(context.Background()) - errG.Go(func() error { - return dw.writeSignedBeaconBlocks() - }) - errG.Go(func() error { - return dw.writeBeaconState() - }) - if err := errG.Wait(); err != nil { - loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") + //errG, _ := errgroup.WithContext(context.Background()) + //errG.Go(func() error { + // return dw.transactSignedBeaconBlocks() + //}) + //errG.Go(func() error { + // return dw.transactBeaconState() + //}) + //if err := errG.Wait(); err != nil { + // loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") + // return err + //} + // Might want to seperate writing to public.blocks so we can do this concurrently... + err := dw.transactSignedBeaconBlocks() + if err != nil { + loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block table...") + return err + } + err = dw.transactBeaconState() + if err != nil { + loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl state table...") return err } } @@ -179,16 +209,16 @@ func (dw *DatabaseWriter) writeFullSlot() error { return nil } -// Write the information for the generic slots table. For now this is only one function. +// Add data for the ethcl.slots table to a transaction. For now this is only one function. // But in the future if we need to incorporate any FK's or perform any actions to write to the // slots table we can do it all here. -func (dw *DatabaseWriter) writeSlots() error { +func (dw *DatabaseWriter) transactSlots() error { return dw.upsertSlots() } // Upsert to the ethcl.slots table. func (dw *DatabaseWriter) upsertSlots() error { - _, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) + _, err := dw.Tx.Exec(dw.Ctx, UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") return err @@ -196,8 +226,8 @@ func (dw *DatabaseWriter) upsertSlots() error { return nil } -// Write the information for the signed_beacon_block. -func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { +// Add the information for the signed_beacon_block to a transaction. +func (dw *DatabaseWriter) transactSignedBeaconBlocks() error { err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) if err != nil { return err @@ -211,7 +241,7 @@ func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { // Upsert to public.blocks. func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { - _, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) + _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") return err @@ -221,7 +251,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { // Upsert to the ethcl.signed_beacon_block table. func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { - _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) + _, err := dw.Tx.Exec(dw.Ctx, UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") return err @@ -229,8 +259,8 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { return nil } -// Write the information for the beacon_state. -func (dw *DatabaseWriter) writeBeaconState() error { +// Add the information for the beacon_state to a transaction. +func (dw *DatabaseWriter) transactBeaconState() error { err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) if err != nil { return err @@ -244,7 +274,7 @@ func (dw *DatabaseWriter) writeBeaconState() error { // Upsert to the ethcl.beacon_state table. func (dw *DatabaseWriter) upsertBeaconState() error { - _, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) + _, err := dw.Tx.Exec(dw.Ctx, UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") return err @@ -252,23 +282,23 @@ func (dw *DatabaseWriter) upsertBeaconState() error { return nil } -// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. +// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. -func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { +func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { slotNum, strErr := strconv.Atoi(slot) if strErr != nil { loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") } - forkCount, err := updateForked(db, slot, latestBlockRoot) + forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } - proposedCount, err := updateProposed(db, slot, latestBlockRoot) + proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } if forkCount > 0 { @@ -289,29 +319,37 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "proposedCount": proposedCount, }).Error("Too many rows were marked as proposed!") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics) } else if proposedCount == 0 { - var count int - err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) - if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) - } else if count != 1 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ - "proposedCount": count, - }).Warn("The proposed block was not marked as proposed...") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) - } else { - loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") - } + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics) + loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") } metrics.IncrementReorgsInsert(1) } +// Wrapper function that will create a transaction and execute the function. +func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs") + } + defer func() { + err := tx.Rollback(ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") + } + }() + transactReorgs(tx, ctx, slot, latestBlockRoot, metrics) + if err = tx.Commit(ctx); err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") + } +} + // Update the slots table by marking the old slot's as forked. -func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) { - res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot) +func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { + res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots") return 0, err @@ -324,8 +362,9 @@ func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, return count, err } -func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) { - res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot) +// Mark a slot as proposed. +func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { + res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.") return 0, err @@ -339,20 +378,26 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64 return count, err } -// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into +// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // have 10 entries as follows: 1-10, 11-20, etc... -func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { +func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { + var entryErrorMsg string + if entryError == nil { + entryErrorMsg = "" + } else { + entryErrorMsg = entryError.Error() + } if endSlot-startSlot <= tableIncrement { kgModel := DbKnownGaps{ StartSlot: strconv.Itoa(startSlot), EndSlot: strconv.Itoa(endSlot), CheckedOut: false, ReprocessingError: "", - EntryError: entryError.Error(), + EntryError: entryErrorMsg, EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel, metric) + upsertKnownGaps(tx, ctx, kgModel, metric) } else { totalSlots := endSlot - startSlot var chunks int @@ -374,18 +419,37 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot EndSlot: strconv.Itoa(tempEnd), CheckedOut: false, ReprocessingError: "", - EntryError: entryError.Error(), + EntryError: entryErrorMsg, EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel, metric) + upsertKnownGaps(tx, ctx, kgModel, metric) } } +} +// Wrapper function, instead of adding the knownGaps entries to a transaction, it will +// create the transaction and write it. +func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps") + } + defer func() { + err := tx.Rollback(ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") + } + }() + transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric) + if err = tx.Commit(ctx); err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps") + } } // A function to upsert a single entry to the ethcl.known_gaps table. -func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) { - _, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, +func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric *BeaconClientMetrics) { + _, err := tx.Exec(ctx, UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) if err != nil { log.WithFields(log.Fields{ @@ -460,7 +524,55 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo if err != nil { return false, err } - + if row > 0 { + return true, nil + } + return false, nil +} + +// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block +// and ethcl.beacon_state. If the slot exists, return true +func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) { + var ( + isInBeaconState bool + isInSignedBeaconBlock bool + err error + ) + errG, _ := errgroup.WithContext(context.Background()) + errG.Go(func() error { + isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state") + } + return err + }) + errG.Go(func() error { + isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block") + } + return err + }) + if err := errG.Wait(); err != nil { + return false, err + } + if isInBeaconState && isInSignedBeaconBlock { + return true, nil + } + return false, nil +} + +// Provide a statement, slot, and root, and this function will check to see +// if the slot and root exist in the table. +func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) { + processRow, err := db.Exec(context.Background(), statement, slot, root) + if err != nil { + return false, err + } + row, err := processRow.RowsAffected() + if err != nil { + return false, err + } if row > 0 { return true, nil } diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index c09e3d8..23d55cc 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -31,9 +31,9 @@ var _ = Describe("Healthcheck", func() { BeforeEach(func() { var err error - Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) - errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) + errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) }) diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 7610191..457a116 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -60,7 +60,7 @@ func (bc *BeaconClient) handleHead() { log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") - go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) + go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index eb3e71b..af6e735 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -94,16 +94,16 @@ func (hp historicProcessing) releaseDbLocks() error { } // Process the slot range. -func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { +func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) { for slot := range workCh { log.Debug("Handling slot: ", slot) - err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics) - errMs := batchHistoricError{ - err: err, - errProcess: errProcess, - slot: slot, - } + err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb) if err != nil { + errMs := batchHistoricError{ + err: err, + errProcess: errProcess, + slot: slot, + } errCh <- errMs } } @@ -150,7 +150,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } defer func() { err := tx.Rollback(ctx) - if err != nil { + if err != nil && err != pgx.ErrTxClosed { loghelper.LogError(err).Error("We were unable to Rollback a transaction") errCount = append(errCount, err) } diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 01762da..85f2834 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -61,7 +61,7 @@ type KnownGapsProcessing struct { func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} - errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) + errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb) log.Debug("Exiting known gaps processing service") return errs } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 146d53c..80fb2d8 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/jackc/pgx/v4" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/consensus-types/wrapper" dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect" @@ -79,7 +80,7 @@ type ProcessSlot struct { // This function will do all the work to process the slot and write it to the DB. // It will return the error and error process. The error process is used for providing reach detail to the // known_gaps table. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) { +func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) { ps := &ProcessSlot{ Slot: slot, BlockRoot: blockRoot, @@ -114,13 +115,34 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return err, "processSlot" } + finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() + if err != nil { + return err, "CalculateBlockRoot" + } + if checkDb { + inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) + if err != nil { + return err, "checkDb" + } + if inDb { + log.WithField("slot", slot).Info("Slot already in the DB.") + return nil, "" + } + } + // Get this object ready to write - dw, err := ps.createWriteObjects() + dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash) if err != nil { return err, "blockRoot" } // Write the object to the DB. - err = dw.writeFullSlot() + defer func() { + err := dw.Tx.Rollback(dw.Ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction") + } + }() + err = dw.transactFullSlot() if err != nil { return err, "processSlot" } @@ -131,26 +153,32 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return fmt.Errorf("headOrHistoric must be either historic or head!"), "" } if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { - ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) + ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) } + + // Commit the transaction + if err = dw.Tx.Commit(dw.Ctx); err != nil { + return err, "transactionCommit" + } + return nil, "" } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { +func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { // Get the knownGaps at startUp. if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) + err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { - return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1) +func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) { + return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb) } // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. @@ -232,14 +260,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver } // Check to make sure that the previous block we processed is the parent of the current block. -func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { +func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) if previousSlot == int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "slot": ps.FullBeaconState.Slot(), "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) + transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) } else if previousSlot > int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -250,27 +278,43 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "previousSlot": previousSlot, "currentSlot": ps.FullBeaconState.Slot(), }).Error("We skipped a few slots.") - writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) + transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") - writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) + transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) } else { log.Debug("Previous Slot and Current Slot are one distance from each other.") } } // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { +func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash string) (*DatabaseWriter, error) { + var status string + if ps.Status != "" { + status = ps.Status + } else { + status = "proposed" + } + + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) + if err != nil { + return dw, err + } + + return dw, nil +} + +// This function will return the final blockRoot, stateRoot, and eth1BlockHash that will be +// used to write to a DB +func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) { var ( stateRoot string blockRoot string - status string eth1BlockHash string ) - if ps.Status == "skipped" { stateRoot = "" blockRoot = "" @@ -290,24 +334,12 @@ func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot() //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) if err != nil { - return nil, err + return "", "", "", err } blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:]) - log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:") + log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz") } eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) } - - if ps.Status != "" { - status = ps.Status - } else { - status = "proposed" - } - - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) - if err != nil { - return dw, err - } - - return dw, nil + return blockRoot, stateRoot, eth1BlockHash, nil }