Compare commits

..

No commits in common. "feature/study-application-memory" and "v0.2.4-alpha1" have entirely different histories.

23 changed files with 277 additions and 512 deletions

View File

@ -66,7 +66,7 @@ jobs:
run: | run: |
until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done
cat ./HEALTH cat ./HEALTH
if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" --env-file ./config.sh cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-eth-beacon-indexer.yml" cp ipld-eth-beacon-indexer:/root/ipld-eth-beacon-indexer.log . && cat ipld-eth-beacon-indexer.log && (exit 1); fi
unit-test: unit-test:
name: Run Unit Tests name: Run Unit Tests

View File

@ -25,7 +25,6 @@ integration-test-ci:
go fmt ./... go fmt ./...
$(GINKGO) -r --label-filter integration \ $(GINKGO) -r --label-filter integration \
--procs=4 --compilers=4 \ --procs=4 --compilers=4 \
--flake-attempts=3 \
--randomize-all --randomize-suites \ --randomize-all --randomize-suites \
--fail-on-pending --keep-going \ --fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \ --cover --coverprofile=cover.profile \
@ -77,7 +76,7 @@ unit-test-ci:
go vet ./... go vet ./...
go fmt ./... go fmt ./...
$(GINKGO) -r --label-filter unit \ $(GINKGO) -r --label-filter unit \
--randomize-all --randomize-suites \ --randomize-all --randomize-suites
--flake-attempts=3 \ --flake-attempts=3 \
--fail-on-pending --keep-going \ --fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \ --cover --coverprofile=cover.profile \
@ -89,7 +88,6 @@ system-test-ci:
go fmt ./... go fmt ./...
$(GINKGO) -r --label-filter system \ $(GINKGO) -r --label-filter system \
--randomize-all --randomize-suites \ --randomize-all --randomize-suites \
--flake-attempts=3 \
--fail-on-pending --keep-going \ --fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \ --cover --coverprofile=cover.profile \
--trace --json-report=report.json --trace --json-report=report.json

View File

@ -39,7 +39,6 @@ var (
bcConnectionProtocol string bcConnectionProtocol string
bcType string bcType string
bcMaxHistoricProcessWorker int bcMaxHistoricProcessWorker int
bcMaxHeadProcessWorker int
bcUniqueNodeIdentifier int bcUniqueNodeIdentifier int
bcCheckDb bool bcCheckDb bool
kgMaxWorker int kgMaxWorker int
@ -97,8 +96,7 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
// err = captureCmd.MarkPersistentFlagRequired("bc.address") // err = captureCmd.MarkPersistentFlagRequired("bc.address")
@ -109,7 +107,7 @@ func init() {
//// Known Gaps specific //// Known Gaps specific
captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.") captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.")
captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.")
// Prometheus Specific // Prometheus Specific
captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.") captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.")
@ -159,8 +157,6 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))

View File

@ -59,7 +59,7 @@ func init() {
func startFullProcessing() { func startFullProcessing() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx, cancel := context.WithCancel(context.Background()) ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -75,11 +75,13 @@ func startFullProcessing() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) go Bc.CaptureHead()
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background()) errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { errG.Go(func() error {
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 { if len(errs) != 0 {
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -88,11 +90,12 @@ func startFullProcessing() {
} }
return nil return nil
}) })
kgCtx, KgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -112,7 +115,7 @@ func startFullProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

View File

@ -46,7 +46,7 @@ var headCmd = &cobra.Command{
func startHeadTracking() { func startHeadTracking() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx, cancel := context.WithCancel(context.Background()) ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -62,13 +62,13 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) go Bc.CaptureHead()
kgCtx, KgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -88,12 +88,13 @@ func startHeadTracking() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {
log.Info("Gracefully shutdown ipld-eth-beacon-indexer") log.Info("Gracefully shutdown ipld-eth-beacon-indexer")
} }
} }
func init() { func init() {

View File

@ -49,7 +49,7 @@ var historicCmd = &cobra.Command{
func startHistoricProcessing() { func startHistoricProcessing() {
// Boot the application // Boot the application
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx, cancel := context.WithCancel(context.Background()) ctx := context.Background()
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
@ -63,9 +63,11 @@ func startHistoricProcessing() {
serveProm(addr) serveProm(addr)
} }
hpContext, hpCancel := context.WithCancel(context.Background())
errG, _ := errgroup.WithContext(context.Background()) errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error { errG.Go(func() error {
errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker"))
if len(errs) != 0 { if len(errs) != 0 {
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
@ -75,11 +77,12 @@ func startHistoricProcessing() {
return nil return nil
}) })
kgContext, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker"))
if len(errs) != 0 { if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
@ -99,7 +102,7 @@ func startHistoricProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

1
go.mod
View File

@ -70,7 +70,6 @@ require (
github.com/urfave/cli/v2 v2.3.0 // indirect github.com/urfave/cli/v2 v2.3.0 // indirect
go.opencensus.io v0.23.0 // indirect go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect

3
go.sum
View File

@ -833,9 +833,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=

View File

@ -40,66 +40,68 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
} }
// Wrapper function for shutting down the head tracking process. // Wrapper function for shutting down the head tracking process.
func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
cancel() err := BC.StopHeadTracking()
BC.StopHeadTracking(ctx, false) if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err := BC.StopKnownGapsProcessing(ctx) err = BC.StopKnownGapsProcessing(kgCancel)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
return err
} }
} }
return nil return err
}, },
}) })
} }
// Wrapper function for shutting down the head tracking process. // Wrapper function for shutting down the head tracking process.
func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
cancel() err := BC.StopHistoric(hpCancel)
err := BC.StopHistoricProcess(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic") loghelper.LogError(err).Error("Unable to stop processing historic")
} }
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(ctx) err = BC.StopKnownGapsProcessing(kgCancel)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
return err
} }
} }
return nil return err
}, },
}) })
} }
// Shutdown the head and historical processing // Shutdown the head and historical processing
func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
cancel() err := BC.StopHistoric(hpCancel)
err := BC.StopHistoricProcess(ctx)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing historic") loghelper.LogError(err).Error("Unable to stop processing historic")
} }
if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) {
err = BC.StopKnownGapsProcessing(ctx) err = BC.StopKnownGapsProcessing(kgCancel)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
} }
} }
BC.StopHeadTracking(ctx, false) err = BC.StopHeadTracking()
if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
}
return err return err
}, },
}) })

View File

@ -56,23 +56,23 @@ var (
BC *beaconclient.BeaconClient BC *beaconclient.BeaconClient
err error err error
ctx context.Context ctx context.Context
cancel context.CancelFunc
notifierCh chan os.Signal notifierCh chan os.Signal
) )
var _ = Describe("Shutdown", func() { var _ = Describe("Shutdown", func() {
BeforeEach(func() { BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background()) ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
notifierCh = make(chan os.Signal, 1) notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() { Describe("Run Shutdown Function for head tracking,", Label("integration"), func() {
Context("When Channels are empty,", func() { Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() { It("Should Shutdown Successfully.", func() {
go func() { go func() {
_, cancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
@ -85,6 +85,7 @@ var _ = Describe("Shutdown", func() {
shutdownCh := make(chan bool) shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
_, cancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
@ -119,6 +120,7 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
_, cancel := context.WithCancel(context.Background())
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))

View File

@ -76,8 +76,8 @@ type BeaconClient struct {
type SseEvents[P ProcessedEvents] struct { type SseEvents[P ProcessedEvents] struct {
Endpoint string // The endpoint for the subscription. Primarily used for logging Endpoint string // The endpoint for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan SseError // Contains any errors while SSE streaming occurred ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan P // Used to capture processed data in its proper struct. ProcessCh chan *P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
} }
@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{ sseEvents := &SseEvents[P]{
Endpoint: endpoint, Endpoint: endpoint,
MessagesCh: make(chan *sse.Event), MessagesCh: make(chan *sse.Event, 1),
ErrorCh: make(chan SseError), ErrorCh: make(chan *SseError),
ProcessCh: make(chan P, 10), ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client { SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(endpoint) return sse.NewClient(endpoint)

View File

@ -18,32 +18,42 @@
package beaconclient package beaconclient
import ( import (
"context" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) { func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
go bc.handleHead(ctx, maxHeadWorkers) go bc.handleHead()
go bc.handleReorg(ctx) go bc.handleReorg()
bc.captureEventTopic(ctx, skipSee) bc.captureEventTopic()
} }
// Stop the head tracking service. // Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) { func (bc *BeaconClient) StopHeadTracking() error {
select { log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
case <-ctx.Done(): chHead := make(chan bool)
if !skipSee { chReorg := make(chan bool)
bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh)
bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh) go bc.HeadTracking.finishProcessingChannel(chHead)
log.Info("Successfully unsubscribed to SSE client") go bc.ReOrgTracking.finishProcessingChannel(chReorg)
close(bc.ReOrgTracking.MessagesCh)
close(bc.HeadTracking.MessagesCh) <-chHead
} <-chReorg
log.Info("Successfully stopped the head tracking service.") log.Info("Successfully stopped the head tracking service.")
default: return nil
log.Error("The context has not completed....") }
}
// This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
}
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
finish <- true
} }

View File

@ -23,7 +23,6 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
@ -266,121 +265,70 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() { Context("Correctly formatted Phase0 Block", func() {
It("Should process it successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
}) })
}) })
Context("Correctly formatted Altair Block", Label("leak-head"), func() { Context("Correctly formatted Altair Block", func() {
It("Should process it successfully.", func() { It("Should turn it into a struct successfully.", func() {
log.SetLevel(log.DebugLevel) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
}) })
}) })
Context("Correctly formatted Altair Test Blocks", Label("correct-test-altairs"), func() { Context("Correctly formatted Altair Test Blocks", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") bc = setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
head: BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, defer httpmock.DeactivateAndReset()
expectedEpoch: 74240, BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
expectStatus: "proposed",
})
}) })
}) })
Context("Correctly formatted Phase0 Test Blocks", Label("correct-test-phase0"), func() { Context("Correctly formatted Phase0 Test Blocks", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
bc = setUpTest(BeaconNodeTester.TestConfig, "99") bc = setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
head: BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, defer httpmock.DeactivateAndReset()
expectedEpoch: 3, BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
expectStatus: "proposed",
})
//bc = setUpTest(BeaconNodeTester.TestConfig, "99")
//BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
//defer httpmock.DeactivateAndReset()
//BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
}) })
}) })
Context("Two consecutive correct blocks", Label("bug"), func() { Context("Two consecutive correct blocks", func() {
It("Should handle both blocks correctly, without any reorgs or known_gaps", func() { It("Should handle both blocks correctly, without any reorgs or known_gaps", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 0, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["100"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
}, headBlocksSent{
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
}) })
}) })
Context("Two consecutive blocks with a bad parent", Label("bad-parent"), func() { Context("Two consecutive blocks with a bad parent", func() {
It("Should add the previous block to the knownGaps table.", func() { It("Should add the previous block to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
BeaconNodeTester.testProcessBlock(bc, maxRetry, 2, 1, 1,
headBlocksSent{
head: BeaconNodeTester.TestEvents["100-dummy"].HeadMessage,
expectedEpoch: 3,
expectStatus: "forked",
},
headBlocksSent{
head: BeaconNodeTester.TestEvents["101"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
}) })
}) })
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
@ -400,15 +348,10 @@ var _ = Describe("Capturehead", Label("head"), func() {
//}) //})
Context("When the proper SSZ objects are not served", func() { Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() { It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0)
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.testProcessBlock(bc, maxRetry, 0, 1, 0, headBlocksSent{
head: BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage,
expectedEpoch: 3,
expectStatus: "proposed",
})
knownGapCount := countKnownGapsTable(bc.Db) knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(1)) Expect(knownGapCount).To(Equal(1))
@ -416,7 +359,6 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end := queryKnownGaps(bc.Db, "102", "102") start, end := queryKnownGaps(bc.Db, "102", "102")
Expect(start).To(Equal(102)) Expect(start).To(Equal(102))
Expect(end).To(Equal(102)) Expect(end).To(Equal(102))
}) })
}) })
}) })
@ -424,10 +366,9 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() { Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() {
Context("There is a gap at start up within one incrementing range.", func() { Context("There is a gap at start up within one incrementing range.", func() {
It("Should add only a single entry to the knownGaps table.", func() { It("Should add only a single entry to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "11", "99") start, end := queryKnownGaps(bc.Db, "11", "99")
Expect(start).To(Equal(11)) Expect(start).To(Equal(11))
@ -436,10 +377,9 @@ var _ = Describe("Capturehead", Label("head"), func() {
}) })
Context("There is a gap at start up spanning multiple incrementing range.", func() { Context("There is a gap at start up spanning multiple incrementing range.", func() {
It("Should add multiple entries to the knownGaps table.", func() { It("Should add multiple entries to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "6", "16") start, end := queryKnownGaps(bc.Db, "6", "16")
@ -451,12 +391,11 @@ var _ = Describe("Capturehead", Label("head"), func() {
Expect(end).To(Equal(99)) Expect(end).To(Equal(99))
}) })
}) })
Context("Gaps between two head messages", Label("gap-head"), func() { Context("Gaps between two head messages", func() {
It("Should add the slots in-between", func() { It("Should add the slots in-between", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "101", "1000101") start, end := queryKnownGaps(bc.Db, "101", "1000101")
@ -473,37 +412,33 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
Context("Altair: Multiple head messages for the same slot.", func() { Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
}) })
}) })
Context("Phase0: Multiple head messages for the same slot.", func() { Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
}) })
}) })
Context("Phase 0: Multiple reorgs have occurred on this slot", func() { Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
}) })
}) })
Context("Altair: Multiple reorgs have occurred on this slot", func() { Context("Altair: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
}) })
}) })
@ -588,25 +523,18 @@ func validateBeaconState(bc *beaconclient.BeaconClient, headMessage beaconclient
} }
// Wrapper function to send a head message to the beaconclient // Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessfulInserts uint64, head ...beaconclient.Head) { func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxRetry int, expectedSuccessfulInserts uint64) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
var (
data []byte
err error
)
startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts) startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
for _, ms := range head { bc.HeadTracking.MessagesCh <- &sse.Event{
data, err = json.Marshal(ms) ID: []byte{},
Expect(err).ToNot(HaveOccurred()) Data: data,
time.Sleep(1 * time.Second) Event: []byte{},
bc.HeadTracking.MessagesCh <- &sse.Event{ Retry: []byte{},
ID: []byte{},
Data: data,
Event: []byte{},
Retry: []byte{},
}
} }
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts { for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -904,13 +832,13 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
startGoRoutines := runtime.NumGoroutine() go bc.CaptureHead()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Messages to BeaconClient") log.Info("Sending Messages to BeaconClient")
sendHeadMessage(bc, maxRetry, 3, firstHead, secondHead, thirdHead) sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
sendHeadMessage(bc, thirdHead, maxRetry, 1)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
@ -962,22 +890,13 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
validateSlot(bc, secondHead, epoch, "proposed") validateSlot(bc, secondHead, epoch, "proposed")
validateSlot(bc, thirdHead, epoch, "forked") validateSlot(bc, thirdHead, epoch, "forked")
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64, head ...headBlocksSent) { func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) go bc.CaptureHead()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
heads := make([]beaconclient.Head, 0) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
for _, msgs := range head {
heads = append(heads, msgs.head)
}
sendHeadMessage(bc, maxRetry, expectedSuccessInsert, heads...)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps { for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
@ -993,28 +912,23 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, maxRet
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
curRetry = curRetry + 1 curRetry = curRetry + 1
if curRetry == maxRetry { if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.ReorgInserts, expectedReorgs)) Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
} }
} }
if expectedSuccessInsert > 0 { if expectedSuccessInsert > 0 {
for _, msg := range head { validateSlot(bc, head, epoch, "proposed")
validateSlot(bc, msg.head, msg.expectedEpoch, msg.expectStatus)
}
} }
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
startGoRoutines := runtime.NumGoroutine() go bc.CaptureHead()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, maxRetry, 2, firstHead, secondHead) sendHeadMessage(bc, firstHead, maxRetry, 1)
sendHeadMessage(bc, secondHead, maxRetry, 1)
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 { for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
@ -1032,22 +946,18 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
log.Info("Checking Altair to make sure the fork was marked properly.") log.Info("Checking Altair to make sure the fork was marked properly.")
validateSlot(bc, firstHead, epoch, "forked") validateSlot(bc, firstHead, epoch, "forked")
validateSlot(bc, secondHead, epoch, "proposed") validateSlot(bc, secondHead, epoch, "proposed")
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
bc.KnownGapTableIncrement = tableIncrement bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, maxRetry, 1, msg...) for _, headMsg := range msg {
sendHeadMessage(bc, headMsg, maxRetry, 1)
}
curRetry := 0 curRetry := 0
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries { for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
@ -1065,8 +975,6 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 { if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
Fail("We found reorgs when we didn't expect it") Fail("We found reorgs when we didn't expect it")
} }
cancel()
testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState.
@ -1083,22 +991,3 @@ func testSszRoot(msg Message) {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:]))) Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:])))
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) {
bc.StopHeadTracking(ctx, skipSse)
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
log.WithField("endNum", endNum).Info("End Go routine number")
//Expect(endNum <= startGoRoutines).To(BeTrue())
Expect(endNum).To(Equal(startGoRoutines))
}
type headBlocksSent struct {
head beaconclient.Head
expectedEpoch int
expectStatus string
}

View File

@ -37,18 +37,14 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e
} }
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopHistoricProcess(ctx context.Context) error { func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error {
select { log.Info("We are stopping the historical processing service.")
case <-ctx.Done(): cancel()
log.Info("We are stopping the historical processing service.") err := bc.HistoricalProcess.releaseDbLocks()
err := bc.HistoricalProcess.releaseDbLocks() if err != nil {
if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!")
}
return nil
default:
return fmt.Errorf("Tried to stop historic before the context ended...")
} }
return nil
} }
// An interface to enforce any batch processing. Currently there are two use cases for this. // An interface to enforce any batch processing. Currently there are two use cases for this.
@ -97,9 +93,9 @@ type batchHistoricError struct {
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int, 5) workCh := make(chan int)
processedCh := make(chan slotsToProcess, 5) processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError, 5) errCh := make(chan batchHistoricError)
finalErrCh := make(chan []error, 1) finalErrCh := make(chan []error, 1)
// Checkout Rows with same node Identifier. // Checkout Rows with same node Identifier.
@ -120,8 +116,6 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(workCh)
close(processedCh)
return return
case slots := <-slotsCh: case slots := <-slotsCh:
if slots.startSlot > slots.endSlot { if slots.startSlot > slots.endSlot {
@ -170,9 +164,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries....
if errs != nil { if errs != nil {
finalErrCh <- errs finalErrCh <- errs
} else {
finalErrCh <- nil
} }
finalErrCh <- nil
log.Debug("We are stopping the processing of adding new entries") log.Debug("We are stopping the processing of adding new entries")
}() }()
log.Debug("Waiting for shutdown signal from channel") log.Debug("Waiting for shutdown signal from channel")

