From 7ff3efb38076b1ce0214116fa7d311ef9b7cd376 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani <58230246+abdulrabbani00@users.noreply.github.com> Date: Fri, 13 May 2022 08:48:31 -0400 Subject: [PATCH] Handle Skipped Slots (#34) * Ensure that the node is synced at boot time * Update test + add logic for checking skipped slots * Update boot check * Add skip_sync to config. * Update a test so it fails --- .github/workflows/on-pr.yml | 3 +- README.md | 6 +- cmd/boot.go | 2 +- cmd/capture.go | 18 +++++- cmd/head.go | 8 ++- entrypoint.sh | 6 +- internal/boot/boot.go | 27 ++++++-- internal/boot/boot_test.go | 25 +++++--- internal/shutdown/shutdown_test.go | 2 +- pkg/beaconclient/beaconclient.go | 6 +- pkg/beaconclient/checksyncstatus.go | 57 +++++++++++++++++ pkg/beaconclient/healthcheck.go | 4 +- pkg/beaconclient/models.go | 2 +- pkg/beaconclient/processslot.go | 96 +++++++++++++---------------- pkg/beaconclient/queryserver.go | 44 +++++++++++++ 15 files changed, 226 insertions(+), 80 deletions(-) create mode 100644 pkg/beaconclient/checksyncstatus.go diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index 75a38d7..ef69e42 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -25,7 +25,7 @@ on: - "**" env: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}} + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'feature/client-build'}} ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} GOPATH: /tmp/go @@ -57,6 +57,7 @@ jobs: echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh echo ethcl_capture_mode=boot >> ./config.sh + echo ethcl_skip_sync=true >> ./config.sh cat ./config.sh - name: Run docker compose diff --git a/README.md b/README.md index 52b433b..cd268ca 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ To run the application, do as follows: 2. Run the start up command. ``` -go run main.go capture head --db.address localhost \ +go run -race main.go capture head --db.address localhost \ --db.password password \ --db.port 8077 \ --db.username vdbm \ @@ -46,8 +46,10 @@ go run main.go capture head --db.address localhost \ --bc.address localhost \ --bc.port 5052 \ --bc.connectionProtocol http \ + --t.skipSync=true \ --log.level info \ - --log.output=true + --log.output=true \ + --kg.increment 100 ``` ## Running Tests diff --git a/cmd/boot.go b/cmd/boot.go index 8a0ffa1..91a958c 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -44,7 +44,7 @@ func bootApp() { log.Info("Starting the application in boot mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } diff --git a/cmd/capture.go b/cmd/capture.go index 63ddbdc..c2137db 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -22,8 +22,10 @@ var ( bcAddress string bcPort int bcConnectionProtocol string + bcType string maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second notifierCh chan os.Signal = make(chan os.Signal, 1) + testDisregardSync bool ) // captureCmd represents the capture command @@ -62,14 +64,18 @@ func init() { exitErr(err) //// Beacon Client Specific - captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)") - captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)") + captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required)") + captureCmd.PersistentFlags().StringVarP(&bcType, "bc.type", "", "lighthouse", "The beacon client we are using, options are prysm and lighthouse.") + captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required )") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") err = captureCmd.MarkPersistentFlagRequired("bc.address") exitErr(err) err = captureCmd.MarkPersistentFlagRequired("bc.port") exitErr(err) + //// Testing Specific + captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") + // Bind Flags with Viper //// DB Flags err = viper.BindPFlag("db.username", captureCmd.PersistentFlags().Lookup("db.username")) @@ -82,12 +88,18 @@ func init() { exitErr(err) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) - err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver")) + err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) + exitErr(err) + + // Testing Specific + err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver")) exitErr(err) // LH specific err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) exitErr(err) + err = viper.BindPFlag("bc.type", captureCmd.PersistentFlags().Lookup("bc.type")) + exitErr(err) err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port")) exitErr(err) err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol")) diff --git a/cmd/head.go b/cmd/head.go index 0058f8d..54c5d7e 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -6,6 +6,7 @@ package cmd import ( "context" + "os" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -35,11 +36,16 @@ func startHeadTracking() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) + BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, testDisregardSync) if err != nil { loghelper.LogError(err).Error("Unable to Start application") + if DB != nil { + DB.Close() + } + os.Exit(1) } + log.Info("The Beacon Client has booted successfully!") // Capture head blocks go BC.CaptureHead(kgTableIncrement) diff --git a/entrypoint.sh b/entrypoint.sh index c9923e3..20f872f 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -11,7 +11,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \ --db.driver $DB_DRIVER \ --bc.address $BC_ADDRESS \ --bc.port $BC_PORT \ - --log.level $LOG_LEVEL + --log.level $LOG_LEVEL\ + --t.skipSync=$SKIP_SYNC /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \ --db.password $DB_PASSWORD \ @@ -21,7 +22,8 @@ echo /root/ipld-ethcl-indexer capture ${CAPTURE_MODE} --db.address $DB_ADDRESS \ --db.driver $DB_DRIVER \ --bc.address $BC_ADDRESS \ --bc.port $BC_PORT \ - --log.level $LOG_LEVEL + --log.level $LOG_LEVEL \ + --t.skipSync=$SKIP_SYNC rv=$? diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 48838ee..bafecd4 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -2,6 +2,7 @@ package boot import ( "context" + "fmt" "time" log "github.com/sirupsen/logrus" @@ -25,7 +26,8 @@ var ( // // 2. Connect to the database. // -func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { +// 3. Make sure the node is synced, unless disregardSync is true. +func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") @@ -38,23 +40,40 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName } log.Debug("Setting up DB connection") - DB, err := postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName) + DB, err = postgres.SetupPostgresDb(dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName) if err != nil { return nil, nil, err } BC.Db = DB + + var status bool + if !disregardSync { + status, err = BC.CheckHeadSync() + if err != nil { + log.Error("Unable to get the nodes sync status") + return BC, DB, err + } + if status { + log.Error("The node is still syncing..") + err = fmt.Errorf("The node is still syncing.") + return BC, DB, err + } + } else { + log.Warn("We are not checking to see if the node has synced to head.") + } return BC, DB, nil } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. -func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) { +func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { var err error for i := 0; i < maxRetry; i++ { - BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol) + BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, + "err": err, }).Warn("Unable to boot application. Going to try again") time.Sleep(time.Duration(retryInterval) * time.Second) continue diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index 3cbf71c..ca83653 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -21,29 +21,36 @@ var _ = Describe("Boot", func() { bcConnectionProtocol string = "http" ) Describe("Booting the application", Label("integration"), func() { - Context("When the DB and BC are both up and running", func() { + Context("When the DB and BC are both up and running, and we skip checking for a synced head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) defer db.Close() - Expect(err).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) + }) + }) + Context("When the DB and BC are both up and running, and we check for a synced head", func() { + It("Should not connect successfully", func() { + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, false) + defer db.Close() + Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) - Expect(err).ToNot(BeNil()) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) + Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) - Expect(err).ToNot(BeNil()) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) + Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol) - Expect(err).ToNot(BeNil()) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) + Expect(err).To(HaveOccurred()) }) }) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index c978ac4..bfb430c 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -40,7 +40,7 @@ var _ = Describe("Shutdown", func() { ) BeforeEach(func() { ctx = context.Background() - BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol) + BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index a0d70c4..ff0c0ed 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -16,7 +16,11 @@ var ( bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States - bcSlotsPerEpoch = 32 // Number of slots in a single Epoch + BcSyncStatusEndpoint = "/eth/v1/node/syncing" + BcBlockRootEndpoint = func(slot string) string { + return "/eth/v1/beacon/blocks/" + slot + "/root" + } + bcSlotsPerEpoch = 32 // Number of slots in a single Epoch //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain ) diff --git a/pkg/beaconclient/checksyncstatus.go b/pkg/beaconclient/checksyncstatus.go new file mode 100644 index 0000000..25c0398 --- /dev/null +++ b/pkg/beaconclient/checksyncstatus.go @@ -0,0 +1,57 @@ +package beaconclient + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +// The sync response +type Sync struct { + Data SyncData `json:"data"` +} + +// The sync data +type SyncData struct { + IsSync bool `json:"is_syncing"` + HeadSlot string `json:"head_slot"` + SyncDistance string `json:"sync_distance"` +} + +// This function will check to see if we are synced up with the head of chain. +//{"data":{"is_syncing":true,"head_slot":"62528","sync_distance":"3734299"}} +func (bc BeaconClient) CheckHeadSync() (bool, error) { + bcSync := bc.ServerEndpoint + BcSyncStatusEndpoint + resp, err := http.Get(bcSync) + + if err != nil { + loghelper.LogEndpoint(bcSync).Error("Unable to check the sync status") + return true, err + } + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + loghelper.LogEndpoint(bcSync).WithFields(log.Fields{"returnCode": resp.StatusCode}).Error("Error when getting the sync status") + return true, fmt.Errorf("Querying the sync status returned a non 2xx status code, code provided: %d", resp.StatusCode) + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return true, err + } + + var syncStatus Sync + if err := json.Unmarshal(body, &syncStatus); err != nil { + loghelper.LogEndpoint(bcSync).WithFields(log.Fields{ + "rawMessage": string(body), + "err": err, + }).Error("Unable to unmarshal sync status") + return true, err + } + + return syncStatus.Data.IsSync, nil +} diff --git a/pkg/beaconclient/healthcheck.go b/pkg/beaconclient/healthcheck.go index c7f3fcd..27638fc 100644 --- a/pkg/beaconclient/healthcheck.go +++ b/pkg/beaconclient/healthcheck.go @@ -22,8 +22,8 @@ func (bc BeaconClient) CheckBeaconClient() error { } if resp.StatusCode < 200 || resp.StatusCode > 299 { - log.Error("We recieved a non 2xx status code when checking the health of the beacon node.") - log.Error("Health Endpoint Status Code: ", resp.StatusCode) + loghelper.LogEndpoint(bcEndpoint).Error("We recieved a non 2xx status code when checking the health of the beacon node.") + loghelper.LogEndpoint(bcEndpoint).Error("Health Endpoint Status Code: ", resp.StatusCode) return fmt.Errorf("beacon Node Provided a non 2xx status code, code provided: %d", resp.StatusCode) } diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index 9c6cc68..836655e 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -42,7 +42,7 @@ type DbSlots struct { Slot string // The slot. BlockRoot string // The block root StateRoot string // The state root - Status string // The status, it can be proposed | forked | missed. + Status string // The status, it can be proposed | forked | skipped. } // A struct to capture whats being written to ethcl.signed_beacon_block table. diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index ead46d1..b275660 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -37,7 +37,7 @@ type ProcessSlot struct { StateRoot string // The hex encoded string of the StateRoot. ParentBlockRoot string // The hex encoded string of the parent block. Status string // The status of the block - HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. + HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots. Db sql.Database // The DB object used to write to the DB. Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics // BeaconBlock @@ -66,26 +66,31 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot Metrics: metrics, } - // Get the SignedBeaconBlock. - err := ps.getSignedBeaconBlock(serverAddress) + // Get the BeaconState. + err := ps.getBeaconState(serverAddress) if err != nil { writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } - // Get the BeaconState. - err = ps.getBeaconState(serverAddress) + // Get the SignedBeaconBlock. + err = ps.getSignedBeaconBlock(serverAddress) if err != nil { writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } + if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot) } // Get this object ready to write - dw := ps.createWriteObjects() - + blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot)) + dw, err := ps.createWriteObjects(blockRootEndpoint) + if err != nil { + writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot") + return err + } // Write the object to the DB. err = dw.writeFullSlot() if err != nil { @@ -97,7 +102,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot if headOrHistoric != "head" && headOrHistoric != "historic" { return fmt.Errorf("headOrHistoric must be either historic or head!") } - if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" { + if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) } return nil @@ -135,7 +140,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { } if rc != 200 { - ps.checkMissedSlot() + ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{} + ps.SszSignedBeaconBlock = []byte{} + ps.ParentBlockRoot = "" + ps.Status = "skipped" + return nil } ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{} @@ -199,8 +208,6 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "currentSlot": ps.FullBeaconState.Slot, }).Error("We skipped a few slots.") writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps") - // Check to see if the slot was skipped. - // Call our known_gaps function. } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, @@ -213,50 +220,37 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str } } -// Add logic for checking a missed Slot -// IF the state is present but block is not, then it was skipped??? -// If the state and block are both absent, then the block might be missing?? -// IF state is absent but block is not, there might be an issue with the LH client. -// Check the previous and following slot? -// Check if head or historic. - -// 1. BeaconBlock is 404. -// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. -// 3. I query BeaconState for slot X, and get a BeaconState. -// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head -func (ps *ProcessSlot) checkMissedSlot() { - -} - // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { +func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) { var ( - stateRoot string - blockRoot string - status string + stateRoot string + blockRoot string + status string + eth1BlockHash string ) - if ps.StateRoot != "" { - stateRoot = ps.StateRoot + if ps.Status == "skipped" { + stateRoot = "" + blockRoot = "" + eth1BlockHash = "" } else { - stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) - log.Debug("StateRoot: ", stateRoot) - } + if ps.StateRoot != "" { + stateRoot = ps.StateRoot + } else { + stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) + log.Debug("StateRoot: ", stateRoot) + } - // MUST RESOLVE! - if ps.BlockRoot != "" { - blockRoot = ps.BlockRoot - } else { - log.Info("We need to add logic") - // We need to get the state of Slot + 1, then we can run the below. - // WE can query it for each run, or we can leave it blank, and update it. - // I just want to avoid getting the same state twice, especially since the state can get heavy. - // blockRoot = "0x" + hex.EncodeToString(ps.FullBeaconState.GetBlockRoots()[ps.Slot%bcSlotPerHistoricalVector][:]) - // log.Debug("Block Root: ", blockRoot) - // log.Debug("ps.Slott: ", ps.Slot) - // Maybe we can use the helper down the road. - //blockRootRaw, _ := helper.BlockRootAtSlot(ps.FullBeaconState, ps.FullSignedBeaconBlock.Block.Slot) - //blockRoot = string(blockRootRaw) + if ps.BlockRoot != "" { + blockRoot = ps.BlockRoot + } else { + var err error + blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) + if err != nil { + return nil, err + } + } + eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash) } if ps.Status != "" { @@ -265,11 +259,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { status = "proposed" } - eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash) - dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics) dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawBeaconState = ps.SszBeaconState - return dw + return dw, nil } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 31168ce..00376d5 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -3,6 +3,7 @@ package beaconclient import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -35,3 +36,46 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { } return body, rc, nil } + +// Object to unmarshal the BlockRootResponse +type BlockRootResponse struct { + Data BlockRootMessage `json:"data"` +} + +// Object to unmarshal the BlockRoot Message +type BlockRootMessage struct { + Root string `json:"root"` +} + +// A function to query the blockroot for a given slot. +func queryBlockRoot(endpoint string, slot string) (string, error) { + log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") + client := &http.Client{} + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to create a request!") + return "", fmt.Errorf("Unable to create a request!: %s", err.Error()) + } + req.Header.Set("Accept", "application/json") + response, err := client.Do(req) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") + return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") + return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) + } + + resp := BlockRootResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + loghelper.LogEndpoint(endpoint).WithFields(log.Fields{ + "rawMessage": string(body), + "err": err, + }).Error("Unable to unmarshal the block root") + return "", err + } + return resp.Data.Root, nil +}