Use a transaction for writing, knowngaps, and reorgs #53

Merged
abdulrabbani00 merged 4 commits from feature/52-database-write-improvements into develop 2022-06-06 13:02:43 +00:00
18 changed files with 276 additions and 121 deletions

View File

@ -47,7 +47,7 @@ func bootApp() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -40,6 +40,7 @@ var (
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcUniqueNodeIdentifier int bcUniqueNodeIdentifier int
bcCheckDb bool
kgMaxWorker int kgMaxWorker int
kgTableIncrement int kgTableIncrement int
kgProcessGaps bool kgProcessGaps bool
@ -95,6 +96,7 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
// exitErr(err) // exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.port") // err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -149,6 +151,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
//// Known Gap Specific //// Known Gap Specific

View File

@ -50,7 +50,7 @@ func startHeadTracking() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -50,7 +50,7 @@ func startHistoricProcessing() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -15,7 +15,8 @@
"bootMaxRetry": 5, "bootMaxRetry": 5,
"maxHistoricProcessWorker": 2, "maxHistoricProcessWorker": 2,
"connectionProtocol": "http", "connectionProtocol": "http",
"uniqueNodeIdentifier": 100 "uniqueNodeIdentifier": 100,
"checkDb": true
}, },
"t": { "t": {
"skipSync": true "skipSync": true

View File

@ -42,11 +42,11 @@ var (
// //
// 3. Make sure the node is synced, unless disregardSync is true. // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
return Bc, nil, err return Bc, nil, err
} }
@ -86,14 +86,14 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { startUpMode string, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
if bcMaxRetry < 0 { if bcMaxRetry < 0 {
i := 0 i := 0
for { for {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -108,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} else { } else {
for i := 0; i < bcMaxRetry; i++ { for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,

View File

@ -39,51 +39,52 @@ var _ = Describe("Boot", func() {
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100 bcUniqueIdentifier int = 100
bcCheckDb bool = false
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0, bcCheckDb)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })

View File

@ -50,6 +50,7 @@ var (
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100 bcUniqueIdentifier int = 100
bcCheckDb bool = false
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database DB sql.Database
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
@ -62,7 +63,7 @@ var _ = Describe("Shutdown", func() {
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -51,6 +51,7 @@ type BeaconClient struct {
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
// Used for Head Tracking // Used for Head Tracking
@ -88,7 +89,7 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int, checkDb bool) (*BeaconClient, error) {
if uniqueNodeIdentifier == 0 { if uniqueNodeIdentifier == 0 {
uniqueNodeIdentifier := rand.Int() uniqueNodeIdentifier := rand.Int()
log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.")
@ -109,6 +110,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: metrics, Metrics: metrics,
UniqueNodeIdentifier: uniqueNodeIdentifier, UniqueNodeIdentifier: uniqueNodeIdentifier,
CheckDb: checkDb,
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}, nil }, nil
} }

View File

@ -204,6 +204,7 @@ var (
dbDriver: dbDriver, dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement, knownGapsTableIncrement: knownGapsTableIncrement,
bcUniqueIdentifier: bcUniqueIdentifier, bcUniqueIdentifier: bcUniqueIdentifier,
checkDb: true,
} }
BeaconNodeTester = TestBeaconNode{ BeaconNodeTester = TestBeaconNode{
@ -424,6 +425,7 @@ type Config struct {
dbDriver string dbDriver string
knownGapsTableIncrement int knownGapsTableIncrement int
bcUniqueIdentifier int bcUniqueIdentifier int
checkDb bool
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
@ -433,7 +435,7 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way // Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions. // Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier, config.checkDb)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -30,7 +30,7 @@ import (
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
log.Info("We are starting the historical processing service.") log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
log.Debug("Exiting Historical") log.Debug("Exiting Historical")
return errs return errs
} }
@ -89,7 +89,7 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB. // 4. Remove the slot entry from the DB.
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int) workCh := make(chan int)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess)
@ -99,7 +99,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d
// Start workers // Start workers
for w := 1; w <= maxWorkers; w++ { for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb)
} }
// Process all ranges and send each individual slot to the worker. // Process all ranges and send each individual slot to the worker.

View File

@ -16,7 +16,7 @@ import (
var _ = Describe("Capturehistoric", func() { var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() { Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() { Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", func() {
It("Successfully Process the Block", func() { It("Successfully Process the Block", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99") bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
@ -54,6 +55,14 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
CheckProposedStmt string = `SELECT slot, block_root CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;` WHERE slot=$1 AND block_root=$2;`
// Check to see if the slot and block_root exist in ethcl.signed_beacon_block
CheckSignedBeaconBlockStmt string = `SELECT slot, block_root
FROM ethcl.signed_beacon_block
WHERE slot=$1 AND block_root=$2`
// Check to see if the slot and state_root exist in ethcl.beacon_state
CheckBeaconStateStmt string = `SELECT slot, state_root
FROM ethcl.beacon_state
WHERE slot=$1 AND state_root=$2`
// Used to get a single slot from the table if it exists // Used to get a single slot from the table if it exists
QueryBySlotStmt string = `SELECT slot QueryBySlotStmt string = `SELECT slot
FROM ethcl.slots FROM ethcl.slots
@ -75,6 +84,8 @@ VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
// Remove any of it from the processslot file. // Remove any of it from the processslot file.
type DatabaseWriter struct { type DatabaseWriter struct {
Db sql.Database Db sql.Database
Tx sql.Tx
Ctx context.Context
Metrics *BeaconClientMetrics Metrics *BeaconClientMetrics
DbSlots *DbSlots DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock DbSignedBeaconBlock *DbSignedBeaconBlock
@ -85,14 +96,21 @@ type DatabaseWriter struct {
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
}
dw := &DatabaseWriter{ dw := &DatabaseWriter{
Db: db, Db: db,
Tx: tx,
Ctx: ctx,
rawBeaconState: rawBeaconState, rawBeaconState: rawBeaconState,
rawSignedBeaconBlock: rawSignedBeaconBlock, rawSignedBeaconBlock: rawSignedBeaconBlock,
Metrics: metrics, Metrics: metrics,
} }
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) err = dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,28 +168,40 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er
return nil return nil
} }
// Write all the data for a given slot. // Add all the data for a given slot to a SQL transaction.
func (dw *DatabaseWriter) writeFullSlot() error { // Originally it wrote to each table individually.
func (dw *DatabaseWriter) transactFullSlot() error {
// If an error occurs, write to knownGaps table. // If an error occurs, write to knownGaps table.
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": dw.DbSlots.Slot, "slot": dw.DbSlots.Slot,
}).Debug("Starting to write to the DB.") }).Debug("Starting to write to the DB.")
err := dw.writeSlots() err := dw.transactSlots()
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
return err return err
} }
log.Debug("We finished writing to the ethcl.slots table.") log.Debug("We finished writing to the ethcl.slots table.")
if dw.DbSlots.Status != "skipped" { if dw.DbSlots.Status != "skipped" {
errG, _ := errgroup.WithContext(context.Background()) //errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { //errG.Go(func() error {
return dw.writeSignedBeaconBlocks() // return dw.transactSignedBeaconBlocks()
}) //})
errG.Go(func() error { //errG.Go(func() error {
return dw.writeBeaconState() // return dw.transactBeaconState()
}) //})
if err := errG.Wait(); err != nil { //if err := errG.Wait(); err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") // loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
// return err
//}
// Might want to seperate writing to public.blocks so we can do this concurrently...
err := dw.transactSignedBeaconBlocks()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block table...")
return err
}
err = dw.transactBeaconState()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl state table...")
return err return err
} }
} }
@ -179,16 +209,16 @@ func (dw *DatabaseWriter) writeFullSlot() error {
return nil return nil
} }
// Write the information for the generic slots table. For now this is only one function. // Add data for the ethcl.slots table to a transaction. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the // But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here. // slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() error { func (dw *DatabaseWriter) transactSlots() error {
return dw.upsertSlots() return dw.upsertSlots()
} }
// Upsert to the ethcl.slots table. // Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() error { func (dw *DatabaseWriter) upsertSlots() error {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) _, err := dw.Tx.Exec(dw.Ctx, UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
return err return err
@ -196,8 +226,8 @@ func (dw *DatabaseWriter) upsertSlots() error {
return nil return nil
} }
// Write the information for the signed_beacon_block. // Add the information for the signed_beacon_block to a transaction.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
if err != nil { if err != nil {
return err return err
@ -211,7 +241,7 @@ func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
// Upsert to public.blocks. // Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err return err
@ -221,7 +251,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
// Upsert to the ethcl.signed_beacon_block table. // Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) _, err := dw.Tx.Exec(dw.Ctx, UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
return err return err
@ -229,8 +259,8 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
return nil return nil
} }
// Write the information for the beacon_state. // Add the information for the beacon_state to a transaction.
func (dw *DatabaseWriter) writeBeaconState() error { func (dw *DatabaseWriter) transactBeaconState() error {
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
if err != nil { if err != nil {
return err return err
@ -244,7 +274,7 @@ func (dw *DatabaseWriter) writeBeaconState() error {
// Upsert to the ethcl.beacon_state table. // Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() error { func (dw *DatabaseWriter) upsertBeaconState() error {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) _, err := dw.Tx.Exec(dw.Ctx, UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
return err return err
@ -252,23 +282,23 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
return nil return nil
} }
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot) slotNum, strErr := strconv.Atoi(slot)
if strErr != nil { if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
} }
forkCount, err := updateForked(db, slot, latestBlockRoot) forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} }
proposedCount, err := updateProposed(db, slot, latestBlockRoot) proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} }
if forkCount > 0 { if forkCount > 0 {
@ -289,29 +319,37 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount, "proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!") }).Error("Too many rows were marked as proposed!")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics)
} else if proposedCount == 0 { } else if proposedCount == 0 {
var count int transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics)
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
} else if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
} }
}
metrics.IncrementReorgsInsert(1) metrics.IncrementReorgsInsert(1)
} }
// Wrapper function that will create a transaction and execute the function.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs")
}
defer func() {
err := tx.Rollback(ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
}
}()
transactReorgs(tx, ctx, slot, latestBlockRoot, metrics)
if err = tx.Commit(ctx); err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
}
}
// Update the slots table by marking the old slot's as forked. // Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) { func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot) res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err return 0, err
@ -324,8 +362,9 @@ func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64,
return count, err return count, err
} }
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) { // Mark a slot as proposed.
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot) func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err return 0, err
@ -339,20 +378,26 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
return count, err return count, err
} }
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into // A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc... // have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
var entryErrorMsg string
if entryError == nil {
entryErrorMsg = ""
} else {
entryErrorMsg = entryError.Error()
}
if endSlot-startSlot <= tableIncrement { if endSlot-startSlot <= tableIncrement {
kgModel := DbKnownGaps{ kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot), StartSlot: strconv.Itoa(startSlot),
EndSlot: strconv.Itoa(endSlot), EndSlot: strconv.Itoa(endSlot),
CheckedOut: false, CheckedOut: false,
ReprocessingError: "", ReprocessingError: "",
EntryError: entryError.Error(), EntryError: entryErrorMsg,
EntryProcess: entryProcess, EntryProcess: entryProcess,
} }
upsertKnownGaps(db, kgModel, metric) upsertKnownGaps(tx, ctx, kgModel, metric)
} else { } else {
totalSlots := endSlot - startSlot totalSlots := endSlot - startSlot
var chunks int var chunks int
@ -374,18 +419,37 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
EndSlot: strconv.Itoa(tempEnd), EndSlot: strconv.Itoa(tempEnd),
CheckedOut: false, CheckedOut: false,
ReprocessingError: "", ReprocessingError: "",
EntryError: entryError.Error(), EntryError: entryErrorMsg,
EntryProcess: entryProcess, EntryProcess: entryProcess,
} }
upsertKnownGaps(db, kgModel, metric) upsertKnownGaps(tx, ctx, kgModel, metric)
}
} }
} }
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will
// create the transaction and write it.
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps")
}
defer func() {
err := tx.Rollback(ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
}
}()
transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric)
if err = tx.Commit(ctx); err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps")
}
} }
// A function to upsert a single entry to the ethcl.known_gaps table. // A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) { func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric *BeaconClientMetrics) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, _, err := tx.Exec(ctx, UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
@ -460,7 +524,55 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo
if err != nil { if err != nil {
return false, err return false, err
} }
if row > 0 {
return true, nil
}
return false, nil
}
// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block
// and ethcl.beacon_state. If the slot exists, return true
func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
var (
isInBeaconState bool
isInSignedBeaconBlock bool
err error
)
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state")
}
return err
})
errG.Go(func() error {
isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block")
}
return err
})
if err := errG.Wait(); err != nil {
return false, err
}
if isInBeaconState && isInSignedBeaconBlock {
return true, nil
}
return false, nil
}
// Provide a statement, slot, and root, and this function will check to see
// if the slot and root exist in the table.
func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) {
processRow, err := db.Exec(context.Background(), statement, slot, root)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 { if row > 0 {
return true, nil return true, nil
} }

View File

@ -31,9 +31,9 @@ var _ = Describe("Healthcheck", func() {
BeforeEach(func() { BeforeEach(func() {
var err error var err error
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -60,7 +60,7 @@ func (bc *BeaconClient) handleHead() {
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")

View File

@ -94,16 +94,16 @@ func (hp historicProcessing) releaseDbLocks() error {
} }
// Process the slot range. // Process the slot range.
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) {
for slot := range workCh { for slot := range workCh {
log.Debug("Handling slot: ", slot) log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics) err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
if err != nil {
errMs := batchHistoricError{ errMs := batchHistoricError{
err: err, err: err,
errProcess: errProcess, errProcess: errProcess,
slot: slot, slot: slot,
} }
if err != nil {
errCh <- errMs errCh <- errMs
} }
} }
@ -150,7 +150,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
} }
defer func() { defer func() {
err := tx.Rollback(ctx) err := tx.Rollback(ctx)
if err != nil { if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction") loghelper.LogError(err).Error("We were unable to Rollback a transaction")
errCount = append(errCount, err) errCount = append(errCount, err)
} }

View File

@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.") log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)}
errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
log.Debug("Exiting known gaps processing service") log.Debug("Exiting known gaps processing service")
return errs return errs
} }

View File

@ -26,6 +26,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/jackc/pgx/v4"
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/consensus-types/wrapper"
dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect" dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect"
@ -79,7 +80,7 @@ type ProcessSlot struct {
// This function will do all the work to process the slot and write it to the DB. // This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the // It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table. // known_gaps table.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) { func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
ps := &ProcessSlot{ ps := &ProcessSlot{
Slot: slot, Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
@ -114,13 +115,34 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err, "processSlot" return err, "processSlot"
} }
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
if err != nil {
return err, "CalculateBlockRoot"
}
if checkDb {
inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil {
return err, "checkDb"
}
if inDb {
log.WithField("slot", slot).Info("Slot already in the DB.")
return nil, ""
}
}
// Get this object ready to write // Get this object ready to write
dw, err := ps.createWriteObjects() dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
if err != nil { if err != nil {
return err, "blockRoot" return err, "blockRoot"
} }
// Write the object to the DB. // Write the object to the DB.
err = dw.writeFullSlot() defer func() {
err := dw.Tx.Rollback(dw.Ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
}
}()
err = dw.transactFullSlot()
if err != nil { if err != nil {
return err, "processSlot" return err, "processSlot"
} }
@ -131,26 +153,32 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return fmt.Errorf("headOrHistoric must be either historic or head!"), "" return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
} }
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
// Commit the transaction
if err = dw.Tx.Commit(dw.Ctx); err != nil {
return err, "transactionCommit"
}
return nil, "" return nil, ""
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp. // Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" { if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
} }
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil { if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
} }
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1) return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
} }
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
@ -232,14 +260,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
} }
// Check to make sure that the previous block we processed is the parent of the current block. // Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) { if previousSlot == int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot(), "slot": ps.FullBeaconState.Slot(),
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) { } else if previousSlot > int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -250,27 +278,43 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"previousSlot": previousSlot, "previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot(), "currentSlot": ps.FullBeaconState.Slot(),
}).Error("We skipped a few slots.") }).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
} else if previousBlockRoot != parentRoot { } else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
} else { } else {
log.Debug("Previous Slot and Current Slot are one distance from each other.") log.Debug("Previous Slot and Current Slot are one distance from each other.")
} }
} }
// Transforms all the raw data into DB models that can be written to the DB. // Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash string) (*DatabaseWriter, error) {
var status string
if ps.Status != "" {
status = ps.Status
} else {
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}
return dw, nil
}
// This function will return the final blockRoot, stateRoot, and eth1BlockHash that will be
// used to write to a DB
func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) {
var ( var (
stateRoot string stateRoot string
blockRoot string blockRoot string
status string
eth1BlockHash string eth1BlockHash string
) )
if ps.Status == "skipped" { if ps.Status == "skipped" {
stateRoot = "" stateRoot = ""
blockRoot = "" blockRoot = ""
@ -290,24 +334,12 @@ func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot() rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil { if err != nil {
return nil, err return "", "", "", err
} }
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:]) blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:") log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz")
} }
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
} }
return blockRoot, stateRoot, eth1BlockHash, nil
if ps.Status != "" {
status = ps.Status
} else {
status = "proposed"
}
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
if err != nil {
return dw, err
}
return dw, nil
} }