Capture the head block in the DB entirely. #27

Merged
abdulrabbani00 merged 13 commits from feature/20-write-sse-events-to-the-db into develop 2022-05-06 15:03:16 +00:00
7 changed files with 243 additions and 81 deletions
Showing only changes of commit 33fefda124 - Show all commits

View File

@ -15,6 +15,7 @@ var (
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
)
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.

View File

@ -5,7 +5,6 @@ package beaconclient
import (
"time"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
@ -21,34 +20,34 @@ func (bc *BeaconClient) CaptureHead() {
}
// A temporary helper function to see the output of beacon block and states.
func (bc *BeaconClient) tempHelper() {
slot := "3200"
blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// Query
log.Info("Get")
blockSsz, _ := querySsz(blockEndpoint, slot)
stateSsz, _ := querySsz(stateEndpoint, slot)
// Transform
log.Info("Tranform")
stateObj := new(spectests.BeaconState)
err := stateObj.UnmarshalSSZ(stateSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
blockObj := new(spectests.SignedBeaconBlock)
err = blockObj.UnmarshalSSZ(blockSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
// Check
log.Info("Check")
log.Info("State Slot: ", stateObj.Slot)
log.Info("Block Slot: ", blockObj.Block.Slot)
}
//func (bc *BeaconClient) tempHelper() {
// slot := "3200"
// blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
// stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// // Query
// log.Info("Get")
// blockSsz, _ := querySsz(blockEndpoint, slot)
// stateSsz, _ := querySsz(stateEndpoint, slot)
// // Transform
// log.Info("Tranform")
// stateObj := new(spectests.BeaconState)
// err := stateObj.UnmarshalSSZ(stateSsz)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// }
//
// blockObj := new(spectests.SignedBeaconBlock)
// err = blockObj.UnmarshalSSZ(blockSsz)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// }
//
// // Check
// log.Info("Check")
// log.Info("State Slot: ", stateObj.Slot)
// log.Info("Block Slot: ", blockObj.Block.Slot)
//}
//
// Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")

View File

@ -0,0 +1,13 @@
package beaconclient
import "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
type DatabaseWriter struct {
Db sql.Database
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
}
// Write functions to write each all together...
// Should I do one atomic write?

View File

@ -44,7 +44,7 @@ type DbSlots struct {
Slot *big.Int // The slot.
BlockRoot string // The block root
StateRoot string // The state root
status string // The status, it can be proposed | forked | missed.
Status string // The status, it can be proposed | forked | missed.
}
// A struct to capture whats being written to ethcl.signed_beacon_block table.
@ -52,12 +52,13 @@ type DbSignedBeaconBlock struct {
Slot *big.Int // The slot.
BlockRoot string // The block root
ParentBlock string // The parent block root.
mh_key string // The ipld multihash key.
MhKey string // The ipld multihash key.
}
// A struct to capture whats being written to ethcl.beacon_state table.
type DbBeaconState struct {
Slot *big.Int // The slot.
StateRoot string // The state root
mh_key string // The ipld multihash key.
MhKey string // The ipld multihash key.
}

View File

@ -8,6 +8,7 @@ import (
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform the necessary steps to handle a reorg.
@ -37,19 +38,21 @@ func (bc *BeaconClient) handleHead() {
for {
head := <-bc.HeadTracking.ProcessCh
// Process all the work here.
// Update the previous block if its the first message.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
var err error
bc.PreviousSlot, err = strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
slot, err := strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
bc.PreviousBlockRoot = head.Block
}
err = handleHeadSlot(bc.ServerEndpoint, slot, head.Block, head.State, uint64(bc.PreviousSlot), bc.PreviousBlockRoot)
if err != nil {
loghelper.LogSlotError(head.Slot, err)
}
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
}
}

View File

@ -7,6 +7,9 @@ package beaconclient
import (
"encoding/hex"
"fmt"
"math/big"
"strconv"
"strings"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus"
@ -14,119 +17,160 @@ import (
)
var (
SlotUnmarshalError = "Unable to properly unmarshal the Slot field in the SignedBeaconBlock."
SlotUnmarshalError = func(obj string) string {
return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj)
}
//SlotUnmarshalError = "Unable to properly unmarshal the Slot field in the SignedBeaconBlock."
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't Query state without a set slot or block_root"
MissingIdentifiedError = "Can't query state without a set slot or block_root"
)
type ProcessSlot struct {
Slot string // The slot number.
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
SszBeaconState []byte // The entire SSZ encoded BeaconState
FullBeaconState *spectests.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
FullSignedBeaconBlock *spectests.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
}
// This function will do all the work to process the slot at head.
func processHeadSlot(baseEndpoint string, slot string, blockRoot string, stateRoot string, parentBlockRoot string, previousSlot uint64, previousBlockRoot string) error {
pc := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
StateRoot: stateRoot,
ParentBlockRoot: parentBlockRoot,
// This function will do all the work to process the slot and write it to the DB.
func handleFullSlot(serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string, headOrHistoric string) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
}
err := pc.getSignedBeaconBlock(baseEndpoint)
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
StateRoot: stateRoot,
HeadOrHistoric: headOrHistoric,
}
// Get the SignedBeaconBlock.
err := ps.getSignedBeaconBlock(serverAddress)
if err != nil {
return err
}
err = pc.getBeaconState(baseEndpoint)
// Get the BeaconState.
err = ps.getBeaconState(serverAddress)
if err != nil {
return err
}
// Handle any reorgs or skipped slots.
if previousSlot != 0 && previousBlockRoot != "" {
pc.checkPreviousSlot(previousSlot, previousBlockRoot)
if ps.HeadOrHistoric == "head" {
if previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
}
}
// Get this object ready to write
ps.createWriteObjects()
// Write the object to the DB.
return nil
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func handleHeadSlot(serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string) error {
log.Debug("handleHeadSlot")
return handleFullSlot(serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head")
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(serverAddress string, slot int) error {
return handleFullSlot(serverAddress, slot, "", "", 0, "", "historic")
}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(baseEndpoint string) error {
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot
} else if ps.Slot != "" {
blockIdentifier = ps.Slot
} else if ps.Slot != 0 {
blockIdentifier = strconv.Itoa(ps.Slot)
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
blockEndpoint := baseEndpoint + blockIdentifier
ps.SszSignedBeaconBlock, _ = querySsz(blockEndpoint, ps.Slot)
blockEndpoint := serverAddress + bcBlockQueryEndpoint + blockIdentifier
var err error
var rc int
ps.SszSignedBeaconBlock, rc, err = querySsz(blockEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.")
return err
}
if rc != 200 {
ps.checkMissedSlot()
}
ps.FullSignedBeaconBlock = new(spectests.SignedBeaconBlock)
err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock)
err = ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock)
if err != nil {
if ps.FullSignedBeaconBlock.Block.Slot == 0 {
loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError)
return fmt.Errorf(SlotUnmarshalError)
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("SignedBeaconBlock"))
return fmt.Errorf(SlotUnmarshalError("SignedBeaconBlock"))
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
loghelper.LogSlotError(ps.Slot, err).Error(ParentRootUnmarshalError)
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError)
return fmt.Errorf(ParentRootUnmarshalError)
}
}
ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
return nil
}
// Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(baseEndpoint string) error {
func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
var stateIdentifier string // Used to query the state
if ps.StateRoot != "" {
stateIdentifier = ps.BlockRoot
} else if ps.Slot != "" {
stateIdentifier = ps.Slot
stateIdentifier = ps.StateRoot
} else if ps.Slot != 0 {
stateIdentifier = strconv.Itoa(ps.Slot)
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
stateEndpoint := baseEndpoint + stateIdentifier
ps.SszBeaconState, _ = querySsz(stateEndpoint, ps.Slot)
stateEndpoint := serverEndpoint + bcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
ps.FullBeaconState = new(spectests.BeaconState)
err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszBeaconState)
err := ps.FullBeaconState.UnmarshalSSZ(ps.SszBeaconState)
if err != nil {
if ps.FullBeaconState.Slot == 0 {
loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError)
return fmt.Errorf(SlotUnmarshalError)
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("BeaconState"))
return fmt.Errorf(SlotUnmarshalError("BeaconState"))
}
}
return nil
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot string) {
if previousSlot == ps.FullBeaconState.Slot {
log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
// Handle Forks
// mark old slot as forked.
} else if previousSlot-1 != ps.FullBeaconState.Slot {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.")
// Check to see if the slot was skipped.
// Call our batch processing function.
} else if previousBlockRoot != "0x"+hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) {
log.WithFields(log.Fields{
@ -134,5 +178,106 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot
"currentBlockParent": ps.FullSignedBeaconBlock.Block.ParentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
// Handle Forks
// Mark the previous slot in the DB as a fork.
// Continue with this slot.
}
}
// Add logic for checking a missed Slot
// IF the state is present but block is not, then it was skipped???
// If the state and block are both absent, then the block might be missing??
// IF state is absent but block is not, there might be an issue with the LH client.
// Check the previous and following slot?
// Check if head or historic.
// 1. BeaconBlock is 404.
// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB.
// 3. I query BeaconState for slot X, and get a BeaconState.
// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head
func (ps *ProcessSlot) checkMissedSlot() {
}
// Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects() {
var (
stateRoot string
blockRoot string
status string
)
if ps.StateRoot != "" {
stateRoot = ps.StateRoot
} else {
stateRoot = hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot)
}
if ps.BlockRoot != "" {
blockRoot = ps.BlockRoot
} else {
// wait for response from Prysm.
// get the blockroot
}
if ps.Status != "" {
status = ps.StateRoot
} else {
status = "proposed"
}
// Function to write slots table
ps.prepareSlotsModel(stateRoot, blockRoot, status)
// function to write block
ps.prepareSignedBeaconBlockModel(blockRoot)
// function to write state
ps.prepareBeaconStateModel(stateRoot)
}
// Create the model for the ethcl.slots table
func (ps *ProcessSlot) prepareSlotsModel(stateRoot string, blockRoot string, status string) *DbSlots {
dbSlots := &DbSlots{
Epoch: calculateEpoch(ps.Slot, bcSlotsPerEpoch),
Slot: big.NewInt(int64(ps.Slot)),
StateRoot: stateRoot,
BlockRoot: blockRoot,
Status: status,
}
log.Debug("DbSlots: ", dbSlots)
return dbSlots
}
// Create the model for the ethcl.signed_beacon_block table.
func (ps *ProcessSlot) prepareSignedBeaconBlockModel(blockRoot string) *DbSignedBeaconBlock {
dbSignedBeaconBlock := &DbSignedBeaconBlock{
Slot: big.NewInt(int64(ps.Slot)),
BlockRoot: blockRoot,
ParentBlock: ps.ParentBlockRoot,
MhKey: calculateMhKey(),
}
log.Debug("dbSignedBeaconBlock: ", dbSignedBeaconBlock)
return dbSignedBeaconBlock
}
// Create the model for the ethcl.beacon_state table.
func (ps *ProcessSlot) prepareBeaconStateModel(stateRoot string) *DbBeaconState {
dbBeaconState := &DbBeaconState{
Slot: big.NewInt(int64(ps.Slot)),
StateRoot: stateRoot,
MhKey: calculateMhKey(),
}
log.Debug("dbBeaconState: ", dbBeaconState)
return dbBeaconState
}
// A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) *big.Int {
epoch := slot / slotPerEpoch
return big.NewInt(int64(epoch))
}
func calculateMhKey() string {
return ""
}

View File

@ -76,26 +76,26 @@ import (
// }
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, error) {
func querySsz(endpoint string, slot string) ([]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return nil, fmt.Errorf("Unable to create a request!: %s", err.Error())
return nil, 0, fmt.Errorf("Unable to create a request!: %s", err.Error())
}
// Not set correctly
req.Header.Set("Accept", "application/octet-stream")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
rc := response.StatusCode
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, nil
return body, rc, nil
}