Use a transaction for writing, knowngaps, and reorgs #53

Merged
abdulrabbani00 merged 4 commits from feature/52-database-write-improvements into develop 2022-06-06 13:02:43 +00:00
3 changed files with 130 additions and 53 deletions
Showing only changes of commit 11dce3db34 - Show all commits

View File

@ -20,10 +20,10 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -75,6 +75,8 @@ VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
// Remove any of it from the processslot file. // Remove any of it from the processslot file.
type DatabaseWriter struct { type DatabaseWriter struct {
Db sql.Database Db sql.Database
Tx sql.Tx
Ctx context.Context
Metrics *BeaconClientMetrics Metrics *BeaconClientMetrics
DbSlots *DbSlots DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock DbSignedBeaconBlock *DbSignedBeaconBlock
@ -85,14 +87,21 @@ type DatabaseWriter struct {
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
}
dw := &DatabaseWriter{ dw := &DatabaseWriter{
Db: db, Db: db,
Tx: tx,
Ctx: ctx,
rawBeaconState: rawBeaconState, rawBeaconState: rawBeaconState,
rawSignedBeaconBlock: rawSignedBeaconBlock, rawSignedBeaconBlock: rawSignedBeaconBlock,
Metrics: metrics, Metrics: metrics,
} }
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) err = dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,28 +159,40 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er
return nil return nil
} }
// Write all the data for a given slot. // Add all the data for a given slot to a SQL transaction.
func (dw *DatabaseWriter) writeFullSlot() error { // Originally it wrote to each table individually.
func (dw *DatabaseWriter) transactFullSlot() error {
// If an error occurs, write to knownGaps table. // If an error occurs, write to knownGaps table.
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": dw.DbSlots.Slot, "slot": dw.DbSlots.Slot,
}).Debug("Starting to write to the DB.") }).Debug("Starting to write to the DB.")
err := dw.writeSlots() err := dw.transactSlots()
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
return err return err
} }
log.Debug("We finished writing to the ethcl.slots table.") log.Debug("We finished writing to the ethcl.slots table.")
if dw.DbSlots.Status != "skipped" { if dw.DbSlots.Status != "skipped" {
errG, _ := errgroup.WithContext(context.Background()) //errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { //errG.Go(func() error {
return dw.writeSignedBeaconBlocks() // return dw.transactSignedBeaconBlocks()
}) //})
errG.Go(func() error { //errG.Go(func() error {
return dw.writeBeaconState() // return dw.transactBeaconState()
}) //})
if err := errG.Wait(); err != nil { //if err := errG.Wait(); err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") // loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
// return err
//}
// Might want to seperate writing to public.blocks so we can do this concurrently...
err := dw.transactSignedBeaconBlocks()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block table...")
return err
}
err = dw.transactBeaconState()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl state table...")
return err return err
} }
} }
@ -179,16 +200,16 @@ func (dw *DatabaseWriter) writeFullSlot() error {
return nil return nil
} }
// Write the information for the generic slots table. For now this is only one function. // Add data for the ethcl.slots table to a transaction. For now this is only one function.
// But in the future if we need to incorporate any FK's or perform any actions to write to the // But in the future if we need to incorporate any FK's or perform any actions to write to the
// slots table we can do it all here. // slots table we can do it all here.
func (dw *DatabaseWriter) writeSlots() error { func (dw *DatabaseWriter) transactSlots() error {
return dw.upsertSlots() return dw.upsertSlots()
} }
// Upsert to the ethcl.slots table. // Upsert to the ethcl.slots table.
func (dw *DatabaseWriter) upsertSlots() error { func (dw *DatabaseWriter) upsertSlots() error {
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) _, err := dw.Tx.Exec(dw.Ctx, UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
return err return err
@ -196,8 +217,8 @@ func (dw *DatabaseWriter) upsertSlots() error {
return nil return nil
} }
// Write the information for the signed_beacon_block. // Add the information for the signed_beacon_block to a transaction.
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
if err != nil { if err != nil {
return err return err
@ -211,7 +232,7 @@ func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
// Upsert to public.blocks. // Upsert to public.blocks.
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
return err return err
@ -221,7 +242,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
// Upsert to the ethcl.signed_beacon_block table. // Upsert to the ethcl.signed_beacon_block table.
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) _, err := dw.Tx.Exec(dw.Ctx, UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
return err return err
@ -229,8 +250,8 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
return nil return nil
} }
// Write the information for the beacon_state. // Add the information for the beacon_state to a transaction.
func (dw *DatabaseWriter) writeBeaconState() error { func (dw *DatabaseWriter) transactBeaconState() error {
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
if err != nil { if err != nil {
return err return err
@ -244,7 +265,7 @@ func (dw *DatabaseWriter) writeBeaconState() error {
// Upsert to the ethcl.beacon_state table. // Upsert to the ethcl.beacon_state table.
func (dw *DatabaseWriter) upsertBeaconState() error { func (dw *DatabaseWriter) upsertBeaconState() error {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) _, err := dw.Tx.Exec(dw.Ctx, UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
return err return err
@ -252,23 +273,23 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
return nil return nil
} }
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot) slotNum, strErr := strconv.Atoi(slot)
if strErr != nil { if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
} }
forkCount, err := updateForked(db, slot, latestBlockRoot) forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} }
proposedCount, err := updateProposed(db, slot, latestBlockRoot) proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} }
if forkCount > 0 { if forkCount > 0 {
@ -289,18 +310,18 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount, "proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!") }).Error("Too many rows were marked as proposed!")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else if proposedCount == 0 { } else if proposedCount == 0 {
var count int var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else if count != 1 { } else if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count, "proposedCount": count,
}).Warn("The proposed block was not marked as proposed...") }).Warn("The proposed block was not marked as proposed...")
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
} else { } else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
} }
@ -309,9 +330,28 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
metrics.IncrementReorgsInsert(1) metrics.IncrementReorgsInsert(1)
} }
// Wrapper function that will create a transaction and execute the function.
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs")
}
defer func() {
err := tx.Rollback(ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
}
}()
transactReorgs(db, tx, ctx, slot, latestBlockRoot, metrics)
if err = tx.Commit(ctx); err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
}
}
// Update the slots table by marking the old slot's as forked. // Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) { func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot) res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err return 0, err
@ -324,8 +364,9 @@ func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64,
return count, err return count, err
} }
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) { // Mark a slot as proposed.
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot) func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err return 0, err
@ -339,10 +380,10 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
return count, err return count, err
} }
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into // A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc... // have 10 entries as follows: 1-10, 11-20, etc...
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
if endSlot-startSlot <= tableIncrement { if endSlot-startSlot <= tableIncrement {
kgModel := DbKnownGaps{ kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot), StartSlot: strconv.Itoa(startSlot),
@ -352,7 +393,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
EntryError: entryError.Error(), EntryError: entryError.Error(),
EntryProcess: entryProcess, EntryProcess: entryProcess,
} }
upsertKnownGaps(db, kgModel, metric) upsertKnownGaps(tx, ctx, kgModel, metric)
} else { } else {
totalSlots := endSlot - startSlot totalSlots := endSlot - startSlot
var chunks int var chunks int
@ -377,15 +418,34 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
EntryError: entryError.Error(), EntryError: entryError.Error(),
EntryProcess: entryProcess, EntryProcess: entryProcess,
} }
upsertKnownGaps(db, kgModel, metric) upsertKnownGaps(tx, ctx, kgModel, metric)
} }
} }
}
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will
// create the transaction and write it.
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps")
}
defer func() {
err := tx.Rollback(ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
}
}()
transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric)
if err = tx.Commit(ctx); err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps")
}
} }
// A function to upsert a single entry to the ethcl.known_gaps table. // A function to upsert a single entry to the ethcl.known_gaps table.
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) { func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric *BeaconClientMetrics) {
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, _, err := tx.Exec(ctx, UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View File

@ -150,7 +150,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
} }
defer func() { defer func() {
err := tx.Rollback(ctx) err := tx.Rollback(ctx)
if err != nil { if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction") loghelper.LogError(err).Error("We were unable to Rollback a transaction")
errCount = append(errCount, err) errCount = append(errCount, err)
} }

View File

@ -26,6 +26,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/jackc/pgx/v4"
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/consensus-types/wrapper"
dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect" dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect"
@ -79,7 +80,7 @@ type ProcessSlot struct {
// This function will do all the work to process the slot and write it to the DB. // This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the // It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table. // known_gaps table.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) { func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
ps := &ProcessSlot{ ps := &ProcessSlot{
Slot: slot, Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
@ -89,6 +90,10 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
Metrics: metrics, Metrics: metrics,
} }
if checkDb {
// Check the DB to see if this slot exists.
}
g, _ := errgroup.WithContext(context.Background()) g, _ := errgroup.WithContext(context.Background())
vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1)
@ -120,7 +125,13 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err, "blockRoot" return err, "blockRoot"
} }
// Write the object to the DB. // Write the object to the DB.
err = dw.writeFullSlot() defer func() {
err := dw.Tx.Rollback(dw.Ctx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
}
}()
err = dw.transactFullSlot()
if err != nil { if err != nil {
return err, "processSlot" return err, "processSlot"
} }
@ -131,8 +142,14 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return fmt.Errorf("headOrHistoric must be either historic or head!"), "" return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
} }
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
} }
// Commit the transaction
if err = dw.Tx.Commit(dw.Ctx); err != nil {
return err, "transactionCommit"
}
return nil, "" return nil, ""
} }
@ -142,7 +159,7 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot
if previousSlot == 0 && previousBlockRoot == "" { if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
} }
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, true)
if err != nil { if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
} }
@ -150,7 +167,7 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) {
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1) return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, true)
} }
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
@ -232,14 +249,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
} }
// Check to make sure that the previous block we processed is the parent of the current block. // Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
if previousSlot == int(ps.FullBeaconState.Slot()) { if previousSlot == int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot(), "slot": ps.FullBeaconState.Slot(),
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) transactReorgs(ps.Db, tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot > int(ps.FullBeaconState.Slot()) { } else if previousSlot > int(ps.FullBeaconState.Slot()) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -250,13 +267,13 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"previousSlot": previousSlot, "previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot(), "currentSlot": ps.FullBeaconState.Slot(),
}).Error("We skipped a few slots.") }).Error("We skipped a few slots.")
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
} else if previousBlockRoot != parentRoot { } else if previousBlockRoot != parentRoot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) transactReorgs(ps.Db, tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
} else { } else {
log.Debug("Previous Slot and Current Slot are one distance from each other.") log.Debug("Previous Slot and Current Slot are one distance from each other.")
} }