diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 2932da8..9c350e2 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -24,7 +24,6 @@ import ( "os" "path/filepath" "runtime" - "runtime/pprof" "strconv" "sync/atomic" "time" @@ -407,7 +406,7 @@ var _ = Describe("Capturehead", Label("head"), func() { Expect(end).To(Equal(99)) }) }) - Context("Gaps between two head messages", func() { + Context("Gaps between two head messages", Label("gap-head"), func() { It("Should add the slots in-between", func() { BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) defer httpmock.DeactivateAndReset() @@ -919,7 +918,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs // A test to validate a single block was processed correctly func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { - pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) @@ -987,6 +986,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHead(ctx, 2, true) @@ -1037,7 +1037,7 @@ func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, st time.Sleep(3 * time.Second) endNum := runtime.NumGoroutine() - pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) log.WithField("startNum", startGoRoutines).Info("Start Go routine number") log.WithField("endNum", endNum).Info("End Go routine number") //Expect(endNum <= startGoRoutines).To(BeTrue()) diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index faef10b..fe5ec83 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -212,6 +212,7 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli // Start the CaptureHistoric function, and check for the correct inserted slots. func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) go bc.CaptureHistoric(ctx, maxWorkers) @@ -306,11 +307,13 @@ func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClie time.Sleep(5 * time.Second) validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt) - time.Sleep(3 * time.Second) + time.Sleep(5 * time.Second) endNum := runtime.NumGoroutine() //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) //Expect(endNum <= startGoRoutines).To(BeTrue()) + log.WithField("startNum", startGoRoutines).Info("Start Go routine number") + log.WithField("endNum", endNum).Info("End Go routine number") Expect(endNum).To(Equal(startGoRoutines)) } diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index c520e41..9b2392b 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -241,23 +241,27 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan "endSlot": slots.endSlot, }).Debug("Starting to check to see if the following slots have been processed") for { - isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) - if err != nil { - errCh <- err + select { + case <-ctx.Done(): + return + default: + isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) + if err != nil { + errCh <- err + } + isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + if isStartProcess && isEndProcess { + _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) + if err != nil { + errCh <- err + } + return + } + time.Sleep(3 * time.Second) } - isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err - } - if isStartProcess && isEndProcess { - break - } - time.Sleep(3 * time.Second) - } - - _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) - if err != nil { - errCh <- err } }() diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index 3c5cb17..b204404 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -30,7 +30,7 @@ var ( ) var _ = Describe("Systemvalidation", Label("system"), func() { Describe("Run the application against a running lighthouse node", func() { - Context("When we receive head messages", func() { + Context("When we receive head messages", Label("system-head"), func() { It("We should process the messages successfully", func() { bc := setUpTest(prodConfig, "10000000000") processProdHeadBlocks(bc, 3, 0, 0, 0)