Use context to stop head tracking

This commit is contained in:
Abdul Rabbani 2022-06-21 14:34:10 -04:00
parent 30e8e955ee
commit a2f2603d38
14 changed files with 258 additions and 139 deletions

View File

@ -75,7 +75,8 @@ func startFullProcessing() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go Bc.CaptureHead() hdCtx, hdCancel := context.WithCancel(context.Background())
go Bc.CaptureHead(hdCtx)
hpContext, hpCancel := context.WithCancel(context.Background()) hpContext, hpCancel := context.WithCancel(context.Background())
@ -90,7 +91,7 @@ func startFullProcessing() {
} }
return nil return nil
}) })
kgCtx, KgCancel := context.WithCancel(context.Background()) kgCtx, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
@ -115,7 +116,7 @@ func startFullProcessing() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownFull(ctx, hdCancel, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

View File

@ -62,8 +62,10 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
go Bc.CaptureHead() hdCtx, hdCancel := context.WithCancel(context.Background())
kgCtx, KgCancel := context.WithCancel(context.Background()) go Bc.CaptureHead(hdCtx)
kgCtx, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
@ -88,7 +90,7 @@ func startHeadTracking() {
} }
// Shutdown when the time is right. // Shutdown when the time is right.
err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!")
} else { } else {

1
go.mod
View File

@ -70,6 +70,7 @@ require (
github.com/urfave/cli/v2 v2.3.0 // indirect github.com/urfave/cli/v2 v2.3.0 // indirect
go.opencensus.io v0.23.0 // indirect go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect

3
go.sum
View File

@ -833,8 +833,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=

View File

@ -40,12 +40,12 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
} }
// Wrapper function for shutting down the head tracking process. // Wrapper function for shutting down the head tracking process.
func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownHeadTracking(ctx context.Context, hdCancel context.CancelFunc, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
defer DB.Close() defer DB.Close()
err := BC.StopHeadTracking() err := BC.StopHeadTracking(hdCancel)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
} }
@ -82,7 +82,7 @@ func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.
} }
// Shutdown the head and historical processing // Shutdown the head and historical processing
func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { func ShutdownFull(ctx context.Context, hdCancel context.CancelFunc, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error {
return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{
// Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown.
"beaconClient": func(ctx context.Context) error { "beaconClient": func(ctx context.Context) error {
@ -97,7 +97,7 @@ func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, no
loghelper.LogError(err).Error("Unable to stop processing known gaps") loghelper.LogError(err).Error("Unable to stop processing known gaps")
} }
} }
err = BC.StopHeadTracking() err = BC.StopHeadTracking(hdCancel)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking")
} }

View File

