diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index c9ad8d1..1c988f7 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -206,4 +206,4 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - args: --timeout 90s + args: --timeout 90s --disable deadcode diff --git a/README.md b/README.md index cf285e2..2b5ab8b 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ To run the application, do as follows: 2. Run the start up command. ``` -go run -race main.go capture head --db.address localhost \ +go run -race main.go capture historic --db.address localhost \ --db.password password \ --db.port 8076 \ --db.username vdbm \ @@ -45,11 +45,14 @@ go run -race main.go capture head --db.address localhost \ --db.driver PGX \ --bc.address localhost \ --bc.port 5052 \ + --bc.maxHistoricProcessWorker 2 \ + --bc.maxKnownGapsWorker 2 \ + --bc.knownGapsProcess=true \ --bc.connectionProtocol http \ - --t.skipSync=true \ - --log.level info \ + --t.skipSync=false \ + --log.level debug \ --log.output=true \ - --kg.increment 100 + --kg.increment 1000000 ``` ## Running Tests diff --git a/cmd/capture.go b/cmd/capture.go index 5916bfa..c6bf8e5 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -25,21 +25,24 @@ import ( ) var ( - dbUsername string - dbPassword string - dbName string - dbAddress string - dbDriver string - dbPort int - bcAddress string - bcPort int - bcBootRetryInterval int - bcBootMaxRetry int - bcConnectionProtocol string - bcType string - maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second - notifierCh chan os.Signal = make(chan os.Signal, 1) - testDisregardSync bool + dbUsername string + dbPassword string + dbName string + dbAddress string + dbDriver string + dbPort int + bcAddress string + bcPort int + bcBootRetryInterval int + bcBootMaxRetry int + bcConnectionProtocol string + bcType string + bcIsProcessKnownGaps bool + bcMaxHistoricProcessWorker int + bcMaxKnownGapsWorker int + maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second + notifierCh chan os.Signal = make(chan os.Signal, 1) + testDisregardSync bool ) // captureCmd represents the capture command @@ -84,6 +87,9 @@ func init() { captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") + captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcMaxKnownGapsWorker, "bc.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().BoolVar(&bcIsProcessKnownGaps, "bc.knownGapsProcess", false, "Should we process entries from the knownGaps table as they occur?") err = captureCmd.MarkPersistentFlagRequired("bc.address") exitErr(err) err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -104,10 +110,10 @@ func init() { exitErr(err) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) - err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) - exitErr(err) // Testing Specific + err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) + exitErr(err) err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver")) exitErr(err) @@ -124,6 +130,14 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) exitErr(err) + err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry")) + exitErr(err) + err = viper.BindPFlag("bc.knownGapsProcess", captureCmd.PersistentFlags().Lookup("bc.knownGapsProcess")) + exitErr(err) + err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) + exitErr(err) + err = viper.BindPFlag("bc.maxKnownGapsWorker", captureCmd.PersistentFlags().Lookup("bc.maxKnownGapsWorker")) + exitErr(err) // Here you will define your flags and configuration settings. } diff --git a/cmd/head.go b/cmd/head.go index 2f360bf..67392a7 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "fmt" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -25,6 +26,7 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" + "golang.org/x/sync/errgroup" ) var ( @@ -56,6 +58,20 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks go Bc.CaptureHead() + if bcIsProcessKnownGaps { + errG := new(errgroup.Group) + errG.Go(func() error { + errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker) + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") + return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + } + return nil + }) + if err := errG.Wait(); err != nil { + loghelper.LogError(err).Error("Error with knownGaps processing") + } + } // Shutdown when the time is right. err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) diff --git a/cmd/historic.go b/cmd/historic.go index c7fb93f..8618ff9 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "fmt" "os" log "github.com/sirupsen/logrus" @@ -26,6 +27,7 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" + "golang.org/x/sync/errgroup" ) // historicCmd represents the historic command @@ -49,12 +51,35 @@ func startHistoricProcessing() { if err != nil { StopApplicationPreBoot(err, Db) } - errs := Bc.CaptureHistoric(2) - if errs != nil { - log.WithFields(log.Fields{ - "TotalErrors": errs, - }).Error("The historical processing service ended after receiving too many errors.") + + errG, _ := errgroup.WithContext(context.Background()) + + errG.Go(func() error { + errs := Bc.CaptureHistoric(bcMaxHistoricProcessWorker) + if len(errs) != 0 { + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") + return fmt.Errorf("Application ended because there were too many error when attempting to process historic") + } + } + return nil + }) + + if bcIsProcessKnownGaps { + errG.Go(func() error { + errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker) + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") + return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + } + return nil + }) } + if err := errG.Wait(); err != nil { + loghelper.LogError(err).Error("Error with knownGaps processing") + } + + log.Debug("WE ARE AT CHECKPOINT") // Shutdown when the time is right. err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index f210952..422ebd2 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -78,7 +78,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser // Start workers for w := 1; w <= maxWorkers; w++ { - log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting historic processing workers") + log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) } diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 23a6785..0a288c3 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -26,8 +26,8 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") - //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") - //validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") + validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") + validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") //validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index d044f88..a559eea 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -419,3 +419,20 @@ func calculateEpoch(slot int, slotPerEpoch int) string { epoch := slot / slotPerEpoch return strconv.Itoa(epoch) } + +// A helper function to check to see if the slot is processed. +func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) { + processRow, err := db.Exec(context.Background(), checkProcessStmt, slot) + if err != nil { + return false, err + } + row, err := processRow.RowsAffected() + if err != nil { + return false, err + } + + if row > 0 { + return true, nil + } + return false, nil +} diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 7a853ce..2ba3bd4 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -31,18 +31,19 @@ import ( ) var ( - // Get a single highest priority and non-checked out row. + // Get a single highest priority and non-checked out row row from ethcl.historical_process getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process WHERE checked_out=false ORDER BY priority ASC LIMIT 1;` - // Used to periodically check to see if there is a new entry in the ethcl_historic_process table. - checkHpEntryStmt string = `INSERT INTO ethcl.historic_process (start_slot, end_slot) VALUES ($1, $2) ON CONFLICT (start_slot, end_slot) DO NOTHING;` - // Used to get the highest priority row that is not checked out, and to check it out within the ethcl.historic_process table. + // Used to periodically check to see if there is a new entry in the ethcl.historic_process table. + checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;` + // Used to checkout a row from the ethcl.historic_process table lockHpEntryStmt string = `UPDATE ethcl.historic_process SET checked_out=true WHERE start_slot=$1 AND end_slot=$2;` - deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process + // Used to delete an entry from the knownGaps table + deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process WHERE start_slot=$1 AND end_slot=$2;` ) @@ -58,7 +59,7 @@ func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error // Remove the table entry. func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { - return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteSlotsEntryStmt) + return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt) } // Remove the table entry. @@ -93,7 +94,15 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { errCount := make([]error, 0) + // 5 is an arbitrary number. It allows us to retry a few times before + // ending the application. + prevErrCount := 0 for len(errCount) < 5 { + if len(errCount) != prevErrCount { + log.WithFields(log.Fields{ + "errCount": errCount, + }).Error("New error entry added") + } processRow, err := db.Exec(context.Background(), checkNewRowsStmt) if err != nil { errCount = append(errCount, err) @@ -103,13 +112,11 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow errCount = append(errCount, err) } if row < 1 { - time.Sleep(100 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) + log.Debug("We are checking rows, be patient") continue } - log.Debug("Found a row, going to start processing.") - log.WithFields(log.Fields{ - "ErrCount": errCount, - }).Debug("The ErrCounter") + log.Debug("We found a new row") ctx := context.Background() // Setup TX @@ -160,7 +167,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow errCount = append(errCount, err) continue } - if rows != 1 { + if rows == 0 { loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ "rowsReturn": rows, }).Error("We did not lock a single row.") @@ -183,13 +190,13 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow // After a row has been processed it should be removed from its appropriate table. func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { - errCh := make(chan error, 0) + errCh := make(chan error) for { slots := <-processCh // Make sure the start and end slot exist in the slots table. go func() { finishedProcess := false - for finishedProcess == false { + for !finishedProcess { isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) if err != nil { errCh <- err @@ -214,20 +221,3 @@ func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, chec } } } - -// A helper function to check to see if the slot is processed. -func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) { - processRow, err := db.Exec(context.Background(), checkProcessStmt, slot) - if err != nil { - return false, err - } - row, err := processRow.RowsAffected() - if err != nil { - return false, err - } - - if row > 0 { - return true, nil - } - return false, nil -} diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go new file mode 100644 index 0000000..b727122 --- /dev/null +++ b/pkg/beaconclient/processknowngaps.go @@ -0,0 +1,76 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// This file contains all the code to process historic slots. + +package beaconclient + +import ( + "strconv" + + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +var ( + // Get a single non-checked out row row from ethcl.known_gaps. + getKgEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps + WHERE checked_out=false + LIMIT 1;` + // Used to periodically check to see if there is a new entry in the ethcl.known_gaps table. + checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;` + // Used to checkout a row from the ethcl.known_gaps table + lockKgEntryStmt string = `UPDATE ethcl.known_gaps + SET checked_out=true + WHERE start_slot=$1 AND end_slot=$2;` + // Used to delete an entry from the knownGaps table + deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps + WHERE start_slot=$1 AND end_slot=$2;` +) + +type knownGapsProcessing struct { + db sql.Database + metrics *BeaconClientMetrics +} + +// This function will perform all the heavy lifting for tracking the head of the chain. +func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { + log.Info("We are starting the known gaps processing service.") + hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics} + errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + log.Debug("Exiting known gaps processing service") + return errs +} + +// Get a single row of historical slots from the table. +func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { + return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh) +} + +// Remove the table entry. +func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { + return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) +} + +// Remove the table entry. +func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { + for { + errMs := <-errMessages + loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) + writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) + } +}