ipld-eth-beacon-indexer/pkg/beaconclient/processhistoric.go
Thomas E Lackey 27fa54c6dc
76: Add indexing of ExecutionPayloads (and other Merge-related updates). (#73)
1. Updates or replaces outdated dependencies (eg, replacing a version of the Prysm client with the latest zrnt).

2. Add support for parsing Bellatrix-era BeaconState and BeaconBlocks

3. Adds flags for toggling the processing of BeaconBlocks and BeaconState. This is particularly important because processing and storing the BeaconState at this time would be too expensive to really do (see: Temporarily disable BeaconState indexing #75 and [Feature] Reduce the Amount of DB Space the Beacon Chain Needs #71)

4. Fixes flaky event handling. The previous code would not reconnect in the case of errors with the SSE connection. This enables automatic reconnection in the case of error (default in the updated v2 SSE library dependency), and also adds a timeout so that if no event is received in 2.5x the block time, the SSE connection is closed and re-established.

5. Other refactoring and cleanup (eg, changing the type of slot from int to Slot (uint64)).
2022-09-28 20:39:56 -05:00

270 lines
9.5 KiB
Go

// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file contains all the code to process historic slots.
package beaconclient
import (
"context"
"fmt"
"strconv"
"time"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
)
var (
// Get a single highest priority and non-checked out row row from eth_beacon.historical_process
getHpEntryStmt string = `SELECT start_slot, end_slot FROM eth_beacon.historic_process
WHERE checked_out=false AND end_slot >= $1
ORDER BY priority ASC
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the eth_beacon.historic_process table.
checkHpEntryStmt string = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=false AND end_slot >= $1;`
// Used to checkout a row from the eth_beacon.historic_process table
lockHpEntryStmt string = `UPDATE eth_beacon.historic_process
SET checked_out=true, checked_out_by=$3
WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the eth_beacon.historic_process table
deleteHpEntryStmt string = `DELETE FROM eth_beacon.historic_process
WHERE start_slot=$1 AND end_slot=$2;`
// Used to update every single row that this node has checked out.
releaseHpLockStmt string = `UPDATE eth_beacon.historic_process
SET checked_out=false, checked_out_by=null
WHERE checked_out_by=$1`
)
type HistoricProcessing struct {
db sql.Database //db connection
metrics *BeaconClientMetrics // metrics for beaconclient
uniqueNodeIdentifier int // node unique identifier.
}
// Get a single row of historical slots from the table.
func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot Slot) []error {
return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier), minimumSlot)
}
// Remove the table entry.
func (hp HistoricProcessing) removeTableEntry(ctx context.Context, processCh <-chan slotsToProcess) error {
return removeRowPostProcess(ctx, hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt)
}
// Remove the table entry.
func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMessages <-chan batchHistoricError) {
for {
select {
case <-ctx.Done():
return
case errMs := <-errMessages:
loghelper.LogSlotError(errMs.slot.Number(), errMs.err)
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
}
}
}
// "un"-checkout the rows held by this DB in the eth_beacon.historical_process table.
func (hp HistoricProcessing) releaseDbLocks() error {
log.Debug("Updating all the entries to eth_beacon.historical processing")
res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier)
if err != nil {
return fmt.Errorf("Unable to remove lock from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
}
log.Debug("Update all the entries to eth_beacon.historical processing")
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("Unable to calculated number of rows affected by releasing locks from eth_beacon.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err)
}
log.WithField("rowCount", rows).Info("Released historicalProcess locks for specified rows.")
return nil
}
// Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan Slot, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) {
for {
select {
case <-ctx.Done():
return
case slot := <-workCh:
log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(ctx, slot, spd)
if err != nil {
errMs := batchHistoricError{
err: err,
errProcess: errProcess,
slot: slot,
}
errCh <- errMs
} else {
incrementTracker(1)
}
}
}
}
// A wrapper function that insert the start_slot and end_slot from a single row into a channel.
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot Slot) []error {
errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before
// ending the application.
prevErrCount := 0
for len(errCount) < 5 {
select {
case <-ctx.Done():
return errCount
default:
if len(errCount) != prevErrCount {
log.WithFields(log.Fields{
"errCount": errCount,
}).Error("New error entry added")
}
processRow, err := db.Exec(context.Background(), checkNewRowsStmt, minimumSlot)
if err != nil {
errCount = append(errCount, err)
}
row, err := processRow.RowsAffected()
if err != nil {
errCount = append(errCount, err)
}
if row < 1 {
time.Sleep(3 * time.Second)
log.Debug("We are checking rows, be patient")
break
}
log.Debug("We found a new row")
dbCtx := context.Background()
// Setup TX
tx, err := db.Begin(dbCtx)
if err != nil {
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
errCount = append(errCount, err)
break
}
defer func() {
err := tx.Rollback(dbCtx)
if err != nil && err != pgx.ErrTxClosed {
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
errCount = append(errCount, err)
}
}()
// Query the DB for slots.
sp := slotsToProcess{}
err = tx.QueryRow(dbCtx, getStartEndSlotStmt, minimumSlot).Scan(&sp.startSlot, &sp.endSlot)
if err != nil {
if err == pgx.ErrNoRows {
time.Sleep(1 * time.Second)
break
}
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), getStartEndSlotStmt, err).Error("Unable to get a row")
errCount = append(errCount, err)
break
}
// Checkout the Row
res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier)
if err != nil {
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err)
break
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
errCount = append(errCount, err)
break
}
if rows > 1 {
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We locked too many rows.....")
errCount = append(errCount, err)
break
}
if rows == 0 {
loghelper.LogSlotRangeStatementError(sp.startSlot.Number(), sp.endSlot.Number(), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We did not lock a single row.")
errCount = append(errCount, err)
break
}
err = tx.Commit(dbCtx)
if err != nil {
loghelper.LogSlotRangeError(sp.startSlot.Number(), sp.endSlot.Number(), err).Error("Unable commit transactions.")
errCount = append(errCount, err)
break
}
log.WithField("slots", sp).Debug("Added a new slots to be processed")
slotCh <- sp
}
}
log.WithFields(log.Fields{
"ErrCount": errCount,
}).Error("The ErrCounter")
return errCount
}
// After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error)
for {
select {
case <-ctx.Done():
return nil
case slots := <-processCh:
// Make sure the start and end slot exist in the slots table.
go func() {
log.WithFields(log.Fields{
"startSlot": slots.startSlot,
"endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed")
for {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.startSlot)
if err != nil {
errCh <- err
}
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, slots.endSlot)
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
break
}
time.Sleep(3 * time.Second)
}
_, err := db.Exec(context.Background(), removeStmt, slots.startSlot.Number(), slots.endSlot.Number())
if err != nil {
errCh <- err
}
}()
if len(errCh) != 0 {
return <-errCh
}
}
}
}