Use context to end goroutine
This commit is contained in:
parent
6cecc69520
commit
2299e54197
@ -24,7 +24,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -407,7 +406,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
|
||||
Expect(end).To(Equal(99))
|
||||
})
|
||||
})
|
||||
Context("Gaps between two head messages", func() {
|
||||
Context("Gaps between two head messages", Label("gap-head"), func() {
|
||||
It("Should add the slots in-between", func() {
|
||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||
defer httpmock.DeactivateAndReset()
|
||||
@ -919,7 +918,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
|
||||
|
||||
// A test to validate a single block was processed correctly
|
||||
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
@ -987,6 +986,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
|
||||
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
|
||||
bc.KnownGapTableIncrement = tableIncrement
|
||||
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHead(ctx, 2, true)
|
||||
@ -1037,7 +1037,7 @@ func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, st
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
endNum := runtime.NumGoroutine()
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
|
||||
log.WithField("endNum", endNum).Info("End Go routine number")
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
|
@ -212,6 +212,7 @@ func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconCli
|
||||
|
||||
// Start the CaptureHistoric function, and check for the correct inserted slots.
|
||||
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go bc.CaptureHistoric(ctx, maxWorkers)
|
||||
@ -306,11 +307,13 @@ func testStopHistoricProcessing(ctx context.Context, bc *beaconclient.BeaconClie
|
||||
time.Sleep(5 * time.Second)
|
||||
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(5 * time.Second)
|
||||
endNum := runtime.NumGoroutine()
|
||||
|
||||
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
//Expect(endNum <= startGoRoutines).To(BeTrue())
|
||||
log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
|
||||
log.WithField("endNum", endNum).Info("End Go routine number")
|
||||
Expect(endNum).To(Equal(startGoRoutines))
|
||||
}
|
||||
|
||||
|
@ -241,23 +241,27 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
|
||||
"endSlot": slots.endSlot,
|
||||
}).Debug("Starting to check to see if the following slots have been processed")
|
||||
for {
|
||||
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
if isStartProcess && isEndProcess {
|
||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
if isStartProcess && isEndProcess {
|
||||
break
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot))
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
|
||||
}()
|
||||
|
@ -30,7 +30,7 @@ var (
|
||||
)
|
||||
var _ = Describe("Systemvalidation", Label("system"), func() {
|
||||
Describe("Run the application against a running lighthouse node", func() {
|
||||
Context("When we receive head messages", func() {
|
||||
Context("When we receive head messages", Label("system-head"), func() {
|
||||
It("We should process the messages successfully", func() {
|
||||
bc := setUpTest(prodConfig, "10000000000")
|
||||
processProdHeadBlocks(bc, 3, 0, 0, 0)
|
||||
|
Loading…
Reference in New Issue
Block a user