diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index c9d3fe6..259788e 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -25,7 +25,7 @@ on: - "**" env: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}} + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}} ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} GOPATH: /tmp/go diff --git a/cmd/boot.go b/cmd/boot.go index 9cab893..3f31e35 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -44,7 +44,7 @@ func bootApp() { log.Info("Starting the application in boot mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } diff --git a/cmd/capture.go b/cmd/capture.go index e8da70b..5916bfa 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -33,6 +33,8 @@ var ( dbPort int bcAddress string bcPort int + bcBootRetryInterval int + bcBootMaxRetry int bcConnectionProtocol string bcType string maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second @@ -80,6 +82,8 @@ func init() { captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.") captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") + captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") + captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") err = captureCmd.MarkPersistentFlagRequired("bc.address") exitErr(err) err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -116,6 +120,10 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) exitErr(err) + err = viper.BindPFlag("bc.bootRetryInterval", captureCmd.PersistentFlags().Lookup("bc.bootRetryInterval")) + exitErr(err) + err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) + exitErr(err) // Here you will define your flags and configuration settings. } diff --git a/cmd/head.go b/cmd/head.go index 9e95f9f..d5544a4 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -48,7 +48,7 @@ func startHeadTracking() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync) if err != nil { loghelper.LogError(err).Error("Unable to Start application") if DB != nil { diff --git a/cmd/historic.go b/cmd/historic.go index 2211503..d52cb0d 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -17,9 +17,14 @@ package cmd import ( - "fmt" + "context" + "os" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) // historicCmd represents the historic command @@ -28,10 +33,22 @@ var historicCmd = &cobra.Command{ Short: "Capture the historic blocks and states.", Long: `Capture the historic blocks and states.`, Run: func(cmd *cobra.Command, args []string) { - fmt.Println("historic called") + startHistoricProcessing() }, } +// Start the application to process historical slots. +func startHistoricProcessing() { + // Boot the application + log.Info("Starting the application in head tracking mode.") + ctx := context.Background() + + _, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync) + if err != nil { + StopApplicationPreBoot(err, DB) + } +} + func init() { captureCmd.AddCommand(historicCmd) @@ -45,3 +62,12 @@ func init() { // is called directly, e.g.: // historicCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") } + +// Stop the application during its initial boot phases. +func StopApplicationPreBoot(startErr error, db sql.Database) { + loghelper.LogError(startErr).Error("Unable to Start application") + if db != nil { + db.Close() + } + os.Exit(1) +} diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 7edeafa..3802407 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -18,6 +18,7 @@ package boot import ( "context" "fmt" + "strings" "time" log "github.com/sirupsen/logrus" @@ -27,10 +28,8 @@ import ( ) var ( - maxRetry = 5 // Max times to try to connect to the DB or BC at boot. - retryInterval = 30 // The time to wait between each try. - DB sql.Database = &postgres.DB{} - BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{} + DB sql.Database = &postgres.DB{} + BC *beaconclient.BeaconClient = &beaconclient.BeaconClient{} ) // This function will perform some boot operations. If any steps fail, the application will fail to start. @@ -81,19 +80,33 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. -func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { +func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, + bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { var err error - for i := 0; i < maxRetry; i++ { + for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, "err": err, }).Warn("Unable to boot application. Going to try again") - time.Sleep(time.Duration(retryInterval) * time.Second) + time.Sleep(time.Duration(bcRetryInterval) * time.Second) continue } break } + + switch strings.ToLower(startUpMode) { + case "head": + log.Debug("No further actions needed to boot the application at this phase.") + case "historic": + log.Debug("Performing additional boot steps for historical processing") + headSlot, err := BC.GetLatestSlotInBeaconServer(bcType) + if err != nil { + return BC, DB, err + } + BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) + // Add another switch case for bcType if its ever needed. + } return BC, DB, err } diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index 21daba1..44e1b2c 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -34,18 +34,28 @@ var _ = Describe("Boot", func() { bcAddress string = "localhost" bcPort int = 5052 bcConnectionProtocol string = "http" + bcType string = "lighthouse" + bcBootRetryInterval int = 1 + bcBootMaxRetry int = 5 ) Describe("Booting the application", Label("integration"), func() { - Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() { + Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true) + defer db.Close() + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { + It("Should connect successfully", func() { + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false) defer db.Close() Expect(err).To(HaveOccurred()) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index b26bc7c..705b9df 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -46,6 +46,9 @@ var _ = Describe("Shutdown", func() { bcAddress string = "localhost" bcPort int = 5052 bcConnectionProtocol string = "http" + bcType string = "lighthouse" + bcBootRetryInterval int = 1 + bcBootMaxRetry int = 5 maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second DB sql.Database BC *beaconclient.BeaconClient @@ -55,7 +58,7 @@ var _ = Describe("Shutdown", func() { ) BeforeEach(func() { ctx = context.Background() - BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) + BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index b1a83ea..357eee3 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -31,7 +31,8 @@ var ( bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States - BcSyncStatusEndpoint = "/eth/v1/node/syncing" + BcSyncStatusEndpoint = "/eth/v1/node/syncing" // The endpoint to check to see if the beacon server is still trying to sync to head. + LhDbInfoEndpoint = "/lighthouse/database/info" // The endpoint for the LIGHTHOUSE server to get the database information. BcBlockRootEndpoint = func(slot string) string { return "/eth/v1/beacon/blocks/" + slot + "/root" } @@ -40,15 +41,6 @@ var ( //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain ) -// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. -type BeaconClientMetrics struct { - HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB. - HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB. - HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB. - HeadError uint64 // Number of errors that occurred when decoding the head message. - HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. -} - // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. type BeaconClient struct { Context context.Context // A context generic context with multiple uses. @@ -59,6 +51,7 @@ type BeaconClient struct { KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. // Used for Head Tracking + PerformHeadTracking bool // Should we track head? StartingSlot int // If we're performing head tracking. What is the first slot we processed. PreviousSlot int // Whats the previous slot we processed @@ -67,6 +60,12 @@ type BeaconClient struct { HeadTracking *SseEvents[Head] // Track the head block ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs //FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints + + // Used for Historical Processing + + // The latest available slot within the Beacon Server. We can't query any slot greater than this. + // This value is lazily updated. Therefore at times it will be outdated. + LatestSlotInBeaconServer int64 } // A struct to keep track of relevant the head event topic. @@ -94,8 +93,11 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: &BeaconClientMetrics{ - HeadTrackingInserts: 0, - HeadTrackingReorgs: 0, + HeadTrackingInserts: 0, + HeadTrackingReorgs: 0, + HeadTrackingKnownGaps: 0, + HeadError: 0, + HeadReorgError: 0, }, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), } diff --git a/pkg/beaconclient/checkbeaconserverstatus.go b/pkg/beaconclient/checkbeaconserverstatus.go new file mode 100644 index 0000000..0d39498 --- /dev/null +++ b/pkg/beaconclient/checkbeaconserverstatus.go @@ -0,0 +1,207 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +package beaconclient + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "sync/atomic" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +var ( + MissingBeaconServerType error = fmt.Errorf("The beacon server type provided is not handled.") + LighthouseMissingSlots error = fmt.Errorf("Anchor is not nil. This means lighthouse has not backfilled all the slots from Genesis to head.") +) + +// The sync response when checking if the node is synced. +type Sync struct { + Data SyncData `json:"data"` +} + +// The sync data +type SyncData struct { + IsSync bool `json:"is_syncing"` + HeadSlot string `json:"head_slot"` + SyncDistance string `json:"sync_distance"` +} + +// This function will check to see if we are synced up with the head of chain. +//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}} +func (bc BeaconClient) CheckHeadSync() (bool, error) { + syncStatus, err := bc.QueryHeadSync() + if err != nil { + return true, nil + } + return syncStatus.Data.IsSync, nil +} + +func (bc BeaconClient) QueryHeadSync() (Sync, error) { + var syncStatus Sync + bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint + resp, err := http.Get(bcSync) + + if err != nil { + loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status") + return syncStatus, err + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status") + return syncStatus, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode) + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return syncStatus, err + } + + if err := json.Unmarshal(body, &syncStatus); err != nil { + loghelper.LogEndpoint(bcSync).WithFields(log.Fields{ + "rawMessage": string(body), + "err": err, + }).Error("Unable to unmarshal sync status") + return syncStatus, err + } + return syncStatus, nil +} + +// The response when checking the lighthouse nodes DB info: /lighthouse/database/info +type LighthouseDatabaseInfo struct { + SchemaVersion int `json:"schema_version"` + Config LhDbConfig `json:"config"` + Split LhDbSplit `json:"split"` + Anchor LhDbAnchor `json:"anchor"` +} + +// The config field within the DatabaseInfo response. +type LhDbConfig struct { + SlotsPerRestorePoint int `json:"slots_per_restore_point"` + SlotsPerRestorePointSetExplicitly bool `json:"slots_per_restore_point_set_explicitly"` + BlockCacheSize int `json:"block_cache_size"` + CompactOnInit bool `json:"compact_on_init"` + CompactOnPrune bool `json:"compact_on_prune"` +} + +// The split field within the DatabaseInfo response. +type LhDbSplit struct { + Slot string `json:"slot"` + StateRoot string `json:"state_root"` +} + +// The anchor field within the DatabaseInfo response. +type LhDbAnchor struct { + AnchorSlot string `json:"anchor_slot"` + OldestBlockSlot string `json:"oldest_block_slot"` + OldestBlockParent string `json:"oldest_block_parent"` + StateUpperLimit string `json:"state_upper_limit"` + StateLowerLimit string `json:"state_lower_limit"` +} + +// This function will notify us what the head slot is. +func (bc BeaconClient) queryHeadSlotInBeaconServer() (int, error) { + syncStatus, err := bc.QueryHeadSync() + if err != nil { + return 0, nil + } + headSlot, err := strconv.Atoi(syncStatus.Data.HeadSlot) + if err != nil { + return 0, nil + } + return headSlot, nil +} + +// return the lighthouse Database Info +func (bc BeaconClient) queryLighthouseDbInfo() (LighthouseDatabaseInfo, error) { + var dbInfo LighthouseDatabaseInfo + + lhDbInfo := bc.ServerEndpoint + LhDbInfoEndpoint + resp, err := http.Get(lhDbInfo) + + if err != nil { + loghelper.LogEndpoint(lhDbInfo).Error("Unable to get the lighthouse database information") + return dbInfo, err + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the lighthouse database information") + return dbInfo, fmt.Errorf("Querying the lighthouse database information returned a non 2xx status code, code provided: %d", resp.StatusCode) + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return dbInfo, err + } + + if err := json.Unmarshal(body, &dbInfo); err != nil { + loghelper.LogEndpoint(lhDbInfo).WithFields(log.Fields{ + "rawMessage": string(body), + "err": err, + }).Error("Unable to unmarshal the lighthouse database information") + return dbInfo, err + } + return dbInfo, nil +} + +// This function will tell us what the latest slot is that the beacon server has available. This is important as +// it will ensure us that we have all slots prior to the given slot. +func (bc BeaconClient) GetLatestSlotInBeaconServer(beaconServerType string) (int, error) { + switch strings.ToLower(beaconServerType) { + case "lighthouse": + headSlot, err := bc.queryHeadSlotInBeaconServer() + if err != nil { + return 0, err + } + lhDb, err := bc.queryLighthouseDbInfo() + if err != nil { + return 0, err + } + if lhDb.Anchor == (LhDbAnchor{}) { + //atomic.StoreInt64(&bc.LatestSlotInBeaconServer, int64(headSlot)) + log.WithFields(log.Fields{ + "headSlot": headSlot, + }).Info("Anchor is nil, the lighthouse client has all the nodes from genesis to head.") + return headSlot, nil + } else { + log.WithFields(log.Fields{ + "lhDb.Anchor": lhDb.Anchor, + }).Info(LighthouseMissingSlots.Error()) + log.Info("We will add a feature down the road to wait for anchor to be null, if its needed.") + return 0, LighthouseMissingSlots + } + default: + log.WithFields(log.Fields{"BeaconServerType": beaconServerType}).Error(MissingBeaconServerType.Error()) + return 0, MissingBeaconServerType + } +} + +// A wrapper function for updating the latest slot. +func (bc BeaconClient) UpdateLatestSlotInBeaconServer(headSlot int64) { + curr := atomic.LoadInt64(&bc.LatestSlotInBeaconServer) + log.WithFields(log.Fields{ + "Previous Latest Slot": curr, + "New Latest Slot": headSlot, + }).Debug("Swapping Head Slot") + atomic.SwapInt64(&bc.LatestSlotInBeaconServer, int64(headSlot)) +} diff --git a/pkg/beaconclient/checksyncstatus.go b/pkg/beaconclient/checksyncstatus.go deleted file mode 100644 index 3f88398..0000000 --- a/pkg/beaconclient/checksyncstatus.go +++ /dev/null @@ -1,72 +0,0 @@ -// VulcanizeDB -// Copyright © 2022 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . -package beaconclient - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - - log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" -) - -// The sync response -type Sync struct { - Data SyncData `json:"data"` -} - -// The sync data -type SyncData struct { - IsSync bool `json:"is_syncing"` - HeadSlot string `json:"head_slot"` - SyncDistance string `json:"sync_distance"` -} - -// This function will check to see if we are synced up with the head of chain. -//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}} -func (bc BeaconClient) CheckHeadSync() (bool, error) { - bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint - resp, err := http.Get(bcSync) - - if err != nil { - loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status") - return true, err - } - - if resp.StatusCode < 200 || resp.StatusCode > 299 { - loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status") - return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode) - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return true, err - } - - var syncStatus Sync - if err := json.Unmarshal(body, &syncStatus); err != nil { - loghelper.LogEndpoint(bcSync).WithFields(log.Fields{ - "rawMessage": string(body), - "err": err, - }).Error("Unable to unmarshal sync status") - return true, err - } - - return syncStatus.Data.IsSync, nil -} diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index a6a45c4..ecf8da5 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -19,6 +19,15 @@ import ( "sync/atomic" ) +// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. +type BeaconClientMetrics struct { + HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB. + HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB. + HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB. + HeadError uint64 // Number of errors that occurred when decoding the head message. + HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. +} + // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {