fix: avoid potential hang when starting event listener
It was possible for NewEvents to never return, blocked on waiting for a WaitGroup to be done. The call to Done was in a goroutine that could exit before reaching the Done call. Replace the WaitGroup with a channel that is closed to signal that initialisation is complete. Also, while we are waiting on the channel, wait on the context so we can exit clealy if the context is canceled.
This commit is contained in:
parent
a999e41677
commit
696469aae7
@ -20,8 +20,10 @@ import (
|
||||
var log = logging.Logger("events")
|
||||
|
||||
// HeightHandler `curH`-`ts.Height` = `confidence`
|
||||
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
|
||||
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
|
||||
type (
|
||||
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
|
||||
RevertHandler func(ctx context.Context, ts *types.TipSet) error
|
||||
)
|
||||
|
||||
type heightHandler struct {
|
||||
confidence int
|
||||
@ -48,7 +50,7 @@ type Events struct {
|
||||
tsc *tipSetCache
|
||||
lk sync.Mutex
|
||||
|
||||
ready sync.WaitGroup
|
||||
ready chan struct{}
|
||||
readyOnce sync.Once
|
||||
|
||||
heightEvents
|
||||
@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
|
||||
},
|
||||
|
||||
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
|
||||
e.ready.Add(1)
|
||||
|
||||
go e.listenHeadChanges(ctx)
|
||||
|
||||
e.ready.Wait()
|
||||
|
||||
// TODO: cleanup/gc goroutine
|
||||
// Wait for the first tipset to be seen or bail if shutting down
|
||||
select {
|
||||
case <-e.ready:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
@ -111,14 +114,22 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
|
||||
|
||||
notifs, err := e.api.ChainNotify(ctx)
|
||||
if err != nil {
|
||||
// TODO: retry
|
||||
// Retry is handled by caller
|
||||
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
|
||||
}
|
||||
|
||||
cur, ok := <-notifs // TODO: timeout?
|
||||
var cur []*api.HeadChange
|
||||
var ok bool
|
||||
|
||||
// Wait for first tipset or bail
|
||||
select {
|
||||
case cur, ok = <-notifs:
|
||||
if !ok {
|
||||
return xerrors.Errorf("notification channel closed")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if len(cur) != 1 {
|
||||
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
|
||||
@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
|
||||
|
||||
e.readyOnce.Do(func() {
|
||||
e.lastTs = cur[0].Val
|
||||
|
||||
e.ready.Done()
|
||||
// Signal that we have seen first tipset
|
||||
close(e.ready)
|
||||
})
|
||||
|
||||
for notif := range notifs {
|
||||
|
Loading…
Reference in New Issue
Block a user