diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 1efcaa7..066c455 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -119,8 +119,8 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve endpoint := baseEndpoint + path sseEvents := &SseEvents[P]{ Endpoint: endpoint, - MessagesCh: make(chan *sse.Event, 10), - ErrorCh: make(chan SseError, 10), + MessagesCh: make(chan *sse.Event), + ErrorCh: make(chan SseError), ProcessCh: make(chan P, 10), SseClient: func(endpoint string) *sse.Client { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index a4f4400..9627965 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -96,7 +96,7 @@ type batchHistoricError struct { // // 5. Handle any errors. func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { - slotsCh := make(chan slotsToProcess, 5) + slotsCh := make(chan slotsToProcess) workCh := make(chan int, 5) processedCh := make(chan slotsToProcess, 5) errCh := make(chan batchHistoricError, 5) @@ -120,6 +120,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, for { select { case <-ctx.Done(): + close(workCh) + close(processedCh) return case slots := <-slotsCh: if slots.startSlot > slots.endSlot { diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index fe5ec83..b97827d 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -3,7 +3,9 @@ package beaconclient_test import ( "context" "fmt" + "os" "runtime" + "runtime/pprof" "sync/atomic" "time" @@ -328,7 +330,7 @@ func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClie time.Sleep(3 * time.Second) endNum := runtime.NumGoroutine() - //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) //Expect(endNum <= startGoRoutines).To(BeTrue()) Expect(endNum).To(Equal(startGoRoutines)) } diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 49b0fd1..8b52fc9 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -132,6 +132,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm for len(errCount) < 5 { select { case <-ctx.Done(): + close(slotCh) return errCount default: if len(errCount) != prevErrCount { diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index ee2ac4d..e9bb9e2 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -43,6 +43,8 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) { return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) } defer response.Body.Close() + // Needed for testing.... But might be interesting to test with... + defer client.CloseIdleConnections() rc := response.StatusCode //var body []byte