From 041276da81218b851616dfc5e4bcaa0665f89c85 Mon Sep 17 00:00:00 2001
From: Abdul Rabbani <58230246+abdulrabbani00@users.noreply.github.com>
Date: Thu, 19 May 2022 09:46:38 -0400
Subject: [PATCH] Update the boot process (#45)
* Update the boot process
* Update the CI/CD reference for stack-orchestrator
---
.github/workflows/on-pr.yml | 2 +-
cmd/boot.go | 2 +-
cmd/capture.go | 8 +
cmd/head.go | 2 +-
cmd/historic.go | 30 ++-
internal/boot/boot.go | 27 ++-
internal/boot/boot_test.go | 16 +-
internal/shutdown/shutdown_test.go | 5 +-
pkg/beaconclient/beaconclient.go | 26 +--
pkg/beaconclient/checkbeaconserverstatus.go | 207 ++++++++++++++++++++
pkg/beaconclient/checksyncstatus.go | 72 -------
pkg/beaconclient/metrics.go | 9 +
12 files changed, 306 insertions(+), 100 deletions(-)
create mode 100644 pkg/beaconclient/checkbeaconserverstatus.go
delete mode 100644 pkg/beaconclient/checksyncstatus.go
diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml
index c9d3fe6..259788e 100644
--- a/.github/workflows/on-pr.yml
+++ b/.github/workflows/on-pr.yml
@@ -25,7 +25,7 @@ on:
- "**"
env:
- stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}}
+ stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
diff --git a/cmd/boot.go b/cmd/boot.go
index 9cab893..3f31e35 100644
--- a/cmd/boot.go
+++ b/cmd/boot.go
@@ -44,7 +44,7 @@ func bootApp() {
log.Info("Starting the application in boot mode.")
ctx := context.Background()
- BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
+ BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}
diff --git a/cmd/capture.go b/cmd/capture.go
index e8da70b..5916bfa 100644
--- a/cmd/capture.go
+++ b/cmd/capture.go
@@ -33,6 +33,8 @@ var (
dbPort int
bcAddress string
bcPort int
+ bcBootRetryInterval int
+ bcBootMaxRetry int
bcConnectionProtocol string
bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
@@ -80,6 +82,8 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
+ captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
+ captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
@@ -116,6 +120,10 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
exitErr(err)
+ err = viper.BindPFlag("bc.bootRetryInterval", captureCmd.PersistentFlags().Lookup("bc.bootRetryInterval"))
+ exitErr(err)
+ err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
+ exitErr(err)
// Here you will define your flags and configuration settings.
}
diff --git a/cmd/head.go b/cmd/head.go
index 9e95f9f..d5544a4 100644
--- a/cmd/head.go
+++ b/cmd/head.go
@@ -48,7 +48,7 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
- BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync)
+ BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
if DB != nil {
diff --git a/cmd/historic.go b/cmd/historic.go
index 2211503..d52cb0d 100644
--- a/cmd/historic.go
+++ b/cmd/historic.go
@@ -17,9 +17,14 @@
package cmd
import (
- "fmt"
+ "context"
+ "os"
+ log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
+ "github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// historicCmd represents the historic command
@@ -28,10 +33,22 @@ var historicCmd = &cobra.Command{
Short: "Capture the historic blocks and states.",
Long: `Capture the historic blocks and states.`,
Run: func(cmd *cobra.Command, args []string) {
- fmt.Println("historic called")
+ startHistoricProcessing()
},
}
+// Start the application to process historical slots.
+func startHistoricProcessing() {
+ // Boot the application
+ log.Info("Starting the application in head tracking mode.")
+ ctx := context.Background()
+
+ _, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
+ if err != nil {
+ StopApplicationPreBoot(err, DB)
+ }
+}
+
func init() {
captureCmd.AddCommand(historicCmd)
@@ -45,3 +62,12 @@ func init() {
// is called directly, e.g.:
// historicCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
+
+// Stop the application during its initial boot phases.
+func StopApplicationPreBoot(startErr error, db sql.Database) {
+ loghelper.LogError(startErr).Error("Unable to Start application")
+ if db != nil {
+ db.Close()
+ }
+ os.Exit(1)
+}
diff --git a/internal/boot/boot.go b/internal/boot/boot.go
index 7edeafa..3802407 100644
--- a/internal/boot/boot.go
+++ b/internal/boot/boot.go
@@ -18,6 +18,7 @@ package boot
import (
"context"
"fmt"
+ "strings"
"time"
log "github.com/sirupsen/logrus"
@@ -27,10 +28,8 @@ import (
)
var (
- maxRetry = 5 // Max times to try to connect to the DB or BC at boot.
- retryInterval = 30 // The time to wait between each try.
- DB sql.Database = &postgres.DB{}
- BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
+ DB sql.Database = &postgres.DB{}
+ BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
)
// This function will perform some boot operations. If any steps fail, the application will fail to start.
@@ -81,19 +80,33 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
}
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
-func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
+func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
+ bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
- for i := 0; i < maxRetry; i++ {
+ for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
"err": err,
}).Warn("Unable to boot application. Going to try again")
- time.Sleep(time.Duration(retryInterval) * time.Second)
+ time.Sleep(time.Duration(bcRetryInterval) * time.Second)
continue
}
break
}
+
+ switch strings.ToLower(startUpMode) {
+ case "head":
+ log.Debug("No further actions needed to boot the application at this phase.")
+ case "historic":
+ log.Debug("Performing additional boot steps for historical processing")
+ headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
+ if err != nil {
+ return BC, DB, err
+ }
+ BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
+ // Add another switch case for bcType if its ever needed.
+ }
return BC, DB, err
}
diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go
index 21daba1..44e1b2c 100644
--- a/internal/boot/boot_test.go
+++ b/internal/boot/boot_test.go
@@ -34,18 +34,28 @@ var _ = Describe("Boot", func() {
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
+ bcType string = "lighthouse"
+ bcBootRetryInterval int = 1
+ bcBootMaxRetry int = 5
)
Describe("Booting the application", Label("integration"), func() {
- Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() {
+ Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() {
- _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
+ defer db.Close()
+ Expect(err).ToNot(HaveOccurred())
+ })
+ })
+ Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
+ It("Should connect successfully", func() {
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
- _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false)
+ _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false)
defer db.Close()
Expect(err).To(HaveOccurred())
})
diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go
index b26bc7c..705b9df 100644
--- a/internal/shutdown/shutdown_test.go
+++ b/internal/shutdown/shutdown_test.go
@@ -46,6 +46,9 @@ var _ = Describe("Shutdown", func() {
bcAddress string = "localhost"
bcPort int = 5052
bcConnectionProtocol string = "http"
+ bcType string = "lighthouse"
+ bcBootRetryInterval int = 1
+ bcBootMaxRetry int = 5
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
@@ -55,7 +58,7 @@ var _ = Describe("Shutdown", func() {
)
BeforeEach(func() {
ctx = context.Background()
- BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
+ BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})
diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go
index b1a83ea..357eee3 100644
--- a/pkg/beaconclient/beaconclient.go
+++ b/pkg/beaconclient/beaconclient.go
@@ -31,7 +31,8 @@ var (
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
- BcSyncStatusEndpoint = "/eth/v1/node/syncing"
+ BcSyncStatusEndpoint = "/eth/v1/node/syncing" // The endpoint to check to see if the beacon server is still trying to sync to head.
+ LhDbInfoEndpoint = "/lighthouse/database/info" // The endpoint for the LIGHTHOUSE server to get the database information.
BcBlockRootEndpoint = func(slot string) string {
return "/eth/v1/beacon/blocks/" + slot + "/root"
}
@@ -40,15 +41,6 @@ var (
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
)
-// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
-type BeaconClientMetrics struct {
- HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
- HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
- HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
- HeadError uint64 // Number of errors that occurred when decoding the head message.
- HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
-}
-
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
@@ -59,6 +51,7 @@ type BeaconClient struct {
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking
+
PerformHeadTracking bool // Should we track head?
StartingSlot int // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed
@@ -67,6 +60,12 @@ type BeaconClient struct {
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
//FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
+
+ // Used for Historical Processing
+
+ // The latest available slot within the Beacon Server. We can't query any slot greater than this.
+ // This value is lazily updated. Therefore at times it will be outdated.
+ LatestSlotInBeaconServer int64
}
// A struct to keep track of relevant the head event topic.
@@ -94,8 +93,11 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
- HeadTrackingInserts: 0,
- HeadTrackingReorgs: 0,
+ HeadTrackingInserts: 0,
+ HeadTrackingReorgs: 0,
+ HeadTrackingKnownGaps: 0,
+ HeadError: 0,
+ HeadReorgError: 0,
},
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
diff --git a/pkg/beaconclient/checkbeaconserverstatus.go b/pkg/beaconclient/checkbeaconserverstatus.go
new file mode 100644
index 0000000..0d39498
--- /dev/null
+++ b/pkg/beaconclient/checkbeaconserverstatus.go
@@ -0,0 +1,207 @@
+// VulcanizeDB
+// Copyright © 2022 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+package beaconclient
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync/atomic"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
+)
+
+var (
+ MissingBeaconServerType error = fmt.Errorf("The beacon server type provided is not handled.")
+ LighthouseMissingSlots error = fmt.Errorf("Anchor is not nil. This means lighthouse has not backfilled all the slots from Genesis to head.")
+)
+
+// The sync response when checking if the node is synced.
+type Sync struct {
+ Data SyncData `json:"data"`
+}
+
+// The sync data
+type SyncData struct {
+ IsSync bool `json:"is_syncing"`
+ HeadSlot string `json:"head_slot"`
+ SyncDistance string `json:"sync_distance"`
+}
+
+// This function will check to see if we are synced up with the head of chain.
+//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
+func (bc BeaconClient) CheckHeadSync() (bool, error) {
+ syncStatus, err := bc.QueryHeadSync()
+ if err != nil {
+ return true, nil
+ }
+ return syncStatus.Data.IsSync, nil
+}
+
+func (bc BeaconClient) QueryHeadSync() (Sync, error) {
+ var syncStatus Sync
+ bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
+ resp, err := http.Get(bcSync)
+
+ if err != nil {
+ loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
+ return syncStatus, err
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode > 299 {
+ loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
+ return syncStatus, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
+ }
+
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return syncStatus, err
+ }
+
+ if err := json.Unmarshal(body, &syncStatus); err != nil {
+ loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
+ "rawMessage": string(body),
+ "err": err,
+ }).Error("Unable to unmarshal sync status")
+ return syncStatus, err
+ }
+ return syncStatus, nil
+}
+
+// The response when checking the lighthouse nodes DB info: /lighthouse/database/info
+type LighthouseDatabaseInfo struct {
+ SchemaVersion int `json:"schema_version"`
+ Config LhDbConfig `json:"config"`
+ Split LhDbSplit `json:"split"`
+ Anchor LhDbAnchor `json:"anchor"`
+}
+
+// The config field within the DatabaseInfo response.
+type LhDbConfig struct {
+ SlotsPerRestorePoint int `json:"slots_per_restore_point"`
+ SlotsPerRestorePointSetExplicitly bool `json:"slots_per_restore_point_set_explicitly"`
+ BlockCacheSize int `json:"block_cache_size"`
+ CompactOnInit bool `json:"compact_on_init"`
+ CompactOnPrune bool `json:"compact_on_prune"`
+}
+
+// The split field within the DatabaseInfo response.
+type LhDbSplit struct {
+ Slot string `json:"slot"`
+ StateRoot string `json:"state_root"`
+}
+
+// The anchor field within the DatabaseInfo response.
+type LhDbAnchor struct {
+ AnchorSlot string `json:"anchor_slot"`
+ OldestBlockSlot string `json:"oldest_block_slot"`
+ OldestBlockParent string `json:"oldest_block_parent"`
+ StateUpperLimit string `json:"state_upper_limit"`
+ StateLowerLimit string `json:"state_lower_limit"`
+}
+
+// This function will notify us what the head slot is.
+func (bc BeaconClient) queryHeadSlotInBeaconServer() (int, error) {
+ syncStatus, err := bc.QueryHeadSync()
+ if err != nil {
+ return 0, nil
+ }
+ headSlot, err := strconv.Atoi(syncStatus.Data.HeadSlot)
+ if err != nil {
+ return 0, nil
+ }
+ return headSlot, nil
+}
+
+// return the lighthouse Database Info
+func (bc BeaconClient) queryLighthouseDbInfo() (LighthouseDatabaseInfo, error) {
+ var dbInfo LighthouseDatabaseInfo
+
+ lhDbInfo := bc.ServerEndpoint + LhDbInfoEndpoint
+ resp, err := http.Get(lhDbInfo)
+
+ if err != nil {
+ loghelper.LogEndpoint(lhDbInfo).Error("Unable to get the lighthouse database information")
+ return dbInfo, err
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode > 299 {
+ loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the lighthouse database information")
+ return dbInfo, fmt.Errorf("Querying the lighthouse database information returned a non 2xx status code, code provided: %d", resp.StatusCode)
+ }
+
+ defer resp.Body.Close()
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return dbInfo, err
+ }
+
+ if err := json.Unmarshal(body, &dbInfo); err != nil {
+ loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{
+ "rawMessage": string(body),
+ "err": err,
+ }).Error("Unable to unmarshal the lighthouse database information")
+ return dbInfo, err
+ }
+ return dbInfo, nil
+}
+
+// This function will tell us what the latest slot is that the beacon server has available. This is important as
+// it will ensure us that we have all slots prior to the given slot.
+func (bc BeaconClient) GetLatestSlotInBeaconServer(beaconServerType string) (int, error) {
+ switch strings.ToLower(beaconServerType) {
+ case "lighthouse":
+ headSlot, err := bc.queryHeadSlotInBeaconServer()
+ if err != nil {
+ return 0, err
+ }
+ lhDb, err := bc.queryLighthouseDbInfo()
+ if err != nil {
+ return 0, err
+ }
+ if lhDb.Anchor == (LhDbAnchor{}) {
+ //atomic.StoreInt64(&bc.LatestSlotInBeaconServer, int64(headSlot))
+ log.WithFields(log.Fields{
+ "headSlot": headSlot,
+ }).Info("Anchor is nil, the lighthouse client has all the nodes from genesis to head.")
+ return headSlot, nil
+ } else {
+ log.WithFields(log.Fields{
+ "lhDb.Anchor": lhDb.Anchor,
+ }).Info(LighthouseMissingSlots.Error())
+ log.Info("We will add a feature down the road to wait for anchor to be null, if its needed.")
+ return 0, LighthouseMissingSlots
+ }
+ default:
+ log.WithFields(log.Fields{"BeaconServerType": beaconServerType}).Error(MissingBeaconServerType.Error())
+ return 0, MissingBeaconServerType
+ }
+}
+
+// A wrapper function for updating the latest slot.
+func (bc BeaconClient) UpdateLatestSlotInBeaconServer(headSlot int64) {
+ curr := atomic.LoadInt64(&bc.LatestSlotInBeaconServer)
+ log.WithFields(log.Fields{
+ "Previous Latest Slot": curr,
+ "New Latest Slot": headSlot,
+ }).Debug("Swapping Head Slot")
+ atomic.SwapInt64(&bc.LatestSlotInBeaconServer, int64(headSlot))
+}
diff --git a/pkg/beaconclient/checksyncstatus.go b/pkg/beaconclient/checksyncstatus.go
deleted file mode 100644
index 3f88398..0000000
--- a/pkg/beaconclient/checksyncstatus.go
+++ /dev/null
@@ -1,72 +0,0 @@
-// VulcanizeDB
-// Copyright © 2022 Vulcanize
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-package beaconclient
-
-import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net/http"
-
- log "github.com/sirupsen/logrus"
- "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
-)
-
-// The sync response
-type Sync struct {
- Data SyncData `json:"data"`
-}
-
-// The sync data
-type SyncData struct {
- IsSync bool `json:"is_syncing"`
- HeadSlot string `json:"head_slot"`
- SyncDistance string `json:"sync_distance"`
-}
-
-// This function will check to see if we are synced up with the head of chain.
-//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
-func (bc BeaconClient) CheckHeadSync() (bool, error) {
- bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
- resp, err := http.Get(bcSync)
-
- if err != nil {
- loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
- return true, err
- }
-
- if resp.StatusCode < 200 || resp.StatusCode > 299 {
- loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
- return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
- }
-
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return true, err
- }
-
- var syncStatus Sync
- if err := json.Unmarshal(body, &syncStatus); err != nil {
- loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
- "rawMessage": string(body),
- "err": err,
- }).Error("Unable to unmarshal sync status")
- return true, err
- }
-
- return syncStatus.Data.IsSync, nil
-}
diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go
index a6a45c4..ecf8da5 100644
--- a/pkg/beaconclient/metrics.go
+++ b/pkg/beaconclient/metrics.go
@@ -19,6 +19,15 @@ import (
"sync/atomic"
)
+// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
+type BeaconClientMetrics struct {
+ HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
+ HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
+ HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
+ HeadError uint64 // Number of errors that occurred when decoding the head message.
+ HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
+}
+
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {