Close idle connections and chan's properly.

This commit is contained in:
Abdul Rabbani 2022-06-23 09:17:46 -04:00
parent 32ec784e1e
commit 16d034c844
5 changed files with 11 additions and 4 deletions

View File

@ -119,8 +119,8 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{ sseEvents := &SseEvents[P]{
Endpoint: endpoint, Endpoint: endpoint,
MessagesCh: make(chan *sse.Event, 10), MessagesCh: make(chan *sse.Event),
ErrorCh: make(chan SseError, 10), ErrorCh: make(chan SseError),
ProcessCh: make(chan P, 10), ProcessCh: make(chan P, 10),
SseClient: func(endpoint string) *sse.Client { SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")

View File

@ -96,7 +96,7 @@ type batchHistoricError struct {
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess, 5) slotsCh := make(chan slotsToProcess)
workCh := make(chan int, 5) workCh := make(chan int, 5)
processedCh := make(chan slotsToProcess, 5) processedCh := make(chan slotsToProcess, 5)
errCh := make(chan batchHistoricError, 5) errCh := make(chan batchHistoricError, 5)
@ -120,6 +120,8 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing,
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(workCh)
close(processedCh)
return return
case slots := <-slotsCh: case slots := <-slotsCh:
if slots.startSlot > slots.endSlot { if slots.startSlot > slots.endSlot {

View File

@ -3,7 +3,9 @@ package beaconclient_test
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"runtime" "runtime"
"runtime/pprof"
"sync/atomic" "sync/atomic"
"time" "time"
@ -328,7 +330,7 @@ func testStopKnownGapProcessing(ctx context.Context, bc *beaconclient.BeaconClie
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine() endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
//Expect(endNum <= startGoRoutines).To(BeTrue()) //Expect(endNum <= startGoRoutines).To(BeTrue())
Expect(endNum).To(Equal(startGoRoutines)) Expect(endNum).To(Equal(startGoRoutines))
} }

View File

@ -132,6 +132,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
for len(errCount) < 5 { for len(errCount) < 5 {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(slotCh)
return errCount return errCount
default: default:
if len(errCount) != prevErrCount { if len(errCount) != prevErrCount {

View File

@ -43,6 +43,8 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) {
return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
} }
defer response.Body.Close() defer response.Body.Close()
// Needed for testing.... But might be interesting to test with...
defer client.CloseIdleConnections()
rc := response.StatusCode rc := response.StatusCode
//var body []byte //var body []byte