Capture the unique identifier everywhere its needed. #51

Merged
abdulrabbani00 merged 8 commits from feature/50-graceful-historic-shutdown into develop 2022-06-03 16:47:13 +00:00
13 changed files with 128 additions and 65 deletions
Showing only changes of commit 4a85428b53 - Show all commits

View File

@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
@ -44,8 +45,9 @@ func bootApp() {
log.Info("Starting the application in boot mode.") log.Info("Starting the application in boot mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }
@ -58,7 +60,7 @@ func bootApp() {
notifierCh <- syscall.SIGTERM notifierCh <- syscall.SIGTERM
}() }()
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else { } else {

View File

@ -39,6 +39,7 @@ var (
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcUniqueNodeIdentifier int
kgMaxWorker int kgMaxWorker int
kgTableIncrement int kgTableIncrement int
kgProcessGaps bool kgProcessGaps bool
@ -93,6 +94,7 @@ func init() {
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
// exitErr(err) // exitErr(err)
// err = captureCmd.MarkPersistentFlagRequired("bc.port") // err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -143,6 +145,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
//// Known Gap Specific //// Known Gap Specific

View File

@ -50,7 +50,7 @@ func startHeadTracking() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -50,7 +50,7 @@ func startHistoricProcessing() {
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
if err != nil { if err != nil {
StopApplicationPreBoot(err, Db) StopApplicationPreBoot(err, Db)
} }

View File

@ -14,7 +14,8 @@
"bootRetryInterval": 30, "bootRetryInterval": 30,
"bootMaxRetry": 5, "bootMaxRetry": 5,
"maxHistoricProcessWorker": 2, "maxHistoricProcessWorker": 2,
"connectionProtocol": "http" "connectionProtocol": "http",
"uniqueNodeIdentifier": 100
}, },
"t": { "t": {
"skipSync": true "skipSync": true

2
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/multiformats/go-multihash v0.1.0 github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0 github.com/onsi/gomega v1.19.0
github.com/prometheus/client_golang v1.12.1
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.8.1
) )
@ -57,7 +58,6 @@ require (
github.com/multiformats/go-varint v0.0.6 // indirect github.com/multiformats/go-varint v0.0.6 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect

View File

@ -42,14 +42,17 @@ var (
// //
// 3. Make sure the node is synced, unless disregardSync is true. // 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement) Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier)
if err != nil {
return Bc, nil, err
}
log.Debug("Checking Beacon Client") log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient() err = Bc.CheckBeaconClient()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -60,36 +63,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
return nil, nil, err return nil, nil, err
} }
BC.Db = DB Bc.Db = DB
var status bool var status bool
if !disregardSync { if !disregardSync {
status, err = BC.CheckHeadSync() status, err = Bc.CheckHeadSync()
if err != nil { if err != nil {
log.Error("Unable to get the nodes sync status") log.Error("Unable to get the nodes sync status")
return BC, DB, err return Bc, DB, err
} }
if status { if status {
log.Error("The node is still syncing..") log.Error("The node is still syncing..")
err = fmt.Errorf("The node is still syncing.") err = fmt.Errorf("The node is still syncing.")
return BC, DB, err return Bc, DB, err
} }
} else { } else {
log.Warn("We are not checking to see if the node has synced to head.") log.Warn("We are not checking to see if the node has synced to head.")
} }
return BC, DB, nil return Bc, DB, nil
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
if bcMaxRetry < 0 { if bcMaxRetry < 0 {
i := 0 i := 0
for { for {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -104,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} else { } else {
for i := 0; i < bcMaxRetry; i++ { for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
@ -136,6 +140,8 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
} }
BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed. // Add another switch case for bcType if its ever needed.
case "boot":
log.Debug("Running application in boot mode.")
default: default:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"startUpMode": startUpMode, "startUpMode": startUpMode,

View File

@ -38,44 +38,52 @@ var _ = Describe("Boot", func() {
bcBootRetryInterval int = 1 bcBootRetryInterval int = 1
bcBootMaxRetry int = 5 bcBootMaxRetry int = 5
bcKgTableIncrement int = 10 bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
}) })

View File

@ -35,32 +35,34 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown"
) )
var (
dbAddress string = "localhost"
dbPort int = 8076
dbName string = "vulcanize_testing"
dbUsername string = "vdbm"
dbPassword string = "password"
dbDriver string = "PGX"
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
bcUniqueIdentifier int = 100
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
err error
ctx context.Context
notifierCh chan os.Signal
)
var _ = Describe("Shutdown", func() { var _ = Describe("Shutdown", func() {
var (
dbAddress string = "localhost"
dbPort int = 8076
dbName string = "vulcanize_testing"
dbUsername string = "vdbm"
dbPassword string = "password"
dbDriver string = "PGX"
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
err error
ctx context.Context
notifierCh chan os.Signal
)
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -48,6 +48,7 @@ type BeaconClient struct {
Db sql.Database // Database object used for reads and writes. Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
// Used for Head Tracking // Used for Head Tracking
@ -84,7 +85,16 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) {
if uniqueNodeIdentifier == 0 {
return nil, fmt.Errorf("The unique node identifier provided is 0, it must be a non-zero value!!!!")
}
metrics, err := CreateBeaconClientMetrics()
if err != nil {
return nil, err
}
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient") log.Info("Creating the BeaconClient")
return &BeaconClient{ return &BeaconClient{
@ -93,9 +103,9 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
KnownGapTableIncrement: bcKgTableIncrement, KnownGapTableIncrement: bcKgTableIncrement,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: CreateBeaconClientMetrics(), Metrics: metrics,
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
} }, nil
} }
// Create all the channels to handle a SSE events // Create all the channels to handle a SSE events

View File

@ -54,6 +54,7 @@ var (
dbUser string = "vdbm" dbUser string = "vdbm"
dbPassword string = "password" dbPassword string = "password"
dbDriver string = "pgx" dbDriver string = "pgx"
bcUniqueIdentifier int = 100
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000 knownGapsTableIncrement int = 100000
maxRetry int = 120 maxRetry int = 120
@ -202,6 +203,7 @@ var (
dbPassword: dbPassword, dbPassword: dbPassword,
dbDriver: dbDriver, dbDriver: dbDriver,
knownGapsTableIncrement: knownGapsTableIncrement, knownGapsTableIncrement: knownGapsTableIncrement,
bcUniqueIdentifier: bcUniqueIdentifier,
} }
BeaconNodeTester = TestBeaconNode{ BeaconNodeTester = TestBeaconNode{
@ -421,6 +423,7 @@ type Config struct {
dbPassword string dbPassword string
dbDriver string dbDriver string
knownGapsTableIncrement int knownGapsTableIncrement int
bcUniqueIdentifier int
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
@ -430,7 +433,8 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way // Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions. // Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement) bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -441,7 +445,7 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
writeSlot(db, maxSlot) writeSlot(db, maxSlot)
bc.Db = db bc.Db = db
return &bc return bc
} }
// A helper function to validate the expected output from the ethcl.slots table. // A helper function to validate the expected output from the ethcl.slots table.

View File

@ -25,19 +25,22 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC *beaconclient.BeaconClient Bc *beaconclient.BeaconClient
errBc *beaconclient.BeaconClient errBc *beaconclient.BeaconClient
) )
BeforeEach(func() { BeforeEach(func() {
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) var err error
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier)
Expect(err).ToNot(HaveOccurred())
}) })
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {
It("We should connect successfully", func() { It("We should connect successfully", func() {
err := BC.CheckBeaconClient() err := Bc.CheckBeaconClient()
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })

View File

@ -24,7 +24,7 @@ import (
) )
//Create a metric struct and register each channel with prometheus //Create a metric struct and register each channel with prometheus
func CreateBeaconClientMetrics() *BeaconClientMetrics { func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) {
metrics := &BeaconClientMetrics{ metrics := &BeaconClientMetrics{
SlotInserts: 0, SlotInserts: 0,
ReorgInserts: 0, ReorgInserts: 0,
@ -34,17 +34,38 @@ func CreateBeaconClientMetrics() *BeaconClientMetrics {
HeadError: 0, HeadError: 0,
HeadReorgError: 0, HeadReorgError: 0,
} }
prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts)
prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) if err != nil {
prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) return nil, err
prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) }
prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) err = prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts)
prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) if err != nil {
prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) return nil, err
return metrics }
err = prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError)
if err != nil {
return nil, err
}
err = prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError)
if err != nil {
return nil, err
}
return metrics, nil
} }
func prometheusRegisterHelper(name string, help string, varPointer *uint64) { func prometheusRegisterHelper(name string, help string, varPointer *uint64) error {
err := prometheus.Register(prometheus.NewCounterFunc( err := prometheus.Register(prometheus.NewCounterFunc(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: "beacon_client", Namespace: "beacon_client",
@ -58,7 +79,9 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) {
})) }))
if err != nil && err.Error() != "duplicate metrics collector registration attempted" { if err != nil && err.Error() != "duplicate metrics collector registration attempted" {
loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.")
return err
} }
return nil
} }
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. // A structure utilized for keeping track of various metrics. Currently, mostly used in testing.