From 8af61d2a2a21be347dce4e86704c7f58efb5fafe Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Sat, 24 Sep 2022 01:45:43 -0500 Subject: [PATCH] Set minimum slot for historic and known-gaps processing. --- cmd/full.go | 4 ++-- cmd/head.go | 2 +- cmd/historic.go | 4 ++-- ipld-eth-beacon-config.json | 12 +++++++----- pkg/beaconclient/capturehistoric.go | 16 ++++++++-------- pkg/beaconclient/capturehistoric_test.go | 4 ++-- pkg/beaconclient/processhistoric.go | 14 +++++++------- pkg/beaconclient/processknowngaps.go | 12 ++++++------ 8 files changed, 35 insertions(+), 33 deletions(-) diff --git a/cmd/full.go b/cmd/full.go index 1f8a5e9..31c7658 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -81,7 +81,7 @@ func startFullProcessing() { errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -95,7 +95,7 @@ func startFullProcessing() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/cmd/head.go b/cmd/head.go index e20cdda..789440b 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -69,7 +69,7 @@ func startHeadTracking() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/cmd/historic.go b/cmd/historic.go index 96c6004..482e200 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -65,7 +65,7 @@ func startHistoricProcessing() { errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"), viper.GetUint64("bc.minimumSlot")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -80,7 +80,7 @@ func startHistoricProcessing() { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"), viper.GetUint64("kg.minimumSlot")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") diff --git a/ipld-eth-beacon-config.json b/ipld-eth-beacon-config.json index cce9d9d..7a22376 100644 --- a/ipld-eth-beacon-config.json +++ b/ipld-eth-beacon-config.json @@ -1,6 +1,6 @@ { "db": { - "address": "vulcanize_db", + "address": "localhost", "password": "password", "port": 5432, "username": "vdbm", @@ -8,7 +8,7 @@ "driver": "PGX" }, "bc": { - "address": "host.docker.internal", + "address": "localhost", "port": 5052, "type": "lighthouse", "bootRetryInterval": 30, @@ -18,7 +18,8 @@ "uniqueNodeIdentifier": 100, "checkDb": true, "performBeaconStateProcessing": false, - "performBeaconBlockProcessing": true + "performBeaconBlockProcessing": true, + "minimumSlot": 4700013 }, "t": { "skipSync": true @@ -31,8 +32,9 @@ }, "kg": { "increment": 10000, - "processKnownGaps": false, - "maxKnownGapsWorker": 2 + "processKnownGaps": true, + "maxKnownGapsWorker": 2, + "minimumSlot": 4700013 }, "pm": { "address": "localhost", diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index ada07f1..24312bf 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -27,10 +27,10 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error { +func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int, minimumSlot uint64) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} - errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed) + errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed, minimumSlot) log.Debug("Exiting Historical") return errs } @@ -52,10 +52,10 @@ func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(context.Context, chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. - removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. - releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. + getSlotRange(context.Context, chan<- slotsToProcess, uint64) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(context.Context, <-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(context.Context, <-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } /// ^^^ @@ -90,7 +90,7 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error { +func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64), minimumSlot uint64) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan uint64) processedCh := make(chan slotsToProcess) @@ -160,7 +160,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, // Get slots from the DB. go func() { - errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... + errs := bp.getSlotRange(ctx, slotsCh, minimumSlot) // Periodically adds new entries.... if errs != nil { finalErrCh <- errs } diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index c6dc5c6..569700b 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -205,7 +205,7 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli // Start the CaptureHistoric function, and check for the correct inserted slots. func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { ctx, cancel := context.WithCancel(context.Background()) - go bc.CaptureHistoric(ctx, maxWorkers) + go bc.CaptureHistoric(ctx, maxWorkers, 0) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) log.Debug("Calling the stop function for historical processing..") err := bc.StopHistoric(cancel) @@ -217,7 +217,7 @@ func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, ma // Wrapper function that processes knownGaps func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { ctx, cancel := context.WithCancel(context.Background()) - go bc.ProcessKnownGaps(ctx, maxWorkers) + go bc.ProcessKnownGaps(ctx, maxWorkers, 0) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) err := bc.StopKnownGapsProcessing(cancel) time.Sleep(5 * time.Second) diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 4bc1ab9..cf32d77 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -33,11 +33,11 @@ import ( var ( // Get a single highest priority and non-checked out row row from eth_beacon.historical_process getHpEntryStmt string = `SELECT start_slot, end_slot FROM eth_beacon.historic_process - WHERE checked_out=false + WHERE checked_out=false AND end_slot >= $1 ORDER BY priority ASC LIMIT 1;` // Used to periodically check to see if there is a new entry in the eth_beacon.historic_process table. - checkHpEntryStmt string = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=false;` + checkHpEntryStmt string = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=false AND end_slot >= $1;` // Used to checkout a row from the eth_beacon.historic_process table lockHpEntryStmt string = `UPDATE eth_beacon.historic_process SET checked_out=true, checked_out_by=$3 @@ -58,8 +58,8 @@ type HistoricProcessing struct { } // Get a single row of historical slots from the table. -func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) +func (hp HistoricProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error { + return getBatchProcessRow(ctx, hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier), minimumSlot) } // Remove the table entry. @@ -123,7 +123,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh cha // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { +func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string, minimumSlot uint64) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before @@ -139,7 +139,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm "errCount": errCount, }).Error("New error entry added") } - processRow, err := db.Exec(context.Background(), checkNewRowsStmt) + processRow, err := db.Exec(context.Background(), checkNewRowsStmt, minimumSlot) if err != nil { errCount = append(errCount, err) } @@ -172,7 +172,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm // Query the DB for slots. sp := slotsToProcess{} - err = tx.QueryRow(dbCtx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) + err = tx.QueryRow(dbCtx, getStartEndSlotStmt, minimumSlot).Scan(&sp.startSlot, &sp.endSlot) if err != nil { if err == pgx.ErrNoRows { time.Sleep(1 * time.Second) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 60a2187..7bc9168 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -30,11 +30,11 @@ import ( var ( // Get a single non-checked out row row from eth_beacon.known_gaps. getKgEntryStmt string = `SELECT start_slot, end_slot FROM eth_beacon.known_gaps - WHERE checked_out=false + WHERE checked_out=false AND end_slot >= $1 ORDER BY priority ASC LIMIT 1;` // Used to periodically check to see if there is a new entry in the eth_beacon.known_gaps table. - checkKgEntryStmt string = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=false;` + checkKgEntryStmt string = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=false AND end_slot >= $1;` // Used to checkout a row from the eth_beacon.known_gaps table lockKgEntryStmt string = `UPDATE eth_beacon.known_gaps SET checked_out=true, checked_out_by=$3 @@ -58,10 +58,10 @@ type KnownGapsProcessing struct { } // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error { +func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int, minimumSlot uint64) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} - errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed) + errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed, minimumSlot) log.Debug("Exiting known gaps processing service") return errs } @@ -78,8 +78,8 @@ func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error } // Get a single row of historical slots from the table. -func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) +func (kgp KnownGapsProcessing) getSlotRange(ctx context.Context, slotCh chan<- slotsToProcess, minimumSlot uint64) []error { + return getBatchProcessRow(ctx, kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier), minimumSlot) } // Remove the table entry.