From 2aff31b2bff9448996df81d1064ff700b3c87b87 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Thu, 12 May 2022 18:29:40 -0400 Subject: [PATCH] Update test + add logic for checking skipped slots --- internal/boot/boot.go | 2 +- internal/boot/boot_test.go | 8 +-- pkg/beaconclient/beaconclient.go | 5 +- pkg/beaconclient/models.go | 2 +- pkg/beaconclient/processslot.go | 96 +++++++++++++++----------------- pkg/beaconclient/queryserver.go | 44 +++++++++++++++ 6 files changed, 98 insertions(+), 59 deletions(-) diff --git a/internal/boot/boot.go b/internal/boot/boot.go index 88aa051..bafecd4 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -54,7 +54,7 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName log.Error("Unable to get the nodes sync status") return BC, DB, err } - if status != false { + if status { log.Error("The node is still syncing..") err = fmt.Errorf("The node is still syncing.") return BC, DB, err diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index 232d447..2c43791 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -25,7 +25,7 @@ var _ = Describe("Boot", func() { It("Should connect successfully", func() { _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) defer db.Close() - Expect(err).To(HaveOccurred()) + Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { @@ -38,19 +38,19 @@ var _ = Describe("Boot", func() { Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) - Expect(err).ToNot(HaveOccurred()) + Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true) - Expect(err).ToNot(HaveOccurred()) + Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true) - Expect(err).ToNot(HaveOccurred()) + Expect(err).To(HaveOccurred()) }) }) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index bef12a3..ff0c0ed 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -17,7 +17,10 @@ var ( BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States BcSyncStatusEndpoint = "/eth/v1/node/syncing" - bcSlotsPerEpoch = 32 // Number of slots in a single Epoch + BcBlockRootEndpoint = func(slot string) string { + return "/eth/v1/beacon/blocks/" + slot + "/root" + } + bcSlotsPerEpoch = 32 // Number of slots in a single Epoch //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain ) diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index 9c6cc68..836655e 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -42,7 +42,7 @@ type DbSlots struct { Slot string // The slot. BlockRoot string // The block root StateRoot string // The state root - Status string // The status, it can be proposed | forked | missed. + Status string // The status, it can be proposed | forked | skipped. } // A struct to capture whats being written to ethcl.signed_beacon_block table. diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index ead46d1..b275660 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -37,7 +37,7 @@ type ProcessSlot struct { StateRoot string // The hex encoded string of the StateRoot. ParentBlockRoot string // The hex encoded string of the parent block. Status string // The status of the block - HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. + HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots. Db sql.Database // The DB object used to write to the DB. Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics // BeaconBlock @@ -66,26 +66,31 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot Metrics: metrics, } - // Get the SignedBeaconBlock. - err := ps.getSignedBeaconBlock(serverAddress) + // Get the BeaconState. + err := ps.getBeaconState(serverAddress) if err != nil { writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } - // Get the BeaconState. - err = ps.getBeaconState(serverAddress) + // Get the SignedBeaconBlock. + err = ps.getSignedBeaconBlock(serverAddress) if err != nil { writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot") return err } + if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot) } // Get this object ready to write - dw := ps.createWriteObjects() - + blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot)) + dw, err := ps.createWriteObjects(blockRootEndpoint) + if err != nil { + writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot") + return err + } // Write the object to the DB. err = dw.writeFullSlot() if err != nil { @@ -97,7 +102,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot if headOrHistoric != "head" && headOrHistoric != "historic" { return fmt.Errorf("headOrHistoric must be either historic or head!") } - if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" { + if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) } return nil @@ -135,7 +140,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { } if rc != 200 { - ps.checkMissedSlot() + ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{} + ps.SszSignedBeaconBlock = []byte{} + ps.ParentBlockRoot = "" + ps.Status = "skipped" + return nil } ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{} @@ -199,8 +208,6 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "currentSlot": ps.FullBeaconState.Slot, }).Error("We skipped a few slots.") writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps") - // Check to see if the slot was skipped. - // Call our known_gaps function. } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, @@ -213,50 +220,37 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str } } -// Add logic for checking a missed Slot -// IF the state is present but block is not, then it was skipped??? -// If the state and block are both absent, then the block might be missing?? -// IF state is absent but block is not, there might be an issue with the LH client. -// Check the previous and following slot? -// Check if head or historic. - -// 1. BeaconBlock is 404. -// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. -// 3. I query BeaconState for slot X, and get a BeaconState. -// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head -func (ps *ProcessSlot) checkMissedSlot() { - -} - // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { +func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) { var ( - stateRoot string - blockRoot string - status string + stateRoot string + blockRoot string + status string + eth1BlockHash string ) - if ps.StateRoot != "" { - stateRoot = ps.StateRoot + if ps.Status == "skipped" { + stateRoot = "" + blockRoot = "" + eth1BlockHash = "" } else { - stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) - log.Debug("StateRoot: ", stateRoot) - } + if ps.StateRoot != "" { + stateRoot = ps.StateRoot + } else { + stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) + log.Debug("StateRoot: ", stateRoot) + } - // MUST RESOLVE! - if ps.BlockRoot != "" { - blockRoot = ps.BlockRoot - } else { - log.Info("We need to add logic") - // We need to get the state of Slot + 1, then we can run the below. - // WE can query it for each run, or we can leave it blank, and update it. - // I just want to avoid getting the same state twice, especially since the state can get heavy. - // blockRoot = "0x" + hex.EncodeToString(ps.FullBeaconState.GetBlockRoots()[ps.Slot%bcSlotPerHistoricalVector][:]) - // log.Debug("Block Root: ", blockRoot) - // log.Debug("ps.Slott: ", ps.Slot) - // Maybe we can use the helper down the road. - //blockRootRaw, _ := helper.BlockRootAtSlot(ps.FullBeaconState, ps.FullSignedBeaconBlock.Block.Slot) - //blockRoot = string(blockRootRaw) + if ps.BlockRoot != "" { + blockRoot = ps.BlockRoot + } else { + var err error + blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) + if err != nil { + return nil, err + } + } + eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash) } if ps.Status != "" { @@ -265,11 +259,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { status = "proposed" } - eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash) - dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics) dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawBeaconState = ps.SszBeaconState - return dw + return dw, nil } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 31168ce..00376d5 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -3,6 +3,7 @@ package beaconclient import ( + "encoding/json" "fmt" "io/ioutil" "net/http" @@ -35,3 +36,46 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { } return body, rc, nil } + +// Object to unmarshal the BlockRootResponse +type BlockRootResponse struct { + Data BlockRootMessage `json:"data"` +} + +// Object to unmarshal the BlockRoot Message +type BlockRootMessage struct { + Root string `json:"root"` +} + +// A function to query the blockroot for a given slot. +func queryBlockRoot(endpoint string, slot string) (string, error) { + log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") + client := &http.Client{} + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to create a request!") + return "", fmt.Errorf("Unable to create a request!: %s", err.Error()) + } + req.Header.Set("Accept", "application/json") + response, err := client.Do(req) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") + return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") + return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) + } + + resp := BlockRootResponse{} + if err := json.Unmarshal(body, &resp); err != nil { + loghelper.LogEndpoint(endpoint).WithFields(log.Fields{ + "rawMessage": string(body), + "err": err, + }).Error("Unable to unmarshal the block root") + return "", err + } + return resp.Data.Root, nil +}