From 6378bc9cde2090a3ed723fcc21a49db638f54ac8 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Sep 2022 22:14:27 -0500 Subject: [PATCH] Replace the entire SseClient on timeout to avoid any internal races. --- pkg/beaconclient/beaconclient.go | 47 ++++++++++++++++++++++++-------- pkg/beaconclient/capturehead.go | 2 +- pkg/beaconclient/incomingsse.go | 11 ++++---- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 053a03c..d7337c4 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -80,7 +80,7 @@ type SseEvents[P ProcessedEvents] struct { MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel ErrorCh chan *SseError // Contains any errors while SSE streaming occurred ProcessCh chan *P // Used to capture processed data in its proper struct. - SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream + sseClient *sse.Client // sse.Client object that is used to interact with the SSE stream } // An object to capture any errors when turning an SSE message to JSON. @@ -126,17 +126,40 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve MessagesCh: make(chan *sse.Event, 1), ErrorCh: make(chan *SseError), ProcessCh: make(chan *P), - SseClient: func(endpoint string) *sse.Client { - log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") - client := sse.NewClient(endpoint) - client.ReconnectNotify = func(err error, duration time.Duration) { - log.WithFields(log.Fields{"endpoint": endpoint}).Warn("Reconnecting SSE client") - } - client.OnDisconnect(func(c *sse.Client) { - log.WithFields(log.Fields{"endpoint": endpoint}).Warn("SSE client disconnected") - }) - return client - }(endpoint), } return sseEvents } + +func (se *SseEvents[P]) Connect() error { + if nil == se.sseClient { + se.initClient() + } + return se.sseClient.SubscribeChanRaw(se.MessagesCh) +} + +func (se *SseEvents[P]) Disconnect() { + if nil == se.sseClient { + return + } + + log.WithFields(log.Fields{"endpoint": se.Endpoint}).Info("Disconnecting and destroying SSE client") + se.sseClient.Unsubscribe(se.MessagesCh) + se.sseClient.Connection.CloseIdleConnections() + se.sseClient = nil +} + +func (se *SseEvents[P]) initClient() { + if nil != se.sseClient { + se.Disconnect() + } + + log.WithFields(log.Fields{"endpoint": se.Endpoint}).Info("Creating SSE client") + client := sse.NewClient(se.Endpoint) + client.ReconnectNotify = func(err error, duration time.Duration) { + log.WithFields(log.Fields{"endpoint": se.Endpoint}).Warn("Reconnecting SSE client") + } + client.OnDisconnect(func(c *sse.Client) { + log.WithFields(log.Fields{"endpoint": se.Endpoint}).Warn("SSE client disconnected") + }) + se.sseClient = client +} diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index a0b6e6b..1d43198 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -50,7 +50,7 @@ func (bc *BeaconClient) StopHeadTracking() error { // This function closes the SSE subscription, but waits until the MessagesCh is empty func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") - se.SseClient.Unsubscribe(se.MessagesCh) + se.Disconnect() for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 { time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) } diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index 36cf2fe..0a57e93 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -38,7 +38,7 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe go func() { errG := new(errgroup.Group) errG.Go(func() error { - err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + err := eventHandler.Connect() if err != nil { return err } @@ -55,6 +55,8 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe } }() + + // TODO(telackey): Doesn't there need to be a check here that the handler hasn't been shutdown? for { var idleTimer *time.Timer = nil var idleTimerC <-chan time.Time = nil @@ -95,9 +97,8 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe }, ).Error("TIMEOUT - Attempting to resubscribe") errMetricInc(1) - eventHandler.SseClient.Unsubscribe(eventHandler.MessagesCh) - eventHandler.SseClient.Connection.CloseIdleConnections() - err = eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + eventHandler.Disconnect() + err = eventHandler.Connect() if err != nil { log.Error("Unable to re-subscribe.", err) } @@ -123,6 +124,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan // Capture all of the event topics. func (bc *BeaconClient) captureEventTopic() { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*15) + go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*30) go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError, 0) }