Replace the entire SseClient on timeout to avoid any internal races.

This commit is contained in:
Thomas E Lackey 2022-09-22 22:14:27 -05:00
parent 3626029c11
commit 6378bc9cde
3 changed files with 42 additions and 18 deletions

View File

@ -80,7 +80,7 @@ type SseEvents[P ProcessedEvents] struct {
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct. ProcessCh chan *P // Used to capture processed data in its proper struct.
SseClient *sse.Client // sse.Client object that is used to interact with the SSE stream sseClient *sse.Client // sse.Client object that is used to interact with the SSE stream
} }
// An object to capture any errors when turning an SSE message to JSON. // An object to capture any errors when turning an SSE message to JSON.
@ -126,17 +126,40 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
MessagesCh: make(chan *sse.Event, 1), MessagesCh: make(chan *sse.Event, 1),
ErrorCh: make(chan *SseError), ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P), ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
client := sse.NewClient(endpoint)
client.ReconnectNotify = func(err error, duration time.Duration) {
log.WithFields(log.Fields{"endpoint": endpoint}).Warn("Reconnecting SSE client")
}
client.OnDisconnect(func(c *sse.Client) {
log.WithFields(log.Fields{"endpoint": endpoint}).Warn("SSE client disconnected")
})
return client
}(endpoint),
} }
return sseEvents return sseEvents
} }
func (se *SseEvents[P]) Connect() error {
if nil == se.sseClient {
se.initClient()
}
return se.sseClient.SubscribeChanRaw(se.MessagesCh)
}
func (se *SseEvents[P]) Disconnect() {
if nil == se.sseClient {
return
}
log.WithFields(log.Fields{"endpoint": se.Endpoint}).Info("Disconnecting and destroying SSE client")
se.sseClient.Unsubscribe(se.MessagesCh)
se.sseClient.Connection.CloseIdleConnections()
se.sseClient = nil
}
func (se *SseEvents[P]) initClient() {
if nil != se.sseClient {
se.Disconnect()
}
log.WithFields(log.Fields{"endpoint": se.Endpoint}).Info("Creating SSE client")
client := sse.NewClient(se.Endpoint)
client.ReconnectNotify = func(err error, duration time.Duration) {
log.WithFields(log.Fields{"endpoint": se.Endpoint}).Warn("Reconnecting SSE client")
}
client.OnDisconnect(func(c *sse.Client) {
log.WithFields(log.Fields{"endpoint": se.Endpoint}).Warn("SSE client disconnected")
})
se.sseClient = client
}

View File

@ -50,7 +50,7 @@ func (bc *BeaconClient) StopHeadTracking() error {
// This function closes the SSE subscription, but waits until the MessagesCh is empty // This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.") loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh) se.Disconnect()
for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 { for len(se.MessagesCh) != 0 || len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
} }

View File

@ -38,7 +38,7 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
go func() { go func() {
errG := new(errgroup.Group) errG := new(errgroup.Group)
errG.Go(func() error { errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) err := eventHandler.Connect()
if err != nil { if err != nil {
return err return err
} }
@ -55,6 +55,8 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
} }
}() }()
// TODO(telackey): Doesn't there need to be a check here that the handler hasn't been shutdown?
for { for {
var idleTimer *time.Timer = nil var idleTimer *time.Timer = nil
var idleTimerC <-chan time.Time = nil var idleTimerC <-chan time.Time = nil
@ -95,9 +97,8 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
}, },
).Error("TIMEOUT - Attempting to resubscribe") ).Error("TIMEOUT - Attempting to resubscribe")
errMetricInc(1) errMetricInc(1)
eventHandler.SseClient.Unsubscribe(eventHandler.MessagesCh) eventHandler.Disconnect()
eventHandler.SseClient.Connection.CloseIdleConnections() err = eventHandler.Connect()
err = eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil { if err != nil {
log.Error("Unable to re-subscribe.", err) log.Error("Unable to re-subscribe.", err)
} }
@ -123,6 +124,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
// Capture all of the event topics. // Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() { func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*15) go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*30)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError, 0) go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError, 0)
} }