Update the boot process #45

Merged
abdulrabbani00 merged 2 commits from feature/43-historic-boot into develop 2022-05-19 13:46:38 +00:00
12 changed files with 306 additions and 100 deletions

View File

@ -25,7 +25,7 @@ on:
- "**" - "**"
env: env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}} stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }} ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go GOPATH: /tmp/go

View File

@ -44,7 +44,7 @@ func bootApp() {
log.Info("Starting the application in boot mode.") log.Info("Starting the application in boot mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }

View File

@ -33,6 +33,8 @@ var (
dbPort int dbPort int
bcAddress string bcAddress string
bcPort int bcPort int
bcBootRetryInterval int
bcBootMaxRetry int
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
@ -80,6 +82,8 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.") captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )") captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
err = captureCmd.MarkPersistentFlagRequired("bc.address") err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err) exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port") err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -116,6 +120,10 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.bootRetryInterval", captureCmd.PersistentFlags().Lookup("bc.bootRetryInterval"))
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
} }

View File

@ -48,7 +48,7 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
if DB != nil { if DB != nil {

View File

@ -17,9 +17,14 @@
package cmd package cmd
import ( import (
"fmt" "context"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
// historicCmd represents the historic command // historicCmd represents the historic command
@ -28,10 +33,22 @@ var historicCmd = &cobra.Command{
Short: "Capture the historic blocks and states.", Short: "Capture the historic blocks and states.",
Long: `Capture the historic blocks and states.`, Long: `Capture the historic blocks and states.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
fmt.Println("historic called") startHistoricProcessing()
}, },
} }
// Start the application to process historical slots.
func startHistoricProcessing() {
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
_, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
if err != nil {
StopApplicationPreBoot(err, DB)
}
}
func init() { func init() {
captureCmd.AddCommand(historicCmd) captureCmd.AddCommand(historicCmd)
@ -45,3 +62,12 @@ func init() {
// is called directly, e.g.: // is called directly, e.g.:
// historicCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") // historicCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
} }
// Stop the application during its initial boot phases.
func StopApplicationPreBoot(startErr error, db sql.Database) {
loghelper.LogError(startErr).Error("Unable to Start application")
if db != nil {
db.Close()
}
os.Exit(1)
}

View File

@ -18,6 +18,7 @@ package boot
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -27,8 +28,6 @@ import (
) )
var ( var (
maxRetry = 5 // Max times to try to connect to the DB or BC at boot.
retryInterval = 30 // The time to wait between each try.
DB sql.Database = &postgres.DB{} DB sql.Database = &postgres.DB{}
BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{} BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{}
) )
@ -81,19 +80,33 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
for i := 0; i < maxRetry; i++ { for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync) BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,
"err": err, "err": err,
}).Warn("Unable to boot application. Going to try again") }).Warn("Unable to boot application. Going to try again")
time.Sleep(time.Duration(retryInterval) * time.Second) time.Sleep(time.Duration(bcRetryInterval) * time.Second)
continue continue
} }
break break
} }
switch strings.ToLower(startUpMode) {
case "head":
log.Debug("No further actions needed to boot the application at this phase.")
case "historic":
log.Debug("Performing additional boot steps for historical processing")
headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
if err != nil {
return BC, DB, err
}
BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed.
}
return BC, DB, err return BC, DB, err
} }

View File

@ -34,18 +34,28 @@ var _ = Describe("Boot", func() {
bcAddress string = "localhost" bcAddress string = "localhost"
bcPort int = 5052 bcPort int = 5052
bcConnectionProtocol string = "http" bcConnectionProtocol string = "http"
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true)
defer db.Close() defer db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
}) })
Context("When the DB and BC are both up and running, and we check for a synced head", func() { Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false)
defer db.Close() defer db.Close()
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })

View File

@ -46,6 +46,9 @@ var _ = Describe("Shutdown", func() {
bcAddress string = "localhost" bcAddress string = "localhost"
bcPort int = 5052 bcPort int = 5052
bcConnectionProtocol string = "http" bcConnectionProtocol string = "http"
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database DB sql.Database
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
@ -55,7 +58,7 @@ var _ = Describe("Shutdown", func() {
) )
BeforeEach(func() { BeforeEach(func() {
ctx = context.Background() ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })

View File

@ -31,7 +31,8 @@ var (
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
BcSyncStatusEndpoint = "/eth/v1/node/syncing" BcSyncStatusEndpoint = "/eth/v1/node/syncing" // The endpoint to check to see if the beacon server is still trying to sync to head.
LhDbInfoEndpoint = "/lighthouse/database/info" // The endpoint for the LIGHTHOUSE server to get the database information.
BcBlockRootEndpoint = func(slot string) string { BcBlockRootEndpoint = func(slot string) string {
return "/eth/v1/beacon/blocks/" + slot + "/root" return "/eth/v1/beacon/blocks/" + slot + "/root"
} }
@ -40,15 +41,6 @@ var (
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
) )
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct { type BeaconClient struct {
Context context.Context // A context generic context with multiple uses. Context context.Context // A context generic context with multiple uses.
@ -59,6 +51,7 @@ type BeaconClient struct {
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking // Used for Head Tracking
PerformHeadTracking bool // Should we track head? PerformHeadTracking bool // Should we track head?
StartingSlot int // If we're performing head tracking. What is the first slot we processed. StartingSlot int // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed PreviousSlot int // Whats the previous slot we processed
@ -67,6 +60,12 @@ type BeaconClient struct {
HeadTracking *SseEvents[Head] // Track the head block HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
//FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints //FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
// Used for Historical Processing
// The latest available slot within the Beacon Server. We can't query any slot greater than this.
// This value is lazily updated. Therefore at times it will be outdated.
LatestSlotInBeaconServer int64
} }
// A struct to keep track of relevant the head event topic. // A struct to keep track of relevant the head event topic.
@ -96,6 +95,9 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
Metrics: &BeaconClientMetrics{ Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0, HeadTrackingInserts: 0,
HeadTrackingReorgs: 0, HeadTrackingReorgs: 0,
HeadTrackingKnownGaps: 0,
HeadError: 0,
HeadReorgError: 0,
}, },
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
} }

View File

@ -0,0 +1,207 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync/atomic"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
MissingBeaconServerType error = fmt.Errorf("The beacon server type provided is not handled.")
LighthouseMissingSlots error = fmt.Errorf("Anchor is not nil. This means lighthouse has not backfilled all the slots from Genesis to head.")
)
// The sync response when checking if the node is synced.
type Sync struct {
Data SyncData `json:"data"`
}
// The sync data
type SyncData struct {
IsSync bool `json:"is_syncing"`
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
}
// This function will check to see if we are synced up with the head of chain.
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
func (bc BeaconClient) CheckHeadSync() (bool, error) {
syncStatus, err := bc.QueryHeadSync()
if err != nil {
return true, nil
}
return syncStatus.Data.IsSync, nil
}
func (bc BeaconClient) QueryHeadSync() (Sync, error) {
var syncStatus Sync
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
resp, err := http.Get(bcSync)
if err != nil {
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
return syncStatus, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
return syncStatus, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return syncStatus, err
}
if err := json.Unmarshal(body, &syncStatus); err != nil {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal sync status")
return syncStatus, err
}
return syncStatus, nil
}
// The response when checking the lighthouse nodes DB info: /lighthouse/database/info
type LighthouseDatabaseInfo struct {
SchemaVersion int `json:"schema_version"`
Config LhDbConfig `json:"config"`
Split LhDbSplit `json:"split"`
Anchor LhDbAnchor `json:"anchor"`
}
// The config field within the DatabaseInfo response.
type LhDbConfig struct {
SlotsPerRestorePoint int `json:"slots_per_restore_point"`
SlotsPerRestorePointSetExplicitly bool `json:"slots_per_restore_point_set_explicitly"`
BlockCacheSize int `json:"block_cache_size"`
CompactOnInit bool `json:"compact_on_init"`
CompactOnPrune bool `json:"compact_on_prune"`
}
// The split field within the DatabaseInfo response.
type LhDbSplit struct {
Slot string `json:"slot"`
StateRoot string `json:"state_root"`
}
// The anchor field within the DatabaseInfo response.
type LhDbAnchor struct {
AnchorSlot string `json:"anchor_slot"`
OldestBlockSlot string `json:"oldest_block_slot"`
OldestBlockParent string `json:"oldest_block_parent"`
StateUpperLimit string `json:"state_upper_limit"`
StateLowerLimit string `json:"state_lower_limit"`
}
// This function will notify us what the head slot is.
func (bc BeaconClient) queryHeadSlotInBeaconServer() (int, error) {
syncStatus, err := bc.QueryHeadSync()
if err != nil {
return 0, nil
}
headSlot, err := strconv.Atoi(syncStatus.Data.HeadSlot)
if err != nil {
return 0, nil
}
return headSlot, nil
}
// return the lighthouse Database Info
func (bc BeaconClient) queryLighthouseDbInfo() (LighthouseDatabaseInfo, error) {
var dbInfo LighthouseDatabaseInfo
lhDbInfo := bc.ServerEndpoint + LhDbInfoEndpoint
resp, err := http.Get(lhDbInfo)
if err != nil {
loghelper.LogEndpoint(lhDbInfo).Error("Unable to get the lighthouse database information")
return dbInfo, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the lighthouse database information")
return dbInfo, fmt.Errorf("Querying the lighthouse database information returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return dbInfo, err
}
if err := json.Unmarshal(body, &dbInfo); err != nil {
loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal the lighthouse database information")
return dbInfo, err
}
return dbInfo, nil
}
// This function will tell us what the latest slot is that the beacon server has available. This is important as
// it will ensure us that we have all slots prior to the given slot.
func (bc BeaconClient) GetLatestSlotInBeaconServer(beaconServerType string) (int, error) {
switch strings.ToLower(beaconServerType) {
case "lighthouse":
headSlot, err := bc.queryHeadSlotInBeaconServer()
if err != nil {
return 0, err
}
lhDb, err := bc.queryLighthouseDbInfo()
if err != nil {
return 0, err
}
if lhDb.Anchor == (LhDbAnchor{}) {
//atomic.StoreInt64(&bc.LatestSlotInBeaconServer, int64(headSlot))
log.WithFields(log.Fields{
"headSlot": headSlot,
}).Info("Anchor is nil, the lighthouse client has all the nodes from genesis to head.")
return headSlot, nil
} else {
log.WithFields(log.Fields{
"lhDb.Anchor": lhDb.Anchor,
}).Info(LighthouseMissingSlots.Error())
log.Info("We will add a feature down the road to wait for anchor to be null, if its needed.")
return 0, LighthouseMissingSlots
}
default:
log.WithFields(log.Fields{"BeaconServerType": beaconServerType}).Error(MissingBeaconServerType.Error())
return 0, MissingBeaconServerType
}
}
// A wrapper function for updating the latest slot.
func (bc BeaconClient) UpdateLatestSlotInBeaconServer(headSlot int64) {
curr := atomic.LoadInt64(&bc.LatestSlotInBeaconServer)
log.WithFields(log.Fields{
"Previous Latest Slot": curr,
"New Latest Slot": headSlot,
}).Debug("Swapping Head Slot")
atomic.SwapInt64(&bc.LatestSlotInBeaconServer, int64(headSlot))
}

View File

@ -1,72 +0,0 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package beaconclient
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// The sync response
type Sync struct {
Data SyncData `json:"data"`
}
// The sync data
type SyncData struct {
IsSync bool `json:"is_syncing"`
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
}
// This function will check to see if we are synced up with the head of chain.
//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}}
func (bc BeaconClient) CheckHeadSync() (bool, error) {
bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint
resp, err := http.Get(bcSync)
if err != nil {
loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status")
return true, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status")
return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
var syncStatus Sync
if err := json.Unmarshal(body, &syncStatus); err != nil {
loghelper.LogEndpoint(bcSync).WithFields(log.Fields{
"rawMessage": string(body),
"err": err,
}).Error("Unable to unmarshal sync status")
return true, err
}
return syncStatus.Data.IsSync, nil
}

View File

@ -19,6 +19,15 @@ import (
"sync/atomic" "sync/atomic"
) )
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here. // occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) { func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {