Set minimum slot for historic and known-gaps processing.

This commit is contained in:
Thomas E Lackey 2022-09-24 01:45:43 -05:00
parent 6034679ec1
commit 8af61d2a2a
8 changed files with 35 additions and 33 deletions

View File

@ -81,7 +81,7 @@ func startFullProcessing() {
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -95,7 +95,7 @@ func startFullProcessing() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -69,7 +69,7 @@ func startHeadTracking() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -65,7 +65,7 @@ func startHistoricProcessing() {
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -80,7 +80,7 @@ func startHistoricProcessing() {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"))
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")

View File

@ -1,6 +1,6 @@
{
"db": {
"address": "vulcanize_db",
"address": "localhost",
"password": "password",
"port": 5432,
"username": "vdbm",
@ -8,7 +8,7 @@
"driver": "PGX"
},
"bc": {
"address": "host.docker.internal",
"address": "localhost",
"port": 5052,
"type": "lighthouse",
"bootRetryInterval": 30,
@ -18,7 +18,8 @@
"uniqueNodeIdentifier": 100,
"checkDb": true,
"performBeaconStateProcessing": false,
"performBeaconBlockProcessing": true
"performBeaconBlockProcessing": true,
"minimumSlot": 4700013
},
"t": {
"skipSync": true
@ -31,8 +32,9 @@
},
"kg": {
"increment": 10000,
"processKnownGaps": false,
"maxKnownGapsWorker": 2
"processKnownGaps": true,
"maxKnownGapsWorker": 2,
"minimumSlot": 4700013
},
"pm": {
"address": "localhost",

View File

@ -27,10 +27,10 @@ import (
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error {
func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot uint64) []error {
log.Info("We are starting the historical processing service.")
bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier}
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed)
errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed, minimumSlot)
log.Debug("Exiting Historical")
return errs
}
@ -52,7 +52,7 @@ func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
//
// 2. Known Gaps Processing
type BatchProcessing interface {
getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
getSlotRange(context.Context, chan<- slotsToProcess, uint64) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors.
removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated.
@ -90,7 +90,7 @@ type batchHistoricError struct {
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error {
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot uint64) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan uint64)
processedCh := make(chan slotsToProcess)
@ -160,7 +160,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
// Get slots from the DB.
go func() {
errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries....
errs := bp.getSlotRange(ctx, slotsCh, minimumSlot) // Periodically adds new entries....
if errs != nil {
finalErrCh <- errs
}

View File

@ -205,7 +205,7 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
// Start the CaptureHistoric function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHistoric(ctx, maxWorkers)
go bc.CaptureHistoric(ctx, maxWorkers, 0)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
log.Debug("Calling the stop function for historical processing..")
err := bc.StopHistoric(cancel)
@ -217,7 +217,7 @@ func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, ma
// Wrapper function that processes knownGaps
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
ctx, cancel := context.WithCancel(context.Background())
go bc.ProcessKnownGaps(ctx, maxWorkers)
go bc.ProcessKnownGaps(ctx, maxWorkers, 0)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
err := bc.StopKnownGapsProcessing(cancel)
time.Sleep(5 * time.Second)

View File

@ -33,11 +33,11 @@ import (
var (
// Get a single highest priority and non-checked out row row from eth_beacon.historical_process
getHpEntryStmt string = `SELECT start_slot, end_slot FROM eth_beacon.historic_process
WHERE checked_out=false
WHERE checked_out=false AND end_slot >= $1
ORDER BY priority ASC
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the eth_beacon.historic_process table.
checkHpEntryStmt string = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=false;`
checkHpEntryStmt string = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=false AND end_slot >= $1;`
// Used to checkout a row from the eth_beacon.historic_process table
lockHpEntryStmt string = `UPDATE eth_beacon.historic_process
SET checked_out=true, checked_out_by=$3
@ -58,8 +58,8 @@ type HistoricProcessing struct {
}
// Get a single row of historical slots from the table.
func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier))
func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error {
return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier), minimumSlot)
}
// Remove the table entry.
@ -123,7 +123,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh cha
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error {
func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot uint64) []error {
errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before
@ -139,7 +139,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
"errCount": errCount,
}).Error("New error entry added")
}
processRow, err := db.Exec(context.Background(), checkNewRowsStmt)
processRow, err := db.Exec(context.Background(), checkNewRowsStmt, minimumSlot)
if err != nil {
errCount = append(errCount, err)
}
@ -172,7 +172,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// Query the DB for slots.
sp := slotsToProcess{}
err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
err = tx.QueryRow(dbCtx, getStartEndSlotStmt, minimumSlot).Scan(&sp.startSlot, &sp.endSlot)
if err != nil {
if err == pgx.ErrNoRows {
time.Sleep(1 * time.Second)

View File

@ -30,11 +30,11 @@ import (
var (
// Get a single non-checked out row row from eth_beacon.known_gaps.
getKgEntryStmt string = `SELECT start_slot, end_slot FROM eth_beacon.known_gaps
WHERE checked_out=false
WHERE checked_out=false AND end_slot >= $1
ORDER BY priority ASC
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the eth_beacon.known_gaps table.
checkKgEntryStmt string = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=false;`
checkKgEntryStmt string = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=false AND end_slot >= $1;`
// Used to checkout a row from the eth_beacon.known_gaps table
lockKgEntryStmt string = `UPDATE eth_beacon.known_gaps
SET checked_out=true, checked_out_by=$3
@ -58,10 +58,10 @@ type KnownGapsProcessing struct {
}
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error {
func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot uint64) []error {
log.Info("We are starting the known gaps processing service.")
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics}
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed)
errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed, minimumSlot)
log.Debug("Exiting known gaps processing service")
return errs
}
@ -78,8 +78,8 @@ func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error
}
// Get a single row of historical slots from the table.
func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier))
func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error {
return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier), minimumSlot)
}
// Remove the table entry.