diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index fd0ac6c..5e51fb8 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -20,10 +20,10 @@ import ( "fmt" "strconv" + "github.com/jackc/pgx/v4" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" - "golang.org/x/sync/errgroup" ) var ( @@ -75,6 +75,8 @@ VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING` // Remove any of it from the processslot file. type DatabaseWriter struct { Db sql.Database + Tx sql.Tx + Ctx context.Context Metrics *BeaconClientMetrics DbSlots *DbSlots DbSignedBeaconBlock *DbSignedBeaconBlock @@ -85,14 +87,21 @@ type DatabaseWriter struct { func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogError(err).Error("We are unable to Begin a SQL transaction") + } dw := &DatabaseWriter{ Db: db, + Tx: tx, + Ctx: ctx, rawBeaconState: rawBeaconState, rawSignedBeaconBlock: rawSignedBeaconBlock, Metrics: metrics, } dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) - err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) + err = dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash) if err != nil { return nil, err } @@ -150,28 +159,40 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er return nil } -// Write all the data for a given slot. -func (dw *DatabaseWriter) writeFullSlot() error { +// Add all the data for a given slot to a SQL transaction. +// Originally it wrote to each table individually. +func (dw *DatabaseWriter) transactFullSlot() error { // If an error occurs, write to knownGaps table. log.WithFields(log.Fields{ "slot": dw.DbSlots.Slot, }).Debug("Starting to write to the DB.") - err := dw.writeSlots() + err := dw.transactSlots() if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...") return err } log.Debug("We finished writing to the ethcl.slots table.") if dw.DbSlots.Status != "skipped" { - errG, _ := errgroup.WithContext(context.Background()) - errG.Go(func() error { - return dw.writeSignedBeaconBlocks() - }) - errG.Go(func() error { - return dw.writeBeaconState() - }) - if err := errG.Wait(); err != nil { - loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") + //errG, _ := errgroup.WithContext(context.Background()) + //errG.Go(func() error { + // return dw.transactSignedBeaconBlocks() + //}) + //errG.Go(func() error { + // return dw.transactBeaconState() + //}) + //if err := errG.Wait(); err != nil { + // loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...") + // return err + //} + // Might want to seperate writing to public.blocks so we can do this concurrently... + err := dw.transactSignedBeaconBlocks() + if err != nil { + loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block table...") + return err + } + err = dw.transactBeaconState() + if err != nil { + loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl state table...") return err } } @@ -179,16 +200,16 @@ func (dw *DatabaseWriter) writeFullSlot() error { return nil } -// Write the information for the generic slots table. For now this is only one function. +// Add data for the ethcl.slots table to a transaction. For now this is only one function. // But in the future if we need to incorporate any FK's or perform any actions to write to the // slots table we can do it all here. -func (dw *DatabaseWriter) writeSlots() error { +func (dw *DatabaseWriter) transactSlots() error { return dw.upsertSlots() } // Upsert to the ethcl.slots table. func (dw *DatabaseWriter) upsertSlots() error { - _, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) + _, err := dw.Tx.Exec(dw.Ctx, UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table") return err @@ -196,8 +217,8 @@ func (dw *DatabaseWriter) upsertSlots() error { return nil } -// Write the information for the signed_beacon_block. -func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { +// Add the information for the signed_beacon_block to a transaction. +func (dw *DatabaseWriter) transactSignedBeaconBlocks() error { err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) if err != nil { return err @@ -211,7 +232,7 @@ func (dw *DatabaseWriter) writeSignedBeaconBlocks() error { // Upsert to public.blocks. func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { - _, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data) + _, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table") return err @@ -221,7 +242,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error { // Upsert to the ethcl.signed_beacon_block table. func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { - _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) + _, err := dw.Tx.Exec(dw.Ctx, UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") return err @@ -229,8 +250,8 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { return nil } -// Write the information for the beacon_state. -func (dw *DatabaseWriter) writeBeaconState() error { +// Add the information for the beacon_state to a transaction. +func (dw *DatabaseWriter) transactBeaconState() error { err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) if err != nil { return err @@ -244,7 +265,7 @@ func (dw *DatabaseWriter) writeBeaconState() error { // Upsert to the ethcl.beacon_state table. func (dw *DatabaseWriter) upsertBeaconState() error { - _, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) + _, err := dw.Tx.Exec(dw.Ctx, UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) if err != nil { loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") return err @@ -252,23 +273,23 @@ func (dw *DatabaseWriter) upsertBeaconState() error { return nil } -// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. +// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. -func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { +func transactReorgs(db sql.Database, tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { slotNum, strErr := strconv.Atoi(slot) if strErr != nil { loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") } - forkCount, err := updateForked(db, slot, latestBlockRoot) + forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } - proposedCount, err := updateProposed(db, slot, latestBlockRoot) + proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } if forkCount > 0 { @@ -289,18 +310,18 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "proposedCount": proposedCount, }).Error("Too many rows were marked as proposed!") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } else if proposedCount == 0 { var count int err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } else if count != 1 { loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ "proposedCount": count, }).Warn("The proposed block was not marked as proposed...") - writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics) + transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics) } else { loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") } @@ -309,9 +330,28 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics * metrics.IncrementReorgsInsert(1) } +// Wrapper function that will create a transaction and execute the function. +func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs") + } + defer func() { + err := tx.Rollback(ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") + } + }() + transactReorgs(db, tx, ctx, slot, latestBlockRoot, metrics) + if err = tx.Commit(ctx); err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs") + } +} + // Update the slots table by marking the old slot's as forked. -func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) { - res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot) +func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { + res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots") return 0, err @@ -324,8 +364,9 @@ func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, return count, err } -func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) { - res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot) +// Mark a slot as proposed. +func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) { + res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot) if err != nil { loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.") return 0, err @@ -339,10 +380,10 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64 return count, err } -// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into +// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // have 10 entries as follows: 1-10, 11-20, etc... -func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { +func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { if endSlot-startSlot <= tableIncrement { kgModel := DbKnownGaps{ StartSlot: strconv.Itoa(startSlot), @@ -352,7 +393,7 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot EntryError: entryError.Error(), EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel, metric) + upsertKnownGaps(tx, ctx, kgModel, metric) } else { totalSlots := endSlot - startSlot var chunks int @@ -377,15 +418,34 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot EntryError: entryError.Error(), EntryProcess: entryProcess, } - upsertKnownGaps(db, kgModel, metric) + upsertKnownGaps(tx, ctx, kgModel, metric) } } +} +// Wrapper function, instead of adding the knownGaps entries to a transaction, it will +// create the transaction and write it. +func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { + ctx := context.Background() + tx, err := db.Begin(ctx) + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps") + } + defer func() { + err := tx.Rollback(ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs") + } + }() + transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric) + if err = tx.Commit(ctx); err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps") + } } // A function to upsert a single entry to the ethcl.known_gaps table. -func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) { - _, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, +func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric *BeaconClientMetrics) { + _, err := tx.Exec(ctx, UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot, knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess) if err != nil { log.WithFields(log.Fields{ diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index eb3e71b..3433f77 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -150,7 +150,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } defer func() { err := tx.Rollback(ctx) - if err != nil { + if err != nil && err != pgx.ErrTxClosed { loghelper.LogError(err).Error("We were unable to Rollback a transaction") errCount = append(errCount, err) } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 146d53c..6ab3043 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" + "github.com/jackc/pgx/v4" si "github.com/prysmaticlabs/prysm/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/consensus-types/wrapper" dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect" @@ -79,7 +80,7 @@ type ProcessSlot struct { // This function will do all the work to process the slot and write it to the DB. // It will return the error and error process. The error process is used for providing reach detail to the // known_gaps table. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) { +func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) { ps := &ProcessSlot{ Slot: slot, BlockRoot: blockRoot, @@ -89,6 +90,10 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot Metrics: metrics, } + if checkDb { + // Check the DB to see if this slot exists. + } + g, _ := errgroup.WithContext(context.Background()) vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) @@ -120,7 +125,13 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return err, "blockRoot" } // Write the object to the DB. - err = dw.writeFullSlot() + defer func() { + err := dw.Tx.Rollback(dw.Ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction") + } + }() + err = dw.transactFullSlot() if err != nil { return err, "processSlot" } @@ -131,8 +142,14 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return fmt.Errorf("headOrHistoric must be either historic or head!"), "" } if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { - ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) + ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) } + + // Commit the transaction + if err = dw.Tx.Commit(dw.Ctx); err != nil { + return err, "transactionCommit" + } + return nil, "" } @@ -142,7 +159,7 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) + err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, true) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } @@ -150,7 +167,7 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot // Handle a historic slot. A wrapper function for calling `handleFullSlot`. func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { - return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1) + return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, true) } // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. @@ -232,14 +249,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver } // Check to make sure that the previous block we processed is the parent of the current block. -func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { +func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot()) if previousSlot == int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "slot": ps.FullBeaconState.Slot(), "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) + transactReorgs(ps.Db, tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) } else if previousSlot > int(ps.FullBeaconState.Slot()) { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -250,13 +267,13 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "previousSlot": previousSlot, "currentSlot": ps.FullBeaconState.Slot(), }).Error("We skipped a few slots.") - writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) + transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics) } else if previousBlockRoot != parentRoot { log.WithFields(log.Fields{ "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") - writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) + transactReorgs(ps.Db, tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) } else { log.Debug("Previous Slot and Current Slot are one distance from each other.") }