Include all changes needed for historical and knownGaps processing #59

Merged
abdulrabbani00 merged 11 commits from develop into main 2022-06-10 13:45:42 +00:00
9 changed files with 105 additions and 27 deletions
Showing only changes of commit 9160dded11 - Show all commits

1
.gitignore vendored
View File

@ -4,5 +4,6 @@ ipld-ethcl-indexer.log
report.json
cover.profile
temp/*
.vscode/*
pkg/beaconclient/ssz-data/
*.test

View File

@ -37,22 +37,7 @@ To run the application, do as follows:
2. Run the start up command.
```
go run -race main.go capture historic --db.address localhost \
--db.password password \
--db.port 8076 \
--db.username vdbm \
--db.name vulcanize_testing \
--db.driver PGX \
--bc.address localhost \
--bc.port 5052 \
--bc.maxHistoricProcessWorker 2 \
--bc.maxKnownGapsWorker 2 \
--bc.knownGapsProcess=true \
--bc.connectionProtocol http \
--t.skipSync=false \
--log.level debug \
--log.output=true \
--kg.increment 1000000
go run -race main.go capture historic --config ./example.ipld-ethcl-indexer-config.json
```
## Running Tests

View File

@ -42,6 +42,9 @@ var (
kgMaxWorker int
kgTableIncrement int
kgProcessGaps bool
pmMetrics bool
pmAddress string
pmPort int
maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
@ -100,6 +103,11 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.known_gaps table. Be careful of system memory.")
// Prometheus Specific
captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.")
captureCmd.PersistentFlags().StringVarP(&pmAddress, "pm.address", "", "localhost", "Address to send the prometheus metrics.")
captureCmd.PersistentFlags().IntVarP(&pmPort, "pm.port", "", 9000, "The port to send prometheus metrics.")
//// Testing Specific
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
@ -145,6 +153,13 @@ func init() {
err = viper.BindPFlag("kg.processKnownGaps", captureCmd.PersistentFlags().Lookup("kg.maxKnownGapsWorker"))
exitErr(err)
// Prometheus Specific
err = viper.BindPFlag("pm.metrics", captureCmd.PersistentFlags().Lookup("pm.metrics"))
exitErr(err)
err = viper.BindPFlag("pm.address", captureCmd.PersistentFlags().Lookup("pm.address"))
exitErr(err)
err = viper.BindPFlag("pm.port", captureCmd.PersistentFlags().Lookup("pm.port"))
exitErr(err)
}
// Helper function to catch any errors.

View File

@ -19,7 +19,10 @@ package cmd
import (
"context"
"fmt"
"net/http"
"strconv"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -52,6 +55,11 @@ func startHeadTracking() {
StopApplicationPreBoot(err, Db)
}
if viper.GetBool("pm.metrics") {
addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port"))
serveProm(addr)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
@ -85,3 +93,18 @@ func startHeadTracking() {
func init() {
captureCmd.AddCommand(headCmd)
}
func serveProm(addr string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := http.Server{
Addr: addr,
Handler: mux,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
loghelper.LogError(err).WithField("endpoint", addr).Error("Error with prometheus")
}
}()
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -54,6 +55,11 @@ func startHistoricProcessing() {
StopApplicationPreBoot(err, Db)
}
if viper.GetBool("pm.metrics") {
addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port"))
serveProm(addr)
}
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {

View File

@ -8,7 +8,7 @@
"driver": "PGX"
},
"bc": {
"address": "10.203.8.51",
"address": "localhost",
"port": 5052,
"type": "lighthouse",
"bootRetryInterval": 30,
@ -17,7 +17,7 @@
"connectionProtocol": "http"
},
"t": {
"skipSync": false
"skipSync": true
},
"log": {
"level": "debug",
@ -29,5 +29,10 @@
"increment": 10000,
"processKnownGaps": true,
"maxKnownGapsWorker": 2
},
"pm": {
"address": "localhost",
"port": 9000,
"metrics": true
}
}

View File

@ -93,13 +93,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
KnownGapTableIncrement: bcKgTableIncrement,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
SlotInserts: 0,
ReorgInserts: 0,
KnownGapsInserts: 0,
HeadError: 0,
HeadReorgError: 0,
},
Metrics: CreateBeaconClientMetrics(),
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
}

View File

@ -25,9 +25,15 @@ import (
var _ = Describe("Healthcheck", func() {
var (
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10)
BC *beaconclient.BeaconClient
errBc *beaconclient.BeaconClient
)
BeforeEach(func() {
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10)
})
Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() {
It("We should connect successfully", func() {

View File

@ -17,8 +17,50 @@ package beaconclient
import (
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
//Create a metric struct and register each channel with prometheus
func CreateBeaconClientMetrics() *BeaconClientMetrics {
metrics := &BeaconClientMetrics{
SlotInserts: 0,
ReorgInserts: 0,
KnownGapsInserts: 0,
knownGapsProcessed: 0,
KnownGapsProcessingError: 0,
HeadError: 0,
HeadReorgError: 0,
}
prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts)
prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts)
prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts)
prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed)
prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError)
prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError)
return metrics
}
func prometheusRegisterHelper(name string, help string, varPointer *uint64) {
err := prometheus.Register(prometheus.NewCounterFunc(
prometheus.CounterOpts{
Namespace: "beacon_client",
Subsystem: "",
Name: name,
Help: help,
ConstLabels: map[string]string{},
},
func() float64 {
return float64(atomic.LoadUint64(varPointer))
}))
if err != nil && err.Error() != "duplicate metrics collector registration attempted" {
loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.")
}
}
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
SlotInserts uint64 // Number of head events we successfully wrote to the DB.
@ -33,6 +75,7 @@ type BeaconClientMetrics struct {
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementSlotInserts(inc uint64) {
logrus.Debug("Incrementing Slot Insert")
atomic.AddUint64(&m.SlotInserts, inc)
}