Add condition for SSE subscription

This commit is contained in:
Abdul Rabbani 2022-06-21 16:42:51 -04:00
parent a2f2603d38
commit 67305e07c3
6 changed files with 50 additions and 34 deletions

View File

@ -76,7 +76,7 @@ func startFullProcessing() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
hdCtx, hdCancel := context.WithCancel(context.Background()) hdCtx, hdCancel := context.WithCancel(context.Background())
go Bc.CaptureHead(hdCtx) go Bc.CaptureHead(hdCtx, false)
hpContext, hpCancel := context.WithCancel(context.Background()) hpContext, hpCancel := context.WithCancel(context.Background())

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"os"
"strconv" "strconv"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -63,7 +64,7 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!") log.Info("The Beacon Client has booted successfully!")
// Capture head blocks // Capture head blocks
hdCtx, hdCancel := context.WithCancel(context.Background()) hdCtx, hdCancel := context.WithCancel(context.Background())
go Bc.CaptureHead(hdCtx) go Bc.CaptureHead(hdCtx, false)
kgCtx, kgCancel := context.WithCancel(context.Background()) kgCtx, kgCancel := context.WithCancel(context.Background())
if viper.GetBool("kg.processKnownGaps") { if viper.GetBool("kg.processKnownGaps") {
@ -96,7 +97,8 @@ func startHeadTracking() {
} else { } else {
log.Info("Gracefully shutdown ipld-eth-beacon-indexer") log.Info("Gracefully shutdown ipld-eth-beacon-indexer")
} }
log.Debug("WTF")
os.Exit(0)
} }
func init() { func init() {

View File

@ -24,11 +24,11 @@ import (
) )
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(ctx context.Context) { func (bc *BeaconClient) CaptureHead(ctx context.Context, skipSee bool) {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
go bc.handleHead(ctx) go bc.handleHead(ctx)
go bc.handleReorg(ctx) go bc.handleReorg(ctx)
bc.captureEventTopic(ctx) bc.captureEventTopic(ctx, skipSee)
} }
// Stop the head tracking service. // Stop the head tracking service.

View File

@ -899,7 +899,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(ctx) go bc.CaptureHead(ctx, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
log.Info("Sending Messages to BeaconClient") log.Info("Sending Messages to BeaconClient")
@ -961,7 +961,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(ctx context.Context, bc *beaconclie
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead(ctx) go bc.CaptureHead(ctx, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -991,7 +991,7 @@ func (tbc TestBeaconNode) testProcessBlock(ctx context.Context, bc *beaconclient
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(ctx) go bc.CaptureHead(ctx, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1) sendHeadMessage(bc, firstHead, maxRetry, 1)
@ -1019,7 +1019,7 @@ func (tbc TestBeaconNode) testMultipleHead(ctx context.Context, bc *beaconclient
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) { func (tbc TestBeaconNode) testKnownGapsMessages(ctx context.Context, bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
bc.KnownGapTableIncrement = tableIncrement bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead(ctx) go bc.CaptureHead(ctx, true)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
for _, headMsg := range msg { for _, headMsg := range msg {
@ -1067,5 +1067,6 @@ func testStopHeadTracking(cancel context.CancelFunc, bc *beaconclient.BeaconClie
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
endNum := runtime.NumGoroutine() endNum := runtime.NumGoroutine()
//pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
Expect(startGoRoutines).To(Equal(endNum)) Expect(startGoRoutines).To(Equal(endNum))
} }

View File

@ -24,37 +24,50 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
var (
shutdownWaitInterval = time.Duration(5) * time.Second
) )
// This function will capture all the SSE events for a given SseEvents object. // This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64)) { func handleIncomingSseEvent[P ProcessedEvents](ctx context.Context, eventHandler *SseEvents[P], errMetricInc func(uint64), skipSse bool) {
go func() { //go func() {
errG := new(errgroup.Group) // subCh := make(chan error, 1)
errG.Go(func() error { // go func() {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) // err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
// if err != nil {
// subCh <- err
// }
// subCh <- nil
// }()
// select {
// case err := <-subCh:
// if err != nil {
// log.WithFields(log.Fields{
// "err": err,
// "endpoint": eventHandler.Endpoint,
// }).Error("Unable to subscribe to the SSE endpoint.")
// return
// } else {
// loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
// }
// case <-ctx.Done():
// return
// }
//}()
if !skipSse {
for {
err := eventHandler.SseClient.SubscribeChanRawWithContext(ctx, eventHandler.MessagesCh)
if err != nil { if err != nil {
return err loghelper.LogEndpoint(eventHandler.Endpoint).WithFields(log.Fields{
"err": err}).Error("We are unable to subscribe to the SSE endpoint")
time.Sleep(3 * time.Second)
continue
} }
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
break
}
} }
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -96,8 +109,8 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
} }
// Capture all of the event topics. // Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic(ctx context.Context) { func (bc *BeaconClient) captureEventTopic(ctx context.Context, skipSse bool) {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError) go handleIncomingSseEvent(ctx, bc.HeadTracking, bc.Metrics.IncrementHeadError, skipSse)
go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError) go handleIncomingSseEvent(ctx, bc.ReOrgTracking, bc.Metrics.IncrementReorgError, skipSse)
} }

View File

@ -67,7 +67,7 @@ func getEnvInt(envVar string) int {
func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) { func processProdHeadBlocks(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
startGoRoutines := runtime.NumGoroutine() startGoRoutines := runtime.NumGoroutine()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHead(ctx) go bc.CaptureHead(ctx, false)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError) validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)