diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 4f959e3..1efcaa7 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -76,8 +76,8 @@ type BeaconClient struct { type SseEvents[P ProcessedEvents] struct { Endpoint string // The endpoint for the subscription. Primarily used for logging MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel - ErrorCh chan *SseError // Contains any errors while SSE streaming occurred - ProcessCh chan *P // Used to capture processed data in its proper struct. + ErrorCh chan SseError // Contains any errors while SSE streaming occurred + ProcessCh chan P // Used to capture processed data in its proper struct. SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream } @@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve endpoint := baseEndpoint + path sseEvents := &SseEvents[P]{ Endpoint: endpoint, - MessagesCh: make(chan *sse.Event, 1), - ErrorCh: make(chan *SseError), - ProcessCh: make(chan *P), + MessagesCh: make(chan *sse.Event, 10), + ErrorCh: make(chan SseError, 10), + ProcessCh: make(chan P, 10), SseClient: func(endpoint string) *sse.Client { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") return sse.NewClient(endpoint) diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index 9392d45..51cecae 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -38,6 +38,7 @@ func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) { if !skipSee { bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh) bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh) + log.Info("Successfully unsubscribed to SSE client") } log.Info("Successfully stopped the head tracking service.") default: diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 9c350e2..40587e3 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "runtime" + "runtime/pprof" "strconv" "sync/atomic" "time" @@ -913,7 +914,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs validateSlot(bc, thirdHead, epoch, "forked") cancel() - testStopHeadTracking(ctx, bc, startGoRoutines) + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test to validate a single block was processed correctly @@ -947,7 +948,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b validateSlot(bc, head, epoch, "proposed") } cancel() - testStopHeadTracking(ctx, bc, startGoRoutines) + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test that ensures that if two HeadMessages occur for a single slot they are marked @@ -978,7 +979,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH validateSlot(bc, firstHead, epoch, "forked") validateSlot(bc, secondHead, epoch, "proposed") cancel() - testStopHeadTracking(ctx, bc, startGoRoutines) + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // A test that ensures that if two HeadMessages occur for a single slot they are marked @@ -1013,7 +1014,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t Fail("We found reorgs when we didn't expect it") } cancel() - testStopHeadTracking(ctx, bc, startGoRoutines) + testStopHeadTracking(ctx, bc, startGoRoutines, true) } // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. @@ -1032,12 +1033,12 @@ func testSszRoot(msg Message) { } // A make shift function to stop head tracking and insure we dont have any goroutine leaks -func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { - bc.StopHeadTracking(ctx, true) +func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) { + bc.StopHeadTracking(ctx, skipSse) time.Sleep(3 * time.Second) endNum := runtime.NumGoroutine() - //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) log.WithField("startNum", startGoRoutines).Info("Start Go routine number") log.WithField("endNum", endNum).Info("End Go routine number") //Expect(endNum <= startGoRoutines).To(BeTrue()) diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index defcb9e..a4f4400 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -96,10 +96,10 @@ type batchHistoricError struct { // // 5. Handle any errors. func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { - slotsCh := make(chan slotsToProcess) - workCh := make(chan int) - processedCh := make(chan slotsToProcess) - errCh := make(chan batchHistoricError) + slotsCh := make(chan slotsToProcess, 5) + workCh := make(chan int, 5) + processedCh := make(chan slotsToProcess, 5) + errCh := make(chan batchHistoricError, 5) finalErrCh := make(chan []error, 1) // Checkout Rows with same node Identifier. diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index ab94c07..6859764 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -30,30 +30,6 @@ import ( // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) { - //go func() { - // subCh := make(chan error, 1) - // go func() { - // err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) - // if err != nil { - // subCh <- err - // } - // subCh <- nil - // }() - // select { - // case err := <-subCh: - // if err != nil { - // log.WithFields(log.Fields{ - // "err": err, - // "endpoint": eventHandler.Endpoint, - // }).Error("Unable to subscribe to the SSE endpoint.") - // return - // } else { - // loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") - // } - // case <-ctx.Done(): - // return - // } - //}() if !skipSse { for { err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) @@ -94,18 +70,19 @@ func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler } // Turn the data object into a Struct. -func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { +func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) { var msgMarshaled P err := json.Unmarshal(msg, &msgMarshaled) if err != nil { loghelper.LogError(err).Error("Unable to parse message") - errorCh <- &SseError{ + errorCh <- SseError{ err: err, msg: msg, } return } - processCh <- &msgMarshaled + processCh <- msgMarshaled + log.Info("Done sending") } // Capture all of the event topics. diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 9aa604e..ed8da5c 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -46,7 +46,7 @@ func (bc *BeaconClient) handleReorg(ctx context.Context) { func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { log.Info("Starting to process head.") - workCh := make(chan workParams) + workCh := make(chan workParams, 5) log.WithField("workerNumber", maxWorkers).Info("Creating Workers") for i := 1; i < maxWorkers; i++ { go bc.headBlockProcessor(ctx, workCh) @@ -56,13 +56,14 @@ func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { select { case <-ctx.Done(): close(bc.HeadTracking.ProcessCh) + close(workCh) return case head := <-bc.HeadTracking.ProcessCh: // Process all the work here. slot, err := strconv.Atoi(head.Slot) if err != nil { - bc.HeadTracking.ErrorCh <- &SseError{ + bc.HeadTracking.ErrorCh <- SseError{ err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), } errorSlots = errorSlots + 1 diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 9b2392b..49b0fd1 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -228,7 +228,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm // After a row has been processed it should be removed from its appropriate table. func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { - errCh := make(chan error) + errCh := make(chan error, 1) for { select { case <-ctx.Done(): @@ -263,7 +263,6 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan time.Sleep(3 * time.Second) } } - }() if len(errCh) != 0 { return <-errCh diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 76bb14e..ee2ac4d 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -54,6 +54,7 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } + //log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object") return &body, rc, nil }