Make channels buffered

This commit is contained in:
Abdul Rabbani 2022-06-23 07:50:36 -04:00
parent 2299e54197
commit 041f862c11
8 changed files with 27 additions and 47 deletions

View File

@ -76,8 +76,8 @@ type BeaconClient struct {
type SseEvents[P ProcessedEvents] struct { type SseEvents[P ProcessedEvents] struct {
Endpoint string // The endpoint for the subscription. Primarily used for logging Endpoint string // The endpoint for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred ErrorCh chan SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct. ProcessCh chan P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
} }
@ -119,9 +119,9 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
endpoint := baseEndpoint + path endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{ sseEvents := &SseEvents[P]{
Endpoint: endpoint, Endpoint: endpoint,
MessagesCh: make(chan *sse.Event, 1), MessagesCh: make(chan *sse.Event, 10),
ErrorCh: make(chan *SseError), ErrorCh: make(chan SseError, 10),
ProcessCh: make(chan *P), ProcessCh: make(chan P, 10),
SseClient: func(endpoint string) *sse.Client { SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(endpoint) return sse.NewClient(endpoint)

View File

@ -38,6 +38,7 @@ func (bc *BeaconClient) StopHeadTracking(ctx context.Context, skipSee bool) {
if !skipSee { if !skipSee {
bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh) bc.HeadTracking.SseClient.Unsubscribe(bc.HeadTracking.MessagesCh)
bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh) bc.ReOrgTracking.SseClient.Unsubscribe(bc.ReOrgTracking.MessagesCh)
log.Info("Successfully unsubscribed to SSE client")
} }
log.Info("Successfully stopped the head tracking service.") log.Info("Successfully stopped the head tracking service.")
default: default:

View File

@ -24,6 +24,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"runtime/pprof"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
@ -913,7 +914,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
validateSlot(bc, thirdHead, epoch, "forked") validateSlot(bc, thirdHead, epoch, "forked")
cancel() cancel()
testStopHeadTracking(ctx, bc, startGoRoutines) testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
@ -947,7 +948,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
validateSlot(bc, head, epoch, "proposed") validateSlot(bc, head, epoch, "proposed")
} }
cancel() cancel()
testStopHeadTracking(ctx, bc, startGoRoutines) testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
@ -978,7 +979,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
validateSlot(bc, firstHead, epoch, "forked") validateSlot(bc, firstHead, epoch, "forked")
validateSlot(bc, secondHead, epoch, "proposed") validateSlot(bc, secondHead, epoch, "proposed")
cancel() cancel()
testStopHeadTracking(ctx, bc, startGoRoutines) testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
@ -1013,7 +1014,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
Fail("We found reorgs when we didn't expect it") Fail("We found reorgs when we didn't expect it")
} }
cancel() cancel()
testStopHeadTracking(ctx, bc, startGoRoutines) testStopHeadTracking(ctx, bc, startGoRoutines, true)
} }
// This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState. // This function will make sure we are properly able to get the SszRoot of the SignedBeaconBlock and the BeaconState.
@ -1032,12 +1033,12 @@ func testSszRoot(msg Message) {
} }
// A make shift function to stop head tracking and insure we dont have any goroutine leaks // A make shift function to stop head tracking and insure we dont have any goroutine leaks
func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int) { func testStopHeadTracking(ctx context.Context, bc *beaconclient.BeaconClient, startGoRoutines int, skipSse bool) {
bc.StopHeadTracking(ctx, true) bc.StopHeadTracking(ctx, skipSse)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine() endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.WithField("startNum", startGoRoutines).Info("Start Go routine number") log.WithField("startNum", startGoRoutines).Info("Start Go routine number")
log.WithField("endNum", endNum).Info("End Go routine number") log.WithField("endNum", endNum).Info("End Go routine number")
//Expect(endNum <= startGoRoutines).To(BeTrue()) //Expect(endNum <= startGoRoutines).To(BeTrue())

View File

@ -96,10 +96,10 @@ type batchHistoricError struct {
// //
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess, 5)
workCh := make(chan int) workCh := make(chan int, 5)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess, 5)
errCh := make(chan batchHistoricError) errCh := make(chan batchHistoricError, 5)
finalErrCh := make(chan []error, 1) finalErrCh := make(chan []error, 1)
// Checkout Rows with same node Identifier. // Checkout Rows with same node Identifier.

View File

@ -30,30 +30,6 @@ import (
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) { func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) {
//go func() {
// subCh := make(chan error, 1)
// go func() {
// err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
// if err != nil {
// subCh <- err
// }
// subCh <- nil
// }()
// select {
// case err := <-subCh:
// if err != nil {
// log.WithFields(log.Fields{
// "err": err,
// "endpoint": eventHandler.Endpoint,
// }).Error("Unable to subscribe to the SSE endpoint.")
// return
// } else {
// loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
// }
// case <-ctx.Done():
// return
// }
//}()
if !skipSse { if !skipSse {
for { for {
err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
@ -94,18 +70,19 @@ func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler
} }
// Turn the data object into a Struct. // Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { func processMsg[P ProcessedEvents](msg []byte, processCh chan<- P, errorCh chan<- SseError) {
var msgMarshaled P var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled) err := json.Unmarshal(msg, &msgMarshaled)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to parse message") loghelper.LogError(err).Error("Unable to parse message")
errorCh <- &SseError{ errorCh <- SseError{
err: err, err: err,
msg: msg, msg: msg,
} }
return return
} }
processCh <- &msgMarshaled processCh <- msgMarshaled
log.Info("Done sending")
} }
// Capture all of the event topics. // Capture all of the event topics.

View File

@ -46,7 +46,7 @@ func (bc *BeaconClient) handleReorg(ctx context.Context) {
func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) { func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) {
log.Info("Starting to process head.") log.Info("Starting to process head.")
workCh := make(chan workParams) workCh := make(chan workParams, 5)
log.WithField("workerNumber", maxWorkers).Info("Creating Workers") log.WithField("workerNumber", maxWorkers).Info("Creating Workers")
for i := 1; i < maxWorkers; i++ { for i := 1; i < maxWorkers; i++ {
go bc.headBlockProcessor(ctx, workCh) go bc.headBlockProcessor(ctx, workCh)
@ -56,13 +56,14 @@ func (bc *BeaconClient) handleHead(ctx context.Context, maxWorkers int) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
close(bc.HeadTracking.ProcessCh) close(bc.HeadTracking.ProcessCh)
close(workCh)
return return
case head := <-bc.HeadTracking.ProcessCh: case head := <-bc.HeadTracking.ProcessCh:
// Process all the work here. // Process all the work here.
slot, err := strconv.Atoi(head.Slot) slot, err := strconv.Atoi(head.Slot)
if err != nil { if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{ bc.HeadTracking.ErrorCh <- SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
} }
errorSlots = errorSlots + 1 errorSlots = errorSlots + 1

View File

@ -228,7 +228,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// After a row has been processed it should be removed from its appropriate table. // After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error { func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error) errCh := make(chan error, 1)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -263,7 +263,6 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
} }
}() }()
if len(errCh) != 0 { if len(errCh) != 0 {
return <-errCh return <-errCh

View File

@ -54,6 +54,7 @@ func querySsz(endpoint string, slot string) (*[]byte, int, error) {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
} }
//log.WithField("body", unsafe.Sizeof(body)).Debug("Size of the raw SSZ object")
return &body, rc, nil return &body, rc, nil
} }