View File

@ -3,7 +3,6 @@ package beaconclient_test
import ( import (
"context" "context"
"fmt" "fmt"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
@ -25,18 +24,17 @@ var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() { Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
It("Successfully Process the Blocks", func() { It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes // Run Two seperate processes
BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
}) })
}) })
Context("When the start block is greater than the endBlock", func() { Context("When the start block is greater than the endBlock", func() {
@ -72,12 +70,11 @@ var _ = Describe("Capturehistoric", func() {
}) })
}) })
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() { Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() {
It("Successfully Process the Blocks", func() { It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes // Run Two seperate processes
@ -86,7 +83,6 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
}) })
}) })
Context("When the start block is greater than the endBlock", func() { Context("When the start block is greater than the endBlock", func() {
@ -111,18 +107,13 @@ var _ = Describe("Capturehistoric", func() {
}) })
}) })
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() { Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() { Context("When it recieves a head, historic and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Head // Head
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
// Historical // Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
@ -134,25 +125,19 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
}) })
}) })
Context("When it recieves a historic, head and known Gaps message (in order)", func() { Context("When it recieves a historic, head and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Historical // Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
// Head // Head
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
// Known Gaps // Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
@ -164,10 +149,9 @@ var _ = Describe("Capturehistoric", func() {
}) })
Context("When it recieves a known Gaps, historic and head message (in order)", func() { Context("When it recieves a known Gaps, historic and head message (in order)", func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Known Gaps // Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0) BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
@ -177,11 +161,7 @@ var _ = Describe("Capturehistoric", func() {
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Head // Head
BeaconNodeTester.testProcessBlock(bc, maxRetry, 1, 0, 0, headBlocksSent{ BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
head: BeaconNodeTester.TestEvents["2375703"].HeadMessage,
expectedEpoch: 74240,
expectStatus: "proposed",
})
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
@ -220,23 +200,25 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
// Start the CaptureHistoric function, and check for the correct inserted slots. // Start the CaptureHistoric function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHistoric(ctx, maxWorkers) go bc.CaptureHistoric(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
cancel() log.Debug("Calling the stop function for historical processing..")
testStopHistoricProcessing(ctx, bc, startGoRoutines) err := bc.StopHistoric(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
} }
// Wrapper function that processes knownGaps // Wrapper function that processes knownGaps
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.ProcessKnownGaps(ctx, maxWorkers) go bc.ProcessKnownGaps(ctx, maxWorkers)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
cancel() err := bc.StopKnownGapsProcessing(cancel)
testStopKnownGapProcessing(ctx, bc, startGoRoutines) time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
} }
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
@ -306,37 +288,3 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(int64(0))) Expect(rows).To(Equal(int64(0)))
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
log.Debug("Calling the stop function for historical processing..")
err := bc.StopHistoricProcess(ctx)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
time.Sleep(5 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
log.WithField("endNum", endNum).Info("End Go routine number")
Expect(endNum).To(Equal(startGoRoutines))
}
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) {
log.Debug("Calling the stop function for knownGaps processing..")
err := bc.StopKnownGapsProcessing(ctx)
Expect(err).ToNot(HaveOccurred())
time.Sleep(5 * time.Second)
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
Expect(endNum).To(Equal(startGoRoutines))
}

View File

@ -18,37 +18,44 @@
package beaconclient package beaconclient
import ( import (
"context"
"encoding/json" "encoding/json"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
var (
shutdownWaitInterval = time.Duration(5) * time.Second
) )
// This function will capture all the SSE events for a given SseEvents object. // This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) { func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) {
if !skipSse { go func() {
for { errG := new(errgroup.Group)
err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil { if err != nil {
loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{ return err
"err": err}).Error("We are unable to subscribe to the SSE endpoint")
time.Sleep(3 * time.Second)
continue
} }
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
break
} }
}
}()
for { for {
select { select {
case <-ctx.Done():
close(eventHandler.ProcessCh)
return
case message := <-eventHandler.MessagesCh: case message := <-eventHandler.MessagesCh:
// Message can be nil if its a keep-alive message // Message can be nil if its a keep-alive message
if len(message.Data) != 0 { if len(message.Data) != 0 {
@ -69,24 +76,23 @@ func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler
} }
// Turn the data object into a Struct. // Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) { func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
var msgMarshaled P var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled) err := json.Unmarshal(msg, &msgMarshaled)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to parse message") loghelper.LogError(err).Error("Unable to parse message")
errorCh <- SseError{ errorCh <- &SseError{
err: err, err: err,
msg: msg, msg: msg,
} }
return return
} }
processCh <- msgMarshaled processCh <- &msgMarshaled
log.Debug("Done sending")
} }
// Capture all of the event topics. // Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) { func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse) go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse) go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
} }

