diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index fc978af..47f62d1 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -66,7 +66,7 @@ func ShutdownHistoricProcessing(ctx context.Context, cancel context.CancelFunc, "beaconClient": func(ctx context.Context) error { defer DB.Close() cancel() - err := BC.StopHistoric(ctx) + err := BC.StopHistoricProcess(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } @@ -89,7 +89,7 @@ func ShutdownFull(ctx context.Context, cancel context.CancelFunc, notifierCh cha "beaconClient": func(ctx context.Context) error { defer DB.Close() cancel() - err := BC.StopHistoric(ctx) + err := BC.StopHistoricProcess(ctx) if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 94f089a..cc75ebd 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -269,49 +269,37 @@ var _ = Describe("Capturehead", Label("head"), func() { It("Should turn it into a struct successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) - bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) - }) }) Context("Correctly formatted Altair Block", func() { It("Should turn it into a struct successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1BlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Correctly formatted Altair Test Blocks", func() { It("Should turn it into a struct successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) - - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703-dummy-2"].HeadMessage, 74240, maxRetry, 1, 0, 0) }) }) @@ -319,18 +307,15 @@ var _ = Describe("Capturehead", Label("head"), func() { It("Should turn it into a struct successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) bc = setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy-2"].HeadMessage, 3, maxRetry, 1, 0, 0) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) @@ -338,28 +323,22 @@ var _ = Describe("Capturehead", Label("head"), func() { It("Should handle both blocks correctly, without any reorgs or known_gaps", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 0, 0) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Two consecutive blocks with a bad parent", func() { It("Should add the previous block to the knownGaps table.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100-dummy"].HeadMessage, 3, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, maxRetry, 1, 1, 1) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Phase 0: We have a correctly formated SSZ SignedBeaconBlock and BeaconState", func() { @@ -381,11 +360,9 @@ var _ = Describe("Capturehead", Label("head"), func() { It("Should return an error, and add the slot to the knownGaps table.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "101") - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["102-wrong-ssz-1"].HeadMessage, 3, maxRetry, 0, 1, 0) knownGapCount := countKnownGapsTable(bc.Db) Expect(knownGapCount).To(Equal(1)) @@ -394,7 +371,6 @@ var _ = Describe("Capturehead", Label("head"), func() { Expect(start).To(Equal(102)) Expect(end).To(Equal(102)) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -404,26 +380,21 @@ var _ = Describe("Capturehead", Label("head"), func() { It("Should add only a single entry to the knownGaps table.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "10") - BeaconNodeTester.testKnownGapsMessages(ctx, bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + BeaconNodeTester.testKnownGapsMessages(bc, 100, 1, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "11", "99") Expect(start).To(Equal(11)) Expect(end).To(Equal(99)) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("There is a gap at start up spanning multiple incrementing range.", func() { It("Should add multiple entries to the knownGaps table.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "5") - BeaconNodeTester.testKnownGapsMessages(ctx, bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) + BeaconNodeTester.testKnownGapsMessages(bc, 10, 10, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage) start, end := queryKnownGaps(bc.Db, "6", "16") Expect(start).To(Equal(6)) @@ -432,19 +403,15 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end = queryKnownGaps(bc.Db, "96", "99") Expect(start).To(Equal(96)) Expect(end).To(Equal(99)) - - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("Gaps between two head messages", func() { It("Should add the slots in-between", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testKnownGapsMessages(ctx, bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) + BeaconNodeTester.testKnownGapsMessages(bc, 1000000, 3, maxRetry, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].HeadMessage) start, end := queryKnownGaps(bc.Db, "101", "1000101") Expect(start).To(Equal(101)) @@ -453,7 +420,6 @@ var _ = Describe("Capturehead", Label("head"), func() { start, end = queryKnownGaps(bc.Db, "2000101", "2375702") Expect(start).To(Equal(2000101)) Expect(end).To(Equal(2375702)) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -463,48 +429,36 @@ var _ = Describe("Capturehead", Label("head"), func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") - BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + BeaconNodeTester.testMultipleHead(bc, TestEvents["2375703"].HeadMessage, TestEvents["2375703-dummy"].HeadMessage, 74240, maxRetry) }) }) Context("Phase0: Multiple head messages for the same slot.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testMultipleHead(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + BeaconNodeTester.testMultipleHead(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) }) }) Context("Phase 0: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "99") - BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + BeaconNodeTester.testMultipleReorgs(bc, TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3, maxRetry) }) }) Context("Altair: Multiple reorgs have occurred on this slot", func() { It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") - BeaconNodeTester.testMultipleReorgs(ctx, bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + BeaconNodeTester.testMultipleReorgs(bc, TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240, maxRetry) }) }) }) @@ -896,7 +850,9 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. -func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { +func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) @@ -955,10 +911,14 @@ func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclie validateSlot(bc, secondHead, epoch, "proposed") validateSlot(bc, thirdHead, epoch, "forked") + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines) } // A test to validate a single block was processed correctly -func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { +func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) @@ -984,11 +944,15 @@ func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient if expectedSuccessInsert > 0 { validateSlot(bc, head, epoch, "proposed") } + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines) } // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. -func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { +func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) @@ -1011,12 +975,17 @@ func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient log.Info("Checking Altair to make sure the fork was marked properly.") validateSlot(bc, firstHead, epoch, "forked") validateSlot(bc, secondHead, epoch, "proposed") + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines) } // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. -func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { +func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement + + startGoRoutines := runtime.NumGoroutine() + ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) time.Sleep(1 * time.Second) @@ -1040,6 +1009,8 @@ func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconc if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 { Fail("We found reorgs when we didn't expect it") } + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines) } // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. @@ -1058,9 +1029,8 @@ func testSszRoot(msg Message) { } // A make shift function to stop head tracking and insure we dont have any goroutine leaks -func testStopHeadTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { +func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { bc.Db.Close() - cancel() bc.StopHeadTracking(ctx, true) time.Sleep(3 * time.Second) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index a7a56a2..defcb9e 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -37,7 +37,7 @@ func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []e } // This function will perform all the necessary clean up tasks for stopping historical processing. -func (bc *BeaconClient) StopHistoric(ctx context.Context) error { +func (bc *BeaconClient) StopHistoricProcess(ctx context.Context) error { select { case <-ctx.Done(): log.Info("We are stopping the historical processing service.") diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 565f97f..54e1a3c 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -27,7 +27,6 @@ var _ = Describe("Capturehistoric", func() { It("Successfully Process the Blocks", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startNum := runtime.NumGoroutine() bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) @@ -36,13 +35,8 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10) BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0) - time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - time.Sleep(3 * time.Second) - bc.Db.Close() - endNum := runtime.NumGoroutine() - Expect(startNum).To(Equal(endNum)) }) }) Context("When the start block is greater than the endBlock", func() { @@ -125,12 +119,10 @@ var _ = Describe("Capturehistoric", func() { It("Should process them all successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Head - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) // Historical BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10) @@ -143,16 +135,12 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) - }) }) Context("When it recieves a historic, head and known Gaps message (in order)", func() { It("Should process them all successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Historical @@ -160,7 +148,7 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) // Known Gaps BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101) @@ -168,15 +156,12 @@ var _ = Describe("Capturehistoric", func() { time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) Context("When it recieves a known Gaps, historic and head message (in order)", func() { It("Should process them all successfully.", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() - startGoRoutines := runtime.NumGoroutine() - ctx, cancel := context.WithCancel(context.Background()) bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") // Known Gaps @@ -188,11 +173,10 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0) // Head - BeaconNodeTester.testProcessBlock(ctx, bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) + BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) time.Sleep(2 * time.Second) validatePopularBatchBlocks(bc) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) }) }) }) @@ -232,7 +216,8 @@ func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, ma ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHistoric(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - testStopHistoricTracking(ctx, cancel, bc, startGoRoutines) + cancel() + testStopHistoricProcessing(ctx, bc, startGoRoutines) } // Wrapper function that processes knownGaps @@ -241,7 +226,8 @@ func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, max ctx, cancel := context.WithCancel(context.Background()) go bc.ProcessKnownGaps(ctx, maxWorkers) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - testStopHistoricTracking(ctx, cancel, bc, startGoRoutines) + cancel() + testStopKnownGapProcessing(ctx, bc, startGoRoutines) } func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { @@ -313,9 +299,25 @@ func validateAllRowsCheckedOut(db sql.Database, checkStmt string) { } // A make shift function to stop head tracking and insure we dont have any goroutine leaks -func testStopHistoricTracking(ctx context.Context, cancel context.CancelFunc, bc *beaconclient.BeaconClient, startGoRoutines int) { +func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { log.Debug("Calling the stop function for historical processing..") - cancel() + err := bc.StopHistoricProcess(ctx) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(5 * time.Second) + validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) + err = bc.Db.Close() + Expect(err).ToNot(HaveOccurred()) + + time.Sleep(3 * time.Second) + endNum := runtime.NumGoroutine() + + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + Expect(startGoRoutines).To(Equal(endNum)) +} + +// A make shift function to stop head tracking and insure we dont have any goroutine leaks +func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { + log.Debug("Calling the stop function for knownGaps processing..") err := bc.StopKnownGapsProcessing(ctx) Expect(err).ToNot(HaveOccurred()) time.Sleep(5 * time.Second) diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index b21cacf..76bb14e 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -45,11 +45,11 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) { defer response.Body.Close() rc := response.StatusCode - var body []byte + //var body []byte //io.Copy(body, response.Body) //bytes.buffer... - _, err = response.Body.Read(body) - //body, err := ioutil.ReadAll(response.Body) + //_, err = response.Body.Read(body) + body, err := ioutil.ReadAll(response.Body) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index 5b14d23..3c5cb17 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -71,5 +71,6 @@ func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expec time.Sleep(1 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) - testStopHeadTracking(ctx, cancel, bc, startGoRoutines) + cancel() + testStopHeadTracking(ctx, bc, startGoRoutines) }