diff --git a/cmd/full.go b/cmd/full.go index 840d39d..6d27670 100644 --- a/cmd/full.go +++ b/cmd/full.go @@ -76,7 +76,7 @@ func startFullProcessing() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks hdCtx, hdCancel := context.WithCancel(context.Background()) - go Bc.CaptureHead(hdCtx) + go Bc.CaptureHead(hdCtx, false) hpContext, hpCancel := context.WithCancel(context.Background()) diff --git a/cmd/head.go b/cmd/head.go index a4f88ec..41c5b43 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "os" "strconv" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -63,7 +64,7 @@ func startHeadTracking() { log.Info("The Beacon Client has booted successfully!") // Capture head blocks hdCtx, hdCancel := context.WithCancel(context.Background()) - go Bc.CaptureHead(hdCtx) + go Bc.CaptureHead(hdCtx, false) kgCtx, kgCancel := context.WithCancel(context.Background()) if viper.GetBool("kg.processKnownGaps") { @@ -96,7 +97,8 @@ func startHeadTracking() { } else { log.Info("Gracefully shutdown ipld-eth-beacon-indexer") } - + log.Debug("WTF") + os.Exit(0) } func init() { diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index 0b54881..a18a70d 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -24,11 +24,11 @@ import ( ) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHead(ctx context.Context) { +func (bc *BeaconClient) CaptureHead(ctx context.Context, skipSee bool) { log.Info("We are tracking the head of the chain.") go bc.handleHead(ctx) go bc.handleReorg(ctx) - bc.captureEventTopic(ctx) + bc.captureEventTopic(ctx, skipSee) } // Stop the head tracking service. diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index adb2bab..107041f 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -899,7 +899,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string // Helper function to test three reorg messages. There are going to be many functions like this, // Because we need to test the same logic for multiple phases. func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead(ctx) + go bc.CaptureHead(ctx, true) time.Sleep(1 * time.Second) log.Info("Sending Messages to BeaconClient") @@ -961,7 +961,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclie // A test to validate a single block was processed correctly func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { - go bc.CaptureHead(ctx) + go bc.CaptureHead(ctx, true) time.Sleep(1 * time.Second) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) @@ -991,7 +991,7 @@ func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient // A test that ensures that if two HeadMessages occur for a single slot they are marked // as proposed and forked correctly. func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { - go bc.CaptureHead(ctx) + go bc.CaptureHead(ctx, true) time.Sleep(1 * time.Second) sendHeadMessage(bc, firstHead, maxRetry, 1) @@ -1019,7 +1019,7 @@ func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient // as proposed and forked correctly. func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { bc.KnownGapTableIncrement = tableIncrement - go bc.CaptureHead(ctx) + go bc.CaptureHead(ctx, true) time.Sleep(1 * time.Second) for _, headMsg := range msg { @@ -1067,5 +1067,6 @@ func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClie time.Sleep(3 * time.Second) endNum := runtime.NumGoroutine() + //pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) Expect(startGoRoutines).To(Equal(endNum)) } diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index 0833f44..ab94c07 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -24,37 +24,50 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" - "golang.org/x/sync/errgroup" -) - -var ( - shutdownWaitInterval = time.Duration(5) * time.Second ) // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64)) { - go func() { - errG := new(errgroup.Group) - errG.Go(func() error { - err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) +func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) { + //go func() { + // subCh := make(chan error, 1) + // go func() { + // err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) + // if err != nil { + // subCh <- err + // } + // subCh <- nil + // }() + // select { + // case err := <-subCh: + // if err != nil { + // log.WithFields(log.Fields{ + // "err": err, + // "endpoint": eventHandler.Endpoint, + // }).Error("Unable to subscribe to the SSE endpoint.") + // return + // } else { + // loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") + // } + // case <-ctx.Done(): + // return + // } + //}() + if !skipSse { + for { + err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh) if err != nil { - return err + loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{ + "err": err}).Error("We are unable to subscribe to the SSE endpoint") + time.Sleep(3 * time.Second) + continue } - return nil - }) - if err := errG.Wait(); err != nil { - log.WithFields(log.Fields{ - "err": err, - "endpoint": eventHandler.Endpoint, - }).Error("Unable to subscribe to the SSE endpoint.") - return - } else { loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") + break } + } - }() for { select { case <-ctx.Done(): @@ -96,8 +109,8 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan } // Capture all of the event topics. -func (bc *BeaconClient) captureEventTopic(ctx context.Context) { +func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError) - go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError) + go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse) + go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse) } diff --git a/pkg/beaconclient/systemvalidation_test.go b/pkg/beaconclient/systemvalidation_test.go index 9187858..fe67772 100644 --- a/pkg/beaconclient/systemvalidation_test.go +++ b/pkg/beaconclient/systemvalidation_test.go @@ -67,7 +67,7 @@ func getEnvInt(envVar string) int { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { startGoRoutines := runtime.NumGoroutine() ctx, cancel := context.WithCancel(context.Background()) - go bc.CaptureHead(ctx) + go bc.CaptureHead(ctx, false) time.Sleep(1 * time.Second) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)