View File

@ -19,102 +19,59 @@
package beaconclient package beaconclient
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
) )
// This function will perform the necessary steps to handle a reorg. // This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorg(ctx context.Context) { func (bc *BeaconClient) handleReorg() {
log.Info("Starting to process reorgs.") log.Info("Starting to process reorgs.")
for { for {
select { reorg := <-bc.ReOrgTracking.ProcessCh
case <-ctx.Done(): log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
return writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
case reorg := <-bc.ReOrgTracking.ProcessCh:
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
} }
} }
// This function will handle the latest head event. // This function will handle the latest head event.
func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.") log.Info("Starting to process head.")
workCh := make(chan workParams, 5)
log.WithField("workerNumber", maxWorkers).Info("Creating Workers")
for i := 1; i < maxWorkers; i++ {
go bc.headBlockProcessor(ctx, workCh)
}
errorSlots := 0 errorSlots := 0
for { for {
select { head := <-bc.HeadTracking.ProcessCh
case <-ctx.Done(): // Process all the work here.
close(workCh) slot, err := strconv.Atoi(head.Slot)
return if err != nil {
case head := <-bc.HeadTracking.ProcessCh: bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
// Process all the work here.
slot, err := strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
errorSlots = errorSlots + 1
continue
} }
if errorSlots != 0 && bc.PreviousSlot != 0 { errorSlots = errorSlots + 1
log.WithFields(log.Fields{ continue
"lastProcessedSlot": bc.PreviousSlot,
"errorSlots": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
errorSlots = 0
}
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
// Not used anywhere yet but might be useful to have.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
bc.StartingSlot = slot
}
workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb}
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
} }
if errorSlots != 0 && bc.PreviousSlot != 0 {
log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorSlots": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
errorSlots = 0
}
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
// Not used anywhere yet but might be useful to have.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
bc.StartingSlot = slot
}
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
} }
} }
// A worker that will process head slots.
func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) {
for {
select {
case <-ctx.Done():
return
case wp := <-workCh:
processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb)
}
}
}
// A struct used to pass parameters to the worker.
type workParams struct {
db sql.Database
serverEndpoint string
slot int
blockRoot string
stateRoot string
previousSlot int
previousBlockRoot string
metrics *BeaconClientMetrics
knownGapsTableIncrement int
checkDb bool
}

