From 4457888cc17999278791d41689e30ed02a5f6b11 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Tue, 24 May 2022 16:03:48 -0400 Subject: [PATCH] Update remove entry for knownGaps, viper --- cmd/capture.go | 69 ++++++++++++++------------ cmd/head.go | 40 +++++++-------- cmd/historic.go | 35 +++++++------ cmd/root.go | 1 + example.ipld-ethcl-indexer-config.json | 33 ++++++++++++ internal/boot/boot.go | 48 ++++++++++++++---- pkg/beaconclient/databasewrite.go | 30 +++++++++++ pkg/beaconclient/metrics.go | 24 +++++++-- pkg/beaconclient/processevents.go | 1 - pkg/beaconclient/processknowngaps.go | 29 ++++++++++- pkg/beaconclient/processslot.go | 18 ++++--- 11 files changed, 232 insertions(+), 96 deletions(-) create mode 100644 example.ipld-ethcl-indexer-config.json diff --git a/cmd/capture.go b/cmd/capture.go index c6bf8e5..694d110 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "os" "time" @@ -39,8 +40,10 @@ var ( bcType string bcIsProcessKnownGaps bool bcMaxHistoricProcessWorker int - bcMaxKnownGapsWorker int - maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second + kgMaxWorker int + kgTableIncrement int + kgProcessGaps bool + maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second notifierCh chan os.Signal = make(chan os.Signal, 1) testDisregardSync bool ) @@ -67,18 +70,18 @@ func init() { captureCmd.PersistentFlags().StringVarP(&dbName, "db.name", "n", "", "Database name connect to DB(required)") captureCmd.PersistentFlags().StringVarP(&dbDriver, "db.driver", "", "", "Database Driver to connect to DB(required)") captureCmd.PersistentFlags().IntVarP(&dbPort, "db.port", "", 0, "Port to connect to DB(required)") - err := captureCmd.MarkPersistentFlagRequired("db.username") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("db.password") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("db.address") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("db.port") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("db.name") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("db.driver") - exitErr(err) + //err := captureCmd.MarkPersistentFlagRequired("db.username") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("db.password") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("db.address") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("db.port") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("db.name") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("db.driver") + // exitErr(err) //// Beacon Client Specific captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)") @@ -88,19 +91,22 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") - captureCmd.PersistentFlags().IntVarP(&bcMaxKnownGapsWorker, "bc.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") - captureCmd.PersistentFlags().BoolVar(&bcIsProcessKnownGaps, "bc.knownGapsProcess", false, "Should we process entries from the knownGaps table as they occur?") - err = captureCmd.MarkPersistentFlagRequired("bc.address") - exitErr(err) - err = captureCmd.MarkPersistentFlagRequired("bc.port") - exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("bc.address") + // exitErr(err) + // err = captureCmd.MarkPersistentFlagRequired("bc.port") + // exitErr(err) + + //// Known Gaps specific + captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the ethcl.known_gaps table.") + captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") + captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.") //// Testing Specific captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") // Bind Flags with Viper //// DB Flags - err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username")) + err := viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username")) exitErr(err) err = viper.BindPFlag("db.password", captureCmd.PersistentFlags().Lookup("db.password")) exitErr(err) @@ -111,13 +117,11 @@ func init() { err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) - // Testing Specific + //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) exitErr(err) - err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver")) - exitErr(err) - // LH specific + //// LH specific err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) exitErr(err) err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type")) @@ -130,22 +134,25 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) exitErr(err) - err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) - exitErr(err) - err = viper.BindPFlag("bc.knownGapsProcess", captureCmd.PersistentFlags().Lookup("bc.knownGapsProcess")) - exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) - err = viper.BindPFlag("bc.maxKnownGapsWorker", captureCmd.PersistentFlags().Lookup("bc.maxKnownGapsWorker")) - exitErr(err) // Here you will define your flags and configuration settings. + //// Known Gap Specific + err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.processKnownGaps")) + exitErr(err) + err = viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment")) + exitErr(err) + err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker")) + exitErr(err) + } // Helper function to catch any errors. // We need to capture these errors for the linter. func exitErr(err error) { if err != nil { + fmt.Println("Error: ", err) os.Exit(1) } } diff --git a/cmd/head.go b/cmd/head.go index 67392a7..7589fd5 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -29,10 +29,6 @@ import ( "golang.org/x/sync/errgroup" ) -var ( - kgTableIncrement int -) - // headCmd represents the head command var headCmd = &cobra.Command{ Use: "head", @@ -49,8 +45,9 @@ func startHeadTracking() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) if err != nil { StopApplicationPreBoot(err, Db) } @@ -58,19 +55,21 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks go Bc.CaptureHead() - if bcIsProcessKnownGaps { - errG := new(errgroup.Group) - errG.Go(func() error { - errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker) - if len(errs) != 0 { - log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") - return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + if viper.GetBool("kg.processKnownGaps") { + go func() { + errG := new(errgroup.Group) + errG.Go(func() error { + errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker")) + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") + return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + } + return nil + }) + if err := errG.Wait(); err != nil { + loghelper.LogError(err).Error("Error with knownGaps processing") } - return nil - }) - if err := errG.Wait(); err != nil { - loghelper.LogError(err).Error("Error with knownGaps processing") - } + }() } // Shutdown when the time is right. @@ -85,9 +84,4 @@ func startHeadTracking() { func init() { captureCmd.AddCommand(headCmd) - - // Known Gaps specific - captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") - err := viper.BindPFlag("kg.increment", captureCmd.PersistentFlags().Lookup("kg.increment")) - exitErr(err) } diff --git a/cmd/historic.go b/cmd/historic.go index 8618ff9..519d00e 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" @@ -46,8 +47,9 @@ func startHistoricProcessing() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) if err != nil { StopApplicationPreBoot(err, Db) } @@ -55,7 +57,7 @@ func startHistoricProcessing() { errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(bcMaxHistoricProcessWorker) + errs := Bc.CaptureHistoric(viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -65,21 +67,22 @@ func startHistoricProcessing() { return nil }) - if bcIsProcessKnownGaps { - errG.Go(func() error { - errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker) - if len(errs) != 0 { - log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") - return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + if viper.GetBool("kg.processKnownGaps") { + go func() { + errG := new(errgroup.Group) + errG.Go(func() error { + errs := Bc.ProcessKnownGaps(viper.GetInt("kg.maxKnownGapsWorker")) + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") + return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + } + return nil + }) + if err := errG.Wait(); err != nil { + loghelper.LogError(err).Error("Error with knownGaps processing") } - return nil - }) + }() } - if err := errG.Wait(); err != nil { - loghelper.LogError(err).Error("Error with knownGaps processing") - } - - log.Debug("WE ARE AT CHECKPOINT") // Shutdown when the time is right. err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) diff --git a/cmd/root.go b/cmd/root.go index 2625dbe..68e8e78 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -47,6 +47,7 @@ It can either do this will keeping track of head, or backfilling historic data.` func Execute() { err := rootCmd.Execute() if err != nil { + fmt.Println("Err when executing rootCmd", err) os.Exit(1) } } diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json new file mode 100644 index 0000000..82513b4 --- /dev/null +++ b/example.ipld-ethcl-indexer-config.json @@ -0,0 +1,33 @@ +{ + "db": { + "address": "localhost", + "password": "password", + "port": 8076, + "username": "vdbm", + "name": "vulcanize_testing", + "driver": "PGX" + }, + "bc": { + "address": "10.203.8.51", + "port": 5052, + "type": "lighthouse", + "bootRetryInterval": 30, + "bootMaxRetry": 5, + "maxHistoricProcessWorker": 2, + "connectionProtocol": "http" + }, + "t": { + "skipSync": false + }, + "log": { + "level": "debug", + "output": true, + "file": "./ipld-ethcl-indexer.log", + "format": "json" + }, + "kg": { + "increment": 10000, + "processKnownGaps": true, + "maxKnownGapsWorker": 2 + } +} diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 37c9537..e726cf8 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -84,18 +84,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { var err error - for i := 0; i < bcMaxRetry; i++ { - BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) - if err != nil { - log.WithFields(log.Fields{ - "retryNumber": i, - "err": err, - }).Warn("Unable to boot application. Going to try again") - time.Sleep(time.Duration(bcRetryInterval) * time.Second) - continue + + if bcMaxRetry < 0 { + i := 0 + for { + BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + if err != nil { + log.WithFields(log.Fields{ + "retryNumber": i, + "err": err, + }).Warn("Unable to boot application. Going to try again") + time.Sleep(time.Duration(bcRetryInterval) * time.Second) + i = i + 1 + continue + } + break + } + } else { + for i := 0; i < bcMaxRetry; i++ { + BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + if err != nil { + log.WithFields(log.Fields{ + "retryNumber": i, + "err": err, + }).Warn("Unable to boot application. Going to try again") + time.Sleep(time.Duration(bcRetryInterval) * time.Second) + continue + } + break } - break } switch strings.ToLower(startUpMode) { @@ -104,6 +123,13 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int case "historic": log.Debug("Performing additional boot steps for historical processing") BC.PerformHistoricalProcessing = true + // This field is not currently used. + // The idea is, that if we are doing historially processing and we get a slot + // greater than this slot, then we would rerun this function. + // this would ensure that we have the slots necessary for processing + // within the beacon server. + + // We can implement this feature if we notice any errors. headSlot, err := BC.GetLatestSlotInBeaconServer(bcType) if err != nil { return BC, DB, err diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index a559eea..fd0ac6c 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -62,6 +62,11 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` UpsertKnownGapsStmt string = ` INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process) VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` + UpsertKnownGapsErrorStmt string = ` + UPDATE ethcl.known_gaps + SET reprocessing_error=$3 + WHERE start_slot=$1 AND end_slot=$2;` + // Get the highest slot if one exists QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots" ) @@ -414,6 +419,31 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric } } +// A function to update a knownGap range with a reprocessing error. +func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocessingErr error, metric *BeaconClientMetrics) error { + res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error()) + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error") + metric.IncrementKnownGapsProcessingError(1) + return err + } + row, err := res.RowsAffected() + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.") + metric.IncrementKnownGapsProcessingError(1) + return err + } + if row != 1 { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{ + "rowCount": row, + }).Error("The rows affected by the upsert for reprocessing_error is not 1.") + metric.IncrementKnownGapsProcessingError(1) + return err + } + metric.IncrementKnownGapsProcessed(1) + return nil +} + // A quick helper function to calculate the epoch. func calculateEpoch(slot int, slotPerEpoch int) string { epoch := slot / slotPerEpoch diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 8b8067d..3304ea5 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -21,11 +21,13 @@ import ( // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. type BeaconClientMetrics struct { - SlotInserts uint64 // Number of head events we successfully wrote to the DB. - ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB. - KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. - HeadError uint64 // Number of errors that occurred when decoding the head message. - HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. + SlotInserts uint64 // Number of head events we successfully wrote to the DB. + ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB. + KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. + knownGapsProcessed uint64 // Number of knownGaps processed. + KnownGapsProcessingError uint64 // Number of errors that occurred while processing a knownGap + HeadError uint64 // Number of errors that occurred when decoding the head message. + HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. } // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all @@ -46,6 +48,18 @@ func (m *BeaconClientMetrics) IncrementKnownGapsInserts(inc uint64) { atomic.AddUint64(&m.KnownGapsInserts, inc) } +// Wrapper function to increment known gaps processed. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementKnownGapsProcessed(inc uint64) { + atomic.AddUint64(&m.knownGapsProcessed, inc) +} + +// Wrapper function to increment known gaps processing error. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementKnownGapsProcessingError(inc uint64) { + atomic.AddUint64(&m.KnownGapsProcessingError, inc) +} + // Wrapper function to increment head errors. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadError(inc uint64) { diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 0af605e..7610191 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -68,5 +68,4 @@ func (bc *BeaconClient) handleHead() { bc.PreviousSlot = slot bc.PreviousBlockRoot = head.Block } - } diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index b727122..6c07c02 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -19,6 +19,7 @@ package beaconclient import ( + "context" "strconv" log "github.com/sirupsen/logrus" @@ -40,6 +41,9 @@ var ( // Used to delete an entry from the knownGaps table deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;` + // Used to check to see if a single slot exists in the known_gaps table. + checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps + WHERE start_slot=$1 AND end_slot=$2;` ) type knownGapsProcessing struct { @@ -70,7 +74,28 @@ func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { for { errMs := <-errMessages - loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) - writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) + + // Check to see if this if this entry already exists. + res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot) + if err != nil { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the ethcl.known_gaps table") + } + + rows, err := res.RowsAffected() + if err != nil { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{ + "queryStatement": checkKgSingleSlotStmt, + }).Error("Unable to get the number of rows affected by this statement.") + } + + if rows > 0 { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap") + err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics) + if err != nil { + loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap") + } + } else { + writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) + } } } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index e4c15b1..146d53c 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -114,13 +114,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return err, "processSlot" } - if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { - writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot, ps.Metrics) - } - // Get this object ready to write - blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot)) - dw, err := ps.createWriteObjects(blockRootEndpoint) + dw, err := ps.createWriteObjects() if err != nil { return err, "blockRoot" } @@ -143,6 +138,10 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { + // Get the knownGaps at startUp. + if previousSlot == 0 && previousBlockRoot == "" { + writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) + } err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) @@ -241,6 +240,11 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) + } else if previousSlot > int(ps.FullBeaconState.Slot()) { + log.WithFields(log.Fields{ + "previousSlot": previousSlot, + "curSlot": int(ps.FullBeaconState.Slot()), + }).Warn("We noticed the previous slot is greater than the current slot.") } else if previousSlot+1 != int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -259,7 +263,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str } // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) { +func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { var ( stateRoot string blockRoot string