Handle Skipped Slots #34
@ -54,7 +54,7 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
|
|||||||
log.Error("Unable to get the nodes sync status")
|
log.Error("Unable to get the nodes sync status")
|
||||||
return BC, DB, err
|
return BC, DB, err
|
||||||
}
|
}
|
||||||
if status != false {
|
if status {
|
||||||
log.Error("The node is still syncing..")
|
log.Error("The node is still syncing..")
|
||||||
err = fmt.Errorf("The node is still syncing.")
|
err = fmt.Errorf("The node is still syncing.")
|
||||||
return BC, DB, err
|
return BC, DB, err
|
||||||
|
@ -25,7 +25,7 @@ var _ = Describe("Boot", func() {
|
|||||||
It("Should connect successfully", func() {
|
It("Should connect successfully", func() {
|
||||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
|
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
|
||||||
@ -38,19 +38,19 @@ var _ = Describe("Boot", func() {
|
|||||||
Context("When the DB is running but not the BC", func() {
|
Context("When the DB is running but not the BC", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the BC is running but not the DB", func() {
|
Context("When the BC is running but not the DB", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When neither the BC or DB are running", func() {
|
Context("When neither the BC or DB are running", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -17,7 +17,10 @@ var (
|
|||||||
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
|
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
|
||||||
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
|
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
|
||||||
BcSyncStatusEndpoint = "/eth/v1/node/syncing"
|
BcSyncStatusEndpoint = "/eth/v1/node/syncing"
|
||||||
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
|
BcBlockRootEndpoint = func(slot string) string {
|
||||||
|
return "/eth/v1/beacon/blocks/" + slot + "/root"
|
||||||
|
}
|
||||||
|
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
|
||||||
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
|
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
|
||||||
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
|
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
|
||||||
)
|
)
|
||||||
|
@ -42,7 +42,7 @@ type DbSlots struct {
|
|||||||
Slot string // The slot.
|
Slot string // The slot.
|
||||||
BlockRoot string // The block root
|
BlockRoot string // The block root
|
||||||
StateRoot string // The state root
|
StateRoot string // The state root
|
||||||
Status string // The status, it can be proposed | forked | missed.
|
Status string // The status, it can be proposed | forked | skipped.
|
||||||
}
|
}
|
||||||
|
|
||||||
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
// A struct to capture whats being written to ethcl.signed_beacon_block table.
|
||||||
|
@ -37,7 +37,7 @@ type ProcessSlot struct {
|
|||||||
StateRoot string // The hex encoded string of the StateRoot.
|
StateRoot string // The hex encoded string of the StateRoot.
|
||||||
ParentBlockRoot string // The hex encoded string of the parent block.
|
ParentBlockRoot string // The hex encoded string of the parent block.
|
||||||
Status string // The status of the block
|
Status string // The status of the block
|
||||||
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
|
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and skipped slots.
|
||||||
Db sql.Database // The DB object used to write to the DB.
|
Db sql.Database // The DB object used to write to the DB.
|
||||||
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
|
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
|
||||||
// BeaconBlock
|
// BeaconBlock
|
||||||
@ -66,26 +66,31 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
Metrics: metrics,
|
Metrics: metrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the SignedBeaconBlock.
|
// Get the BeaconState.
|
||||||
err := ps.getSignedBeaconBlock(serverAddress)
|
err := ps.getBeaconState(serverAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the BeaconState.
|
// Get the SignedBeaconBlock.
|
||||||
err = ps.getBeaconState(serverAddress)
|
err = ps.getSignedBeaconBlock(serverAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
|
||||||
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
writeStartUpGaps(db, knownGapsTableIncrement, ps.Slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get this object ready to write
|
// Get this object ready to write
|
||||||
dw := ps.createWriteObjects()
|
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
|
||||||
|
dw, err := ps.createWriteObjects(blockRootEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot")
|
||||||
|
return err
|
||||||
|
}
|
||||||
// Write the object to the DB.
|
// Write the object to the DB.
|
||||||
err = dw.writeFullSlot()
|
err = dw.writeFullSlot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -97,7 +102,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
if headOrHistoric != "head" && headOrHistoric != "historic" {
|
||||||
return fmt.Errorf("headOrHistoric must be either historic or head!")
|
return fmt.Errorf("headOrHistoric must be either historic or head!")
|
||||||
}
|
}
|
||||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" {
|
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
||||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -135,7 +140,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if rc != 200 {
|
if rc != 200 {
|
||||||
ps.checkMissedSlot()
|
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
|
||||||
|
ps.SszSignedBeaconBlock = []byte{}
|
||||||
|
ps.ParentBlockRoot = ""
|
||||||
|
ps.Status = "skipped"
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
|
ps.FullSignedBeaconBlock = &st.SignedBeaconBlock{}
|
||||||
@ -199,8 +208,6 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
"currentSlot": ps.FullBeaconState.Slot,
|
"currentSlot": ps.FullBeaconState.Slot,
|
||||||
}).Error("We skipped a few slots.")
|
}).Error("We skipped a few slots.")
|
||||||
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot)-1, fmt.Errorf("Gaps during head processing"), "headGaps")
|
||||||
// Check to see if the slot was skipped.
|
|
||||||
// Call our known_gaps function.
|
|
||||||
} else if previousBlockRoot != parentRoot {
|
} else if previousBlockRoot != parentRoot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousBlockRoot": previousBlockRoot,
|
"previousBlockRoot": previousBlockRoot,
|
||||||
@ -213,50 +220,37 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add logic for checking a missed Slot
|
|
||||||
// IF the state is present but block is not, then it was skipped???
|
|
||||||
// If the state and block are both absent, then the block might be missing??
|
|
||||||
// IF state is absent but block is not, there might be an issue with the LH client.
|
|
||||||
// Check the previous and following slot?
|
|
||||||
// Check if head or historic.
|
|
||||||
|
|
||||||
// 1. BeaconBlock is 404.
|
|
||||||
// 2. check check /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
|
|
||||||
// 3. I query BeaconState for slot X, and get a BeaconState.
|
|
||||||
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
|
|
||||||
func (ps *ProcessSlot) checkMissedSlot() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Transforms all the raw data into DB models that can be written to the DB.
|
// Transforms all the raw data into DB models that can be written to the DB.
|
||||||
func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWriter, error) {
|
||||||
var (
|
var (
|
||||||
stateRoot string
|
stateRoot string
|
||||||
blockRoot string
|
blockRoot string
|
||||||
status string
|
status string
|
||||||
|
eth1BlockHash string
|
||||||
)
|
)
|
||||||
|
|
||||||
if ps.StateRoot != "" {
|
if ps.Status == "skipped" {
|
||||||
stateRoot = ps.StateRoot
|
stateRoot = ""
|
||||||
|
blockRoot = ""
|
||||||
|
eth1BlockHash = ""
|
||||||
} else {
|
} else {
|
||||||
stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot)
|
if ps.StateRoot != "" {
|
||||||
log.Debug("StateRoot: ", stateRoot)
|
stateRoot = ps.StateRoot
|
||||||
}
|
} else {
|
||||||
|
stateRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot)
|
||||||
|
log.Debug("StateRoot: ", stateRoot)
|
||||||
|
}
|
||||||
|
|
||||||
// MUST RESOLVE!
|
if ps.BlockRoot != "" {
|
||||||
if ps.BlockRoot != "" {
|
blockRoot = ps.BlockRoot
|
||||||
blockRoot = ps.BlockRoot
|
} else {
|
||||||
} else {
|
var err error
|
||||||
log.Info("We need to add logic")
|
blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
|
||||||
// We need to get the state of Slot + 1, then we can run the below.
|
if err != nil {
|
||||||
// WE can query it for each run, or we can leave it blank, and update it.
|
return nil, err
|
||||||
// I just want to avoid getting the same state twice, especially since the state can get heavy.
|
}
|
||||||
// blockRoot = "0x" + hex.EncodeToString(ps.FullBeaconState.GetBlockRoots()[ps.Slot%bcSlotPerHistoricalVector][:])
|
}
|
||||||
// log.Debug("Block Root: ", blockRoot)
|
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
||||||
// log.Debug("ps.Slott: ", ps.Slot)
|
|
||||||
// Maybe we can use the helper down the road.
|
|
||||||
//blockRootRaw, _ := helper.BlockRootAtSlot(ps.FullBeaconState, ps.FullSignedBeaconBlock.Block.Slot)
|
|
||||||
//blockRoot = string(blockRootRaw)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ps.Status != "" {
|
if ps.Status != "" {
|
||||||
@ -265,11 +259,9 @@ func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
|
|||||||
status = "proposed"
|
status = "proposed"
|
||||||
}
|
}
|
||||||
|
|
||||||
eth1BlockHash := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.Body.Eth1Data.BlockHash)
|
|
||||||
|
|
||||||
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
|
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.Metrics)
|
||||||
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
|
||||||
dw.rawBeaconState = ps.SszBeaconState
|
dw.rawBeaconState = ps.SszBeaconState
|
||||||
|
|
||||||
return dw
|
return dw, nil
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
package beaconclient
|
package beaconclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -35,3 +36,46 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
|
|||||||
}
|
}
|
||||||
return body, rc, nil
|
return body, rc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Object to unmarshal the BlockRootResponse
|
||||||
|
type BlockRootResponse struct {
|
||||||
|
Data BlockRootMessage `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Object to unmarshal the BlockRoot Message
|
||||||
|
type BlockRootMessage struct {
|
||||||
|
Root string `json:"root"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// A function to query the blockroot for a given slot.
|
||||||
|
func queryBlockRoot(endpoint string, slot string) (string, error) {
|
||||||
|
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
|
||||||
|
client := &http.Client{}
|
||||||
|
req, err := http.NewRequest("GET", endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
|
||||||
|
return "", fmt.Errorf("Unable to create a request!: %s", err.Error())
|
||||||
|
}
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
response, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
|
||||||
|
return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
|
||||||
|
}
|
||||||
|
defer response.Body.Close()
|
||||||
|
body, err := ioutil.ReadAll(response.Body)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
|
||||||
|
return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := BlockRootResponse{}
|
||||||
|
if err := json.Unmarshal(body, &resp); err != nil {
|
||||||
|
loghelper.LogEndpoint(endpoint).WithFields(log.Fields{
|
||||||
|
"rawMessage": string(body),
|
||||||
|
"err": err,
|
||||||
|
}).Error("Unable to unmarshal the block root")
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return resp.Data.Root, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user