58 lines
1.2 KiB
Go
58 lines
1.2 KiB
Go
package blob_indexer
|
|
|
|
import (
|
|
"context"
|
|
log "log/slog"
|
|
"time"
|
|
|
|
"github.com/r3labs/sse/v2"
|
|
)
|
|
|
|
type Event = sse.Event
|
|
|
|
type EventSubscription struct {
|
|
client *sse.Client
|
|
rawChan chan *sse.Event
|
|
errChan chan error
|
|
}
|
|
|
|
func (se *EventSubscription) Close() {
|
|
if nil == se.client {
|
|
return
|
|
}
|
|
|
|
log.Debug("Disconnecting and destroying SSE client", "endpoint", se.client.URL)
|
|
se.client.Unsubscribe(se.rawChan)
|
|
se.client.Connection.CloseIdleConnections()
|
|
se.client = nil
|
|
}
|
|
|
|
func (se *EventSubscription) Err() <-chan error {
|
|
return se.errChan
|
|
}
|
|
|
|
func CreateSubscription(ctx context.Context, endpoint string, sink chan *Event) *EventSubscription {
|
|
client := sse.NewClient(endpoint)
|
|
errChan := make(chan error)
|
|
listen := func() {
|
|
errChan <- client.SubscribeChanRawWithContext(ctx, sink)
|
|
}
|
|
|
|
client.ReconnectNotify = func(err error, duration time.Duration) {
|
|
log.Debug("Reconnecting SSE client", "endpoint", endpoint,
|
|
"error", err, "duration", duration)
|
|
}
|
|
client.OnDisconnect(func(c *sse.Client) {
|
|
log.Debug("SSE client disconnected", "endpoint", endpoint)
|
|
})
|
|
|
|
log.Debug("Subscribing to event stream", "endpoint", endpoint)
|
|
go listen()
|
|
|
|
return &EventSubscription{
|
|
client: client,
|
|
rawChan: sink,
|
|
errChan: errChan,
|
|
}
|
|
}
|