Capture the head block in the DB entirely. #27

Merged
abdulrabbani00 merged 13 commits from feature/20-write-sse-events-to-the-db into develop 2022-05-06 15:03:16 +00:00
12 changed files with 109 additions and 40 deletions
Showing only changes of commit f80fb9d896 - Show all commits

View File

@ -37,7 +37,7 @@ func startHeadTracking() {
}
// Capture head blocks
go BC.CaptureHead(DB)
go BC.CaptureHead()
// Shutdown when the time is right.
notifierCh := make(chan os.Signal, 1)

View File

@ -73,6 +73,8 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
if err != nil {
return nil, nil, err
}
BC.Db = DB
return BC, DB, nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
)
// TODO: Use prysms config values instead of hardcoding them here.
@ -25,6 +26,7 @@ type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?

View File

@ -6,17 +6,16 @@ import (
"time"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(db sql.Database) {
func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.")
//bc.tempHelper()
go bc.handleHead(db)
go bc.handleHead()
//go bc.handleFinalizedCheckpoint()
go bc.handleReorgs()
go bc.handleReorg()
bc.captureEventTopic()
}

View File

@ -29,6 +29,10 @@ VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
UpsertBlocksStmt string = `
INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateReorgStmt string = `UPDATE ethcl.slots
SET status=forked
WHERE slot=$1 AND block_hash<>$2
RETURNING block_hash;`
)
// Put all functionality to prepare the write object
@ -92,6 +96,8 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) {
// Write all the data for a given slot.
func (dw *DatabaseWriter) writeFullSlot() {
// Add errors for each function call
// If an error occurs, write to knownGaps table.
dw.writeSlots()
dw.writeSignedBeaconBlocks()
dw.writeBeaconState()
@ -149,6 +155,23 @@ func (dw *DatabaseWriter) upsertBeaconState() {
}
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func updateReorgs(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateReorgStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with reorgs.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were effected by the reorg.")
return 0, err
}
return count, nil
}
// Dummy function for calculating the mhKey.
func calculateMhKey() string {
rand.Seed(time.Now().UnixNano())

View File

@ -8,33 +8,21 @@ import (
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorgs() {
func (bc *BeaconClient) handleReorg() {
log.Info("Starting to process reorgs.")
for {
// We will add real functionality later
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock)
}
}
// // This function will perform the necessary steps to handle a reorg.
// func (bc *BeaconClient) handleFinalizedCheckpoint() {
// log.Info("Starting to process finalized checkpoints.")
// for {
// // We will add real functionality later
// finalized := <-bc.FinalizationTracking.ProcessCh
// log.WithFields(log.Fields{"finalized": finalized}).Debug("Received a new finalized checkpoint.")
// }
//
// }
// This function will handle the latest head event.
func (bc *BeaconClient) handleHead(db sql.Database) {
func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.")
for {
head := <-bc.HeadTracking.ProcessCh
@ -45,7 +33,7 @@ func (bc *BeaconClient) handleHead(db sql.Database) {
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
}
err = handleHeadSlot(db, bc.ServerEndpoint, slot, head.Block, head.State, uint64(bc.PreviousSlot), bc.PreviousBlockRoot)
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot)
if err != nil {
loghelper.LogSlotError(head.Slot, err)
}

View File

@ -0,0 +1,31 @@
package beaconclient
import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
func processReorg(db sql.Database, slot string, latestBlockRoot string) {
// Check to see if there are slots in the DB with the given slot.
// Update them ALL to forked
// Upsert the new slot into the DB, mark the status to proposed.
// Query at the end to make sure that you have handled the reorg properly.
updatedRows, err := updateReorgs(db, slot, latestBlockRoot)
if err != nil {
// Add this slot to the knownGaps table..
// Maybe we need to rename the knownGaps table to the "batchProcess" table.
}
if updatedRows > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Info("Updated DB based on Reorgs.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Warn("There were no rows to update.")
}
}

View File

@ -30,13 +30,14 @@ var (
type ProcessSlot struct {
// Generic
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
Db sql.Database // The DB object used to write to the DB.
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
@ -53,7 +54,7 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
func handleFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string, headOrHistoric string) error {
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
@ -94,8 +95,8 @@ func handleFullSlot(db sql.Database, serverAddress string, slot int, blockRoot s
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func handleHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot uint64, previousBlockRoot string) error {
return handleFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head")
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head")
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
@ -171,27 +172,28 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
}
// Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot string) {
if previousSlot == uint64(ps.FullBeaconState.Slot) {
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot)
if previousSlot == int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
// mark old slot as forked.
} else if previousSlot-1 != uint64(ps.FullBeaconState.Slot) {
processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.")
// Check to see if the slot was skipped.
// Call our batch processing function.
} else if previousBlockRoot != "0x"+hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) {
} else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": ps.FullSignedBeaconBlock.Block.ParentRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
// Handle Forks
// Mark the previous slot in the DB as a fork.
processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot)
// Call our batch processing function.
// Continue with this slot.
} else {
log.Debug("Previous Slot and Current Slot are one distance from each other.")

View File

@ -13,7 +13,7 @@ import (
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {

22
pkg/loghelper/logreorg.go Normal file
View File

@ -0,0 +1,22 @@
package loghelper
import (
log "github.com/sirupsen/logrus"
)
// A simple helper function that will help wrap the reorg error messages.
func LogReorgError(slot string, latestBlockRoot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
"latestBlockRoot": latestBlockRoot,
})
}
// A simple helper function that will help wrap regular reorg messages.
func LogReorg(slot string, latestBlockRoot string) *log.Entry {
return log.WithFields(log.Fields{
"slot": slot,
"latestBlockRoot": latestBlockRoot,
})
}