diff --git a/cmd/capture.go b/cmd/capture.go index d144834..6d48027 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -39,6 +39,7 @@ var ( bcConnectionProtocol string bcType string bcMaxHistoricProcessWorker int + bcMaxHeadProcessWorker int bcUniqueNodeIdentifier int bcCheckDb bool kgMaxWorker int @@ -96,7 +97,8 @@ func init() { captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.") captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") - captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcMaxHeadProcessWorker, "bc.maxHeadProcessWorker", "", 3, "The number of workers that should be actively processing slots head slots. Be careful of system memory.") captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?") // err = captureCmd.MarkPersistentFlagRequired("bc.address") @@ -107,7 +109,7 @@ func init() { //// Known Gaps specific captureCmd.PersistentFlags().BoolVarP(&kgProcessGaps, "kg.processKnownGaps", "", true, "Should we process the slots within the eth-beacon.known_gaps table.") captureCmd.PersistentFlags().IntVarP(&kgTableIncrement, "kg.increment", "", 10000, "The max slots within a single entry to the known_gaps table.") - captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&kgMaxWorker, "kg.maxKnownGapsWorker", "", 3, "The number of workers that should be actively processing slots from the eth-beacon.known_gaps table. Be careful of system memory.") // Prometheus Specific captureCmd.PersistentFlags().BoolVarP(&pmMetrics, "pm.metrics", "", true, "Should we capture prometheus metrics.") @@ -157,6 +159,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) + err = viper.BindPFlag("bc.maxHeadProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHeadProcessWorker")) + exitErr(err) err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) exitErr(err) err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb")) diff --git a/cmd/full.go b/cmd/full.go index 6d27670..1ba83a9 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -59,7 +59,7 @@ func init() { func startFullProcessing() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -75,14 +75,11 @@ func startFullProcessing() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - hdCtx, hdCancel := context.WithCancel(context.Background()) - go Bc.CaptureHead(hdCtx, false) - - hpContext, hpCancel := context.WithCancel(context.Background()) + go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -91,12 +88,11 @@ func startFullProcessing() { } return nil }) - kgCtx, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -116,7 +112,7 @@ func startFullProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownFull(ctx, hdCancel, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownFull(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/cmd/head.go b/cmd/head.go index 41c5b43..1aea4ea 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net/http" - "os" "strconv" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -47,7 +46,7 @@ var headCmd = &cobra.Command{ func startHeadTracking() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -63,15 +62,13 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks - hdCtx, hdCancel := context.WithCancel(context.Background()) - go Bc.CaptureHead(hdCtx, false) + go Bc.CaptureHead(ctx, viper.GetInt("bc.maxHeadProcessWorker"), false) - kgCtx, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgCtx, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -91,14 +88,12 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { log.Info("Gracefully shutdown ipld-eth-beacon-indexer") } - log.Debug("WTF") - os.Exit(0) } func init() { diff --git a/cmd/historic.go b/cmd/historic.go index 6e0a03e..78797d8 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -49,7 +49,7 @@ var historicCmd = &cobra.Command{ func startHistoricProcessing() { // Boot the application log.Info("Starting the application in head tracking mode.") - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), @@ -63,11 +63,9 @@ func startHistoricProcessing() { serveProm(addr) } - hpContext, hpCancel := context.WithCancel(context.Background()) - errG, _ := errgroup.WithContext(context.Background()) errG.Go(func() error { - errs := Bc.CaptureHistoric(hpContext, viper.GetInt("bc.maxHistoricProcessWorker")) + errs := Bc.CaptureHistoric(ctx, viper.GetInt("bc.maxHistoricProcessWorker")) if len(errs) != 0 { if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events") @@ -77,12 +75,11 @@ func startHistoricProcessing() { return nil }) - kgContext, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { go func() { errG := new(errgroup.Group) errG.Go(func() error { - errs := Bc.ProcessKnownGaps(kgContext, viper.GetInt("kg.maxKnownGapsWorker")) + errs := Bc.ProcessKnownGaps(ctx, viper.GetInt("kg.maxKnownGapsWorker")) if len(errs) != 0 { log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps") return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps") @@ -102,7 +99,7 @@ func startHistoricProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownHistoricProcessing(ctx, kgCancel, hpCancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, cancel, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-eth-beacon-indexer!") } else { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 7e03fb0..fc978af 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -40,68 +40,66 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t } // Wrapper function for shutting down the head tracking process. -func ShutdownHeadTracking(ctx context.Context, hdCancel context.CancelFunc, kgCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHeadTracking(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHeadTracking(hdCancel) - if err != nil { - loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") - } + cancel() + BC.StopHeadTracking(ctx, false) if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err := BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") + return err } } - return err + return nil }, }) } // Wrapper function for shutting down the head tracking process. -func ShutdownHistoricProcessing(ctx context.Context, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHistoric(hpCancel) + cancel() + err := BC.StopHistoric(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err = BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") + return err } } - return err + return nil }, }) } // Shutdown the head and historical processing -func ShutdownFull(ctx context.Context, hdCancel context.CancelFunc, kgCancel, hpCancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { +func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() - err := BC.StopHistoric(hpCancel) + cancel() + err := BC.StopHistoric(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { - err = BC.StopKnownGapsProcessing(kgCancel) + err = BC.StopKnownGapsProcessing(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing known gaps") } } - err = BC.StopHeadTracking(hdCancel) - if err != nil { - loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") - } - + BC.StopHeadTracking(ctx, false) return err }, }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 99dc6dd..eb56d10 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -56,12 +56,13 @@ var ( BC *beaconclient.BeaconClient err error ctx context.Context + cancel context.CancelFunc notifierCh chan os.Signal ) var _ = Describe("Shutdown", func() { BeforeEach(func() { - ctx = context.Background() + ctx, cancel = context.WithCancel(context.Background()) BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb) notifierCh = make(chan os.Signal, 1) @@ -72,10 +73,8 @@ var _ = Describe("Shutdown", func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { - _, kgCancel := context.WithCancel(context.Background()) - _, hdCancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -86,10 +85,8 @@ var _ = Describe("Shutdown", func() { shutdownCh := make(chan bool) //log.SetLevel(log.DebugLevel) go func() { - _, kgCancel := context.WithCancel(context.Background()) - _, hdCancel := context.WithCancel(context.Background()) log.Debug("Starting shutdown chan") - err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -122,9 +119,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - _, kgCancel := context.WithCancel(context.Background()) - _, hdCancel := context.WithCancel(context.Background()) - err = shutdown.ShutdownHeadTracking(ctx, hdCancel, kgCancel, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, cancel, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index a18a70d..9392d45 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -24,17 +24,23 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHead(ctx context.Context, skipSee bool) { +func (bc *BeaconClient) CaptureHead(ctx context.Context, maxHeadWorkers int, skipSee bool) { log.Info("We are tracking the head of the chain.") - go bc.handleHead(ctx) + go bc.handleHead(ctx, maxHeadWorkers) go bc.handleReorg(ctx) bc.captureEventTopic(ctx, skipSee) } // Stop the head tracking service. -func (bc *BeaconClient) StopHeadTracking(cancel context.CancelFunc) error { - log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") - cancel() - log.Info("Successfully stopped the head tracking service.") - return nil +func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) { + select { + case <-ctx.Done(): + if !skipSee { + bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh) + bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh) + } + log.Info("Successfully stopped the head tracking service.") + default: + log.Error("The context has not completed....") + } } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 107041f..94f089a 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -267,8 +267,6 @@ var _ = Describe("Capturehead", Label("head"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Context("Correctly formatted Phase0 Block", Label("leak-head"), func() { It("Should turn it into a struct successfully.", func() { - log.SetLevel(log.DebugLevel) - BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() startGoRoutines := runtime.NumGoroutine() @@ -280,7 +278,7 @@ var _ = Describe("Capturehead", Label("head"), func() { validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) @@ -295,7 +293,7 @@ var _ = Describe("Capturehead", Label("head"), func() { BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Correctly formatted Altair Test Blocks", func() { @@ -313,7 +311,7 @@ var _ = Describe("Capturehead", Label("head"), func() { defer httpmock.DeactivateAndReset() BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) @@ -332,7 +330,7 @@ var _ = Describe("Capturehead", Label("head"), func() { defer httpmock.DeactivateAndReset() BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) @@ -347,7 +345,7 @@ var _ = Describe("Capturehead", Label("head"), func() { BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Two consecutive blocks with a bad parent", func() { @@ -361,7 +359,7 @@ var _ = Describe("Capturehead", Label("head"), func() { BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { @@ -396,7 +394,7 @@ var _ = Describe("Capturehead", Label("head"), func() { Expect(start).To(Equal(102)) Expect(end).To(Equal(102)) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -414,7 +412,7 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end := queryKnownGaps(bc.Db, "11", "99") Expect(start).To(Equal(11)) Expect(end).To(Equal(99)) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("There is a gap at start up spanning multiple incrementing range.", func() { @@ -435,7 +433,7 @@ var _ = Describe("Capturehead", Label("head"), func() { Expect(start).To(Equal(96)) Expect(end).To(Equal(99)) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Gaps between two head messages", func() { @@ -455,7 +453,7 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end = queryKnownGaps(bc.Db, "2000101", "2375702") Expect(start).To(Equal(2000101)) Expect(end).To(Equal(2375702)) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -470,7 +468,7 @@ var _ = Describe("Capturehead", Label("head"), func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Phase0: Multiple head messages for the same slot.", func() { @@ -482,7 +480,7 @@ var _ = Describe("Capturehead", Label("head"), func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Phase 0: Multiple reorgs have occurred on this slot", func() { @@ -494,7 +492,7 @@ var _ = Describe("Capturehead", Label("head"), func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Altair: Multiple reorgs have occurred on this slot", func() { @@ -506,7 +504,7 @@ var _ = Describe("Capturehead", Label("head"), func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -899,7 +897,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead(ctx, true) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) log.Info("Sending Messages to BeaconClient") @@ -961,7 +959,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclie // A test to validate a single block was processed correctly func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { - go bc.CaptureHead(ctx, true) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) @@ -991,7 +989,7 @@ func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead(ctx, true) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) sendHeadMessage(bc, firstHead, maxRetry, 1) @@ -1019,7 +1017,7 @@ func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient // as proposed and forked correctly. func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement - go bc.CaptureHead(ctx, true) + go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) for _, headMsg := range msg { @@ -1060,10 +1058,10 @@ func testSszRoot(msg Message) { } // A make shift function to stop head tracking and insure we dont have any goroutine leaks -func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { +func testStopHeadTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { bc.Db.Close() - err := bc.StopHeadTracking(cancel) - Expect(err).ToNot(HaveOccurred()) + cancel() + bc.StopHeadTracking(ctx, true) time.Sleep(3 * time.Second) endNum := runtime.NumGoroutine() diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 50855de..a7a56a2 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -37,14 +37,18 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopHistoric(cancel context.CancelFunc) error { - log.Info("We are stopping the historical processing service.") - cancel() - err := bc.HistoricalProcess.releaseDbLocks() - if err != nil { - loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") +func (bc *BeaconClient) StopHistoric(ctx context.Context) error { + select { + case <-ctx.Done(): + log.Info("We are stopping the historical processing service.") + err := bc.HistoricalProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!") + } + return nil + default: + return fmt.Errorf("Tried to stop historic before the context ended...") } - return nil } // An interface to enforce any batch processing. Currently there are two use cases for this. diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 5f36039..565f97f 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -143,7 +143,7 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) @@ -168,7 +168,7 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("When it recieves a known Gaps, historic and head message (in order)", func() { @@ -192,7 +192,7 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -228,25 +228,20 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli // Start the CaptureHistoric function, and check for the correct inserted slots. func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHistoric(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - log.Debug("Calling the stop function for historical processing..") - err := bc.StopHistoric(cancel) - time.Sleep(5 * time.Second) - Expect(err).ToNot(HaveOccurred()) - validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) + testStopHistoricTracking(ctx, cancel, bc, startGoRoutines) } // Wrapper function that processes knownGaps func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.ProcessKnownGaps(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - err := bc.StopKnownGapsProcessing(cancel) - time.Sleep(5 * time.Second) - Expect(err).ToNot(HaveOccurred()) - validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt) + testStopHistoricTracking(ctx, cancel, bc, startGoRoutines) } func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { @@ -316,3 +311,21 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) { Expect(err).ToNot(HaveOccurred()) Expect(rows).To(Equal(int64(0))) } + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopHistoricTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { + log.Debug("Calling the stop function for historical processing..") + cancel() + err := bc.StopKnownGapsProcessing(ctx) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(5 * time.Second) + validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt) + err = bc.Db.Close() + Expect(err).ToNot(HaveOccurred()) + + time.Sleep(3 * time.Second) + endNum := runtime.NumGoroutine() + + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + Expect(startGoRoutines).To(Equal(endNum)) +} diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 15d183f..9aa604e 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -24,6 +24,7 @@ import ( "strconv" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql" ) // This function will perform the necessary steps to handle a reorg. @@ -42,8 +43,14 @@ func (bc *BeaconClient) handleReorg(ctx context.Context) { } // This function will handle the latest head event. -func (bc *BeaconClient) handleHead(ctx context.Context) { +func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { log.Info("Starting to process head.") + + workCh := make(chan workParams) + log.WithField("workerNumber", maxWorkers).Info("Creating Workers") + for i := 1; i < maxWorkers; i++ { + go bc.headBlockProcessor(ctx, workCh) + } errorSlots := 0 for { select { @@ -77,9 +84,8 @@ func (bc *BeaconClient) handleHead(ctx context.Context) { bc.StartingSlot = slot } - go processHeadSlot(ctx, bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) - - log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") + workCh <- workParams{db: bc.Db, serverEndpoint: bc.ServerEndpoint, slot: slot, blockRoot: head.Block, stateRoot: head.State, previousSlot: bc.PreviousSlot, previousBlockRoot: bc.PreviousBlockRoot, metrics: bc.Metrics, knownGapsTableIncrement: bc.KnownGapTableIncrement, checkDb: bc.CheckDb} + log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished sending this slot to the workCh") // Update the previous block bc.PreviousSlot = slot @@ -87,3 +93,29 @@ func (bc *BeaconClient) handleHead(ctx context.Context) { } } } + +// A worker that will process head slots. +func (bc *BeaconClient) headBlockProcessor(ctx context.Context, workCh <-chan workParams) { + for { + select { + case <-ctx.Done(): + return + case wp := <-workCh: + processHeadSlot(ctx, wp.db, wp.serverEndpoint, wp.slot, wp.blockRoot, wp.stateRoot, wp.previousSlot, wp.previousBlockRoot, wp.metrics, wp.knownGapsTableIncrement, wp.checkDb) + } + } +} + +// A struct used to pass parameters to the worker. +type workParams struct { + db sql.Database + serverEndpoint string + slot int + blockRoot string + stateRoot string + previousSlot int + previousBlockRoot string + metrics *BeaconClientMetrics + knownGapsTableIncrement int + checkDb bool +} diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 343fc4a..f389e49 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -20,6 +20,7 @@ package beaconclient import ( "context" + "fmt" "strconv" log "github.com/sirupsen/logrus" @@ -67,14 +68,18 @@ func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) [] } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopKnownGapsProcessing(cancel context.CancelFunc) error { - log.Info("We are stopping the known gaps processing service.") - cancel() - err := bc.KnownGapsProcess.releaseDbLocks() - if err != nil { - loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") +func (bc *BeaconClient) StopKnownGapsProcessing(ctx context.Context) error { + select { + case <-ctx.Done(): + log.Info("We are stopping the known gaps processing service.") + err := bc.KnownGapsProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the eth_beacon.known_gaps table. Manual Intervention is needed!") + } + return nil + default: + return fmt.Errorf("Tried to stop knownGaps Processing without closing the context..") } - return nil } // Get a single row of historical slots from the table. diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 4962d2b..b21cacf 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -44,7 +44,12 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) { } defer response.Body.Close() rc := response.StatusCode - body, err := ioutil.ReadAll(response.Body) + + var body []byte + //io.Copy(body, response.Body) + //bytes.buffer... + _, err = response.Body.Read(body) + //body, err := ioutil.ReadAll(response.Body) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index fe67772..5b14d23 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -67,9 +67,9 @@ func getEnvInt(envVar string) int { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) - go bc.CaptureHead(ctx, false) + go bc.CaptureHead(ctx, 2, false) time.Sleep(1 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - testStopHeadTracking(cancel, bc, startGoRoutines) + testStopHeadTracking(ctx, cancel, bc, startGoRoutines) } diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go index 29fe19e..b5a2f9e 100644 --- a/pkg/gracefulshutdown/gracefulshutdown.go +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -45,7 +45,11 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat // add any other syscalls that you want to be notified with signal.Notify(notifierCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - <-notifierCh + // Wait for one or the other... + select { + case <-notifierCh: + case <-ctx.Done(): + } log.Info("Shutting Down your application")