Testing for Batch Processing #56

Merged
abdulrabbani00 merged 30 commits from feature/48-test-historical into develop 2022-06-09 21:32:46 +00:00
9 changed files with 263 additions and 91 deletions
Showing only changes of commit 66fc40a37a - Show all commits

View File

@ -26,7 +26,7 @@ env:
db_user: vdbm
db_password: password
db_driver: "pgx"
DOCKER_HOST: 127.0.0.1:2375
#DOCKER_HOST: 127.0.0.1:2375
jobs:
system-testing:
@ -62,7 +62,8 @@ jobs:
- name: Run docker compose
id: compose
run: |
docker-compose \
ls "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml"
sudo docker-compose \
-f "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
--env-file ./config.sh \
up -d --build
@ -85,7 +86,7 @@ jobs:
- name: Clean up the docker containers
if: steps.compose.outcome == 'success'
run: |
docker-compose \
sudo docker-compose \
-f "./stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \
--env-file ./config.sh \
down -v

118
cmd/full.go Normal file
View File

@ -0,0 +1,118 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
// fullCmd represents the full command
var fullCmd = &cobra.Command{
Use: "full",
Short: "Capture all components of the application (head and historical)",
Long: `Capture all components of the application (head and historical`,
Run: func(cmd *cobra.Command, args []string) {
startFullProcessing()
},
}
func init() {
captureCmd.AddCommand(fullCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// fullCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// fullCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
// Start the application to track at head and historical processing.
func startFullProcessing() {
// Boot the application
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
if err != nil {
StopApplicationPreBoot(err, Db)
}
if viper.GetBool("pm.metrics") {
addr := viper.GetString("pm.address") + ":" + strconv.Itoa(viper.GetInt("pm.port"))
serveProm(addr)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
return fmt.Errorf("Application ended because there were too many error when attempting to process historic")
}
}
return nil
})
kgCtx, KgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
}()
}
// Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
}

View File

@ -8,7 +8,7 @@
"driver": "PGX"
},
"bc": {
"address": "localhost",
"address": "10.203.8.51",
"port": 5052,
"type": "lighthouse",
"bootRetryInterval": 30,

View File

@ -81,6 +81,33 @@ func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.
})
}
// Shutdown the head and historical processing
func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error {
defer DB.Close()
err := BC.StopHistoric(hpCancel)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(kgCancel)
if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps")
}
}
err = BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err
},
})
}
// Wrapper function for shutting down the application in boot mode.
func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{

View File

@ -162,9 +162,6 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
log.Debug("Waiting for shutdown signal from channel")
select {
case <-ctx.Done():
close(workCh)
close(processedCh)
close(errCh)
log.Debug("Received shutdown signal from channel")
return nil
case errs := <-finalErrCh:

View File

@ -480,7 +480,11 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
}
if maxSlot != firstSlot-1 {
if maxSlot < firstSlot-1 {
if maxSlot == 0 {
writeKnownGaps(db, tableIncrement, maxSlot, firstSlot-1, fmt.Errorf(""), "startup", metric)
} else {
writeKnownGaps(db, tableIncrement, maxSlot+1, firstSlot-1, fmt.Errorf(""), "startup", metric)
}
} else {
log.WithFields(log.Fields{
"maxSlot": maxSlot,
@ -537,7 +541,7 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo
// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block
// and ethcl.beacon_state. If the slot exists, return true
func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
func IsSlotInDb(ctx context.Context, db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
var (
isInBeaconState bool
isInSignedBeaconBlock bool
@ -545,18 +549,28 @@ func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string
)
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state")
}
return err
}
})
errG.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot)
if err != nil {
loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block")
}
return err
}
})
if err := errG.Wait(); err != nil {
return false, err

View File

@ -82,7 +82,7 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
// "un"-checkout the rows held by this DB in the ethcl.historical_process table.
func (hp HistoricProcessing) releaseDbLocks(cancel context.CancelFunc) error {
go func() { cancel() }()
cancel()
log.Debug("Updating all the entries to ethcl.historical processing")
log.Debug("Db: ", hp.db)
log.Debug("hp.uniqueNodeIdentifier ", hp.uniqueNodeIdentifier)
@ -107,7 +107,7 @@ func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<-
return
case slot := <-workCh:
log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
err, errProcess := handleHistoricSlot(ctx, db, serverAddress, slot, metrics, checkDb)
if err != nil {
errMs := batchHistoricError{
err: err,

View File

@ -122,7 +122,7 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
// Updated checked_out column for the uniqueNodeIdentifier.
func (kgp KnownGapsProcessing) releaseDbLocks(cancel context.CancelFunc) error {
go func() { cancel() }()
cancel()
log.Debug("Updating all the entries to ethcl.known_gaps")
log.Debug("Db: ", kgp.db)
log.Debug("kgp.uniqueNodeIdentifier ", kgp.uniqueNodeIdentifier)

View File

@ -77,7 +77,11 @@ type ProcessSlot struct {
// This function will do all the work to process the slot and write it to the DB.
// It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
select {
case <-ctx.Done():
return nil, ""
default:
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
@ -92,20 +96,30 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
// Get the BeaconState.
g.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
err := ps.getBeaconState(serverAddress, vUnmarshalerCh)
if err != nil {
return err
}
return nil
}
})
// Get the SignedBeaconBlock.
g.Go(func() error {
select {
case <-ctx.Done():
return nil
default:
err := ps.getSignedBeaconBlock(serverAddress, vUnmarshalerCh)
if err != nil {
return err
}
return nil
}
})
if err := g.Wait(); err != nil {
@ -117,7 +131,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err, "CalculateBlockRoot"
}
if checkDb {
inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
if err != nil {
return err, "checkDb"
}
@ -160,6 +174,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return nil, ""
}
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
@ -167,15 +182,15 @@ func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot
if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
}
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
}
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
func handleHistoricSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
return processFullSlot(ctx, db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.