From 66fc40a37a7fabb00dcb5ad2791cfa7ad90ffc47 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Thu, 9 Jun 2022 14:53:35 -0400 Subject: [PATCH] Update and test the shutdown --- .github/workflows/system-tests.yml | 7 +- cmd/full.go | 118 +++++++++++++++++++ example.ipld-ethcl-indexer-config.json | 2 +- internal/shutdown/shutdown.go | 27 +++++ pkg/beaconclient/capturehistoric.go | 3 - pkg/beaconclient/databasewrite.go | 34 ++++-- pkg/beaconclient/processhistoric.go | 4 +- pkg/beaconclient/processknowngaps.go | 2 +- pkg/beaconclient/processslot.go | 157 ++++++++++++++----------- 9 files changed, 263 insertions(+), 91 deletions(-) create mode 100644 cmd/full.go diff --git a/.github/workflows/system-tests.yml b/.github/workflows/system-tests.yml index fbdc324..f0d7aae 100644 --- a/.github/workflows/system-tests.yml +++ b/.github/workflows/system-tests.yml @@ -26,7 +26,7 @@ env: db_user: vdbm db_password: password db_driver: "pgx" - DOCKER_HOST: 127.0.0.1:2375 + #DOCKER_HOST: 127.0.0.1:2375 jobs: system-testing: @@ -62,7 +62,8 @@ jobs: - name: Run docker compose id: compose run: | - docker-compose \ + ls "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" + sudo docker-compose \ -f "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ --env-file ./config.sh \ up -d --build @@ -85,7 +86,7 @@ jobs: - name: Clean up the docker containers if: steps.compose.outcome == 'success' run: | - docker-compose \ + sudo docker-compose \ -f "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ --env-file ./config.sh \ down -v diff --git a/cmd/full.go b/cmd/full.go new file mode 100644 index 0000000..17b80cd --- /dev/null +++ b/cmd/full.go @@ -0,0 +1,118 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "fmt" + "strconv" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" + "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" + "golang.org/x/sync/errgroup" +) + +// fullCmd represents the full command +var fullCmd = &cobra.Command{ + Use: "full", + Short: "Capture all components of the application (head and historical)", + Long: `Capture all components of the application (head and historical`, + Run: func(cmd *cobra.Command, args []string) { + startFullProcessing() + }, +} + +func init() { + captureCmd.AddCommand(fullCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // fullCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // fullCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +// Start the application to track at head and historical processing. +func startFullProcessing() { + // Boot the application + log.Info("Starting the application in head tracking mode.") + ctx := context.Background() + + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb")) + if err != nil { + StopApplicationPreBoot(err, Db) + } + + if viper.GetBool("pm.metrics") { + addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port")) + serveProm(addr) + } + + log.Info("The Beacon Client has booted successfully!") + // Capture head blocks + go Bc.CaptureHead() + + hpContext, hpCancel := context.WithCancel(context.Background()) + + errG, _ := errgroup.WithContext(context.Background()) + errG.Go(func() error { + errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + if len(errs) != 0 { + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") + return fmt.Errorf("Application ended because there were too many error when attempting to process historic") + } + } + return nil + }) + kgCtx, KgCancel := context.WithCancel(context.Background()) + if viper.GetBool("kg.processKnownGaps") { + go func() { + errG := new(errgroup.Group) + errG.Go(func() error { + errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + if len(errs) != 0 { + log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") + return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") + } + return nil + }) + if err := errG.Wait(); err != nil { + loghelper.LogError(err).Error("Error with knownGaps processing") + } + }() + } + + // Shutdown when the time is right. + err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + if err != nil { + loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + } else { + log.Info("Gracefully shutdown ipld-ethcl-indexer") + } + +} diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 96c2c6f..e0ac947 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -8,7 +8,7 @@ "driver": "PGX" }, "bc": { - "address": "localhost", + "address": "10.203.8.51", "port": 5052, "type": "lighthouse", "bootRetryInterval": 30, diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 742a100..87c5fc4 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -81,6 +81,33 @@ func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context. }) } +// Shutdown the head and historical processing +func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "beaconClient": func(ctx context.Context) error { + defer DB.Close() + err := BC.StopHistoric(hpCancel) + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing historic") + } + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing(kgCancel) + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + } + err = BC.StopHeadTracking() + if err != nil { + loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") + } + + return err + }, + }) + +} + // Wrapper function for shutting down the application in boot mode. func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 6b7ab40..b6d2655 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -162,9 +162,6 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, log.Debug("Waiting for shutdown signal from channel") select { case <-ctx.Done(): - close(workCh) - close(processedCh) - close(errCh) log.Debug("Received shutdown signal from channel") return nil case errs := <-finalErrCh: diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index c2a978f..8bf202c 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -480,7 +480,11 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric } if maxSlot != firstSlot-1 { if maxSlot < firstSlot-1 { - writeKnownGaps(db, tableIncrement, maxSlot+1, firstSlot-1, fmt.Errorf(""), "startup", metric) + if maxSlot == 0 { + writeKnownGaps(db, tableIncrement, maxSlot, firstSlot-1, fmt.Errorf(""), "startup", metric) + } else { + writeKnownGaps(db, tableIncrement, maxSlot+1, firstSlot-1, fmt.Errorf(""), "startup", metric) + } } else { log.WithFields(log.Fields{ "maxSlot": maxSlot, @@ -537,7 +541,7 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo // Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block // and ethcl.beacon_state. If the slot exists, return true -func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) { +func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) { var ( isInBeaconState bool isInSignedBeaconBlock bool @@ -545,18 +549,28 @@ func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string ) errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot) - if err != nil { - loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state") + select { + case <-ctx.Done(): + return nil + default: + isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state") + } + return err } - return err }) errG.Go(func() error { - isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot) - if err != nil { - loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block") + select { + case <-ctx.Done(): + return nil + default: + isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot) + if err != nil { + loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block") + } + return err } - return err }) if err := errG.Wait(); err != nil { return false, err diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index f8d54a3..ecfff30 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -82,7 +82,7 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess // "un"-checkout the rows held by this DB in the ethcl.historical_process table. func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error { - go func() { cancel() }() + cancel() log.Debug("Updating all the entries to ethcl.historical processing") log.Debug("Db: ", hp.db) log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier) @@ -107,7 +107,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- return case slot := <-workCh: log.Debug("Handling slot: ", slot) - err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb) + err, errProcess := handleHistoricSlot(ctx, db, serverAddress, slot, metrics, checkDb) if err != nil { errMs := batchHistoricError{ err: err, diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 79f2d53..4b4a55c 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -122,7 +122,7 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe // Updated checked_out column for the uniqueNodeIdentifier. func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error { - go func() { cancel() }() + cancel() log.Debug("Updating all the entries to ethcl.known_gaps") log.Debug("Db: ", kgp.db) log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier) diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index f40d7d1..ffb3b80 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -77,88 +77,103 @@ type ProcessSlot struct { // This function will do all the work to process the slot and write it to the DB. // It will return the error and error process. The error process is used for providing reach detail to the // known_gaps table. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) { - ps := &ProcessSlot{ - Slot: slot, - BlockRoot: blockRoot, - StateRoot: stateRoot, - HeadOrHistoric: headOrHistoric, - Db: db, - Metrics: metrics, - } +func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) { + select { + case <-ctx.Done(): + return nil, "" + default: + ps := &ProcessSlot{ + Slot: slot, + BlockRoot: blockRoot, + StateRoot: stateRoot, + HeadOrHistoric: headOrHistoric, + Db: db, + Metrics: metrics, + } - g, _ := errgroup.WithContext(context.Background()) - vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) + g, _ := errgroup.WithContext(context.Background()) + vUnmarshalerCh := make(chan *dt.VersionedUnmarshaler, 1) - // Get the BeaconState. - g.Go(func() error { - err := ps.getBeaconState(serverAddress, vUnmarshalerCh) + // Get the BeaconState. + g.Go(func() error { + select { + case <-ctx.Done(): + return nil + default: + err := ps.getBeaconState(serverAddress, vUnmarshalerCh) + if err != nil { + return err + } + return nil + } + }) + + // Get the SignedBeaconBlock. + g.Go(func() error { + select { + case <-ctx.Done(): + return nil + default: + err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh) + if err != nil { + return err + } + return nil + } + }) + + if err := g.Wait(); err != nil { + return err, "processSlot" + } + + finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() if err != nil { - return err + return err, "CalculateBlockRoot" + } + if checkDb { + inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) + if err != nil { + return err, "checkDb" + } + if inDb { + log.WithField("slot", slot).Info("Slot already in the DB.") + return nil, "" + } } - return nil - }) - // Get the SignedBeaconBlock. - g.Go(func() error { - err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh) + // Get this object ready to write + dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash) if err != nil { - return err + return err, "blockRoot" } - return nil - }) - - if err := g.Wait(); err != nil { - return err, "processSlot" - } - - finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash() - if err != nil { - return err, "CalculateBlockRoot" - } - if checkDb { - inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) + // Write the object to the DB. + defer func() { + err := dw.Tx.Rollback(dw.Ctx) + if err != nil && err != pgx.ErrTxClosed { + loghelper.LogError(err).Error("We were unable to Rollback a transaction") + } + }() + err = dw.transactFullSlot() if err != nil { - return err, "checkDb" + return err, "processSlot" } - if inDb { - log.WithField("slot", slot).Info("Slot already in the DB.") - return nil, "" + + // Handle any reorgs or skipped slots. + headOrHistoric = strings.ToLower(headOrHistoric) + if headOrHistoric != "head" && headOrHistoric != "historic" { + return fmt.Errorf("headOrHistoric must be either historic or head!"), "" } - } - - // Get this object ready to write - dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash) - if err != nil { - return err, "blockRoot" - } - // Write the object to the DB. - defer func() { - err := dw.Tx.Rollback(dw.Ctx) - if err != nil && err != pgx.ErrTxClosed { - loghelper.LogError(err).Error("We were unable to Rollback a transaction") + if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { + ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) } - }() - err = dw.transactFullSlot() - if err != nil { - return err, "processSlot" - } - // Handle any reorgs or skipped slots. - headOrHistoric = strings.ToLower(headOrHistoric) - if headOrHistoric != "head" && headOrHistoric != "historic" { - return fmt.Errorf("headOrHistoric must be either historic or head!"), "" - } - if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { - ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement) - } + // Commit the transaction + if err = dw.Tx.Commit(dw.Ctx); err != nil { + return err, "transactionCommit" + } - // Commit the transaction - if err = dw.Tx.Commit(dw.Ctx); err != nil { - return err, "transactionCommit" + return nil, "" } - - return nil, "" } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. @@ -167,15 +182,15 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) + err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) { - return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb) +func handleHistoricSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) { + return processFullSlot(ctx, db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb) } // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.