From 33fefda124e893b2291b7dbe383a99cc3569193a Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Wed, 4 May 2022 09:34:23 -0400 Subject: [PATCH] Create DB models ready for write. --- pkg/beaconclient/beaconclient.go | 1 + pkg/beaconclient/capturehead.go | 57 ++++---- pkg/beaconclient/databasewrite.go | 13 ++ pkg/beaconclient/models.go | 7 +- pkg/beaconclient/processevents.go | 23 ++-- pkg/beaconclient/processslot.go | 211 +++++++++++++++++++++++++----- pkg/beaconclient/queryserver.go | 12 +- 7 files changed, 243 insertions(+), 81 deletions(-) create mode 100644 pkg/beaconclient/databasewrite.go diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index e1c6d1b..a114263 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -15,6 +15,7 @@ var ( bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States + bcSlotsPerEpoch = 32 // Number of slots in a single Epoch ) // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index 58583bd..145aac3 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -5,7 +5,6 @@ package beaconclient import ( "time" - "github.com/ferranbt/fastssz/spectests" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) @@ -21,34 +20,34 @@ func (bc *BeaconClient) CaptureHead() { } // A temporary helper function to see the output of beacon block and states. -func (bc *BeaconClient) tempHelper() { - slot := "3200" - blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot - stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot - // Query - log.Info("Get") - blockSsz, _ := querySsz(blockEndpoint, slot) - stateSsz, _ := querySsz(stateEndpoint, slot) - // Transform - log.Info("Tranform") - stateObj := new(spectests.BeaconState) - err := stateObj.UnmarshalSSZ(stateSsz) - if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") - } - - blockObj := new(spectests.SignedBeaconBlock) - err = blockObj.UnmarshalSSZ(blockSsz) - if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") - } - - // Check - log.Info("Check") - log.Info("State Slot: ", stateObj.Slot) - log.Info("Block Slot: ", blockObj.Block.Slot) -} - +//func (bc *BeaconClient) tempHelper() { +// slot := "3200" +// blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot +// stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot +// // Query +// log.Info("Get") +// blockSsz, _ := querySsz(blockEndpoint, slot) +// stateSsz, _ := querySsz(stateEndpoint, slot) +// // Transform +// log.Info("Tranform") +// stateObj := new(spectests.BeaconState) +// err := stateObj.UnmarshalSSZ(stateSsz) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// } +// +// blockObj := new(spectests.SignedBeaconBlock) +// err = blockObj.UnmarshalSSZ(blockSsz) +// if err != nil { +// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") +// } +// +// // Check +// log.Info("Check") +// log.Info("State Slot: ", stateObj.Slot) +// log.Info("Block Slot: ", blockObj.Block.Slot) +//} +// // Stop the head tracking service. func (bc *BeaconClient) StopHeadTracking() error { log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go new file mode 100644 index 0000000..8bd49e4 --- /dev/null +++ b/pkg/beaconclient/databasewrite.go @@ -0,0 +1,13 @@ +package beaconclient + +import "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + +type DatabaseWriter struct { + Db sql.Database + DbSlots *DbSlots + DbSignedBeaconBlock *DbSignedBeaconBlock + DbBeaconState *DbBeaconState +} + +// Write functions to write each all together... +// Should I do one atomic write? diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index fe520db..7ac828b 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -44,7 +44,7 @@ type DbSlots struct { Slot *big.Int // The slot. BlockRoot string // The block root StateRoot string // The state root - status string // The status, it can be proposed | forked | missed. + Status string // The status, it can be proposed | forked | missed. } // A struct to capture whats being written to ethcl.signed_beacon_block table. @@ -52,12 +52,13 @@ type DbSignedBeaconBlock struct { Slot *big.Int // The slot. BlockRoot string // The block root ParentBlock string // The parent block root. - mh_key string // The ipld multihash key. + MhKey string // The ipld multihash key. } +// A struct to capture whats being written to ethcl.beacon_state table. type DbBeaconState struct { Slot *big.Int // The slot. StateRoot string // The state root - mh_key string // The ipld multihash key. + MhKey string // The ipld multihash key. } diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index db29e56..0d39fde 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -8,6 +8,7 @@ import ( "strconv" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) // This function will perform the necessary steps to handle a reorg. @@ -37,19 +38,21 @@ func (bc *BeaconClient) handleHead() { for { head := <-bc.HeadTracking.ProcessCh // Process all the work here. - - // Update the previous block if its the first message. - if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" { - var err error - bc.PreviousSlot, err = strconv.Atoi(head.Slot) - if err != nil { - bc.HeadTracking.ErrorCh <- &SseError{ - err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), - } + slot, err := strconv.Atoi(head.Slot) + if err != nil { + bc.HeadTracking.ErrorCh <- &SseError{ + err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), } - bc.PreviousBlockRoot = head.Block + } + err = handleHeadSlot(bc.ServerEndpoint, slot, head.Block, head.State, uint64(bc.PreviousSlot), bc.PreviousBlockRoot) + if err != nil { + loghelper.LogSlotError(head.Slot, err) } log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") + + // Update the previous block + bc.PreviousSlot = slot + bc.PreviousBlockRoot = head.Block } } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 5da56cb..f05c0d7 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -7,6 +7,9 @@ package beaconclient import ( "encoding/hex" "fmt" + "math/big" + "strconv" + "strings" "github.com/ferranbt/fastssz/spectests" log "github.com/sirupsen/logrus" @@ -14,119 +17,160 @@ import ( ) var ( - SlotUnmarshalError = "Unable to properly unmarshal the Slot field in the SignedBeaconBlock." + SlotUnmarshalError = func(obj string) string { + return fmt.Sprintf("Unable to properly unmarshal the Slot field in the %s.", obj) + } + + //SlotUnmarshalError = "Unable to properly unmarshal the Slot field in the SignedBeaconBlock." ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock." - MissingIdentifiedError = "Can't Query state without a set slot or block_root" + MissingIdentifiedError = "Can't query state without a set slot or block_root" ) type ProcessSlot struct { - Slot string // The slot number. + Slot int // The slot number. + Epoch int // The epoch number. BlockRoot string // The hex encoded string of the BlockRoot. StateRoot string // The hex encoded string of the StateRoot. ParentBlockRoot string // The hex encoded string of the parent block. + Status string // The status of the block + HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock SszBeaconState []byte // The entire SSZ encoded BeaconState FullBeaconState *spectests.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors. FullSignedBeaconBlock *spectests.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors. } -// This function will do all the work to process the slot at head. -func processHeadSlot(baseEndpoint string, slot string, blockRoot string, stateRoot string, parentBlockRoot string, previousSlot uint64, previousBlockRoot string) error { - pc := &ProcessSlot{ - Slot: slot, - BlockRoot: blockRoot, - StateRoot: stateRoot, - ParentBlockRoot: parentBlockRoot, +// This function will do all the work to process the slot and write it to the DB. +func handleFullSlot(serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string, headOrHistoric string) error { + headOrHistoric = strings.ToLower(headOrHistoric) + if headOrHistoric != "head" && headOrHistoric != "historic" { + return fmt.Errorf("headOrBatch must be either historic or head!") } - err := pc.getSignedBeaconBlock(baseEndpoint) + ps := &ProcessSlot{ + Slot: slot, + BlockRoot: blockRoot, + StateRoot: stateRoot, + HeadOrHistoric: headOrHistoric, + } + + // Get the SignedBeaconBlock. + err := ps.getSignedBeaconBlock(serverAddress) if err != nil { return err } - err = pc.getBeaconState(baseEndpoint) + // Get the BeaconState. + err = ps.getBeaconState(serverAddress) if err != nil { return err } // Handle any reorgs or skipped slots. - if previousSlot != 0 && previousBlockRoot != "" { - pc.checkPreviousSlot(previousSlot, previousBlockRoot) + if ps.HeadOrHistoric == "head" { + if previousSlot != 0 && previousBlockRoot != "" { + ps.checkPreviousSlot(previousSlot, previousBlockRoot) + } } // Get this object ready to write + ps.createWriteObjects() // Write the object to the DB. return nil } +// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. +func handleHeadSlot(serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string) error { + log.Debug("handleHeadSlot") + return handleFullSlot(serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head") +} + +// Handle a historic slot. A wrapper function for calling `handleFullSlot`. +func handleHistoricSlot(serverAddress string, slot int) error { + return handleFullSlot(serverAddress, slot, "", "", 0, "", "historic") +} + // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. -func (ps *ProcessSlot) getSignedBeaconBlock(baseEndpoint string) error { +func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { var blockIdentifier string // Used to query the block if ps.BlockRoot != "" { blockIdentifier = ps.BlockRoot - } else if ps.Slot != "" { - blockIdentifier = ps.Slot + } else if ps.Slot != 0 { + blockIdentifier = strconv.Itoa(ps.Slot) } else { log.Error(MissingIdentifiedError) return fmt.Errorf(MissingIdentifiedError) } - blockEndpoint := baseEndpoint + blockIdentifier - ps.SszSignedBeaconBlock, _ = querySsz(blockEndpoint, ps.Slot) + blockEndpoint := serverAddress + bcBlockQueryEndpoint + blockIdentifier + var err error + var rc int + ps.SszSignedBeaconBlock, rc, err = querySsz(blockEndpoint, strconv.Itoa(ps.Slot)) + if err != nil { + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.") + return err + } + + if rc != 200 { + ps.checkMissedSlot() + } ps.FullSignedBeaconBlock = new(spectests.SignedBeaconBlock) - err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock) + err = ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock) if err != nil { if ps.FullSignedBeaconBlock.Block.Slot == 0 { - loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError) - return fmt.Errorf(SlotUnmarshalError) + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("SignedBeaconBlock")) + return fmt.Errorf(SlotUnmarshalError("SignedBeaconBlock")) } else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil { - loghelper.LogSlotError(ps.Slot, err).Error(ParentRootUnmarshalError) + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(ParentRootUnmarshalError) return fmt.Errorf(ParentRootUnmarshalError) } } + ps.ParentBlockRoot = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) return nil } // Update the SszBeaconState and FullBeaconState object with their respective values. -func (ps *ProcessSlot) getBeaconState(baseEndpoint string) error { +func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error { var stateIdentifier string // Used to query the state if ps.StateRoot != "" { - stateIdentifier = ps.BlockRoot - } else if ps.Slot != "" { - stateIdentifier = ps.Slot + stateIdentifier = ps.StateRoot + } else if ps.Slot != 0 { + stateIdentifier = strconv.Itoa(ps.Slot) } else { log.Error(MissingIdentifiedError) return fmt.Errorf(MissingIdentifiedError) } - stateEndpoint := baseEndpoint + stateIdentifier - ps.SszBeaconState, _ = querySsz(stateEndpoint, ps.Slot) + stateEndpoint := serverEndpoint + bcStateQueryEndpoint + stateIdentifier + ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) ps.FullBeaconState = new(spectests.BeaconState) - err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszBeaconState) + err := ps.FullBeaconState.UnmarshalSSZ(ps.SszBeaconState) if err != nil { if ps.FullBeaconState.Slot == 0 { - loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError) - return fmt.Errorf(SlotUnmarshalError) + loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(SlotUnmarshalError("BeaconState")) + return fmt.Errorf(SlotUnmarshalError("BeaconState")) } } return nil } +// Check to make sure that the previous block we processed is the parent of the current block. func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot string) { if previousSlot == ps.FullBeaconState.Slot { log.WithFields(log.Fields{ "slot": ps.FullBeaconState.Slot, "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - // Handle Forks + // mark old slot as forked. } else if previousSlot-1 != ps.FullBeaconState.Slot { log.WithFields(log.Fields{ "previousSlot": previousSlot, "currentSlot": ps.FullBeaconState.Slot, }).Error("We skipped a few slots.") + // Check to see if the slot was skipped. // Call our batch processing function. } else if previousBlockRoot != "0x"+hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) { log.WithFields(log.Fields{ @@ -134,5 +178,106 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot "currentBlockParent": ps.FullSignedBeaconBlock.Block.ParentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") // Handle Forks + // Mark the previous slot in the DB as a fork. + // Continue with this slot. } } + +// Add logic for checking a missed Slot +// IF the state is present but block is not, then it was skipped??? +// If the state and block are both absent, then the block might be missing?? +// IF state is absent but block is not, there might be an issue with the LH client. +// Check the previous and following slot? +// Check if head or historic. + +// 1. BeaconBlock is 404. +// 2. check heck /lighthouse/database/info to make sure the oldest_block_slot == 0 and anchor == null. This indicates that I don't have any gaps in the DB. +// 3. I query BeaconState for slot X, and get a BeaconState. +// 4. Although for good measure you should also check that the head is at a slot >= X using something like /eth/v1/node/syncing/ or /eth/v1/beacon/headers/head +func (ps *ProcessSlot) checkMissedSlot() { + +} + +// Transforms all the raw data into DB models that can be written to the DB. +func (ps *ProcessSlot) createWriteObjects() { + var ( + stateRoot string + blockRoot string + status string + ) + + if ps.StateRoot != "" { + stateRoot = ps.StateRoot + } else { + stateRoot = hex.EncodeToString(ps.FullSignedBeaconBlock.Block.StateRoot) + } + + if ps.BlockRoot != "" { + blockRoot = ps.BlockRoot + } else { + // wait for response from Prysm. + // get the blockroot + } + + if ps.Status != "" { + status = ps.StateRoot + } else { + status = "proposed" + } + // Function to write slots table + ps.prepareSlotsModel(stateRoot, blockRoot, status) + + // function to write block + ps.prepareSignedBeaconBlockModel(blockRoot) + + // function to write state + ps.prepareBeaconStateModel(stateRoot) +} + +// Create the model for the ethcl.slots table +func (ps *ProcessSlot) prepareSlotsModel(stateRoot string, blockRoot string, status string) *DbSlots { + dbSlots := &DbSlots{ + Epoch: calculateEpoch(ps.Slot, bcSlotsPerEpoch), + Slot: big.NewInt(int64(ps.Slot)), + StateRoot: stateRoot, + BlockRoot: blockRoot, + Status: status, + } + log.Debug("DbSlots: ", dbSlots) + return dbSlots + +} + +// Create the model for the ethcl.signed_beacon_block table. +func (ps *ProcessSlot) prepareSignedBeaconBlockModel(blockRoot string) *DbSignedBeaconBlock { + dbSignedBeaconBlock := &DbSignedBeaconBlock{ + Slot: big.NewInt(int64(ps.Slot)), + BlockRoot: blockRoot, + ParentBlock: ps.ParentBlockRoot, + MhKey: calculateMhKey(), + } + log.Debug("dbSignedBeaconBlock: ", dbSignedBeaconBlock) + return dbSignedBeaconBlock +} + +// Create the model for the ethcl.beacon_state table. +func (ps *ProcessSlot) prepareBeaconStateModel(stateRoot string) *DbBeaconState { + dbBeaconState := &DbBeaconState{ + Slot: big.NewInt(int64(ps.Slot)), + StateRoot: stateRoot, + MhKey: calculateMhKey(), + } + + log.Debug("dbBeaconState: ", dbBeaconState) + return dbBeaconState +} + +// A quick helper function to calculate the epoch. +func calculateEpoch(slot int, slotPerEpoch int) *big.Int { + epoch := slot / slotPerEpoch + return big.NewInt(int64(epoch)) +} + +func calculateMhKey() string { + return "" +} diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 14dd0a1..272d429 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -76,26 +76,26 @@ import ( // } // A helper function to query endpoints that utilize slots. -func querySsz(endpoint string, slot string) ([]byte, error) { +func querySsz(endpoint string, slot string) ([]byte, int, error) { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint") client := &http.Client{} req, err := http.NewRequest("GET", endpoint, nil) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to create a request!") - return nil, fmt.Errorf("Unable to create a request!: %s", err.Error()) + return nil, 0, fmt.Errorf("Unable to create a request!: %s", err.Error()) } - // Not set correctly req.Header.Set("Accept", "application/octet-stream") response, err := client.Do(req) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") - return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) + return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) } defer response.Body.Close() + rc := response.StatusCode body, err := ioutil.ReadAll(response.Body) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") - return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) + return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } - return body, nil + return body, rc, nil }