package beacon import ( "context" log "log/slog" "time" "github.com/r3labs/sse/v2" ) type Event = sse.Event type EventSubscription struct { client *sse.Client rawChan chan *sse.Event errChan chan error } func (se *EventSubscription) Close() { if nil == se.client { return } log.Debug("Disconnecting and destroying SSE client", "endpoint", se.client.URL) se.client.Unsubscribe(se.rawChan) se.client.Connection.CloseIdleConnections() se.client = nil } func (se *EventSubscription) Err() <-chan error { return se.errChan } func CreateSubscription(ctx context.Context, endpoint string, sink chan *Event) *EventSubscription { client := sse.NewClient(endpoint) errChan := make(chan error) listen := func() { errChan <- client.SubscribeChanRawWithContext(ctx, sink) } client.ReconnectNotify = func(err error, duration time.Duration) { log.Debug("Reconnecting SSE client", "endpoint", endpoint, "error", err, "duration", duration) } client.OnDisconnect(func(c *sse.Client) { log.Debug("SSE client disconnected", "endpoint", endpoint) }) log.Debug("Subscribing to event stream", "endpoint", endpoint) go listen() return &EventSubscription{ client: client, rawChan: sink, errChan: errChan, } }