Merge pull request #5159 from iand/fix/event-hang

fix(events): avoid potential hang when starting event listener
This commit is contained in:
Łukasz Magiera 2020-12-09 19:40:02 +01:00 committed by GitHub
commit e1be89b442
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -20,8 +20,10 @@ import (
var log = logging.Logger("events")
// HeightHandler `curH`-`ts.Height` = `confidence`
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
type (
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)
type heightHandler struct {
confidence int
@ -48,7 +50,7 @@ type Events struct {
tsc *tipSetCache
lk sync.Mutex
ready sync.WaitGroup
ready chan struct{}
readyOnce sync.Once
heightEvents
@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
},
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
}
e.ready.Add(1)
go e.listenHeadChanges(ctx)
e.ready.Wait()
// TODO: cleanup/gc goroutine
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}
return e
}
@ -111,13 +114,21 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
cur, ok := <-notifs // TODO: timeout?
if !ok {
return xerrors.Errorf("notification channel closed")
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
}
if len(cur) != 1 {
@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
e.ready.Done()
// Signal that we have seen first tipset
close(e.ready)
})
for notif := range notifs {