From a2f2603d38470670532e470d6b7774afcc4a1c6c Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Tue, 21 Jun 2022 14:34:10 -0400 Subject: [PATCH] Use context to stop head tracking --- cmd/full.go | 7 +- cmd/head.go | 8 +- go.mod | 1 + go.sum | 3 +- internal/shutdown/shutdown.go | 8 +- internal/shutdown/shutdown_test.go | 17 ++- pkg/beaconclient/capturehead.go | 33 +---- pkg/beaconclient/capturehead_test.go | 160 ++++++++++++++++------ pkg/beaconclient/capturehistoric.go | 3 +- pkg/beaconclient/capturehistoric_test.go | 48 +++++-- pkg/beaconclient/incomingsse.go | 13 +- pkg/beaconclient/processevents.go | 84 +++++++----- pkg/beaconclient/processslot.go | 4 +- pkg/beaconclient/systemvalidation_test.go | 8 +- 14 files changed, 258 insertions(+), 139 deletions(-) diff --git a/cmd/full.go b/cmd/full.go index 7d50c8a..840d39d 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -75,7 +75,8 @@ func startFullProcessing() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - go Bc.CaptureHead() + hdCtx, hdCancel := context.WithCancel(context.Background()) + go Bc.CaptureHead(hdCtx) hpContext, hpCancel := context.WithCancel(context.Background()) @@ -90,7 +91,7 @@ func startFullProcessing() { } return nil }) - kgCtx, KgCancel := context.WithCancel(context.Background()) + kgCtx, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) @@ -115,7 +116,7 @@ func startFullProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownFull(ctx, KgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownFull(ctx, hdCancel, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/cmd/head.go b/cmd/head.go index 4688087..a4f88ec 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -62,8 +62,10 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - go Bc.CaptureHead() - kgCtx, KgCancel := context.WithCancel(context.Background()) + hdCtx, hdCancel := context.WithCancel(context.Background()) + go Bc.CaptureHead(hdCtx) + + kgCtx, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) @@ -88,7 +90,7 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownHeadTracking(ctx, KgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/go.mod b/go.mod index f3082d4..efb8ad3 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( github.com/urfave/cli/v2 v2.3.0 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect + go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect diff --git a/go.sum b/go.sum index 249cbfc..0e5dee9 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,9 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 13181b4..7e03fb0 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -40,12 +40,12 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t } // Wrapper function for shutting down the head tracking process. -func ShutdownHeadTracking(ctx context.Context, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHeadTracking(ctx context.Context, hdCancel context.CancelFunc, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHeadTracking() + err := BC.StopHeadTracking(hdCancel) if err != nil { loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") } @@ -82,7 +82,7 @@ func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context. } // Shutdown the head and historical processing -func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownFull(ctx context.Context, hdCancel context.CancelFunc, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { @@ -97,7 +97,7 @@ func ShutdownFull(ctx context.Context, kgCancel, hpCancel context.CancelFunc, no loghelper.LogError(err).Error("Unable to stop processing known gaps") } } - err = BC.StopHeadTracking() + err = BC.StopHeadTracking(hdCancel) if err != nil { loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") } diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 97d83af..99dc6dd 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -68,13 +68,14 @@ var _ = Describe("Shutdown", func() { Expect(err).To(BeNil()) }) - Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { + Describe("Run Shutdown Function for head tracking,", Label("integration", "shutdown"), func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { - _, cancel := context.WithCancel(context.Background()) + _, kgCancel := context.WithCancel(context.Background()) + _, hdCancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -85,9 +86,10 @@ var _ = Describe("Shutdown", func() { shutdownCh := make(chan bool) //log.SetLevel(log.DebugLevel) go func() { - _, cancel := context.WithCancel(context.Background()) + _, kgCancel := context.WithCancel(context.Background()) + _, hdCancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -120,8 +122,9 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - _, cancel := context.WithCancel(context.Background()) - err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + _, kgCancel := context.WithCancel(context.Background()) + _, hdCancel := context.WithCancel(context.Background()) + err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index a0b6e6b..0b54881 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -18,42 +18,23 @@ package beaconclient import ( - "time" + "context" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHead() { +func (bc *BeaconClient) CaptureHead(ctx context.Context) { log.Info("We are tracking the head of the chain.") - go bc.handleHead() - go bc.handleReorg() - bc.captureEventTopic() + go bc.handleHead(ctx) + go bc.handleReorg(ctx) + bc.captureEventTopic(ctx) } // Stop the head tracking service. -func (bc *BeaconClient) StopHeadTracking() error { +func (bc *BeaconClient) StopHeadTracking(cancel context.CancelFunc) error { log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") - chHead := make(chan bool) - chReorg := make(chan bool) - - go bc.HeadTracking.finishProcessingChannel(chHead) - go bc.ReOrgTracking.finishProcessingChannel(chReorg) - - <-chHead - <-chReorg + cancel() log.Info("Successfully stopped the head tracking service.") return nil } - -// This function closes the SSE subscription, but waits until the MessagesCh is empty -func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { - loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") - se.SseClient.Unsubscribe(se.MessagesCh) - for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 { - time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) - } - loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown") - finish <- true -} diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 1c569df..adb2bab 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "path/filepath" + "runtime" "strconv" "sync/atomic" "time" @@ -264,71 +265,103 @@ type MimicConfig struct { var _ = Describe("Capturehead", Label("head"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { - Context("Correctly formatted Phase0 Block", func() { + Context("Correctly formatted Phase0 Block", Label("leak-head"), func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") + log.SetLevel(log.DebugLevel) + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) + testStopHeadTracking(cancel, bc, startGoRoutines) + }) }) Context("Correctly formatted Altair Block", func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Correctly formatted Altair Test Blocks", func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Correctly formatted Phase0 Test Blocks", func() { It("Should turn it into a struct successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Two consecutive correct blocks", func() { It("Should handle both blocks correctly, without any reorgs or known_gaps", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Two consecutive blocks with a bad parent", func() { It("Should add the previous block to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { @@ -348,10 +381,13 @@ var _ = Describe("Capturehead", Label("head"), func() { //}) Context("When the proper SSZ objects are not served", func() { It("Should return an error, and add the slot to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "101") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "101") + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) knownGapCount := countKnownGapsTable(bc.Db) Expect(knownGapCount).To(Equal(1)) @@ -359,6 +395,8 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end := queryKnownGaps(bc.Db, "102", "102") Expect(start).To(Equal(102)) Expect(end).To(Equal(102)) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) }) @@ -366,21 +404,28 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("Known Gaps Scenario", Label("unit", "behavioral"), func() { Context("There is a gap at start up within one incrementing range.", func() { It("Should add only a single entry to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "10") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "10") + BeaconNodeTester.testKnownGapsMessages(ctx, bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "11", "99") Expect(start).To(Equal(11)) Expect(end).To(Equal(99)) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("There is a gap at start up spanning multiple incrementing range.", func() { It("Should add multiple entries to the knownGaps table.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "5") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "5") + BeaconNodeTester.testKnownGapsMessages(ctx, bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "6", "16") Expect(start).To(Equal(6)) @@ -389,14 +434,19 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end = queryKnownGaps(bc.Db, "96", "99") Expect(start).To(Equal(96)) Expect(end).To(Equal(99)) + + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Gaps between two head messages", func() { It("Should add the slots in-between", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testKnownGapsMessages(ctx, bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) start, end := queryKnownGaps(bc.Db, "101", "1000101") Expect(start).To(Equal(101)) @@ -405,6 +455,7 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end = queryKnownGaps(bc.Db, "2000101", "2375702") Expect(start).To(Equal(2000101)) Expect(end).To(Equal(2375702)) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) }) @@ -412,34 +463,50 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { Context("Altair: Multiple head messages for the same slot.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Phase0: Multiple head messages for the same slot.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Phase 0: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("Altair: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") + BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) }) @@ -831,8 +898,8 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. -func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead() +func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { + go bc.CaptureHead(ctx) time.Sleep(1 * time.Second) log.Info("Sending Messages to BeaconClient") @@ -893,8 +960,8 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs } // A test to validate a single block was processed correctly -func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { - go bc.CaptureHead() +func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { + go bc.CaptureHead(ctx) time.Sleep(1 * time.Second) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) @@ -923,8 +990,8 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. -func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead() +func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { + go bc.CaptureHead(ctx) time.Sleep(1 * time.Second) sendHeadMessage(bc, firstHead, maxRetry, 1) @@ -950,9 +1017,9 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. -func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { +func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement - go bc.CaptureHead() + go bc.CaptureHead(ctx) time.Sleep(1 * time.Second) for _, headMsg := range msg { @@ -991,3 +1058,14 @@ func testSszRoot(msg Message) { Expect(err).ToNot(HaveOccurred()) Expect(msg.HeadMessage.Block).To(Equal("0x" + hex.EncodeToString(blockRoot[:]))) } + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { + bc.Db.Close() + err := bc.StopHeadTracking(cancel) + Expect(err).ToNot(HaveOccurred()) + + time.Sleep(3 * time.Second) + endNum := runtime.NumGoroutine() + Expect(startGoRoutines).To(Equal(endNum)) +} diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 2bf6dfc..50855de 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -164,8 +164,9 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, errs := bp.getSlotRange(ctx, slotsCh) // Periodically adds new entries.... if errs != nil { finalErrCh <- errs + } else { + finalErrCh <- nil } - finalErrCh <- nil log.Debug("We are stopping the processing of adding new entries") }() log.Debug("Waiting for shutdown signal from channel") diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 5571a7e..5f36039 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -3,6 +3,7 @@ package beaconclient_test import ( "context" "fmt" + "runtime" "sync/atomic" "time" @@ -24,9 +25,11 @@ var _ = Describe("Capturehistoric", func() { Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() { Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() { It("Successfully Process the Blocks", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + startNum := runtime.NumGoroutine() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) // Run Two seperate processes @@ -35,6 +38,11 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + time.Sleep(3 * time.Second) + + bc.Db.Close() + endNum := runtime.NumGoroutine() + Expect(startNum).To(Equal(endNum)) }) }) Context("When the start block is greater than the endBlock", func() { @@ -70,11 +78,13 @@ var _ = Describe("Capturehistoric", func() { }) }) Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() { - Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() { + Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", Label("leak"), func() { It("Successfully Process the Blocks", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + startNum := runtime.NumGoroutine() + + bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101) BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0) // Run Two seperate processes @@ -83,6 +93,10 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + + bc.Db.Close() + endNum := runtime.NumGoroutine() + Expect(startNum).To(Equal(endNum)) }) }) Context("When the start block is greater than the endBlock", func() { @@ -107,13 +121,16 @@ var _ = Describe("Capturehistoric", func() { }) }) Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() { - Context("When it recieves a head, historic and known Gaps message (in order)", func() { + Context("When it recieves a head, historic and known Gaps message (in order)", Label("deb"), func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) // Historical BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) @@ -125,19 +142,25 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + + testStopHeadTracking(cancel, bc, startGoRoutines) + }) }) Context("When it recieves a historic, head and known Gaps message (in order)", func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Historical BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) // Known Gaps BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) @@ -145,13 +168,17 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) Context("When it recieves a known Gaps, historic and head message (in order)", func() { It("Should process them all successfully.", func() { - bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + + bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Known Gaps BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0) @@ -161,10 +188,11 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) + testStopHeadTracking(cancel, bc, startGoRoutines) }) }) }) diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index cdb4891..0833f44 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -18,6 +18,7 @@ package beaconclient import ( + "context" "encoding/json" "time" @@ -33,7 +34,7 @@ var ( // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) { +func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64)) { go func() { errG := new(errgroup.Group) errG.Go(func() error { @@ -56,6 +57,10 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe }() for { select { + case <-ctx.Done(): + close(eventHandler.MessagesCh) + close(eventHandler.ErrorCh) + return case message := <-eventHandler.MessagesCh: // Message can be nil if its a keep-alive message if len(message.Data) != 0 { @@ -91,8 +96,8 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan } // Capture all of the event topics. -func (bc *BeaconClient) captureEventTopic() { +func (bc *BeaconClient) captureEventTopic(ctx context.Context) { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) - go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError) + go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError) + go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError) } diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 8dd5520..15d183f 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -19,6 +19,7 @@ package beaconclient import ( + "context" "fmt" "strconv" @@ -26,52 +27,63 @@ import ( ) // This function will perform the necessary steps to handle a reorg. -func (bc *BeaconClient) handleReorg() { +func (bc *BeaconClient) handleReorg(ctx context.Context) { log.Info("Starting to process reorgs.") for { - reorg := <-bc.ReOrgTracking.ProcessCh - log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") - writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) + select { + case <-ctx.Done(): + close(bc.ReOrgTracking.ProcessCh) + return + case reorg := <-bc.ReOrgTracking.ProcessCh: + log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") + writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) + } } } // This function will handle the latest head event. -func (bc *BeaconClient) handleHead() { +func (bc *BeaconClient) handleHead(ctx context.Context) { log.Info("Starting to process head.") errorSlots := 0 for { - head := <-bc.HeadTracking.ProcessCh - // Process all the work here. - slot, err := strconv.Atoi(head.Slot) - if err != nil { - bc.HeadTracking.ErrorCh <- &SseError{ - err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), + select { + case <-ctx.Done(): + close(bc.HeadTracking.ProcessCh) + return + case head := <-bc.HeadTracking.ProcessCh: + + // Process all the work here. + slot, err := strconv.Atoi(head.Slot) + if err != nil { + bc.HeadTracking.ErrorCh <- &SseError{ + err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), + } + errorSlots = errorSlots + 1 + continue } - errorSlots = errorSlots + 1 - continue + if errorSlots != 0 && bc.PreviousSlot != 0 { + log.WithFields(log.Fields{ + "lastProcessedSlot": bc.PreviousSlot, + "errorSlots": errorSlots, + }).Warn("We added slots to the knownGaps table because we got bad head messages.") + writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics) + errorSlots = 0 + } + + log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") + + // Not used anywhere yet but might be useful to have. + if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" { + bc.StartingSlot = slot + } + + go processHeadSlot(ctx, bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) + + log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") + + // Update the previous block + bc.PreviousSlot = slot + bc.PreviousBlockRoot = head.Block } - if errorSlots != 0 && bc.PreviousSlot != 0 { - log.WithFields(log.Fields{ - "lastProcessedSlot": bc.PreviousSlot, - "errorSlots": errorSlots, - }).Warn("We added slots to the knownGaps table because we got bad head messages.") - writeKnownGaps(bc.Db, bc.KnownGapTableIncrement, bc.PreviousSlot+1, slot, fmt.Errorf("Bad Head Messages"), "headProcessing", bc.Metrics) - errorSlots = 0 - } - - log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") - - // Not used anywhere yet but might be useful to have. - if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" { - bc.StartingSlot = slot - } - - go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) - - log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") - - // Update the previous block - bc.PreviousSlot = slot - bc.PreviousBlockRoot = head.Block } } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 4e075b4..4b1f55e 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -239,12 +239,12 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { +func processHeadSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { // Get the knownGaps at startUp. if previousSlot == 0 && previousBlockRoot == "" { writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) } - err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) + err, errReason := processFullSlot(ctx, db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) if err != nil { writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index 7aabd41..9187858 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -1,7 +1,9 @@ package beaconclient_test import ( + "context" "os" + "runtime" "strconv" "time" @@ -63,7 +65,11 @@ func getEnvInt(envVar string) int { // Start head tracking and wait for the expected results. func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { - go bc.CaptureHead() + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) + go bc.CaptureHead(ctx) time.Sleep(1 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) + + testStopHeadTracking(cancel, bc, startGoRoutines) }