Checkpoint - Check the DB before writing

This commit is contained in:
Abdul Rabbani 2022-06-03 19:32:08 -04:00
parent 11dce3db34
commit fd053a240f
17 changed files with 145 additions and 73 deletions

View File

@ -47,7 +47,7 @@ func bootApp() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -40,6 +40,7 @@ var (
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcUniqueNodeIdentifier int bcUniqueNodeIdentifier int
bcCheckDb bool
kgMaxWorker int kgMaxWorker int
kgTableIncrement int kgTableIncrement int
kgProcessGaps bool kgProcessGaps bool
@ -95,6 +96,7 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
// exitErr(err) // exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.port") // err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -149,6 +151,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
//// Known Gap Specific //// Known Gap Specific

View File

@ -50,7 +50,7 @@ func startHeadTracking() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -50,7 +50,7 @@ func startHistoricProcessing() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -15,7 +15,8 @@
"bootMaxRetry": 5, "bootMaxRetry": 5,
"maxHistoricProcessWorker": 2, "maxHistoricProcessWorker": 2,
"connectionProtocol": "http", "connectionProtocol": "http",
"uniqueNodeIdentifier": 100 "uniqueNodeIdentifier": 100,
"checkDb": true
}, },
"t": { "t": {
"skipSync": true "skipSync": true

View File

@ -42,11 +42,11 @@ var (
// //
// 3. Make sure the node is synced, unless disregardSync is true. // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
return Bc, nil, err return Bc, nil, err
} }
@ -86,14 +86,14 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { startUpMode string, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
if bcMaxRetry < 0 { if bcMaxRetry < 0 {
i := 0 i := 0
for { for {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -108,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} else { } else {
for i := 0; i < bcMaxRetry; i++ { for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,

View File

@ -39,51 +39,52 @@ var _ = Describe("Boot", func() {
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100 bcUniqueIdentifier int = 100
bcCheckDb bool = false
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })

View File

@ -50,6 +50,7 @@ var (
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100 bcUniqueIdentifier int = 100
bcCheckDb bool = false
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database DB sql.Database
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
@ -62,7 +63,7 @@ var _ = Describe("Shutdown", func() {
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -51,6 +51,7 @@ type BeaconClient struct {
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
// Used for Head Tracking // Used for Head Tracking
@ -88,7 +89,7 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int, checkDb bool) (*BeaconClient, error) {
if uniqueNodeIdentifier == 0 { if uniqueNodeIdentifier == 0 {
uniqueNodeIdentifier := rand.Int() uniqueNodeIdentifier := rand.Int()
log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.")
@ -109,6 +110,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: metrics, Metrics: metrics,
UniqueNodeIdentifier: uniqueNodeIdentifier, UniqueNodeIdentifier: uniqueNodeIdentifier,
CheckDb: checkDb,
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}, nil }, nil
} }

View File

@ -204,6 +204,7 @@ var (
dbDriver: dbDriver, dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement, knownGapsTableIncrement: knownGapsTableIncrement,
bcUniqueIdentifier: bcUniqueIdentifier, bcUniqueIdentifier: bcUniqueIdentifier,
checkDb: true,
} }
BeaconNodeTester = TestBeaconNode{ BeaconNodeTester = TestBeaconNode{
@ -424,6 +425,7 @@ type Config struct {
dbDriver string dbDriver string
knownGapsTableIncrement int knownGapsTableIncrement int
bcUniqueIdentifier int bcUniqueIdentifier int
checkDb bool
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
@ -433,7 +435,7 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way // Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions. // Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier, config.checkDb)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -30,7 +30,7 @@ import (
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
log.Info("We are starting the historical processing service.") log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
log.Debug("Exiting Historical") log.Debug("Exiting Historical")
return errs return errs
} }
@ -89,7 +89,7 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB. // 4. Remove the slot entry from the DB.
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int) workCh := make(chan int)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess)
@ -99,7 +99,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d
// Start workers // Start workers
for w := 1; w <= maxWorkers; w++ { for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb)
} }
// Process all ranges and send each individual slot to the worker. // Process all ranges and send each individual slot to the worker.

View File

@ -24,6 +24,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -54,6 +55,14 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;` WHERE slot=$1 AND block_root=$2;`
// Check to see if the slot and block_root exist in ethcl.signed_beacon_block
CheckSignedBeaconBlockStmt string = `SELECT slot, block_root
FROM ethcl.signed_beacon_block
WHERE slot=$1 AND block_root=$2`
// Check to see if the slot and state_root exist in ethcl.beacon_state
CheckBeaconStateStmt string = `SELECT slot, state_root
FROM ethcl.beacon_state
WHERE slot=$1 AND state_root=$2`
// Used to get a single slot from the table if it exists // Used to get a single slot from the table if it exists
QueryBySlotStmt string = `SELECT slot QueryBySlotStmt string = `SELECT slot
FROM ethcl.slots FROM ethcl.slots
@ -275,7 +284,7 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot) slotNum, strErr := strconv.Atoi(slot)
if strErr != nil { if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
@ -312,19 +321,8 @@ func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string
}).Error("Too many rows were marked as proposed!") }).Error("Too many rows were marked as proposed!")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else if proposedCount == 0 { } else if proposedCount == 0 {
var count int transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
} }
metrics.IncrementReorgsInsert(1) metrics.IncrementReorgsInsert(1)
@ -343,7 +341,7 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
} }
}() }()
transactReorgs(db, tx, ctx, slot, latestBlockRoot, metrics) transactReorgs(tx, ctx, slot, latestBlockRoot, metrics)
if err = tx.Commit(ctx); err != nil { if err = tx.Commit(ctx); err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
} }
@ -520,7 +518,55 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo
if err != nil { if err != nil {
return false, err return false, err
} }
if row > 0 {
return true, nil
}
return false, nil
}
// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block
// and ethcl.beacon_state. If the slot exists, return true
func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
var (
isInBeaconState bool
isInSignedBeaconBlock bool
err error
)
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state")
}
return err
})
errG.Go(func() error {
isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block")
}
return err
})
if err := errG.Wait(); err != nil {
return false, err
}
if isInBeaconState && isInSignedBeaconBlock {
return true, nil
}
return false, nil
}
// Provide a statement, slot, and root, and this function will check to see
// if the slot and root exist in the table.
func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) {
processRow, err := db.Exec(context.Background(), statement, slot, root)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 { if row > 0 {
return true, nil return true, nil
} }

View File

@ -31,9 +31,9 @@ var _ = Describe("Healthcheck", func() {
BeforeEach(func() { BeforeEach(func() {
var err error var err error
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -60,7 +60,7 @@ func (bc *BeaconClient) handleHead() {
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")

View File

@ -94,10 +94,10 @@ func (hp historicProcessing) releaseDbLocks() error {
} }
// Process the slot range. // Process the slot range.
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) {
for slot := range workCh { for slot := range workCh {
log.Debug("Handling slot: ", slot) log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics) err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
errMs := batchHistoricError{ errMs := batchHistoricError{
err: err, err: err,
errProcess: errProcess, errProcess: errProcess,

View File

@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.") log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)}
errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
log.Debug("Exiting known gaps processing service") log.Debug("Exiting known gaps processing service")
return errs return errs
} }

View File

@ -90,10 +90,6 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
Metrics: metrics, Metrics: metrics,
} }
if checkDb {
// Check the DB to see if this slot exists.
}
g, _ := errgroup.WithContext(context.Background()) g, _ := errgroup.WithContext(context.Background())
vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1)
@ -119,8 +115,23 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err, "processSlot" return err, "processSlot"
} }
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
if err != nil {
return err, "CalculateBlockRoot"
}
if checkDb {
inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil {
return err, "checkDb"
}
if inDb {
log.WithField("slot", slot).Info("Slot already in the DB.")
return nil, ""
}
}
// Get this object ready to write // Get this object ready to write
dw, err := ps.createWriteObjects() dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, ps.Status, finalEth1BlockHash)
if err != nil { if err != nil {
return err, "blockRoot" return err, "blockRoot"
} }
@ -154,20 +165,20 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp. // Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" { if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
} }
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, true) err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil { if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
} }
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, true) return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
} }
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
@ -256,7 +267,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"slot": ps.FullBeaconState.Slot(), "slot": ps.FullBeaconState.Slot(),
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
transactReorgs(ps.Db, tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) { } else if previousSlot > int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -273,21 +284,37 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
transactReorgs(ps.Db, tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
} else { } else {
log.Debug("Previous Slot and Current Slot are one distance from each other.") log.Debug("Previous Slot and Current Slot are one distance from each other.")
} }
} }
// Transforms all the raw data into DB models that can be written to the DB. // Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, status, eth1BlockHash string) (*DatabaseWriter, error) {
if ps.Status != "" {
status = ps.Status
} else {
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}
return dw, nil
}
// This function will return the final blockRoot, stateRoot, and eth1BlockHash that will be
// used to write to a DB
func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) {
var ( var (
stateRoot string stateRoot string
blockRoot string blockRoot string
status string
eth1BlockHash string eth1BlockHash string
) )
if ps.Status == "skipped" { if ps.Status == "skipped" {
stateRoot = "" stateRoot = ""
blockRoot = "" blockRoot = ""
@ -307,24 +334,12 @@ func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot() rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil { if err != nil {
return nil, err return "", "", "", err
} }
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:]) blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:") log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz")
} }
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
} }
return blockRoot, stateRoot, eth1BlockHash, nil
if ps.Status != "" {
status = ps.Status
} else {
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}
return dw, nil
} }