diff --git a/.gitignore b/.gitignore index 1a3bc3a..aa7c2e7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ ipld-ethcl-indexer.log report.json cover.profile temp/* +.vscode/* pkg/beaconclient/ssz-data/ *.test \ No newline at end of file diff --git a/README.md b/README.md index 2b5ab8b..9096054 100644 --- a/README.md +++ b/README.md @@ -37,22 +37,7 @@ To run the application, do as follows: 2. Run the start up command. ``` -go run -race main.go capture historic --db.address localhost \ - --db.password password \ - --db.port 8076 \ - --db.username vdbm \ - --db.name vulcanize_testing \ - --db.driver PGX \ - --bc.address localhost \ - --bc.port 5052 \ - --bc.maxHistoricProcessWorker 2 \ - --bc.maxKnownGapsWorker 2 \ - --bc.knownGapsProcess=true \ - --bc.connectionProtocol http \ - --t.skipSync=false \ - --log.level debug \ - --log.output=true \ - --kg.increment 1000000 +go run -race main.go capture historic --config ./example.ipld-ethcl-indexer-config.json ``` ## Running Tests diff --git a/cmd/capture.go b/cmd/capture.go index aad3aef..e1b46c8 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -42,6 +42,9 @@ var ( kgMaxWorker int kgTableIncrement int kgProcessGaps bool + pmMetrics bool + pmAddress string + pmPort int maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second notifierCh chan os.Signal = make(chan os.Signal, 1) testDisregardSync bool @@ -100,6 +103,11 @@ func init() { captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.") + // Prometheus Specific + captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.") + captureCmd.PersistentFlags().StringVarP(&pmAddress, "pm.address", "", "localhost", "Address to send the prometheus metrics.") + captureCmd.PersistentFlags().IntVarP(&pmPort, "pm.port", "", 9000, "The port to send prometheus metrics.") + //// Testing Specific captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") @@ -145,6 +153,13 @@ func init() { err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker")) exitErr(err) + // Prometheus Specific + err = viper.BindPFlag("pm.metrics", captureCmd.PersistentFlags().Lookup("pm.metrics")) + exitErr(err) + err = viper.BindPFlag("pm.address", captureCmd.PersistentFlags().Lookup("pm.address")) + exitErr(err) + err = viper.BindPFlag("pm.port", captureCmd.PersistentFlags().Lookup("pm.port")) + exitErr(err) } // Helper function to catch any errors. diff --git a/cmd/head.go b/cmd/head.go index 7589fd5..1006fe1 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -19,7 +19,10 @@ package cmd import ( "context" "fmt" + "net/http" + "strconv" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -52,6 +55,11 @@ func startHeadTracking() { StopApplicationPreBoot(err, Db) } + if viper.GetBool("pm.metrics") { + addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port")) + serveProm(addr) + } + log.Info("The Beacon Client has booted successfully!") // Capture head blocks go Bc.CaptureHead() @@ -85,3 +93,18 @@ func startHeadTracking() { func init() { captureCmd.AddCommand(headCmd) } + +func serveProm(addr string) { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + srv := http.Server{ + Addr: addr, + Handler: mux, + } + go func() { + if err := srv.ListenAndServe(); err != nil { + loghelper.LogError(err).WithField("endpoint", addr).Error("Error with prometheus") + } + }() +} diff --git a/cmd/historic.go b/cmd/historic.go index 519d00e..cfd9d03 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "strconv" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -54,6 +55,11 @@ func startHistoricProcessing() { StopApplicationPreBoot(err, Db) } + if viper.GetBool("pm.metrics") { + addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port")) + serveProm(addr) + } + errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 82513b4..0929773 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -8,7 +8,7 @@ "driver": "PGX" }, "bc": { - "address": "10.203.8.51", + "address": "localhost", "port": 5052, "type": "lighthouse", "bootRetryInterval": 30, @@ -17,7 +17,7 @@ "connectionProtocol": "http" }, "t": { - "skipSync": false + "skipSync": true }, "log": { "level": "debug", @@ -29,5 +29,10 @@ "increment": 10000, "processKnownGaps": true, "maxKnownGapsWorker": 2 + }, + "pm": { + "address": "localhost", + "port": 9000, + "metrics": true } } diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index d277201..15f24b2 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -93,13 +93,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres KnownGapTableIncrement: bcKgTableIncrement, HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), - Metrics: &BeaconClientMetrics{ - SlotInserts: 0, - ReorgInserts: 0, - KnownGapsInserts: 0, - HeadError: 0, - HeadReorgError: 0, - }, + Metrics: CreateBeaconClientMetrics(), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), } } diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index 06ba2dd..df92398 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -25,9 +25,15 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) - errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) + BC *beaconclient.BeaconClient + errBc *beaconclient.BeaconClient ) + + BeforeEach(func() { + BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) + errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) + + }) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { It("We should connect successfully", func() { diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 3304ea5..6156e2d 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -17,8 +17,50 @@ package beaconclient import ( "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) +//Create a metric struct and register each channel with prometheus +func CreateBeaconClientMetrics() *BeaconClientMetrics { + metrics := &BeaconClientMetrics{ + SlotInserts: 0, + ReorgInserts: 0, + KnownGapsInserts: 0, + knownGapsProcessed: 0, + KnownGapsProcessingError: 0, + HeadError: 0, + HeadReorgError: 0, + } + prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) + prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) + prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) + prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) + prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) + prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) + prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) + return metrics +} + +func prometheusRegisterHelper(name string, help string, varPointer *uint64) { + err := prometheus.Register(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Namespace: "beacon_client", + Subsystem: "", + Name: name, + Help: help, + ConstLabels: map[string]string{}, + }, + func() float64 { + return float64(atomic.LoadUint64(varPointer)) + })) + if err != nil && err.Error() != "duplicate metrics collector registration attempted" { + loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") + } +} + // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. type BeaconClientMetrics struct { SlotInserts uint64 // Number of head events we successfully wrote to the DB. @@ -33,6 +75,7 @@ type BeaconClientMetrics struct { // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementSlotInserts(inc uint64) { + logrus.Debug("Incrementing Slot Insert") atomic.AddUint64(&m.SlotInserts, inc) }