View File

@ -132,7 +132,6 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
for len(errCount) < 5 { for len(errCount) < 5 {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(slotCh)
return errCount return errCount
default: default:
if len(errCount) != prevErrCount { if len(errCount) != prevErrCount {
@ -229,7 +228,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// After a row has been processed it should be removed from its appropriate table. // After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error, 1) errCh := make(chan error)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -242,28 +241,25 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
"endSlot": slots.endSlot, "endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed") }).Debug("Starting to check to see if the following slots have been processed")
for { for {
select { isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
case <-ctx.Done(): if err != nil {
return errCh <- err
default:
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil {
errCh <- err
}
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
return
}
time.Sleep(3 * time.Second)
} }
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
if isStartProcess && isEndProcess {
break
}
time.Sleep(3 * time.Second)
} }
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
if err != nil {
errCh <- err
}
}() }()
if len(errCh) != 0 { if len(errCh) != 0 {
return <-errCh return <-errCh

View File

@ -20,7 +20,6 @@ package beaconclient
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -68,18 +67,14 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []
} }
// This function will perform all the necessary clean up tasks for stopping historical processing. // This function will perform all the necessary clean up tasks for stopping historical processing.
func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error { func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error {
select { log.Info("We are stopping the known gaps processing service.")
case <-ctx.Done(): cancel()
log.Info("We are stopping the known gaps processing service.") err := bc.KnownGapsProcess.releaseDbLocks()
err := bc.KnownGapsProcess.releaseDbLocks() if err != nil {
if err != nil { loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!")
}
return nil
default:
return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..")
} }
return nil
} }
// Get a single row of historical slots from the table. // Get a single row of historical slots from the table.