@ -68,13 +68,14 @@ var _ = Describe("Shutdown", func() {
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() {
Context("When Channels are empty,", func() { Context("When Channels are empty,", func() {
It("Should Shutdown Successfully.", func() { It("Should Shutdown Successfully.", func() {
go func() { go func() {
_, cancel := context.WithCancel(context.Background()) _, kgCancel := context.WithCancel(context.Background())
_, hdCancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}() }()
@ -85,9 +86,10 @@ var _ = Describe("Shutdown", func() {
shutdownCh := make(chan bool) shutdownCh := make(chan bool)
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
_, cancel := context.WithCancel(context.Background()) _, kgCancel := context.WithCancel(context.Background())
_, hdCancel := context.WithCancel(context.Background())
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
shutdownCh <- true shutdownCh <- true
@ -120,8 +122,9 @@ var _ = Describe("Shutdown", func() {
//log.SetLevel(log.DebugLevel) //log.SetLevel(log.DebugLevel)
go func() { go func() {
log.Debug("Starting shutdown chan") log.Debug("Starting shutdown chan")
_, cancel := context.WithCancel(context.Background()) _, kgCancel := context.WithCancel(context.Background())
err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) _, hdCancel := context.WithCancel(context.Background())
err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC)
log.Debug("We have completed the shutdown...") log.Debug("We have completed the shutdown...")
Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String())))
shutdownCh <- true shutdownCh <- true

View File

@ -18,42 +18,23 @@
package beaconclient package beaconclient
import ( import (
"time" "context"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() { func (bc *BeaconClient) CaptureHead(ctx context.Context) {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
go bc.handleHead() go bc.handleHead(ctx)
go bc.handleReorg() go bc.handleReorg(ctx)
bc.captureEventTopic() bc.captureEventTopic(ctx)
} }
// Stop the head tracking service. // Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error { func (bc *BeaconClient) StopHeadTracking(cancel context.CancelFunc) error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool) cancel()
chReorg := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg)
<-chHead
<-chReorg
log.Info("Successfully stopped the head tracking service.") log.Info("Successfully stopped the head tracking service.")
return nil return nil
} }
// This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
}
loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
finish <- true
}

View File

@ -23,6 +23,7 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
@ -264,71 +265,103 @@ type MimicConfig struct {
var _ = Describe("Capturehead", Label("head"), func() { var _ = Describe("Capturehead", Label("head"), func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() { Context("Correctly formatted Phase0 Block", Label("leak-head"), func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99") log.SetLevel(log.DebugLevel)
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Correctly formatted Altair Block", func() { Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey)
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Correctly formatted Altair Test Blocks", func() { Context("Correctly formatted Altair Test Blocks", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0)
bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") bc = setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Correctly formatted Phase0 Test Blocks", func() { Context("Correctly formatted Phase0 Test Blocks", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
bc = setUpTest(BeaconNodeTester.TestConfig, "99") bc = setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Two consecutive correct blocks", func() { Context("Two consecutive correct blocks", func() {
It("Should handle both blocks correctly, without any reorgs or known_gaps", func() { It("Should handle both blocks correctly, without any reorgs or known_gaps", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Two consecutive blocks with a bad parent", func() { Context("Two consecutive blocks with a bad parent", func() {
It("Should add the previous block to the knownGaps table.", func() { It("Should add the previous block to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) startGoRoutines := runtime.NumGoroutine()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0)
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() {
@ -348,10 +381,13 @@ var _ = Describe("Capturehead", Label("head"), func() {
//}) //})
Context("When the proper SSZ objects are not served", func() { Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() { It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0)
knownGapCount := countKnownGapsTable(bc.Db) knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(1)) Expect(knownGapCount).To(Equal(1))
@ -359,6 +395,8 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end := queryKnownGaps(bc.Db, "102", "102") start, end := queryKnownGaps(bc.Db, "102", "102")
Expect(start).To(Equal(102)) Expect(start).To(Equal(102))
Expect(end).To(Equal(102)) Expect(end).To(Equal(102))
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -366,21 +404,28 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() { Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() {
Context("There is a gap at start up within one incrementing range.", func() { Context("There is a gap at start up within one incrementing range.", func() {
It("Should add only a single entry to the knownGaps table.", func() { It("Should add only a single entry to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "10")
BeaconNodeTester.testKnownGapsMessages(ctx, bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "11", "99") start, end := queryKnownGaps(bc.Db, "11", "99")
Expect(start).To(Equal(11)) Expect(start).To(Equal(11))
Expect(end).To(Equal(99)) Expect(end).To(Equal(99))
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("There is a gap at start up spanning multiple incrementing range.", func() { Context("There is a gap at start up spanning multiple incrementing range.", func() {
It("Should add multiple entries to the knownGaps table.", func() { It("Should add multiple entries to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "5")
BeaconNodeTester.testKnownGapsMessages(ctx, bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "6", "16") start, end := queryKnownGaps(bc.Db, "6", "16")
Expect(start).To(Equal(6)) Expect(start).To(Equal(6))
@ -389,14 +434,19 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end = queryKnownGaps(bc.Db, "96", "99") start, end = queryKnownGaps(bc.Db, "96", "99")
Expect(start).To(Equal(96)) Expect(start).To(Equal(96))
Expect(end).To(Equal(99)) Expect(end).To(Equal(99))
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Gaps between two head messages", func() { Context("Gaps between two head messages", func() {
It("Should add the slots in-between", func() { It("Should add the slots in-between", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testKnownGapsMessages(ctx, bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage)
start, end := queryKnownGaps(bc.Db, "101", "1000101") start, end := queryKnownGaps(bc.Db, "101", "1000101")
Expect(start).To(Equal(101)) Expect(start).To(Equal(101))
@ -405,6 +455,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
start, end = queryKnownGaps(bc.Db, "2000101", "2375702") start, end = queryKnownGaps(bc.Db, "2000101", "2375702")
Expect(start).To(Equal(2000101)) Expect(start).To(Equal(2000101))
Expect(end).To(Equal(2375702)) Expect(end).To(Equal(2375702))
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -412,34 +463,50 @@ var _ = Describe("Capturehead", Label("head"), func() {
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
Context("Altair: Multiple head messages for the same slot.", func() { Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase0: Multiple head messages for the same slot.", func() { Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Phase 0: Multiple reorgs have occurred on this slot", func() { Context("Phase 0: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("Altair: Multiple reorgs have occurred on this slot", func() { Context("Altair: Multiple reorgs have occurred on this slot", func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
}) })
@ -831,8 +898,8 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead() go bc.CaptureHead(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Messages to BeaconClient") log.Info("Sending Messages to BeaconClient")
@ -893,8 +960,8 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
} }
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead() go bc.CaptureHead(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -923,8 +990,8 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead() go bc.CaptureHead(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1) sendHeadMessage(bc, firstHead, maxRetry, 1)
@ -950,9 +1017,9 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
bc.KnownGapTableIncrement = tableIncrement bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead() go bc.CaptureHead(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
for _, headMsg := range msg { for _, headMsg := range msg {
@ -991,3 +1058,14 @@ func testSszRoot(msg Message) {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:]))) Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:])))
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) {
bc.Db.Close()
err := bc.StopHeadTracking(cancel)
Expect(err).ToNot(HaveOccurred())
time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine()
Expect(startGoRoutines).To(Equal(endNum))
}

View File

@ -164,8 +164,9 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries....
if errs != nil { if errs != nil {
finalErrCh <- errs finalErrCh <- errs
} else {
finalErrCh <- nil
} }
finalErrCh <- nil
log.Debug("We are stopping the processing of adding new entries") log.Debug("We are stopping the processing of adding new entries")
}() }()
log.Debug("Waiting for shutdown signal from channel") log.Debug("Waiting for shutdown signal from channel")

View File

@ -3,6 +3,7 @@ package beaconclient_test
import ( import (
"context" "context"
"fmt" "fmt"
"runtime"
"sync/atomic" "sync/atomic"
"time" "time"
@ -24,9 +25,11 @@ var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() { Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
It("Successfully Process the Blocks", func() { It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startNum := runtime.NumGoroutine()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes // Run Two seperate processes
@ -35,6 +38,11 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
time.Sleep(3 * time.Second)
bc.Db.Close()
endNum := runtime.NumGoroutine()
Expect(startNum).To(Equal(endNum))
}) })
}) })
Context("When the start block is greater than the endBlock", func() { Context("When the start block is greater than the endBlock", func() {
@ -70,11 +78,13 @@ var _ = Describe("Capturehistoric", func() {
}) })
}) })
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() { Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() {
It("Successfully Process the Blocks", func() { It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startNum := runtime.NumGoroutine()
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes // Run Two seperate processes
@ -83,6 +93,10 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
bc.Db.Close()
endNum := runtime.NumGoroutine()
Expect(startNum).To(Equal(endNum))
}) })
}) })
Context("When the start block is greater than the endBlock", func() { Context("When the start block is greater than the endBlock", func() {
@ -107,13 +121,16 @@ var _ = Describe("Capturehistoric", func() {
}) })
}) })
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() { Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
Context("When it recieves a head, historic and known Gaps message (in order)", func() { Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Head // Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
// Historical // Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
@ -125,19 +142,25 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("When it recieves a historic, head and known Gaps message (in order)", func() { Context("When it recieves a historic, head and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Historical // Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
// Head // Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
// Known Gaps // Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
@ -145,13 +168,17 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
Context("When it recieves a known Gaps, historic and head message (in order)", func() { Context("When it recieves a known Gaps, historic and head message (in order)", func() {
It("Should process them all successfully.", func() { It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
// Known Gaps // Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0) BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
@ -161,10 +188,11 @@ var _ = Describe("Capturehistoric", func() {
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Head // Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc) validatePopularBatchBlocks(bc)
testStopHeadTracking(cancel, bc, startGoRoutines)
}) })
}) })
}) })

View File

@ -18,6 +18,7 @@
package beaconclient package beaconclient
import ( import (
"context"
"encoding/json" "encoding/json"
"time" "time"
@ -33,7 +34,7 @@ var (
// This function will capture all the SSE events for a given SseEvents object. // This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) { func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64)) {
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
@ -56,6 +57,10 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
}() }()
for { for {
select { select {
case <-ctx.Done():
close(eventHandler.MessagesCh)
close(eventHandler.ErrorCh)
return
case message := <-eventHandler.MessagesCh: case message := <-eventHandler.MessagesCh:
// Message can be nil if its a keep-alive message // Message can be nil if its a keep-alive message
if len(message.Data) != 0 { if len(message.Data) != 0 {
@ -91,8 +96,8 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
} }
// Capture all of the event topics. // Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() { func (bc *BeaconClient) captureEventTopic(ctx context.Context) {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError) go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
} }

View File

@ -19,6 +19,7 @@
package beaconclient package beaconclient
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
@ -26,52 +27,63 @@ import (
) )
// This function will perform the necessary steps to handle a reorg. // This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorg() { func (bc *BeaconClient) handleReorg(ctx context.Context) {
log.Info("Starting to process reorgs.") log.Info("Starting to process reorgs.")
for { for {
reorg := <-bc.ReOrgTracking.ProcessCh select {
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") case <-ctx.Done():
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) close(bc.ReOrgTracking.ProcessCh)
return
case reorg := <-bc.ReOrgTracking.ProcessCh:
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
} }
} }
// This function will handle the latest head event. // This function will handle the latest head event.
func (bc *BeaconClient) handleHead() { func (bc *BeaconClient) handleHead(ctx context.Context) {
log.Info("Starting to process head.") log.Info("Starting to process head.")
errorSlots := 0 errorSlots := 0
for { for {
head := <-bc.HeadTracking.ProcessCh select {
// Process all the work here. case <-ctx.Done():
slot, err := strconv.Atoi(head.Slot) close(bc.HeadTracking.ProcessCh)
if err != nil { return
bc.HeadTracking.ErrorCh <- &SseError{ case head := <-bc.HeadTracking.ProcessCh:
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
// Process all the work here.
slot, err := strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
errorSlots = errorSlots + 1
continue
} }
errorSlots = errorSlots + 1 if errorSlots != 0 && bc.PreviousSlot != 0 {
continue log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorSlots": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
errorSlots = 0
}
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
// Not used anywhere yet but might be useful to have.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
bc.StartingSlot = slot
}
go processHeadSlot(ctx, bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
} }
if errorSlots != 0 && bc.PreviousSlot != 0 {
log.WithFields(log.Fields{
"lastProcessedSlot": bc.PreviousSlot,
"errorSlots": errorSlots,
}).Warn("We added slots to the knownGaps table because we got bad head messages.")
writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics)
errorSlots = 0
}
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
// Not used anywhere yet but might be useful to have.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
bc.StartingSlot = slot
}
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
// Update the previous block
bc.PreviousSlot = slot
bc.PreviousBlockRoot = head.Block
} }
} }

View File

@ -239,12 +239,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string,
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
// Get the knownGaps at startUp. // Get the knownGaps at startUp.
if previousSlot == 0 && previousBlockRoot == "" { if previousSlot == 0 && previousBlockRoot == "" {
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
} }
err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
if err != nil { if err != nil {
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
} }

View File

@ -1,7 +1,9 @@
package beaconclient_test package beaconclient_test
import ( import (
"context"
"os" "os"
"runtime"
"strconv" "strconv"
"time" "time"
@ -63,7 +65,11 @@ func getEnvInt(envVar string) int {
// Start head tracking and wait for the expected results. // Start head tracking and wait for the expected results.
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
go bc.CaptureHead() startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
testStopHeadTracking(cancel, bc, startGoRoutines)
} }