Feature/study application memory #70
14
.github/workflows/generic-testing.yml
vendored
14
.github/workflows/generic-testing.yml
vendored
@ -3,22 +3,22 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
stack-orchestrator-ref:
|
||||
required: false
|
||||
required: true
|
||||
type: string
|
||||
ipld-eth-beacon-db-ref:
|
||||
required: false
|
||||
required: true
|
||||
type: string
|
||||
ssz-data-ref:
|
||||
required: false
|
||||
required: true
|
||||
type: string
|
||||
secrets:
|
||||
GHA_KEY:
|
||||
required: true
|
||||
|
||||
env:
|
||||
stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || '7fb664270a0ba09e2caa3095e8c91f3fdb5b38af' }}
|
||||
ipld-eth-beacon-db-ref: ${{ inputs.ipld-eth-beacon-db-ref || '3dfe416302d553f8240f6051c08a7899b0e39e12' }}
|
||||
ssz-data-ref: ${{ inputs.ssz-data-ref || 'main' }}
|
||||
stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref }}
|
||||
ipld-eth-beacon-db-ref: ${{ inputs.ipld-eth-beacon-db-ref }}
|
||||
ssz-data-ref: ${{ inputs.ssz-data-ref }}
|
||||
GOPATH: /tmp/go
|
||||
jobs:
|
||||
build:
|
||||
@ -66,7 +66,7 @@ jobs:
|
||||
run: |
|
||||
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
|
||||
cat ./HEALTH
|
||||
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi
|
||||
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi
|
||||
|
||||
unit-test:
|
||||
name: Run Unit Tests
|
||||
|
@ -2,9 +2,50 @@ name: Publish Docker image
|
||||
on:
|
||||
release:
|
||||
types: [published, edited]
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
stack-orchestrator-ref:
|
||||
description: "The branch, commit or sha from stack-orchestrator to checkout"
|
||||
required: false
|
||||
default: "7fb664270a0ba09e2caa3095e8c91f3fdb5b38af"
|
||||
ipld-eth-beacon-db-ref:
|
||||
description: "The branch, commit or sha from ipld-eth-beacon-db to checkout"
|
||||
required: false
|
||||
default: "3dfe416302d553f8240f6051c08a7899b0e39e12"
|
||||
ssz-data-ref:
|
||||
description: "The branch, commit or sha from ssz-data to checkout"
|
||||
required: false
|
||||
default: "main"
|
||||
pull_request:
|
||||
paths:
|
||||
- "!**.md"
|
||||
- "!.gitignore"
|
||||
- "!LICENSE"
|
||||
- "!.github/workflows/**"
|
||||
- ".github/workflows/on-pr.yml"
|
||||
- ".github/workflows/tests.yml"
|
||||
- "**"
|
||||
schedule:
|
||||
- cron: '0 13 * * *' # Must be single quotes!!
|
||||
jobs:
|
||||
pre_job:
|
||||
# continue-on-error: true # Uncomment once integration is finished
|
||||
runs-on: ubuntu-latest
|
||||
# Map a step output to a job output
|
||||
outputs:
|
||||
should_skip: ${{ steps.skip_check.outputs.should_skip }}
|
||||
steps:
|
||||
- id: skip_check
|
||||
uses: fkirc/skip-duplicate-actions@v4
|
||||
with:
|
||||
# All of these options are optional, so you can remove them if you are happy with the defaults
|
||||
concurrent_skipping: "never"
|
||||
skip_after_successful_duplicate: "true"
|
||||
do_not_skip: '["workflow_dispatch", "schedule"]'
|
||||
trigger-tests:
|
||||
uses: ./.github/workflows/generic-testing.yml
|
||||
if: ${{ needs.pre_job.outputs.should_skip != 'true' }}
|
||||
needs: pre_job
|
||||
with:
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
|
||||
ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }}
|
||||
@ -12,6 +53,8 @@ jobs:
|
||||
secrets:
|
||||
GHA_KEY: ${{secrets.GHA_KEY}}
|
||||
system-testing:
|
||||
if: ${{ needs.pre_job.outputs.should_skip != 'true' }}
|
||||
needs: pre_job
|
||||
uses: ./.github/workflows/system-tests.yml
|
||||
with:
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
|
||||
@ -25,6 +68,11 @@ jobs:
|
||||
needs:
|
||||
- trigger-tests
|
||||
- system-testing
|
||||
if: |
|
||||
always() &&
|
||||
(needs.trigger-tests.result == 'success' || needs.trigger-tests.result == 'skipped') &&
|
||||
(needs.system-testing.result == 'success' || needs.system-testing.result == 'skipped') &&
|
||||
github.event_name == 'release'
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Get the version
|
||||
@ -42,6 +90,10 @@ jobs:
|
||||
name: Push Docker image to Docker Hub
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
if: |
|
||||
always() &&
|
||||
(needs.build.result == 'success') &&
|
||||
github.event_name == 'release'
|
||||
steps:
|
||||
- name: Get the version
|
||||
id: vars
|
47
.github/workflows/on-pr.yml
vendored
47
.github/workflows/on-pr.yml
vendored
@ -1,47 +0,0 @@
|
||||
name: Test Application On PR
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
stack-orchestrator-ref:
|
||||
description: "The branch, commit or sha from stack-orchestrator to checkout"
|
||||
required: false
|
||||
default: "main"
|
||||
ipld-eth-beacon-db-ref:
|
||||
description: "The branch, commit or sha from ipld-eth-beacon-db to checkout"
|
||||
required: false
|
||||
default: "main"
|
||||
ssz-data-ref:
|
||||
description: "The branch, commit or sha from ssz-data to checkout"
|
||||
required: false
|
||||
default: "main"
|
||||
pull_request:
|
||||
paths:
|
||||
- "!**.md"
|
||||
- "!.gitignore"
|
||||
- "!LICENSE"
|
||||
- "!.github/workflows/**"
|
||||
- ".github/workflows/on-pr.yml"
|
||||
- ".github/workflows/tests.yml"
|
||||
- "**"
|
||||
schedule:
|
||||
- cron: '0 13 * * *' # Must be single quotes!!
|
||||
|
||||
jobs:
|
||||
trigger-tests:
|
||||
if: github.event_name != 'schedule'
|
||||
uses: ./.github/workflows/generic-testing.yml
|
||||
with:
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
|
||||
ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }}
|
||||
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }}
|
||||
secrets:
|
||||
GHA_KEY: ${{secrets.GHA_KEY}}
|
||||
system-testing:
|
||||
uses: ./.github/workflows/system-tests.yml
|
||||
with:
|
||||
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }}
|
||||
ipld-eth-beacon-db-ref: ${{ github.event.inputs.ipld-eth-beacon-db-ref }}
|
||||
secrets:
|
||||
GHA_KEY: ${{secrets.GHA_KEY}}
|
||||
BC_ADDRESS: ${{secrets.BC_ADDRESS}}
|
4
Makefile
4
Makefile
@ -25,6 +25,7 @@ integration-test-ci:
|
||||
go fmt ./...
|
||||
$(GINKGO) -r --label-filter integration \
|
||||
--procs=4 --compilers=4 \
|
||||
--flake-attempts=3 \
|
||||
--randomize-all --randomize-suites \
|
||||
--fail-on-pending --keep-going \
|
||||
--cover --coverprofile=cover.profile \
|
||||
@ -76,7 +77,7 @@ unit-test-ci:
|
||||
go vet ./...
|
||||
go fmt ./...
|
||||
$(GINKGO) -r --label-filter unit \
|
||||
--randomize-all --randomize-suites
|
||||
--randomize-all --randomize-suites \
|
||||
--flake-attempts=3 \
|
||||
--fail-on-pending --keep-going \
|
||||
--cover --coverprofile=cover.profile \
|
||||
@ -88,6 +89,7 @@ system-test-ci:
|
||||
go fmt ./...
|
||||
$(GINKGO) -r --label-filter system \
|
||||
--randomize-all --randomize-suites \
|
||||
--flake-attempts=3 \
|
||||
--fail-on-pending --keep-going \
|
||||
--cover --coverprofile=cover.profile \
|
||||
--trace --json-report=report.json
|
||||
|
@ -18,7 +18,10 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -60,6 +63,12 @@ func bootApp() {
|
||||
notifierCh <- syscall.SIGTERM
|
||||
}()
|
||||
|
||||
if viper.GetBool("t.pprof") {
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
|
||||
}()
|
||||
}
|
||||
|
||||
err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
|
||||
|
@ -39,6 +39,7 @@ var (
|
||||
bcConnectionProtocol string
|
||||
bcType string
|
||||
bcMaxHistoricProcessWorker int
|
||||
bcMaxHeadProcessWorker int
|
||||
bcUniqueNodeIdentifier int
|
||||
bcCheckDb bool
|
||||
kgMaxWorker int
|
||||
@ -50,6 +51,8 @@ var (
|
||||
maxWaitSecondsShutdown time.Duration = time.Duration(20) * time.Second
|
||||
notifierCh chan os.Signal = make(chan os.Signal, 1)
|
||||
testDisregardSync bool
|
||||
isTestPprof bool
|
||||
testPprofPort int
|
||||
)
|
||||
|
||||
// captureCmd represents the capture command
|
||||
@ -94,7 +97,8 @@ func init() {
|
||||
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.")
|
||||
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
|
||||
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
|
||||
// err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
||||
@ -105,7 +109,7 @@ func init() {
|
||||
//// Known Gaps specific
|
||||
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.")
|
||||
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
|
||||
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
|
||||
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
|
||||
|
||||
// Prometheus Specific
|
||||
captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.")
|
||||
@ -114,6 +118,8 @@ func init() {
|
||||
|
||||
//// Testing Specific
|
||||
captureCmd.PersistentFlags().BoolVar(&testDisregardSync, "t.skipSync", false, "Should we disregard the head sync?")
|
||||
captureCmd.PersistentFlags().BoolVar(&isTestPprof, "t.pprof", false, "Should we start pprof?")
|
||||
captureCmd.PersistentFlags().IntVar(&testPprofPort, "t.pprofPort", 6060, "What port should we export pprof at?")
|
||||
|
||||
// Bind Flags with Viper
|
||||
//// DB Flags
|
||||
@ -133,6 +139,10 @@ func init() {
|
||||
//// Testing Specific
|
||||
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("t.pprof", captureCmd.PersistentFlags().Lookup("t.pprof"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("t.pprofPort", captureCmd.PersistentFlags().Lookup("t.pprofPort"))
|
||||
exitErr(err)
|
||||
|
||||
//// LH specific
|
||||
err = viper.BindPFlag("bc.address", captureCmd.PersistentFlags().Lookup("bc.address"))
|
||||
@ -149,6 +159,8 @@ func init() {
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
|
||||
exitErr(err)
|
||||
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))
|
||||
|
20
cmd/full.go
20
cmd/full.go
@ -19,6 +19,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -58,7 +59,7 @@ func init() {
|
||||
func startFullProcessing() {
|
||||
// Boot the application
|
||||
log.Info("Starting the application in head tracking mode.")
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||
@ -74,13 +75,11 @@ func startFullProcessing() {
|
||||
|
||||
log.Info("The Beacon Client has booted successfully!")
|
||||
// Capture head blocks
|
||||
go Bc.CaptureHead()
|
||||
|
||||
hpContext, hpCancel := context.WithCancel(context.Background())
|
||||
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
|
||||
|
||||
errG, _ := errgroup.WithContext(context.Background())
|
||||
errG.Go(func() error {
|
||||
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
|
||||
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
|
||||
if len(errs) != 0 {
|
||||
if len(errs) != 0 {
|
||||
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
|
||||
@ -89,12 +88,11 @@ func startFullProcessing() {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
kgCtx, KgCancel := context.WithCancel(context.Background())
|
||||
if viper.GetBool("kg.processKnownGaps") {
|
||||
go func() {
|
||||
errG := new(errgroup.Group)
|
||||
errG.Go(func() error {
|
||||
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
if len(errs) != 0 {
|
||||
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
|
||||
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
|
||||
@ -107,8 +105,14 @@ func startFullProcessing() {
|
||||
}()
|
||||
}
|
||||
|
||||
if viper.GetBool("t.pprof") {
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
|
||||
}()
|
||||
}
|
||||
|
||||
// Shutdown when the time is right.
|
||||
err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
|
||||
} else {
|
||||
|
17
cmd/head.go
17
cmd/head.go
@ -46,7 +46,7 @@ var headCmd = &cobra.Command{
|
||||
func startHeadTracking() {
|
||||
// Boot the application
|
||||
log.Info("Starting the application in head tracking mode.")
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||
@ -62,13 +62,13 @@ func startHeadTracking() {
|
||||
|
||||
log.Info("The Beacon Client has booted successfully!")
|
||||
// Capture head blocks
|
||||
go Bc.CaptureHead()
|
||||
kgCtx, KgCancel := context.WithCancel(context.Background())
|
||||
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false)
|
||||
|
||||
if viper.GetBool("kg.processKnownGaps") {
|
||||
go func() {
|
||||
errG := new(errgroup.Group)
|
||||
errG.Go(func() error {
|
||||
errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
if len(errs) != 0 {
|
||||
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
|
||||
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
|
||||
@ -81,14 +81,19 @@ func startHeadTracking() {
|
||||
}()
|
||||
}
|
||||
|
||||
if viper.GetBool("t.pprof") {
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
|
||||
}()
|
||||
}
|
||||
|
||||
// Shutdown when the time is right.
|
||||
err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
|
||||
} else {
|
||||
log.Info("Gracefully shutdown ipld-eth-beacon-indexer")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -22,6 +22,9 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
@ -46,7 +49,7 @@ var historicCmd = &cobra.Command{
|
||||
func startHistoricProcessing() {
|
||||
// Boot the application
|
||||
log.Info("Starting the application in head tracking mode.")
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||
@ -60,11 +63,9 @@ func startHistoricProcessing() {
|
||||
serveProm(addr)
|
||||
}
|
||||
|
||||
hpContext, hpCancel := context.WithCancel(context.Background())
|
||||
|
||||
errG, _ := errgroup.WithContext(context.Background())
|
||||
errG.Go(func() error {
|
||||
errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
|
||||
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker"))
|
||||
if len(errs) != 0 {
|
||||
if len(errs) != 0 {
|
||||
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
|
||||
@ -74,12 +75,11 @@ func startHistoricProcessing() {
|
||||
return nil
|
||||
})
|
||||
|
||||
kgContext, kgCancel := context.WithCancel(context.Background())
|
||||
if viper.GetBool("kg.processKnownGaps") {
|
||||
go func() {
|
||||
errG := new(errgroup.Group)
|
||||
errG.Go(func() error {
|
||||
errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker"))
|
||||
if len(errs) != 0 {
|
||||
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
|
||||
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
|
||||
@ -92,8 +92,14 @@ func startHistoricProcessing() {
|
||||
}()
|
||||
}
|
||||
|
||||
if viper.GetBool("t.pprof") {
|
||||
go func() {
|
||||
log.Println(http.ListenAndServe(fmt.Sprint("localhost:"+strconv.Itoa(viper.GetInt("t.pprofPort"))), nil))
|
||||
}()
|
||||
}
|
||||
|
||||
// Shutdown when the time is right.
|
||||
err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
|
||||
} else {
|
||||
|
@ -19,7 +19,9 @@
|
||||
"checkDb": true
|
||||
},
|
||||
"t": {
|
||||
"skipSync": true
|
||||
"skipSync": true,
|
||||
"pprof": true,
|
||||
"pprofPort": 6060
|
||||
},
|
||||
"log": {
|
||||
"level": "debug",
|
||||
|
@ -19,7 +19,9 @@
|
||||
"checkDb": true
|
||||
},
|
||||
"t": {
|
||||
"skipSync": true
|
||||
"skipSync": true,
|
||||
"pprof": true,
|
||||
"pprofPort": 6060
|
||||
},
|
||||
"log": {
|
||||
"level": "debug",
|
||||
|
1
go.mod
1
go.mod
@ -70,6 +70,7 @@ require (
|
||||
github.com/urfave/cli/v2 v2.3.0 // indirect
|
||||
go.opencensus.io v0.23.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/goleak v1.1.12 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.21.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
|
||||
|
3
go.sum
3
go.sum
@ -833,8 +833,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
|
||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
|
||||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
|
@ -40,68 +40,66 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
|
||||
}
|
||||
|
||||
// Wrapper function for shutting down the head tracking process.
|
||||
func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
|
||||
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
|
||||
"beaconClient": func(ctx context.Context) error {
|
||||
defer DB.Close()
|
||||
err := BC.StopHeadTracking()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
|
||||
}
|
||||
cancel()
|
||||
BC.StopHeadTracking(ctx, false)
|
||||
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
|
||||
err = BC.StopKnownGapsProcessing(kgCancel)
|
||||
err := BC.StopKnownGapsProcessing(ctx)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to stop processing known gaps")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Wrapper function for shutting down the head tracking process.
|
||||
func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
|
||||
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
|
||||
"beaconClient": func(ctx context.Context) error {
|
||||
defer DB.Close()
|
||||
err := BC.StopHistoric(hpCancel)
|
||||
cancel()
|
||||
err := BC.StopHistoricProcess(ctx)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to stop processing historic")
|
||||
}
|
||||
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
|
||||
err = BC.StopKnownGapsProcessing(kgCancel)
|
||||
err = BC.StopKnownGapsProcessing(ctx)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to stop processing known gaps")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Shutdown the head and historical processing
|
||||
func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
|
||||
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
|
||||
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
|
||||
"beaconClient": func(ctx context.Context) error {
|
||||
defer DB.Close()
|
||||
err := BC.StopHistoric(hpCancel)
|
||||
cancel()
|
||||
err := BC.StopHistoricProcess(ctx)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to stop processing historic")
|
||||
}
|
||||
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
|
||||
err = BC.StopKnownGapsProcessing(kgCancel)
|
||||
err = BC.StopKnownGapsProcessing(ctx)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to stop processing known gaps")
|
||||
}
|
||||
}
|
||||
err = BC.StopHeadTracking()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
|
||||
}
|
||||
|
||||
BC.StopHeadTracking(ctx, false)
|
||||
return err
|
||||
},
|
||||
})
|
||||
|
@ -56,23 +56,23 @@ var (
|
||||
BC *beaconclient.BeaconClient
|
||||
err error
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
notifierCh chan os.Signal
|
||||
)
|
||||
|
||||
var _ = Describe("Shutdown", func() {
|
||||
BeforeEach(func() {
|
||||
ctx = context.Background()
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
|
||||
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
|
||||
notifierCh = make(chan os.Signal, 1)
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
|
||||
Describe("Run Shutdown Function for head tracking,", Label("integration"), func() {
|
||||
Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() {
|
||||
Context("When Channels are empty,", func() {
|
||||
It("Should Shutdown Successfully.", func() {
|
||||
go func() {
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
log.Debug("Starting shutdown chan")
|
||||
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
||||
log.Debug("We have completed the shutdown...")
|
||||
@ -85,7 +85,6 @@ var _ = Describe("Shutdown", func() {
|
||||
shutdownCh := make(chan bool)
|
||||
//log.SetLevel(log.DebugLevel)
|
||||
go func() {
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
log.Debug("Starting shutdown chan")
|
||||
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
||||
log.Debug("We have completed the shutdown...")
|
||||
@ -120,7 +119,6 @@ var _ = Describe("Shutdown", func() {
|
||||
//log.SetLevel(log.DebugLevel)
|
||||
go func() {
|
||||
log.Debug("Starting shutdown chan")
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
|
||||
log.Debug("We have completed the shutdown...")
|
||||
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
|
||||
|
@ -76,8 +76,8 @@ type BeaconClient struct {
|
||||
type SseEvents[P ProcessedEvents] struct {
|
||||
Endpoint string // The endpoint for the subscription. Primarily used for logging
|
||||
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
|
||||
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
|
||||
ProcessCh chan *P // Used to capture processed data in its proper struct.
|
||||
ErrorCh chan SseError // Contains any errors while SSE streaming occurred
|
||||
ProcessCh chan P // Used to capture processed data in its proper struct.
|
||||
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
|
||||
}
|
||||
|
||||
@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
|
||||
endpoint := baseEndpoint + path
|
||||
sseEvents := &SseEvents[P]{
|
||||
Endpoint: endpoint,
|
||||
MessagesCh: make(chan *sse.Event, 1),
|
||||
ErrorCh: make(chan *SseError),
|
||||
ProcessCh: make(chan *P),
|
||||
MessagesCh: make(chan *sse.Event),
|
||||
ErrorCh: make(chan SseError),
|
||||
ProcessCh: make(chan P, 10),
|
||||
SseClient: func(endpoint string) *sse.Client {
|
||||
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
|
||||
return sse.NewClient(endpoint)
|
||||
|
@ -18,42 +18,32 @@
|
||||
package beaconclient
|
||||
|
||||
import (
|
||||
"time"
|
||||
"context"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
|
||||
)
|
||||
|
||||
// This function will perform all the heavy lifting for tracking the head of the chain.
|
||||
func (bc *BeaconClient) CaptureHead() {
|
||||
func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) {
|
||||
log.Info("We are tracking the head of the chain.")
|
||||
go bc.handleHead()
|
||||
go bc.handleReorg()
|
||||
bc.captureEventTopic()
|
||||
go bc.handleHead(ctx, maxHeadWorkers)
|
||||
go bc.handleReorg(ctx)
|
||||
bc.captureEventTopic(ctx, skipSee)
|
||||
}
|
||||
|
||||
// Stop the head tracking service.
|
||||
func (bc *BeaconClient) StopHeadTracking() error {
|
||||
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
|
||||
chHead := make(chan bool)
|
||||
chReorg := make(chan bool)
|
||||
|
||||
go bc.HeadTracking.finishProcessingChannel(chHead)
|
||||
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
|
||||
|
||||
<-chHead
|
||||
<-chReorg
|
||||
log.Info("Successfully stopped the head tracking service.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// This function closes the SSE subscription, but waits until the MessagesCh is empty
|
||||
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
|
||||
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
|
||||
se.SseClient.Unsubscribe(se.MessagesCh)
|
||||
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
|
||||
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
|
||||
func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if !skipSee {
|
||||
bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh)
|
||||
bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh)
|
||||
log.Info("Successfully unsubscribed to SSE client")
|
||||
close(bc.ReOrgTracking.MessagesCh)
|
||||
close(bc.HeadTracking.MessagesCh)
|
||||
}
|
||||
log.Info("Successfully stopped the head tracking service.")
|
||||
default:
|
||||
log.Error("The context has not completed....")
|
||||
}
|
||||
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
|
||||
finish <- true
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -265,70 +266,121 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
|
||||
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
|
||||
Context("Correctly formatted Phase0 Block", func() {
|
||||
It("Should turn it into a struct successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
It("Should process it successfully.", func() {
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey)
|
||||
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
|
||||
|
||||
})
|
||||
})
|
||||
Context("Correctly formatted Altair Block", func() {
|
||||
It("Should turn it into a struct successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
Context("Correctly formatted Altair Block", Label("leak-head"), func() {
|
||||
It("Should process it successfully.", func() {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey)
|
||||
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
|
||||
})
|
||||
})
|
||||
Context("Correctly formatted Altair Test Blocks", func() {
|
||||
Context("Correctly formatted Altair Test Blocks", Label("correct-test-altairs"), func() {
|
||||
It("Should turn it into a struct successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
bc = setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
})
|
||||
})
|
||||
Context("Correctly formatted Phase0 Test Blocks", func() {
|
||||
Context("Correctly formatted Phase0 Test Blocks", Label("correct-test-phase0"), func() {
|
||||
It("Should turn it into a struct successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
bc = setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
//bc = setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
//BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
//defer httpmock.DeactivateAndReset()
|
||||
//BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
Context("Two consecutive correct blocks", func() {
|
||||
Context("Two consecutive correct blocks", Label("bug"), func() {
|
||||
It("Should handle both blocks correctly, without any reorgs or known_gaps", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
}, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
})
|
||||
})
|
||||
Context("Two consecutive blocks with a bad parent", func() {
|
||||
Context("Two consecutive blocks with a bad parent", Label("bad-parent"), func() {
|
||||
It("Should add the previous block to the knownGaps table.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 1, 1,
|
||||
headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "forked",
|
||||
},
|
||||
headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
})
|
||||
})
|
||||
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
|
||||
@ -348,10 +400,15 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
//})
|
||||
Context("When the proper SSZ objects are not served", func() {
|
||||
It("Should return an error, and add the slot to the knownGaps table.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0)
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 0, 1, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage,
|
||||
expectedEpoch: 3,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
knownGapCount := countKnownGapsTable(bc.Db)
|
||||
Expect(knownGapCount).To(Equal(1))
|
||||
@ -359,6 +416,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
start, end := queryKnownGaps(bc.Db, "102", "102")
|
||||
Expect(start).To(Equal(102))
|
||||
Expect(end).To(Equal(102))
|
||||
|
||||
})
|
||||
})
|
||||
})
|
||||
@ -366,9 +424,10 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() {
|
||||
Context("There is a gap at start up within one incrementing range.", func() {
|
||||
It("Should add only a single entry to the knownGaps table.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
|
||||
BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
|
||||
start, end := queryKnownGaps(bc.Db, "11", "99")
|
||||
Expect(start).To(Equal(11))
|
||||
@ -377,9 +436,10 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
})
|
||||
Context("There is a gap at start up spanning multiple incrementing range.", func() {
|
||||
It("Should add multiple entries to the knownGaps table.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
|
||||
BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
|
||||
|
||||
start, end := queryKnownGaps(bc.Db, "6", "16")
|
||||
@ -391,11 +451,12 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
Expect(end).To(Equal(99))
|
||||
})
|
||||
})
|
||||
Context("Gaps between two head messages", func() {
|
||||
Context("Gaps between two head messages", Label("gap-head"), func() {
|
||||
It("Should add the slots in-between", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage)
|
||||
|
||||
start, end := queryKnownGaps(bc.Db, "101", "1000101")
|
||||
@ -412,33 +473,37 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
|
||||
Context("Altair: Multiple head messages for the same slot.", func() {
|
||||
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
|
||||
})
|
||||
})
|
||||
Context("Phase0: Multiple head messages for the same slot.", func() {
|
||||
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
||||
})
|
||||
})
|
||||
Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
|
||||
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
|
||||
})
|
||||
})
|
||||
Context("Altair: Multiple reorgs have occurred on this slot", func() {
|
||||
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
|
||||
})
|
||||
})
|
||||
@ -523,18 +588,25 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient
|
||||
}
|
||||
|
||||
// Wrapper function to send a head message to the beaconclient
|
||||
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) {
|
||||
|
||||
data, err := json.Marshal(head)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
func sendHeadMessage(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessfulInserts uint64, head ...beaconclient.Head) {
|
||||
|
||||
var (
|
||||
data []byte
|
||||
err error
|
||||
)
|
||||
startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
|
||||
bc.HeadTracking.MessagesCh <- &sse.Event{
|
||||
ID: []byte{},
|
||||
Data: data,
|
||||
Event: []byte{},
|
||||
Retry: []byte{},
|
||||
for _, ms := range head {
|
||||
data, err = json.Marshal(ms)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
time.Sleep(1 * time.Second)
|
||||
bc.HeadTracking.MessagesCh <- &sse.Event{
|
||||
ID: []byte{},
|
||||
Data: data,
|
||||
Event: []byte{},
|
||||
Retry: []byte{},
|
||||
}
|
||||
}
|
||||
|
||||
curRetry := 0
|
||||
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
|
||||
time.Sleep(1 * time.Second)
|
||||
@ -832,13 +904,13 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
|
||||
// Helper function to test three reorg messages. There are going to be many functions like this,
|
||||
// Because we need to test the same logic for multiple phases.
|
||||
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
|
||||
go bc.CaptureHead()
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
log.Info("Sending Messages to BeaconClient")
|
||||
sendHeadMessage(bc, firstHead, maxRetry, 1)
|
||||
sendHeadMessage(bc, secondHead, maxRetry, 1)
|
||||
sendHeadMessage(bc, thirdHead, maxRetry, 1)
|
||||
sendHeadMessage(bc, maxRetry, 3, firstHead, secondHead, thirdHead)
|
||||
|
||||
curRetry := 0
|
||||
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
|
||||
@ -890,13 +962,22 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
|
||||
validateSlot(bc, secondHead, epoch, "proposed")
|
||||
validateSlot(bc, thirdHead, epoch, "forked")
|
||||
|
||||
cancel()
|
||||
testStopHeadTracking(ctx, bc, startGoRoutines, true)
|
||||
}
|
||||
|
||||
// A test to validate a single block was processed correctly
|
||||
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
|
||||
go bc.CaptureHead()
|
||||
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64, head ...headBlocksSent) {
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
time.Sleep(1 * time.Second)
|
||||
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
|
||||
heads := make([]beaconclient.Head, 0)
|
||||
for _, msgs := range head {
|
||||
heads = append(heads, msgs.head)
|
||||
}
|
||||
sendHeadMessage(bc, maxRetry, expectedSuccessInsert, heads...)
|
||||
|
||||
curRetry := 0
|
||||
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
|
||||
@ -912,23 +993,28 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
|
||||
time.Sleep(1 * time.Second)
|
||||
curRetry = curRetry + 1
|
||||
if curRetry == maxRetry {
|
||||
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
|
||||
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.ReorgInserts, expectedReorgs))
|
||||
}
|
||||
}
|
||||
|
||||
if expectedSuccessInsert > 0 {
|
||||
validateSlot(bc, head, epoch, "proposed")
|
||||
for _, msg := range head {
|
||||
validateSlot(bc, msg.head, msg.expectedEpoch, msg.expectStatus)
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
testStopHeadTracking(ctx, bc, startGoRoutines, true)
|
||||
}
|
||||
|
||||
// A test that ensures that if two HeadMessages occur for a single slot they are marked
|
||||
// as proposed and forked correctly.
|
||||
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
|
||||
go bc.CaptureHead()
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
sendHeadMessage(bc, firstHead, maxRetry, 1)
|
||||
sendHeadMessage(bc, secondHead, maxRetry, 1)
|
||||
sendHeadMessage(bc, maxRetry, 2, firstHead, secondHead)
|
||||
|
||||
curRetry := 0
|
||||
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
|
||||
@ -946,18 +1032,22 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
|
||||
log.Info("Checking Altair to make sure the fork was marked properly.")
|
||||
validateSlot(bc, firstHead, epoch, "forked")
|
||||
validateSlot(bc, secondHead, epoch, "proposed")
|
||||
cancel()
|
||||
testStopHeadTracking(ctx, bc, startGoRoutines, true)
|
||||
}
|
||||
|
||||
// A test that ensures that if two HeadMessages occur for a single slot they are marked
|
||||
// as proposed and forked correctly.
|
||||
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
|
||||
bc.KnownGapTableIncrement = tableIncrement
|
||||
go bc.CaptureHead()
|
||||
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
for _, headMsg := range msg {
|
||||
sendHeadMessage(bc, headMsg, maxRetry, 1)
|
||||
}
|
||||
sendHeadMessage(bc, maxRetry, 1, msg...)
|
||||
|
||||
curRetry := 0
|
||||
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
|
||||
@ -975,6 +1065,8 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
|
||||
if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
|
||||
Fail("We found reorgs when we didn't expect it")
|
||||
}
|
||||
cancel()
|
||||
testStopHeadTracking(ctx, bc, startGoRoutines, true)
|
||||
}
|
||||
|
||||
// This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState.
|
||||
@ -991,3 +1083,22 @@ func testSszRoot(msg Message) {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:])))
|
||||
}
|
||||
|
||||
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
|
||||
func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) {
|
||||
bc.StopHeadTracking(ctx, skipSse)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
endNum := runtime.NumGoroutine()
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
|
||||
log.WithField("endNum", endNum).Info("End Go routine number")
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
Expect(endNum).To(Equal(startGoRoutines))
|
||||
}
|
||||
|
||||
type headBlocksSent struct {
|
||||
head beaconclient.Head
|
||||
expectedEpoch int
|
||||
expectStatus string
|
||||
}
|
||||
|
@ -37,14 +37,18 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
|
||||
}
|
||||
|
||||
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
||||
func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
|
||||
log.Info("We are stopping the historical processing service.")
|
||||
cancel()
|
||||
err := bc.HistoricalProcess.releaseDbLocks()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
|
||||
func (bc *BeaconClient) StopHistoricProcess(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("We are stopping the historical processing service.")
|
||||
err := bc.HistoricalProcess.releaseDbLocks()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("Tried to stop historic before the context ended...")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// An interface to enforce any batch processing. Currently there are two use cases for this.
|
||||
@ -93,9 +97,9 @@ type batchHistoricError struct {
|
||||
// 5. Handle any errors.
|
||||
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
|
||||
slotsCh := make(chan slotsToProcess)
|
||||
workCh := make(chan int)
|
||||
processedCh := make(chan slotsToProcess)
|
||||
errCh := make(chan batchHistoricError)
|
||||
workCh := make(chan int, 5)
|
||||
processedCh := make(chan slotsToProcess, 5)
|
||||
errCh := make(chan batchHistoricError, 5)
|
||||
finalErrCh := make(chan []error, 1)
|
||||
|
||||
// Checkout Rows with same node Identifier.
|
||||
@ -116,6 +120,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(workCh)
|
||||
close(processedCh)
|
||||
return
|
||||
case slots := <-slotsCh:
|
||||
if slots.startSlot > slots.endSlot {
|
||||
@ -164,8 +170,9 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
|
||||
errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries....
|
||||
if errs != nil {
|
||||
finalErrCh <- errs
|
||||
} else {
|
||||
finalErrCh <- nil
|
||||
}
|
||||
finalErrCh <- nil
|
||||
log.Debug("We are stopping the processing of adding new entries")
|
||||
}()
|
||||
log.Debug("Waiting for shutdown signal from channel")
|
||||
|
@ -3,6 +3,7 @@ package beaconclient_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -24,17 +25,18 @@ var _ = Describe("Capturehistoric", func() {
|
||||
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
|
||||
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
|
||||
It("Successfully Process the Blocks", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
|
||||
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
|
||||
// Run Two seperate processes
|
||||
BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10)
|
||||
BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
validatePopularBatchBlocks(bc)
|
||||
|
||||
})
|
||||
})
|
||||
Context("When the start block is greater than the endBlock", func() {
|
||||
@ -70,11 +72,12 @@ var _ = Describe("Capturehistoric", func() {
|
||||
})
|
||||
})
|
||||
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
|
||||
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() {
|
||||
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() {
|
||||
It("Successfully Process the Blocks", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
|
||||
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
|
||||
// Run Two seperate processes
|
||||
@ -83,6 +86,7 @@ var _ = Describe("Capturehistoric", func() {
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
validatePopularBatchBlocks(bc)
|
||||
|
||||
})
|
||||
})
|
||||
Context("When the start block is greater than the endBlock", func() {
|
||||
@ -107,13 +111,18 @@ var _ = Describe("Capturehistoric", func() {
|
||||
})
|
||||
})
|
||||
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
|
||||
Context("When it recieves a head, historic and known Gaps message (in order)", func() {
|
||||
Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() {
|
||||
It("Should process them all successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
// Head
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
// Historical
|
||||
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
|
||||
@ -125,19 +134,25 @@ var _ = Describe("Capturehistoric", func() {
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
validatePopularBatchBlocks(bc)
|
||||
|
||||
})
|
||||
})
|
||||
Context("When it recieves a historic, head and known Gaps message (in order)", func() {
|
||||
It("Should process them all successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
// Historical
|
||||
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
|
||||
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
|
||||
|
||||
// Head
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
// Known Gaps
|
||||
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
|
||||
@ -149,9 +164,10 @@ var _ = Describe("Capturehistoric", func() {
|
||||
})
|
||||
Context("When it recieves a known Gaps, historic and head message (in order)", func() {
|
||||
It("Should process them all successfully.", func() {
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
|
||||
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
||||
// Known Gaps
|
||||
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
|
||||
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
|
||||
@ -161,7 +177,11 @@ var _ = Describe("Capturehistoric", func() {
|
||||
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
|
||||
|
||||
// Head
|
||||
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
||||
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
|
||||
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
|
||||
expectedEpoch: 74240,
|
||||
expectStatus: "proposed",
|
||||
})
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
validatePopularBatchBlocks(bc)
|
||||
@ -200,25 +220,23 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
|
||||
|
||||
// Start the CaptureHistoric function, and check for the correct inserted slots.
|
||||
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHistoric(ctx, maxWorkers)
|
||||
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
|
||||
log.Debug("Calling the stop function for historical processing..")
|
||||
err := bc.StopHistoric(cancel)
|
||||
time.Sleep(5 * time.Second)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
|
||||
cancel()
|
||||
testStopHistoricProcessing(ctx, bc, startGoRoutines)
|
||||
}
|
||||
|
||||
// Wrapper function that processes knownGaps
|
||||
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.ProcessKnownGaps(ctx, maxWorkers)
|
||||
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
|
||||
err := bc.StopKnownGapsProcessing(cancel)
|
||||
time.Sleep(5 * time.Second)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
|
||||
cancel()
|
||||
testStopKnownGapProcessing(ctx, bc, startGoRoutines)
|
||||
}
|
||||
|
||||
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
||||
@ -288,3 +306,37 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(rows).To(Equal(int64(0)))
|
||||
}
|
||||
|
||||
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
|
||||
func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
|
||||
log.Debug("Calling the stop function for historical processing..")
|
||||
err := bc.StopHistoricProcess(ctx)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
time.Sleep(5 * time.Second)
|
||||
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
endNum := runtime.NumGoroutine()
|
||||
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
|
||||
log.WithField("endNum", endNum).Info("End Go routine number")
|
||||
Expect(endNum).To(Equal(startGoRoutines))
|
||||
}
|
||||
|
||||
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
|
||||
func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
|
||||
log.Debug("Calling the stop function for knownGaps processing..")
|
||||
err := bc.StopKnownGapsProcessing(ctx)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
time.Sleep(5 * time.Second)
|
||||
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
endNum := runtime.NumGoroutine()
|
||||
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
Expect(endNum).To(Equal(startGoRoutines))
|
||||
}
|
||||
|
@ -18,44 +18,37 @@
|
||||
package beaconclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
shutdownWaitInterval = time.Duration(5) * time.Second
|
||||
)
|
||||
|
||||
// This function will capture all the SSE events for a given SseEvents object.
|
||||
// When new messages come in, it will ensure that they are decoded into JSON.
|
||||
// If any errors occur, it log the error information.
|
||||
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) {
|
||||
go func() {
|
||||
errG := new(errgroup.Group)
|
||||
errG.Go(func() error {
|
||||
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
|
||||
func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) {
|
||||
if !skipSse {
|
||||
for {
|
||||
err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
|
||||
if err != nil {
|
||||
return err
|
||||
loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{
|
||||
"err": err}).Error("We are unable to subscribe to the SSE endpoint")
|
||||
time.Sleep(3 * time.Second)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := errG.Wait(); err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"err": err,
|
||||
"endpoint": eventHandler.Endpoint,
|
||||
}).Error("Unable to subscribe to the SSE endpoint.")
|
||||
return
|
||||
} else {
|
||||
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(eventHandler.ProcessCh)
|
||||
return
|
||||
case message := <-eventHandler.MessagesCh:
|
||||
// Message can be nil if its a keep-alive message
|
||||
if len(message.Data) != 0 {
|
||||
@ -76,23 +69,24 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
|
||||
}
|
||||
|
||||
// Turn the data object into a Struct.
|
||||
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
|
||||
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) {
|
||||
var msgMarshaled P
|
||||
err := json.Unmarshal(msg, &msgMarshaled)
|
||||
if err != nil {
|
||||
loghelper.LogError(err).Error("Unable to parse message")
|
||||
errorCh <- &SseError{
|
||||
errorCh <- SseError{
|
||||
err: err,
|
||||
msg: msg,
|
||||
}
|
||||
return
|
||||
}
|
||||
processCh <- &msgMarshaled
|
||||
processCh <- msgMarshaled
|
||||
log.Debug("Done sending")
|
||||
}
|
||||
|
||||
// Capture all of the event topics.
|
||||
func (bc *BeaconClient) captureEventTopic() {
|
||||
func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) {
|
||||
log.Info("We are capturing all SSE events")
|
||||
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
|
||||
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
|
||||
go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse)
|
||||
go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse)
|
||||
}
|
||||
|
@ -19,59 +19,102 @@
|
||||
package beaconclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
|
||||
)
|
||||
|
||||
// This function will perform the necessary steps to handle a reorg.
|
||||
func (bc *BeaconClient) handleReorg() {
|
||||
func (bc *BeaconClient) handleReorg(ctx context.Context) {
|
||||
log.Info("Starting to process reorgs.")
|
||||
for {
|
||||
reorg := <-bc.ReOrgTracking.ProcessCh
|
||||
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
|
||||
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case reorg := <-bc.ReOrgTracking.ProcessCh:
|
||||
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
|
||||
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function will handle the latest head event.
|
||||
func (bc *BeaconClient) handleHead() {
|
||||
func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) {
|
||||
log.Info("Starting to process head.")
|
||||
|
||||
workCh := make(chan workParams, 5)
|
||||
log.WithField("workerNumber", maxWorkers).Info("Creating Workers")
|
||||
for i := 1; i < maxWorkers; i++ {
|
||||
go bc.headBlockProcessor(ctx, workCh)
|
||||
}
|
||||
errorSlots := 0
|
||||
for {
|
||||
head := <-bc.HeadTracking.ProcessCh
|
||||
// Process all the work here.
|
||||
slot, err := strconv.Atoi(head.Slot)
|
||||
if err != nil {
|
||||
bc.HeadTracking.ErrorCh <- &SseError{
|
||||
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(workCh)
|
||||
return
|
||||
case head := <-bc.HeadTracking.ProcessCh:
|
||||
|
||||
// Process all the work here.
|
||||
slot, err := strconv.Atoi(head.Slot)
|
||||
if err != nil {
|
||||
bc.HeadTracking.ErrorCh <- SseError{
|
||||
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
|
||||
}
|
||||
errorSlots = errorSlots + 1
|
||||
continue
|
||||
}
|
||||
errorSlots = errorSlots + 1
|
||||
continue
|
||||
if errorSlots != 0 && bc.PreviousSlot != 0 {
|
||||
log.WithFields(log.Fields{
|
||||
"lastProcessedSlot": bc.PreviousSlot,
|
||||
"errorSlots": errorSlots,
|
||||
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
||||
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
|
||||
errorSlots = 0
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
||||
|
||||
// Not used anywhere yet but might be useful to have.
|
||||
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
|
||||
bc.StartingSlot = slot
|
||||
}
|
||||
|
||||
workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb}
|
||||
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh")
|
||||
|
||||
// Update the previous block
|
||||
bc.PreviousSlot = slot
|
||||
bc.PreviousBlockRoot = head.Block
|
||||
}
|
||||
if errorSlots != 0 && bc.PreviousSlot != 0 {
|
||||
log.WithFields(log.Fields{
|
||||
"lastProcessedSlot": bc.PreviousSlot,
|
||||
"errorSlots": errorSlots,
|
||||
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
|
||||
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
|
||||
errorSlots = 0
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
||||
|
||||
// Not used anywhere yet but might be useful to have.
|
||||
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
|
||||
bc.StartingSlot = slot
|
||||
}
|
||||
|
||||
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
|
||||
|
||||
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
|
||||
|
||||
// Update the previous block
|
||||
bc.PreviousSlot = slot
|
||||
bc.PreviousBlockRoot = head.Block
|
||||
}
|
||||
}
|
||||
|
||||
// A worker that will process head slots.
|
||||
func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case wp := <-workCh:
|
||||
processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A struct used to pass parameters to the worker.
|
||||
type workParams struct {
|
||||
db sql.Database
|
||||
serverEndpoint string
|
||||
slot int
|
||||
blockRoot string
|
||||
stateRoot string
|
||||
previousSlot int
|
||||
previousBlockRoot string
|
||||
metrics *BeaconClientMetrics
|
||||
knownGapsTableIncrement int
|
||||
checkDb bool
|
||||
}
|
||||
|
@ -132,6 +132,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
|
||||
for len(errCount) < 5 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(slotCh)
|
||||
return errCount
|
||||
default:
|
||||
if len(errCount) != prevErrCount {
|
||||
@ -228,7 +229,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
|
||||
|
||||
// After a row has been processed it should be removed from its appropriate table.
|
||||
func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
|
||||
errCh := make(chan error)
|
||||
errCh := make(chan error, 1)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -241,25 +242,28 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
|
||||
"endSlot": slots.endSlot,
|
||||
}).Debug("Starting to check to see if the following slots have been processed")
|
||||
for {
|
||||
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
if isStartProcess && isEndProcess {
|
||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
if isStartProcess && isEndProcess {
|
||||
break
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
|
||||
}()
|
||||
if len(errCh) != 0 {
|
||||
return <-errCh
|
||||
|
@ -20,6 +20,7 @@ package beaconclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@ -67,14 +68,18 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
|
||||
}
|
||||
|
||||
// This function will perform all the necessary clean up tasks for stopping historical processing.
|
||||
func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
|
||||
log.Info("We are stopping the known gaps processing service.")
|
||||
cancel()
|
||||
err := bc.KnownGapsProcess.releaseDbLocks()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
|
||||
func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("We are stopping the known gaps processing service.")
|
||||
err := bc.KnownGapsProcess.releaseDbLocks()
|
||||
if err != nil {
|
||||
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get a single row of historical slots from the table.
|
||||
|
@ -63,12 +63,12 @@ type ProcessSlot struct {
|
||||
PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics.
|
||||
// BeaconBlock
|
||||
|
||||
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
|
||||
SszSignedBeaconBlock *[]byte // The entire SSZ encoded SignedBeaconBlock
|
||||
FullSignedBeaconBlock si.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
|
||||
|
||||
// BeaconState
|
||||
FullBeaconState state.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
|
||||
SszBeaconState []byte // The entire SSZ encoded BeaconState
|
||||
SszBeaconState *[]byte // The entire SSZ encoded BeaconState
|
||||
|
||||
// DB Write objects
|
||||
DbSlotsModel *DbSlots // The model being written to the slots table.
|
||||
@ -155,6 +155,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
// Make sure channel is empty.
|
||||
return err, "processSlot"
|
||||
}
|
||||
|
||||
@ -234,12 +235,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
|
||||
}
|
||||
|
||||
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
||||
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
|
||||
func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
|
||||
// Get the knownGaps at startUp.
|
||||
if previousSlot == 0 && previousBlockRoot == "" {
|
||||
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
|
||||
}
|
||||
err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
|
||||
err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
|
||||
if err != nil {
|
||||
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
|
||||
}
|
||||
@ -270,7 +271,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
|
||||
vm := <-vmCh
|
||||
if rc != 200 {
|
||||
ps.FullSignedBeaconBlock = &wrapper.Phase0SignedBeaconBlock{}
|
||||
ps.SszSignedBeaconBlock = []byte{}
|
||||
ps.SszSignedBeaconBlock = &[]byte{}
|
||||
ps.ParentBlockRoot = ""
|
||||
ps.Status = "skipped"
|
||||
return nil
|
||||
@ -280,7 +281,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
|
||||
return fmt.Errorf(VersionedUnmarshalerError)
|
||||
}
|
||||
|
||||
ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(ps.SszSignedBeaconBlock)
|
||||
ps.FullSignedBeaconBlock, err = vm.UnmarshalBeaconBlock(*ps.SszSignedBeaconBlock)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Warn("Unable to process the slots SignedBeaconBlock")
|
||||
return nil
|
||||
@ -291,23 +292,29 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
|
||||
|
||||
// Update the SszBeaconState and FullBeaconState object with their respective values.
|
||||
func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error {
|
||||
var stateIdentifier string // Used to query the state
|
||||
var (
|
||||
stateIdentifier string // Used to query the state
|
||||
err error
|
||||
)
|
||||
if ps.StateRoot != "" {
|
||||
stateIdentifier = ps.StateRoot
|
||||
} else {
|
||||
stateIdentifier = strconv.Itoa(ps.Slot)
|
||||
}
|
||||
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
|
||||
ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
|
||||
ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to querrySSZ")
|
||||
}
|
||||
|
||||
versionedUnmarshaler, err := dt.FromState(ps.SszBeaconState)
|
||||
versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error(VersionedUnmarshalerError)
|
||||
vmCh <- nil
|
||||
return fmt.Errorf(VersionedUnmarshalerError)
|
||||
}
|
||||
vmCh <- versionedUnmarshaler
|
||||
ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(ps.SszBeaconState)
|
||||
ps.FullBeaconState, err = versionedUnmarshaler.UnmarshalBeaconState(*ps.SszBeaconState)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to process the slots BeaconState")
|
||||
return err
|
||||
@ -356,7 +363,7 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash st
|
||||
status = "proposed"
|
||||
}
|
||||
|
||||
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics)
|
||||
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
|
||||
if err != nil {
|
||||
return dw, err
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ import (
|
||||
)
|
||||
|
||||
// A helper function to query endpoints that utilize slots.
|
||||
func querySsz(endpoint string, slot string) ([]byte, int, error) {
|
||||
func querySsz(endpoint string, slot string) (*[]byte, int, error) {
|
||||
log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint")
|
||||
client := &http.Client{}
|
||||
req, err := http.NewRequest("GET", endpoint, nil)
|
||||
@ -43,13 +43,21 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) {
|
||||
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
|
||||
}
|
||||
defer response.Body.Close()
|
||||
// Needed for testing.... But might be interesting to test with...
|
||||
defer client.CloseIdleConnections()
|
||||
rc := response.StatusCode
|
||||
|
||||
//var body []byte
|
||||
//io.Copy(body, response.Body)
|
||||
//bytes.buffer...
|
||||
//_, err = response.Body.Read(body)
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
|
||||
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
|
||||
}
|
||||
return body, rc, nil
|
||||
//log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object")
|
||||
return &body, rc, nil
|
||||
}
|
||||
|
||||
// Object to unmarshal the BlockRootResponse
|
||||
|
@ -1,11 +1,13 @@
|
||||
package beaconclient_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
|
||||
//. "github.com/onsi/gomega"
|
||||
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
|
||||
)
|
||||
@ -28,7 +30,7 @@ var (
|
||||
)
|
||||
var _ = Describe("Systemvalidation", Label("system"), func() {
|
||||
Describe("Run the application against a running lighthouse node", func() {
|
||||
Context("When we receive head messages", func() {
|
||||
Context("When we receive head messages", Label("system-head"), func() {
|
||||
It("We should process the messages successfully", func() {
|
||||
bc := setUpTest(prodConfig, "10000000000")
|
||||
processProdHeadBlocks(bc, 3, 0, 0, 0)
|
||||
@ -63,7 +65,23 @@ func getEnvInt(envVar string) int {
|
||||
|
||||
// Start head tracking and wait for the expected results.
|
||||
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
||||
go bc.CaptureHead()
|
||||
time.Sleep(1 * time.Second)
|
||||
//startGoRoutines := runtime.NumGoroutine()
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, false)
|
||||
time.Sleep(5 * time.Second)
|
||||
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
|
||||
|
||||
cancel()
|
||||
time.Sleep(4 * time.Second)
|
||||
testStopSystemHeadTracking(ctx, bc)
|
||||
}
|
||||
|
||||
// Custom stop for system testing
|
||||
func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) {
|
||||
bc.StopHeadTracking(ctx, false)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
}
|
||||
|
@ -45,7 +45,11 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat
|
||||
|
||||
// add any other syscalls that you want to be notified with
|
||||
signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
|
||||
<-notifierCh
|
||||
// Wait for one or the other...
|
||||
select {
|
||||
case <-notifierCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
log.Info("Shutting Down your application")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user