diff --git a/cmd/boot.go b/cmd/boot.go index e5e7b3b..6f98729 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -49,7 +49,7 @@ func bootApp() { viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { - loghelper.LogError(err).Error("Unable to Start application") + StopApplicationPreBoot(err, Db) } log.Info("Boot complete, we are going to shutdown.") diff --git a/cmd/capture.go b/cmd/capture.go index 93bbe66..d4ad6fc 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -125,6 +125,8 @@ func init() { exitErr(err) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) + err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver")) + exitErr(err) //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) diff --git a/cmd/head.go b/cmd/head.go index 9ef4d63..f2dd0e5 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -94,6 +94,7 @@ func init() { captureCmd.AddCommand(headCmd) } +// Start prometheus server func serveProm(addr string) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 9c8d71e..228b760 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -49,6 +49,12 @@ func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTi if err != nil { loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") } + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + } return err }, }) @@ -64,9 +70,11 @@ func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } - err = BC.StopKnownGapsProcessing() - if err != nil { - loghelper.LogError(err).Error("Unable to stop processing known gaps") + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } } return err }, diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index caa3a20..e9fd5c0 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -50,6 +50,7 @@ type BeaconClient struct { Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. + KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing // Used for Head Tracking @@ -67,9 +68,8 @@ type BeaconClient struct { // The latest available slot within the Beacon Server. We can't query any slot greater than this. // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 - PerformHistoricalProcessing bool // Should we perform historical processing? - HistoricalProcess historicProcessing - KnownGapsProcess knownGapsProcessing + PerformHistoricalProcessing bool // Should we perform historical processing? + HistoricalProcess historicProcessing // object keeping track of historical processing } // A struct to keep track of relevant the head event topic. diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 5f3603e..01762da 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -50,7 +50,7 @@ var ( WHERE checked_out_by=$1` ) -type knownGapsProcessing struct { +type KnownGapsProcessing struct { db sql.Database //db connection metrics *BeaconClientMetrics // metrics for beaconclient uniqueNodeIdentifier int // node unique identifier. @@ -60,7 +60,7 @@ type knownGapsProcessing struct { // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - bc.KnownGapsProcess = knownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} + bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting known gaps processing service") return errs @@ -77,17 +77,17 @@ func (bc *BeaconClient) StopKnownGapsProcessing() error { } // Get a single row of historical slots from the table. -func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { +func (kgp KnownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. -func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { +func (kgp KnownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) } // Remove the table entry. -func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { +func (kgp KnownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { for { errMs := <-errMessages @@ -117,7 +117,7 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi } // Updated checked_out column for the uniqueNodeIdentifier. -func (kgp knownGapsProcessing) releaseDbLocks() error { +func (kgp KnownGapsProcessing) releaseDbLocks() error { go func() { kgp.finishProcessing <- 1 }() res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) if err != nil { diff --git a/pkg/database/sql/postgres/database.go b/pkg/database/sql/postgres/database.go index c8451f8..16cb936 100644 --- a/pkg/database/sql/postgres/database.go +++ b/pkg/database/sql/postgres/database.go @@ -49,7 +49,7 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st "driver_name_provided": driverName, }).Error("Can't resolve driver type") } - log.Info("Using Driver:", DbDriver) + log.Info("Using Driver: ", DbDriver) postgresConfig := Config{ Hostname: dbHostname,