diff --git a/.github/workflows/generic-testing.yml b/.github/workflows/generic-testing.yml index d5f906b..3a9b32d 100644 --- a/.github/workflows/generic-testing.yml +++ b/.github/workflows/generic-testing.yml @@ -3,22 +3,22 @@ on: workflow_call: inputs: stack-orchestrator-ref: - required: false + required: true type: string ipld-eth-beacon-db-ref: - required: false + required: true type: string ssz-data-ref: - required: false + required: true type: string secrets: GHA_KEY: required: true env: - stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || '7fb664270a0ba09e2caa3095e8c91f3fdb5b38af' }} - ipld-eth-beacon-db-ref: ${{ inputs.ipld-eth-beacon-db-ref || '3dfe416302d553f8240f6051c08a7899b0e39e12' }} - ssz-data-ref: ${{ inputs.ssz-data-ref || 'main' }} + stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref }} + ipld-eth-beacon-db-ref: ${{ inputs.ipld-eth-beacon-db-ref }} + ssz-data-ref: ${{ inputs.ssz-data-ref }} GOPATH: /tmp/go jobs: build: @@ -66,7 +66,7 @@ jobs: run: | until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done cat ./HEALTH - if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi + if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi unit-test: name: Run Unit Tests diff --git a/.github/workflows/on-publish.yml b/.github/workflows/on-pr-publish.yml similarity index 58% rename from .github/workflows/on-publish.yml rename to .github/workflows/on-pr-publish.yml index 7ed6b38..41b08fe 100644 --- a/.github/workflows/on-publish.yml +++ b/.github/workflows/on-pr-publish.yml @@ -2,9 +2,50 @@ name: Publish Docker image on: release: types: [published, edited] + workflow_dispatch: + inputs: + stack-orchestrator-ref: + description: "The branch, commit or sha from stack-orchestrator to checkout" + required: false + default: "7fb664270a0ba09e2caa3095e8c91f3fdb5b38af" + ipld-eth-beacon-db-ref: + description: "The branch, commit or sha from ipld-eth-beacon-db to checkout" + required: false + default: "3dfe416302d553f8240f6051c08a7899b0e39e12" + ssz-data-ref: + description: "The branch, commit or sha from ssz-data to checkout" + required: false + default: "main" + pull_request: + paths: + - "!**.md" + - "!.gitignore" + - "!LICENSE" + - "!.github/workflows/**" + - ".github/workflows/on-pr.yml" + - ".github/workflows/tests.yml" + - "**" + schedule: + - cron: '0 13 * * *' # Must be single quotes!! jobs: + pre_job: + # continue-on-error: true # Uncomment once integration is finished + runs-on: ubuntu-latest + # Map a step output to a job output + outputs: + should_skip: ${{ steps.skip_check.outputs.should_skip }} + steps: + - id: skip_check + uses: fkirc/skip-duplicate-actions@v4 + with: + # All of these options are optional, so you can remove them if you are happy with the defaults + concurrent_skipping: "never" + skip_after_successful_duplicate: "true" + do_not_skip: '["workflow_dispatch", "schedule"]' trigger-tests: uses: ./.github/workflows/generic-testing.yml + if: ${{ needs.pre_job.outputs.should_skip != 'true' }} + needs: pre_job with: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }} @@ -12,6 +53,8 @@ jobs: secrets: GHA_KEY: ${{secrets.GHA_KEY}} system-testing: + if: ${{ needs.pre_job.outputs.should_skip != 'true' }} + needs: pre_job uses: ./.github/workflows/system-tests.yml with: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} @@ -25,6 +68,11 @@ jobs: needs: - trigger-tests - system-testing + if: | + always() && + (needs.trigger-tests.result == 'success' || needs.trigger-tests.result == 'skipped') && + (needs.system-testing.result == 'success' || needs.system-testing.result == 'skipped') && + github.event_name == 'release' steps: - uses: actions/checkout@v2 - name: Get the version @@ -42,6 +90,10 @@ jobs: name: Push Docker image to Docker Hub runs-on: ubuntu-latest needs: build + if: | + always() && + (needs.build.result == 'success') && + github.event_name == 'release' steps: - name: Get the version id: vars diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml deleted file mode 100644 index f3254a0..0000000 --- a/.github/workflows/on-pr.yml +++ /dev/null @@ -1,47 +0,0 @@ -name: Test Application On PR - -on: - workflow_dispatch: - inputs: - stack-orchestrator-ref: - description: "The branch, commit or sha from stack-orchestrator to checkout" - required: false - default: "main" - ipld-eth-beacon-db-ref: - description: "The branch, commit or sha from ipld-eth-beacon-db to checkout" - required: false - default: "main" - ssz-data-ref: - description: "The branch, commit or sha from ssz-data to checkout" - required: false - default: "main" - pull_request: - paths: - - "!**.md" - - "!.gitignore" - - "!LICENSE" - - "!.github/workflows/**" - - ".github/workflows/on-pr.yml" - - ".github/workflows/tests.yml" - - "**" - schedule: - - cron: '0 13 * * *' # Must be single quotes!! - -jobs: - trigger-tests: - if: github.event_name != 'schedule' - uses: ./.github/workflows/generic-testing.yml - with: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} - ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }} - ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} - secrets: - GHA_KEY: ${{secrets.GHA_KEY}} - system-testing: - uses: ./.github/workflows/system-tests.yml - with: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} - ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }} - secrets: - GHA_KEY: ${{secrets.GHA_KEY}} - BC_ADDRESS: ${{secrets.BC_ADDRESS}} diff --git a/Makefile b/Makefile index 7f9f925..ca856ca 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,7 @@ integration-test-ci: go fmt ./... $(GINKGO) -r --label-filter integration \ --procs=4 --compilers=4 \ + --flake-attempts=3 \ --randomize-all --randomize-suites \ --fail-on-pending --keep-going \ --cover --coverprofile=cover.profile \ @@ -76,7 +77,7 @@ unit-test-ci: go vet ./... go fmt ./... $(GINKGO) -r --label-filter unit \ - --randomize-all --randomize-suites + --randomize-all --randomize-suites \ --flake-attempts=3 \ --fail-on-pending --keep-going \ --cover --coverprofile=cover.profile \ @@ -88,6 +89,7 @@ system-test-ci: go fmt ./... $(GINKGO) -r --label-filter system \ --randomize-all --randomize-suites \ + --flake-attempts=3 \ --fail-on-pending --keep-going \ --cover --coverprofile=cover.profile \ --trace --json-report=report.json diff --git a/cmd/boot.go b/cmd/boot.go index 63e15b9..c40f8b2 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -18,7 +18,10 @@ package cmd import ( "context" + "fmt" + "net/http" "os" + "strconv" "syscall" log "github.com/sirupsen/logrus" @@ -60,6 +63,12 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") diff --git a/cmd/capture.go b/cmd/capture.go index f5693a6..6d48027 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -39,6 +39,7 @@ var ( bcConnectionProtocol string bcType string bcMaxHistoricProcessWorker int + bcMaxHeadProcessWorker int bcUniqueNodeIdentifier int bcCheckDb bool kgMaxWorker int @@ -50,6 +51,8 @@ var ( maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second notifierCh chan os.Signal = make(chan os.Signal, 1) testDisregardSync bool + isTestPprof bool + testPprofPort int ) // captureCmd represents the capture command @@ -94,7 +97,8 @@ func init() { captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") - captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") // err = captureCmd.MarkPersistentFlagRequired("bc.address") @@ -105,7 +109,7 @@ func init() { //// Known Gaps specific captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.") captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") - captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") // Prometheus Specific captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.") @@ -114,6 +118,8 @@ func init() { //// Testing Specific captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?") + captureCmd.PersistentFlags().BoolVar(&isTestPprof, "t.pprof", false, "Should we start pprof?") + captureCmd.PersistentFlags().IntVar(&testPprofPort, "t.pprofPort", 6060, "What port should we export pprof at?") // Bind Flags with Viper //// DB Flags @@ -133,6 +139,10 @@ func init() { //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) exitErr(err) + err = viper.BindPFlag("t.pprof", captureCmd.PersistentFlags().Lookup("t.pprof")) + exitErr(err) + err = viper.BindPFlag("t.pprofPort", captureCmd.PersistentFlags().Lookup("t.pprofPort")) + exitErr(err) //// LH specific err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address")) @@ -149,6 +159,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) + err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker")) + exitErr(err) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) exitErr(err) err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) diff --git a/cmd/full.go b/cmd/full.go index 0c4b9d2..1ba83a9 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -19,6 +19,7 @@ package cmd import ( "context" "fmt" + "net/http" "strconv" log "github.com/sirupsen/logrus" @@ -58,7 +59,7 @@ func init() { func startFullProcessing() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -74,13 +75,11 @@ func startFullProcessing() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - go Bc.CaptureHead() - - hpContext, hpCancel := context.WithCancel(context.Background()) + go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -89,12 +88,11 @@ func startFullProcessing() { } return nil }) - kgCtx, KgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -107,8 +105,14 @@ func startFullProcessing() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. - err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/cmd/head.go b/cmd/head.go index ba70f8c..1aea4ea 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -46,7 +46,7 @@ var headCmd = &cobra.Command{ func startHeadTracking() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -62,13 +62,13 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - go Bc.CaptureHead() - kgCtx, KgCancel := context.WithCancel(context.Background()) + go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) + if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -81,14 +81,19 @@ func startHeadTracking() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. - err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { log.Info("Gracefully shutdown ipld-eth-beacon-indexer") } - } func init() { diff --git a/cmd/historic.go b/cmd/historic.go index 1c6b653..78797d8 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -22,6 +22,9 @@ import ( "os" "strconv" + "net/http" + _ "net/http/pprof" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -46,7 +49,7 @@ var historicCmd = &cobra.Command{ func startHistoricProcessing() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -60,11 +63,9 @@ func startHistoricProcessing() { serveProm(addr) } - hpContext, hpCancel := context.WithCancel(context.Background()) - errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -74,12 +75,11 @@ func startHistoricProcessing() { return nil }) - kgContext, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -92,8 +92,14 @@ func startHistoricProcessing() { }() } + if viper.GetBool("t.pprof") { + go func() { + log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil)) + }() + } + // Shutdown when the time is right. - err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/config/cicd/boot.ipld-eth-beacon-indexer.json b/config/cicd/boot.ipld-eth-beacon-indexer.json index b10cc13..a042f7b 100644 --- a/config/cicd/boot.ipld-eth-beacon-indexer.json +++ b/config/cicd/boot.ipld-eth-beacon-indexer.json @@ -19,7 +19,9 @@ "checkDb": true }, "t": { - "skipSync": true + "skipSync": true, + "pprof": true, + "pprofPort": 6060 }, "log": { "level": "debug", diff --git a/config/example.ipld-eth-beacon-indexer-config.json b/config/example.ipld-eth-beacon-indexer-config.json index 7481284..b41b32c 100644 --- a/config/example.ipld-eth-beacon-indexer-config.json +++ b/config/example.ipld-eth-beacon-indexer-config.json @@ -19,7 +19,9 @@ "checkDb": true }, "t": { - "skipSync": true + "skipSync": true, + "pprof": true, + "pprofPort": 6060 }, "log": { "level": "debug", diff --git a/go.mod b/go.mod index f3082d4..efb8ad3 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/urfave/cli/v2 v2.3.0 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect + go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect diff --git a/go.sum b/go.sum index 249cbfc..0e5dee9 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 13181b4..47f62d1 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -40,68 +40,66 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t } // Wrapper function for shutting down the head tracking process. -func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHeadTracking() - if err != nil { - loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") - } + cancel() + BC.StopHeadTracking(ctx, false) if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err := BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") + return err } } - return err + return nil }, }) } // Wrapper function for shutting down the head tracking process. -func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHistoric(hpCancel) + cancel() + err := BC.StopHistoricProcess(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err = BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") + return err } } - return err + return nil }, }) } // Shutdown the head and historical processing -func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHistoric(hpCancel) + cancel() + err := BC.StopHistoricProcess(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err = BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") } } - err = BC.StopHeadTracking() - if err != nil { - loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") - } - + BC.StopHeadTracking(ctx, false) return err }, }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 97d83af..eb56d10 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -56,23 +56,23 @@ var ( BC *beaconclient.BeaconClient err error ctx context.Context + cancel context.CancelFunc notifierCh chan os.Signal ) var _ = Describe("Shutdown", func() { BeforeEach(func() { - ctx = context.Background() + ctx, cancel = context.WithCancel(context.Background()) BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) - Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { + Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { - _, cancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") @@ -85,7 +85,6 @@ var _ = Describe("Shutdown", func() { shutdownCh := make(chan bool) //log.SetLevel(log.DebugLevel) go func() { - _, cancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") @@ -120,7 +119,6 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - _, cancel := context.WithCancel(context.Background()) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 4f959e3..066c455 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -76,8 +76,8 @@ type BeaconClient struct { type SseEvents[P ProcessedEvents] struct { Endpoint string // The endpoint for the subscription. Primarily used for logging MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel - ErrorCh chan *SseError // Contains any errors while SSE streaming occurred - ProcessCh chan *P // Used to capture processed data in its proper struct. + ErrorCh chan SseError // Contains any errors while SSE streaming occurred + ProcessCh chan P // Used to capture processed data in its proper struct. SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream } @@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve endpoint := baseEndpoint + path sseEvents := &SseEvents[P]{ Endpoint: endpoint, - MessagesCh: make(chan *sse.Event, 1), - ErrorCh: make(chan *SseError), - ProcessCh: make(chan *P), + MessagesCh: make(chan *sse.Event), + ErrorCh: make(chan SseError), + ProcessCh: make(chan P, 10), SseClient: func(endpoint string) *sse.Client { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") return sse.NewClient(endpoint) diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index a0b6e6b..5ab00e2 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -18,42 +18,32 @@ package beaconclient import ( - "time" + "context" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHead() { +func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) { log.Info("We are tracking the head of the chain.") - go bc.handleHead() - go bc.handleReorg() - bc.captureEventTopic() + go bc.handleHead(ctx, maxHeadWorkers) + go bc.handleReorg(ctx) + bc.captureEventTopic(ctx, skipSee) } // Stop the head tracking service. -func (bc *BeaconClient) StopHeadTracking() error { - log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") - chHead := make(chan bool) - chReorg := make(chan bool) - - go bc.HeadTracking.finishProcessingChannel(chHead) - go bc.ReOrgTracking.finishProcessingChannel(chReorg) - - <-chHead - <-chReorg - log.Info("Successfully stopped the head tracking service.") - return nil -} - -// This function closes the SSE subscription, but waits until the MessagesCh is empty -func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { - loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") - se.SseClient.Unsubscribe(se.MessagesCh) - for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 { - time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) +func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) { + select { + case <-ctx.Done(): + if !skipSee { + bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh) + bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh) + log.Info("Successfully unsubscribed to SSE client") + close(bc.ReOrgTracking.MessagesCh) + close(bc.HeadTracking.MessagesCh) + } + log.Info("Successfully stopped the head tracking service.") + default: + log.Error("The context has not completed....") } - loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") - finish <- true } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 1c569df..dd5b1e0 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "path/filepath" + "runtime" "strconv" "sync/atomic" "time" @@ -265,70 +266,121 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Context("Correctly formatted Phase0 Block", func() { - It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") + It("Should process it successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["100"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) }) }) - Context("Correctly formatted Altair Block", func() { - It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + Context("Correctly formatted Altair Block", Label("leak-head"), func() { + It("Should process it successfully.", func() { + log.SetLevel(log.DebugLevel) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) }) }) - Context("Correctly formatted Altair Test Blocks", func() { + Context("Correctly formatted Altair Test Blocks", Label("correct-test-altairs"), func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") - BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) - defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) }) }) - Context("Correctly formatted Phase0 Test Blocks", func() { + Context("Correctly formatted Phase0 Test Blocks", Label("correct-test-phase0"), func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) bc = setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) - defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) + + //bc = setUpTest(BeaconNodeTester.TestConfig, "99") + //BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + //defer httpmock.DeactivateAndReset() + //BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) + }) }) - Context("Two consecutive correct blocks", func() { + Context("Two consecutive correct blocks", Label("bug"), func() { It("Should handle both blocks correctly, without any reorgs or known_gaps", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["100"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }, headBlocksSent{ + head: BeaconNodeTester.TestEvents["101"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) + }) }) - Context("Two consecutive blocks with a bad parent", func() { + Context("Two consecutive blocks with a bad parent", Label("bad-parent"), func() { It("Should add the previous block to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 1, 1, + headBlocksSent{ + head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, + expectedEpoch: 3, + expectStatus: "forked", + }, + headBlocksSent{ + head: BeaconNodeTester.TestEvents["101"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) + }) }) Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { @@ -348,10 +400,15 @@ var _ = Describe("Capturehead", Label("head"), func() { //}) Context("When the proper SSZ objects are not served", func() { It("Should return an error, and add the slot to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "101") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) + + bc := setUpTest(BeaconNodeTester.TestConfig, "101") + BeaconNodeTester.testProcessBlock(bc, maxRetry, 0, 1, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, + expectedEpoch: 3, + expectStatus: "proposed", + }) knownGapCount := countKnownGapsTable(bc.Db) Expect(knownGapCount).To(Equal(1)) @@ -359,6 +416,7 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end := queryKnownGaps(bc.Db, "102", "102") Expect(start).To(Equal(102)) Expect(end).To(Equal(102)) + }) }) }) @@ -366,9 +424,10 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() { Context("There is a gap at start up within one incrementing range.", func() { It("Should add only a single entry to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "10") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "10") BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "11", "99") Expect(start).To(Equal(11)) @@ -377,9 +436,10 @@ var _ = Describe("Capturehead", Label("head"), func() { }) Context("There is a gap at start up spanning multiple incrementing range.", func() { It("Should add multiple entries to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "5") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "5") BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "6", "16") @@ -391,11 +451,12 @@ var _ = Describe("Capturehead", Label("head"), func() { Expect(end).To(Equal(99)) }) }) - Context("Gaps between two head messages", func() { + Context("Gaps between two head messages", Label("gap-head"), func() { It("Should add the slots in-between", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) start, end := queryKnownGaps(bc.Db, "101", "1000101") @@ -412,33 +473,37 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { Context("Altair: Multiple head messages for the same slot.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) }) }) Context("Phase0: Multiple head messages for the same slot.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) }) }) Context("Phase 0: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) }) }) Context("Altair: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) }) }) @@ -523,18 +588,25 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient } // Wrapper function to send a head message to the beaconclient -func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) { - - data, err := json.Marshal(head) - Expect(err).ToNot(HaveOccurred()) +func sendHeadMessage(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessfulInserts uint64, head ...beaconclient.Head) { + var ( + data []byte + err error + ) startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts) - bc.HeadTracking.MessagesCh <- &sse.Event{ - ID: []byte{}, - Data: data, - Event: []byte{}, - Retry: []byte{}, + for _, ms := range head { + data, err = json.Marshal(ms) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(1 * time.Second) + bc.HeadTracking.MessagesCh <- &sse.Event{ + ID: []byte{}, + Data: data, + Event: []byte{}, + Retry: []byte{}, + } } + curRetry := 0 for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts { time.Sleep(1 * time.Second) @@ -832,13 +904,13 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) log.Info("Sending Messages to BeaconClient") - sendHeadMessage(bc, firstHead, maxRetry, 1) - sendHeadMessage(bc, secondHead, maxRetry, 1) - sendHeadMessage(bc, thirdHead, maxRetry, 1) + sendHeadMessage(bc, maxRetry, 3, firstHead, secondHead, thirdHead) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 { @@ -890,13 +962,22 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs validateSlot(bc, secondHead, epoch, "proposed") validateSlot(bc, thirdHead, epoch, "forked") + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test to validate a single block was processed correctly -func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { - go bc.CaptureHead() +func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64, head ...headBlocksSent) { + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) - sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) + heads := make([]beaconclient.Head, 0) + for _, msgs := range head { + heads = append(heads, msgs.head) + } + sendHeadMessage(bc, maxRetry, expectedSuccessInsert, heads...) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps { @@ -912,23 +993,28 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { - Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps)) + Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.ReorgInserts, expectedReorgs)) } } if expectedSuccessInsert > 0 { - validateSlot(bc, head, epoch, "proposed") + for _, msg := range head { + validateSlot(bc, msg.head, msg.expectedEpoch, msg.expectStatus) + } } + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) - sendHeadMessage(bc, firstHead, maxRetry, 1) - sendHeadMessage(bc, secondHead, maxRetry, 1) + sendHeadMessage(bc, maxRetry, 2, firstHead, secondHead) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 { @@ -946,18 +1032,22 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH log.Info("Checking Altair to make sure the fork was marked properly.") validateSlot(bc, firstHead, epoch, "forked") validateSlot(bc, secondHead, epoch, "proposed") + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement - go bc.CaptureHead() + + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) - for _, headMsg := range msg { - sendHeadMessage(bc, headMsg, maxRetry, 1) - } + sendHeadMessage(bc, maxRetry, 1, msg...) curRetry := 0 for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries { @@ -975,6 +1065,8 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 { Fail("We found reorgs when we didn't expect it") } + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. @@ -991,3 +1083,22 @@ func testSszRoot(msg Message) { Expect(err).ToNot(HaveOccurred()) Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:]))) } + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) { + bc.StopHeadTracking(ctx, skipSse) + + time.Sleep(3 * time.Second) + endNum := runtime.NumGoroutine() + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + log.WithField("startNum", startGoRoutines).Info("Start Go routine number") + log.WithField("endNum", endNum).Info("End Go routine number") + //Expect(endNum <= startGoRoutines).To(BeTrue()) + Expect(endNum).To(Equal(startGoRoutines)) +} + +type headBlocksSent struct { + head beaconclient.Head + expectedEpoch int + expectStatus string +} diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 2bf6dfc..9627965 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -37,14 +37,18 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { - log.Info("We are stopping the historical processing service.") - cancel() - err := bc.HistoricalProcess.releaseDbLocks() - if err != nil { - loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") +func (bc *BeaconClient) StopHistoricProcess(ctx context.Context) error { + select { + case <-ctx.Done(): + log.Info("We are stopping the historical processing service.") + err := bc.HistoricalProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") + } + return nil + default: + return fmt.Errorf("Tried to stop historic before the context ended...") } - return nil } // An interface to enforce any batch processing. Currently there are two use cases for this. @@ -93,9 +97,9 @@ type batchHistoricError struct { // 5. Handle any errors. func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { slotsCh := make(chan slotsToProcess) - workCh := make(chan int) - processedCh := make(chan slotsToProcess) - errCh := make(chan batchHistoricError) + workCh := make(chan int, 5) + processedCh := make(chan slotsToProcess, 5) + errCh := make(chan batchHistoricError, 5) finalErrCh := make(chan []error, 1) // Checkout Rows with same node Identifier. @@ -116,6 +120,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, for { select { case <-ctx.Done(): + close(workCh) + close(processedCh) return case slots := <-slotsCh: if slots.startSlot > slots.endSlot { @@ -164,8 +170,9 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... if errs != nil { finalErrCh <- errs + } else { + finalErrCh <- nil } - finalErrCh <- nil log.Debug("We are stopping the processing of adding new entries") }() log.Debug("Waiting for shutdown signal from channel") diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 5571a7e..c644129 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -3,6 +3,7 @@ package beaconclient_test import ( "context" "fmt" + "runtime" "sync/atomic" "time" @@ -24,17 +25,18 @@ var _ = Describe("Capturehistoric", func() { Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() { It("Successfully Process the Blocks", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) // Run Two seperate processes BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0) - time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + }) }) Context("When the start block is greater than the endBlock", func() { @@ -70,11 +72,12 @@ var _ = Describe("Capturehistoric", func() { }) }) Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() { - Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() { + Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() { It("Successfully Process the Blocks", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101) BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0) // Run Two seperate processes @@ -83,6 +86,7 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + }) }) Context("When the start block is greater than the endBlock", func() { @@ -107,13 +111,18 @@ var _ = Describe("Capturehistoric", func() { }) }) Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() { - Context("When it recieves a head, historic and known Gaps message (in order)", func() { + Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) // Historical BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) @@ -125,19 +134,25 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + }) }) Context("When it recieves a historic, head and known Gaps message (in order)", func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Historical BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) // Known Gaps BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) @@ -149,9 +164,10 @@ var _ = Describe("Capturehistoric", func() { }) Context("When it recieves a known Gaps, historic and head message (in order)", func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Known Gaps BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0) @@ -161,7 +177,11 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ + head: BeaconNodeTester.TestEvents["2375703"].HeadMessage, + expectedEpoch: 74240, + expectStatus: "proposed", + }) time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) @@ -200,25 +220,23 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli // Start the CaptureHistoric function, and check for the correct inserted slots. func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHistoric(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - log.Debug("Calling the stop function for historical processing..") - err := bc.StopHistoric(cancel) - time.Sleep(5 * time.Second) - Expect(err).ToNot(HaveOccurred()) - validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) + cancel() + testStopHistoricProcessing(ctx, bc, startGoRoutines) } // Wrapper function that processes knownGaps func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.ProcessKnownGaps(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - err := bc.StopKnownGapsProcessing(cancel) - time.Sleep(5 * time.Second) - Expect(err).ToNot(HaveOccurred()) - validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt) + cancel() + testStopKnownGapProcessing(ctx, bc, startGoRoutines) } func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { @@ -288,3 +306,37 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) { Expect(err).ToNot(HaveOccurred()) Expect(rows).To(Equal(int64(0))) } + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { + log.Debug("Calling the stop function for historical processing..") + err := bc.StopHistoricProcess(ctx) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(5 * time.Second) + validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) + + time.Sleep(5 * time.Second) + endNum := runtime.NumGoroutine() + + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //Expect(endNum <= startGoRoutines).To(BeTrue()) + log.WithField("startNum", startGoRoutines).Info("Start Go routine number") + log.WithField("endNum", endNum).Info("End Go routine number") + Expect(endNum).To(Equal(startGoRoutines)) +} + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { + log.Debug("Calling the stop function for knownGaps processing..") + err := bc.StopKnownGapsProcessing(ctx) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(5 * time.Second) + validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt) + + time.Sleep(3 * time.Second) + endNum := runtime.NumGoroutine() + + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //Expect(endNum <= startGoRoutines).To(BeTrue()) + Expect(endNum).To(Equal(startGoRoutines)) +} diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index cdb4891..0ebd1d5 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -18,44 +18,37 @@ package beaconclient import ( + "context" "encoding/json" "time" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" - "golang.org/x/sync/errgroup" -) - -var ( - shutdownWaitInterval = time.Duration(5) * time.Second ) // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) { - go func() { - errG := new(errgroup.Group) - errG.Go(func() error { - err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) +func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) { + if !skipSse { + for { + err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) if err != nil { - return err + loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{ + "err": err}).Error("We are unable to subscribe to the SSE endpoint") + time.Sleep(3 * time.Second) + continue } - return nil - }) - if err := errG.Wait(); err != nil { - log.WithFields(log.Fields{ - "err": err, - "endpoint": eventHandler.Endpoint, - }).Error("Unable to subscribe to the SSE endpoint.") - return - } else { loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") + break } + } - }() for { select { + case <-ctx.Done(): + close(eventHandler.ProcessCh) + return case message := <-eventHandler.MessagesCh: // Message can be nil if its a keep-alive message if len(message.Data) != 0 { @@ -76,23 +69,24 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe } // Turn the data object into a Struct. -func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { +func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) { var msgMarshaled P err := json.Unmarshal(msg, &msgMarshaled) if err != nil { loghelper.LogError(err).Error("Unable to parse message") - errorCh <- &SseError{ + errorCh <- SseError{ err: err, msg: msg, } return } - processCh <- &msgMarshaled + processCh <- msgMarshaled + log.Debug("Done sending") } // Capture all of the event topics. -func (bc *BeaconClient) captureEventTopic() { +func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) - go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError) + go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse) + go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse) } diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 8dd5520..f95cb31 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -19,59 +19,102 @@ package beaconclient import ( + "context" "fmt" "strconv" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql" ) // This function will perform the necessary steps to handle a reorg. -func (bc *BeaconClient) handleReorg() { +func (bc *BeaconClient) handleReorg(ctx context.Context) { log.Info("Starting to process reorgs.") for { - reorg := <-bc.ReOrgTracking.ProcessCh - log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") - writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) + select { + case <-ctx.Done(): + return + case reorg := <-bc.ReOrgTracking.ProcessCh: + log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") + writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) + } } } // This function will handle the latest head event. -func (bc *BeaconClient) handleHead() { +func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { log.Info("Starting to process head.") + + workCh := make(chan workParams, 5) + log.WithField("workerNumber", maxWorkers).Info("Creating Workers") + for i := 1; i < maxWorkers; i++ { + go bc.headBlockProcessor(ctx, workCh) + } errorSlots := 0 for { - head := <-bc.HeadTracking.ProcessCh - // Process all the work here. - slot, err := strconv.Atoi(head.Slot) - if err != nil { - bc.HeadTracking.ErrorCh <- &SseError{ - err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), + select { + case <-ctx.Done(): + close(workCh) + return + case head := <-bc.HeadTracking.ProcessCh: + + // Process all the work here. + slot, err := strconv.Atoi(head.Slot) + if err != nil { + bc.HeadTracking.ErrorCh <- SseError{ + err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), + } + errorSlots = errorSlots + 1 + continue } - errorSlots = errorSlots + 1 - continue + if errorSlots != 0 && bc.PreviousSlot != 0 { + log.WithFields(log.Fields{ + "lastProcessedSlot": bc.PreviousSlot, + "errorSlots": errorSlots, + }).Warn("We added slots to the knownGaps table because we got bad head messages.") + writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics) + errorSlots = 0 + } + + log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") + + // Not used anywhere yet but might be useful to have. + if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" { + bc.StartingSlot = slot + } + + workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb} + log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh") + + // Update the previous block + bc.PreviousSlot = slot + bc.PreviousBlockRoot = head.Block } - if errorSlots != 0 && bc.PreviousSlot != 0 { - log.WithFields(log.Fields{ - "lastProcessedSlot": bc.PreviousSlot, - "errorSlots": errorSlots, - }).Warn("We added slots to the knownGaps table because we got bad head messages.") - writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics) - errorSlots = 0 - } - - log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") - - // Not used anywhere yet but might be useful to have. - if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" { - bc.StartingSlot = slot - } - - go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) - - log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") - - // Update the previous block - bc.PreviousSlot = slot - bc.PreviousBlockRoot = head.Block } } + +// A worker that will process head slots. +func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) { + for { + select { + case <-ctx.Done(): + return + case wp := <-workCh: + processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb) + } + } +} + +// A struct used to pass parameters to the worker. +type workParams struct { + db sql.Database + serverEndpoint string + slot int + blockRoot string + stateRoot string + previousSlot int + previousBlockRoot string + metrics *BeaconClientMetrics + knownGapsTableIncrement int + checkDb bool +} diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index c520e41..8b52fc9 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -132,6 +132,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm for len(errCount) < 5 { select { case <-ctx.Done(): + close(slotCh) return errCount default: if len(errCount) != prevErrCount { @@ -228,7 +229,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm // After a row has been processed it should be removed from its appropriate table. func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { - errCh := make(chan error) + errCh := make(chan error, 1) for { select { case <-ctx.Done(): @@ -241,25 +242,28 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan "endSlot": slots.endSlot, }).Debug("Starting to check to see if the following slots have been processed") for { - isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) - if err != nil { - errCh <- err + select { + case <-ctx.Done(): + return + default: + isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) + if err != nil { + errCh <- err + } + isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + if isStartProcess && isEndProcess { + _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + return + } + time.Sleep(3 * time.Second) } - isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err - } - if isStartProcess && isEndProcess { - break - } - time.Sleep(3 * time.Second) } - - _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err - } - }() if len(errCh) != 0 { return <-errCh diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 343fc4a..f389e49 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -20,6 +20,7 @@ package beaconclient import ( "context" + "fmt" "strconv" log "github.com/sirupsen/logrus" @@ -67,14 +68,18 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) [] } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { - log.Info("We are stopping the known gaps processing service.") - cancel() - err := bc.KnownGapsProcess.releaseDbLocks() - if err != nil { - loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") +func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error { + select { + case <-ctx.Done(): + log.Info("We are stopping the known gaps processing service.") + err := bc.KnownGapsProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") + } + return nil + default: + return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..") } - return nil } // Get a single row of historical slots from the table. diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 1b8f619..9e09c3a 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -63,12 +63,12 @@ type ProcessSlot struct { PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics. // BeaconBlock - SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock + SszSignedBeaconBlock *[]byte // The entire SSZ encoded SignedBeaconBlock FullSignedBeaconBlock si.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors. // BeaconState FullBeaconState state.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors. - SszBeaconState []byte // The entire SSZ encoded BeaconState + SszBeaconState *[]byte // The entire SSZ encoded BeaconState // DB Write objects DbSlotsModel *DbSlots // The model being written to the slots table. @@ -155,6 +155,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, }) if err := g.Wait(); err != nil { + // Make sure channel is empty. return err, "processSlot" } @@ -234,12 +235,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { +func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { // Get the knownGaps at startUp. if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) + err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } @@ -270,7 +271,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d vm := <-vmCh if rc != 200 { ps.FullSignedBeaconBlock = &wrapper.Phase0SignedBeaconBlock{} - ps.SszSignedBeaconBlock = []byte{} + ps.SszSignedBeaconBlock = &[]byte{} ps.ParentBlockRoot = "" ps.Status = "skipped" return nil @@ -280,7 +281,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d return fmt.Errorf(VersionedUnmarshalerError) } - ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(ps.SszSignedBeaconBlock) + ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(*ps.SszSignedBeaconBlock) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Warn("Unable to process the slots SignedBeaconBlock") return nil @@ -291,23 +292,29 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d // Update the SszBeaconState and FullBeaconState object with their respective values. func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error { - var stateIdentifier string // Used to query the state + var ( + stateIdentifier string // Used to query the state + err error + ) if ps.StateRoot != "" { stateIdentifier = ps.StateRoot } else { stateIdentifier = strconv.Itoa(ps.Slot) } stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier - ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) + ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) + if err != nil { + return fmt.Errorf("Unable to querrySSZ") + } - versionedUnmarshaler, err := dt.FromState(ps.SszBeaconState) + versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(VersionedUnmarshalerError) vmCh <- nil return fmt.Errorf(VersionedUnmarshalerError) } vmCh <- versionedUnmarshaler - ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(ps.SszBeaconState) + ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(*ps.SszBeaconState) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to process the slots BeaconState") return err @@ -356,7 +363,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st status = "proposed" } - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics) + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics) if err != nil { return dw, err } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 5294335..e9bb9e2 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -28,7 +28,7 @@ import ( ) // A helper function to query endpoints that utilize slots. -func querySsz(endpoint string, slot string) ([]byte, int, error) { +func querySsz(endpoint string, slot string) (*[]byte, int, error) { log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") client := &http.Client{} req, err := http.NewRequest("GET", endpoint, nil) @@ -43,13 +43,21 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) } defer response.Body.Close() + // Needed for testing.... But might be interesting to test with... + defer client.CloseIdleConnections() rc := response.StatusCode + + //var body []byte + //io.Copy(body, response.Body) + //bytes.buffer... + //_, err = response.Body.Read(body) body, err := ioutil.ReadAll(response.Body) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } - return body, rc, nil + //log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object") + return &body, rc, nil } // Object to unmarshal the BlockRootResponse diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index 7aabd41..807ea59 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -1,11 +1,13 @@ package beaconclient_test import ( + "context" "os" "strconv" "time" . "github.com/onsi/ginkgo/v2" + //. "github.com/onsi/gomega" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" ) @@ -28,7 +30,7 @@ var ( ) var _ = Describe("Systemvalidation", Label("system"), func() { Describe("Run the application against a running lighthouse node", func() { - Context("When we receive head messages", func() { + Context("When we receive head messages", Label("system-head"), func() { It("We should process the messages successfully", func() { bc := setUpTest(prodConfig, "10000000000") processProdHeadBlocks(bc, 3, 0, 0, 0) @@ -63,7 +65,23 @@ func getEnvInt(envVar string) int { // Start head tracking and wait for the expected results. func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { - go bc.CaptureHead() - time.Sleep(1 * time.Second) + //startGoRoutines := runtime.NumGoroutine() + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx, 2, false) + time.Sleep(5 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) + + cancel() + time.Sleep(4 * time.Second) + testStopSystemHeadTracking(ctx, bc) +} + +// Custom stop for system testing +func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) { + bc.StopHeadTracking(ctx, false) + + time.Sleep(3 * time.Second) + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //Expect(endNum <= startGoRoutines).To(BeTrue()) } diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go index 29fe19e..b5a2f9e 100644 --- a/pkg/gracefulshutdown/gracefulshutdown.go +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -45,7 +45,11 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat // add any other syscalls that you want to be notified with signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - <-notifierCh + // Wait for one or the other... + select { + case <-notifierCh: + case <-ctx.Done(): + } log.Info("Shutting Down your application")