diff --git a/cmd/boot.go b/cmd/boot.go index 9fc3532..bb543e6 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" @@ -44,8 +45,9 @@ func bootApp() { log.Info("Starting the application in boot mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } @@ -58,7 +60,7 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/capture.go b/cmd/capture.go index e1b46c8..93bbe66 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -39,6 +39,7 @@ var ( bcConnectionProtocol string bcType string bcMaxHistoricProcessWorker int + bcUniqueNodeIdentifier int kgMaxWorker int kgTableIncrement int kgProcessGaps bool @@ -93,6 +94,7 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") // err = captureCmd.MarkPersistentFlagRequired("bc.address") // exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -143,6 +145,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) + err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) + exitErr(err) // Here you will define your flags and configuration settings. //// Known Gap Specific diff --git a/cmd/head.go b/cmd/head.go index 1006fe1..3af009f 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -50,7 +50,7 @@ func startHeadTracking() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/historic.go b/cmd/historic.go index cfd9d03..2d89deb 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -50,7 +50,7 @@ func startHistoricProcessing() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 0929773..69746ca 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -14,7 +14,8 @@ "bootRetryInterval": 30, "bootMaxRetry": 5, "maxHistoricProcessWorker": 2, - "connectionProtocol": "http" + "connectionProtocol": "http", + "uniqueNodeIdentifier": 100 }, "t": { "skipSync": true diff --git a/go.mod b/go.mod index 5bb8589..3c844c8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/multiformats/go-multihash v0.1.0 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 + github.com/prometheus/client_golang v1.12.1 github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/sirupsen/logrus v1.8.1 ) @@ -57,7 +58,6 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/internal/boot/boot.go b/internal/boot/boot.go index e726cf8..b1e476b 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -42,14 +42,17 @@ var ( // // 3. Make sure the node is synced, unless disregardSync is true. func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement) + Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) + if err != nil { + return Bc, nil, err + } log.Debug("Checking Beacon Client") - err := BC.CheckBeaconClient() + err = Bc.CheckBeaconClient() if err != nil { return nil, nil, err } @@ -60,36 +63,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName return nil, nil, err } - BC.Db = DB + Bc.Db = DB var status bool if !disregardSync { - status, err = BC.CheckHeadSync() + status, err = Bc.CheckHeadSync() if err != nil { log.Error("Unable to get the nodes sync status") - return BC, DB, err + return Bc, DB, err } if status { log.Error("The node is still syncing..") err = fmt.Errorf("The node is still syncing.") - return BC, DB, err + return Bc, DB, err } } else { log.Warn("We are not checking to see if the node has synced to head.") } - return BC, DB, nil + return Bc, DB, nil } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, + startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { var err error if bcMaxRetry < 0 { i := 0 for { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -104,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } else { for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -136,6 +140,8 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) // Add another switch case for bcType if its ever needed. + case "boot": + log.Debug("Running application in boot mode.") default: log.WithFields(log.Fields{ "startUpMode": startUpMode, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index aa77e56..ef621ae 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -38,44 +38,52 @@ var _ = Describe("Boot", func() { bcBootRetryInterval int = 1 bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) + defer db.Close() + Expect(err).To(HaveOccurred()) + }) + }) + Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { + It("Should not connect successfully", func() { + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 0bbf38d..8c75506 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -35,32 +35,34 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" ) +var ( + dbAddress string = "localhost" + dbPort int = 8076 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" + bcType string = "lighthouse" + bcBootRetryInterval int = 1 + bcBootMaxRetry int = 5 + bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 + maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second + DB sql.Database + BC *beaconclient.BeaconClient + err error + ctx context.Context + notifierCh chan os.Signal +) + var _ = Describe("Shutdown", func() { - var ( - dbAddress string = "localhost" - dbPort int = 8076 - dbName string = "vulcanize_testing" - dbUsername string = "vdbm" - dbPassword string = "password" - dbDriver string = "PGX" - bcAddress string = "localhost" - bcPort int = 5052 - bcConnectionProtocol string = "http" - bcType string = "lighthouse" - bcBootRetryInterval int = 1 - bcBootMaxRetry int = 5 - bcKgTableIncrement int = 10 - maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second - DB sql.Database - BC *beaconclient.BeaconClient - err error - ctx context.Context - notifierCh chan os.Signal - ) BeforeEach(func() { ctx = context.Background() BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, - bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 15f24b2..33d3cac 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -48,6 +48,7 @@ type BeaconClient struct { Db sql.Database // Database object used for reads and writes. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. + UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. // Used for Head Tracking @@ -84,7 +85,16 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { + if uniqueNodeIdentifier == 0 { + return nil, fmt.Errorf("The unique node identifier provided is 0, it must be a non-zero value!!!!") + } + + metrics, err := CreateBeaconClientMetrics() + if err != nil { + return nil, err + } + endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) log.Info("Creating the BeaconClient") return &BeaconClient{ @@ -93,9 +103,9 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres KnownGapTableIncrement: bcKgTableIncrement, HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), - Metrics: CreateBeaconClientMetrics(), + Metrics: metrics, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), - } + }, nil } // Create all the channels to handle a SSE events diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 459b2af..0856cbe 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -54,6 +54,7 @@ var ( dbUser string = "vdbm" dbPassword string = "password" dbDriver string = "pgx" + bcUniqueIdentifier int = 100 dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" knownGapsTableIncrement int = 100000 maxRetry int = 120 @@ -202,6 +203,7 @@ var ( dbPassword: dbPassword, dbDriver: dbDriver, knownGapsTableIncrement: knownGapsTableIncrement, + bcUniqueIdentifier: bcUniqueIdentifier, } BeaconNodeTester = TestBeaconNode{ @@ -421,6 +423,7 @@ type Config struct { dbPassword string dbDriver string knownGapsTableIncrement int + bcUniqueIdentifier int } ////////////////////////////////////////////////////// @@ -430,7 +433,8 @@ type Config struct { // Must run before each test. We can't use the beforeEach because of the way // Gingko treats race conditions. func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { - bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement) + bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) Expect(err).ToNot(HaveOccurred()) @@ -441,7 +445,7 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { writeSlot(db, maxSlot) bc.Db = db - return &bc + return bc } // A helper function to validate the expected output from the ethcl.slots table. diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index df92398..c09e3d8 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -25,19 +25,22 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC *beaconclient.BeaconClient + Bc *beaconclient.BeaconClient errBc *beaconclient.BeaconClient ) BeforeEach(func() { - BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) - errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) + var err error + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) + errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) }) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { It("We should connect successfully", func() { - err := BC.CheckBeaconClient() + err := Bc.CheckBeaconClient() Expect(err).To(BeNil()) }) }) diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 6156e2d..086d7a7 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -24,7 +24,7 @@ import ( ) //Create a metric struct and register each channel with prometheus -func CreateBeaconClientMetrics() *BeaconClientMetrics { +func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { metrics := &BeaconClientMetrics{ SlotInserts: 0, ReorgInserts: 0, @@ -34,17 +34,38 @@ func CreateBeaconClientMetrics() *BeaconClientMetrics { HeadError: 0, HeadReorgError: 0, } - prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) - prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) - prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) - prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) - prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) - prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) - prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) - return metrics + err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) + if err != nil { + return nil, err + } + return metrics, nil } -func prometheusRegisterHelper(name string, help string, varPointer *uint64) { +func prometheusRegisterHelper(name string, help string, varPointer *uint64) error { err := prometheus.Register(prometheus.NewCounterFunc( prometheus.CounterOpts{ Namespace: "beacon_client", @@ -58,7 +79,9 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) { })) if err != nil && err.Error() != "duplicate metrics collector registration attempted" { loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") + return err } + return nil } // A structure utilized for keeping track of various metrics. Currently, mostly used in testing.