View File

@ -156,6 +156,10 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
// Make sure channel is empty. // Make sure channel is empty.
select {
case <-vUnmarshalerCh:
default:
}
return err, "processSlot" return err, "processSlot"
} }
@ -235,12 +239,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp. // Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" { if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
} }
err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil { if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
} }
@ -292,20 +296,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *d
// Update the SszBeaconState and FullBeaconState object with their respective values. // Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error { func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.VersionedUnmarshaler) error {
var ( var stateIdentifier string // Used to query the state
stateIdentifier string // Used to query the state
err error
)
if ps.StateRoot != "" { if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot stateIdentifier = ps.StateRoot
} else { } else {
stateIdentifier = strconv.Itoa(ps.Slot) stateIdentifier = strconv.Itoa(ps.Slot)
} }
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
ps.SszBeaconState, _, err = querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) ps.SszBeaconState, _, _ = querySsz(stateEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return fmt.Errorf("Unable to querrySSZ")
}
versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState) versionedUnmarshaler, err := dt.FromState(*ps.SszBeaconState)
if err != nil { if err != nil {

View File

@ -43,20 +43,12 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) {
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
} }
defer response.Body.Close() defer response.Body.Close()
// Needed for testing.... But might be interesting to test with...
defer client.CloseIdleConnections()
rc := response.StatusCode rc := response.StatusCode
//var body []byte
//io.Copy(body, response.Body)
//bytes.buffer...
//_, err = response.Body.Read(body)
body, err := ioutil.ReadAll(response.Body) body, err := ioutil.ReadAll(response.Body)
if err != nil { if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
} }
//log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object")
return &body, rc, nil return &body, rc, nil
} }

View File

@ -1,13 +1,11 @@
package beaconclient_test package beaconclient_test
import ( import (
"context"
"os" "os"
"strconv" "strconv"
"time" "time"
. "github.com/onsi/ginkgo/v2" . "github.com/onsi/ginkgo/v2"
//. "github.com/onsi/gomega" //. "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
) )
@ -30,7 +28,7 @@ var (
) )
var _ = Describe("Systemvalidation", Label("system"), func() { var _ = Describe("Systemvalidation", Label("system"), func() {
Describe("Run the application against a running lighthouse node", func() { Describe("Run the application against a running lighthouse node", func() {
Context("When we receive head messages", Label("system-head"), func() { Context("When we receive head messages", func() {
It("We should process the messages successfully", func() { It("We should process the messages successfully", func() {
bc := setUpTest(prodConfig, "10000000000") bc := setUpTest(prodConfig, "10000000000")
processProdHeadBlocks(bc, 3, 0, 0, 0) processProdHeadBlocks(bc, 3, 0, 0, 0)
@ -65,23 +63,7 @@ func getEnvInt(envVar string) int {
// Start head tracking and wait for the expected results. // Start head tracking and wait for the expected results.
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
//startGoRoutines := runtime.NumGoroutine() go bc.CaptureHead()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) time.Sleep(1 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx, 2, false)
time.Sleep(5 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
cancel()
time.Sleep(4 * time.Second)
testStopSystemHeadTracking(ctx, bc)
}
// Custom stop for system testing
func testStopSystemHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient) {
bc.StopHeadTracking(ctx, false)
time.Sleep(3 * time.Second)
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue())
} }

View File

@ -45,11 +45,7 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat
// add any other syscalls that you want to be notified with // add any other syscalls that you want to be notified with
signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
// Wait for one or the other... <-notifierCh
select {
case <-notifierCh:
case <-ctx.Done():
}
log.Info("Shutting Down your application") log.Info("Shutting Down your application")