7a242a4c71
* multihash key gen func * go mod updates * Added test to ensure the application shuts down gracefully or within a timeframe. * Disregard race condition since its with the test not the application itself * Capture the head block in the DB entirely. (#27) * -- Intermediary Commit -- Just want to commit my code over the weekend, in case I spill coffee on my workstation. * Create DB models ready for write. * Handle SSE events * Update ref for stack-orchestrator * Use env in one place only. * Boot Application on PR * Update syntax * Update syntax * Correct command * Use bash instead of sh * Use until instead of while * Make linter happy and check sse subscription err * Handle Reorgs - Untested * Feature/22 test handling incoming events - Intermediary Commit (#28) * Checkpoint before the weekend * Update location for SetupPostgresDB * Feature/22 test handling incoming events (#30) * Checkpoint before the weekend * Update location for SetupPostgresDB * Include first functioning tests for processing head * Fix gitignore * Test CaptureHead | Add Metrics | Handle Test Race Conditions This Commit allows us to: * Test the `CaptureHead` function. * Test parsing a single Head message. * Test a Reorg condition. * Add Metrics. This is primarily used for testing but can have future use cases. * Rearrange the test due to race conditions introduced by reusing a variable. `BeforeEach` can't be used to update `BC`. * Update and finalize testing at this stage * Update code and CI/CD * Fix lint errors * Update CICD and fail when file not found. * Update test to have failed as expected. * Remove Test file * Add KnownGaps Errors (#33) * Handle Skipped Slots (#34) * Ensure that the node is synced at boot time * Update test + add logic for checking skipped slots * Update boot check * Add skip_sync to config. * Update a test so it fails * go mod updates * Integrate MHKey into existing code base. * Update go.mod and go.sum * Utilize the MHkey * Stop tests from running forever on failure. * Use sszRoot instead of sszObj for MhKey * Update entrypoint script * Update config parameter Co-authored-by: Abdul Rabbani <abdulrabbani00@gmail.com> Co-authored-by: Abdul Rabbani <58230246+abdulrabbani00@users.noreply.github.com>
392 lines
14 KiB
Go
392 lines
14 KiB
Go
package beaconclient
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
|
)
|
|
|
|
var (
|
|
// Statement to upsert to the ethcl.slots table.
|
|
UpsertSlotsStmt string = `
|
|
INSERT INTO ethcl.slots (epoch, slot, block_root, state_root, status)
|
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
|
// Statement to upsert to the ethcl.signed_beacon_blocks table.
|
|
UpsertSignedBeaconBlockStmt string = `
|
|
INSERT INTO ethcl.signed_beacon_block (slot, block_root, parent_block_root, eth1_block_hash, mh_key)
|
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (slot, block_root) DO NOTHING`
|
|
// Statement to upsert to the ethcl.beacon_state table.
|
|
UpsertBeaconState string = `
|
|
INSERT INTO ethcl.beacon_state (slot, state_root, mh_key)
|
|
VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
|
|
// Statement to upsert to the public.blocks table.
|
|
UpsertBlocksStmt string = `
|
|
INSERT INTO public.blocks (key, data)
|
|
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
|
|
UpdateForkedStmt string = `UPDATE ethcl.slots
|
|
SET status='forked'
|
|
WHERE slot=$1 AND block_root<>$2
|
|
RETURNING block_root;`
|
|
UpdateProposedStmt string = `UPDATE ethcl.slots
|
|
SET status='proposed'
|
|
WHERE slot=$1 AND block_root=$2
|
|
RETURNING block_root;`
|
|
CheckProposedStmt string = `SELECT slot, block_root
|
|
FROM ethcl.slots
|
|
WHERE slot=$1 AND block_root=$2;`
|
|
// Statement to insert known_gaps. We don't pass in timestamp, we let the server take care of that one.
|
|
UpsertKnownGapsStmt string = `
|
|
INSERT INTO ethcl.known_gaps (start_slot, end_slot, checked_out, reprocessing_error, entry_error, entry_process)
|
|
VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
|
|
QueryHighestSlotStmt string = "SELECT COALESCE(MAX(slot), 0) FROM ethcl.slots"
|
|
)
|
|
|
|
// Put all functionality to prepare the write object
|
|
// And write it in this file.
|
|
// Remove any of it from the processslot file.
|
|
type DatabaseWriter struct {
|
|
Db sql.Database
|
|
Metrics *BeaconClientMetrics
|
|
DbSlots *DbSlots
|
|
DbSignedBeaconBlock *DbSignedBeaconBlock
|
|
DbBeaconState *DbBeaconState
|
|
rawBeaconState []byte
|
|
rawSignedBeaconBlock []byte
|
|
}
|
|
|
|
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
|
|
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
|
|
dw := &DatabaseWriter{
|
|
Db: db,
|
|
rawBeaconState: rawBeaconState,
|
|
rawSignedBeaconBlock: rawSignedBeaconBlock,
|
|
Metrics: metrics,
|
|
}
|
|
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
|
err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = dw.prepareBeaconStateModel(slot, stateRoot)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return dw, err
|
|
}
|
|
|
|
// Write functions to write each all together...
|
|
// Should I do one atomic write?
|
|
// Create the model for the ethcl.slots table
|
|
func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoot string, status string) {
|
|
dw.DbSlots = &DbSlots{
|
|
Epoch: calculateEpoch(slot, bcSlotsPerEpoch),
|
|
Slot: strconv.Itoa(slot),
|
|
StateRoot: stateRoot,
|
|
BlockRoot: blockRoot,
|
|
Status: status,
|
|
}
|
|
log.Debug("dw.DbSlots: ", dw.DbSlots)
|
|
|
|
}
|
|
|
|
// Create the model for the ethcl.signed_beacon_block table.
|
|
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1BlockHash string) error {
|
|
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
|
|
Slot: strconv.Itoa(slot),
|
|
BlockRoot: blockRoot,
|
|
ParentBlock: parentBlockRoot,
|
|
Eth1BlockHash: eth1BlockHash,
|
|
MhKey: mhKey,
|
|
}
|
|
log.Debug("dw.DbSignedBeaconBlock: ", dw.DbSignedBeaconBlock)
|
|
return nil
|
|
}
|
|
|
|
// Create the model for the ethcl.beacon_state table.
|
|
func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) error {
|
|
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dw.DbBeaconState = &DbBeaconState{
|
|
Slot: strconv.Itoa(slot),
|
|
StateRoot: stateRoot,
|
|
MhKey: mhKey,
|
|
}
|
|
log.Debug("dw.DbBeaconState: ", dw.DbBeaconState)
|
|
return nil
|
|
}
|
|
|
|
// Write all the data for a given slot.
|
|
func (dw *DatabaseWriter) writeFullSlot() error {
|
|
// Add errors for each function call
|
|
// If an error occurs, write to knownGaps table.
|
|
err := dw.writeSlots()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dw.DbSlots.Status != "skipped" {
|
|
err = dw.writeSignedBeaconBlocks()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = dw.writeBeaconState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
dw.Metrics.IncrementHeadTrackingInserts(1)
|
|
return nil
|
|
}
|
|
|
|
// Write the information for the generic slots table. For now this is only one function.
|
|
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
|
// slots table we can do it all here.
|
|
func (dw *DatabaseWriter) writeSlots() error {
|
|
return dw.upsertSlots()
|
|
}
|
|
|
|
// Upsert to the ethcl.slots table.
|
|
func (dw *DatabaseWriter) upsertSlots() error {
|
|
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
|
if err != nil {
|
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Write the information for the signed_beacon_block.
|
|
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
|
|
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = dw.upsertSignedBeaconBlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Upsert to public.blocks.
|
|
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
|
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
|
|
if err != nil {
|
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Upsert to the ethcl.signed_beacon_block table.
|
|
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
|
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
|
|
if err != nil {
|
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Write the information for the beacon_state.
|
|
func (dw *DatabaseWriter) writeBeaconState() error {
|
|
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = dw.upsertBeaconState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Upsert to the ethcl.beacon_state table.
|
|
func (dw *DatabaseWriter) upsertBeaconState() error {
|
|
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
|
if err != nil {
|
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
|
|
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
|
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
|
slotNum, strErr := strconv.Atoi(slot)
|
|
if strErr != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
|
|
}
|
|
|
|
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
|
}
|
|
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
|
}
|
|
|
|
if forkCount > 0 {
|
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
"forkCount": forkCount,
|
|
}).Info("Updated rows that were forked.")
|
|
} else {
|
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
"forkCount": forkCount,
|
|
}).Warn("There were no forked rows to update.")
|
|
}
|
|
|
|
if proposedCount == 1 {
|
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
"proposedCount": proposedCount,
|
|
}).Info("Updated the row that should have been marked as proposed.")
|
|
} else if proposedCount > 1 {
|
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
"proposedCount": proposedCount,
|
|
}).Error("Too many rows were marked as proposed!")
|
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
|
} else if proposedCount == 0 {
|
|
var count int
|
|
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
|
}
|
|
if count != 1 {
|
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
"proposedCount": count,
|
|
}).Warn("The proposed block was not marked as proposed...")
|
|
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg")
|
|
} else {
|
|
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
|
}
|
|
}
|
|
|
|
metrics.IncrementHeadTrackingReorgs(1)
|
|
}
|
|
|
|
// Update the slots table by marking the old slot's as forked.
|
|
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
|
|
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
|
|
return 0, err
|
|
}
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
|
|
return 0, err
|
|
}
|
|
return count, err
|
|
}
|
|
|
|
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
|
|
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
|
|
return 0, err
|
|
}
|
|
count, err := res.RowsAffected()
|
|
if err != nil {
|
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
|
|
return 0, err
|
|
}
|
|
|
|
return count, err
|
|
}
|
|
|
|
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
|
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
|
// have 10 entries as follows: 1-10, 11-20, etc...
|
|
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string) {
|
|
if endSlot-startSlot <= tableIncrement {
|
|
kgModel := DbKnownGaps{
|
|
StartSlot: strconv.Itoa(startSlot),
|
|
EndSlot: strconv.Itoa(endSlot),
|
|
CheckedOut: false,
|
|
ReprocessingError: "",
|
|
EntryError: entryError.Error(),
|
|
EntryProcess: entryProcess,
|
|
}
|
|
upsertKnownGaps(db, kgModel)
|
|
}
|
|
totalSlots := endSlot - startSlot
|
|
var chunks int
|
|
chunks = totalSlots / tableIncrement
|
|
if totalSlots%tableIncrement != 0 {
|
|
chunks = chunks + 1
|
|
}
|
|
|
|
for i := 0; i < chunks; i++ {
|
|
var tempStart, tempEnd int
|
|
tempStart = startSlot + (i * tableIncrement)
|
|
if i+1 == chunks {
|
|
tempEnd = endSlot
|
|
} else {
|
|
tempEnd = startSlot + ((i + 1) * tableIncrement)
|
|
}
|
|
kgModel := DbKnownGaps{
|
|
StartSlot: strconv.Itoa(tempStart),
|
|
EndSlot: strconv.Itoa(tempEnd),
|
|
CheckedOut: false,
|
|
ReprocessingError: "",
|
|
EntryError: entryError.Error(),
|
|
EntryProcess: entryProcess,
|
|
}
|
|
upsertKnownGaps(db, kgModel)
|
|
}
|
|
|
|
}
|
|
|
|
// A function to upsert a single entry to the ethcl.known_gaps table.
|
|
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps) {
|
|
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
|
|
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"err": err,
|
|
"startSlot": knModel.StartSlot,
|
|
"endSlot": knModel.EndSlot,
|
|
}).Fatal("We are unable to write to the ethcl.known_gaps table!!! We will stop the application because of that.")
|
|
}
|
|
log.WithFields(log.Fields{
|
|
"startSlot": knModel.StartSlot,
|
|
"endSlot": knModel.EndSlot,
|
|
}).Warn("A new gap has been added to the ethcl.known_gaps table.")
|
|
}
|
|
|
|
// A function to write the gap between the highest slot in the DB and the first processed slot.
|
|
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int) {
|
|
var maxSlot int
|
|
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
|
|
if err != nil {
|
|
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
|
|
}
|
|
|
|
if err != nil {
|
|
loghelper.LogError(err).WithFields(log.Fields{
|
|
"maxSlot": maxSlot,
|
|
}).Fatal("Unable to get convert max block from DB to int. We must close the application or we might have undetected gaps.")
|
|
}
|
|
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot, fmt.Errorf(""), "startup")
|
|
}
|
|
|
|
// A quick helper function to calculate the epoch.
|
|
func calculateEpoch(slot int, slotPerEpoch int) string {
|
|
epoch := slot / slotPerEpoch
|
|
return strconv.Itoa(epoch)
|
|
}
|