From 689b1e5b3cc5a3060f4c0ee862adb777dda0dc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 18 Sep 2019 20:07:39 +0200 Subject: [PATCH] events: Safer restarting in listenHeadChanges --- chain/events/events.go | 39 ++++++++++++++++--------------------- chain/events/events_test.go | 14 ++++++------- storage/miner.go | 5 +++-- 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index 3fdad1cb3..c58ab4e81 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -47,7 +47,7 @@ type Events struct { calledEvents } -func NewEvents(api eventApi) *Events { +func NewEvents(ctx context.Context, api eventApi) *Events { gcConfidence := 2 * build.ForkLengthThreshold tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight) @@ -81,7 +81,7 @@ func NewEvents(api eventApi) *Events { e.ready.Add(1) - go e.listenHeadChanges(context.TODO()) + go e.listenHeadChanges(ctx) e.ready.Wait() @@ -90,48 +90,44 @@ func NewEvents(api eventApi) *Events { return e } -func (e *Events) restartHeadChanges(ctx context.Context) { - go func() { +func (e *Events) listenHeadChanges(ctx context.Context) { + for { + if err := e.listenHeadChangesOnce(ctx); err != nil { + log.Errorf("listen head changes errored: %s", err) + } else { + log.Warn("listenHeadChanges quit") + } if ctx.Err() != nil { log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) return } time.Sleep(time.Second) log.Info("restarting listenHeadChanges") - e.listenHeadChanges(ctx) - }() + } } -func (e *Events) listenHeadChanges(ctx context.Context) { +func (e *Events) listenHeadChangesOnce(ctx context.Context) error { notifs, err := e.api.ChainNotify(ctx) if err != nil { // TODO: retry - log.Errorf("listenHeadChanges ChainNotify call failed: %s", err) - e.restartHeadChanges(ctx) - return + return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err) } cur, ok := <-notifs // TODO: timeout? if !ok { - log.Error("notification channel closed") - e.restartHeadChanges(ctx) - return + return xerrors.Errorf("notification channel closed") } if len(cur) != 1 { - log.Errorf("unexpected initial head notification length: %d", len(cur)) - e.restartHeadChanges(ctx) - return + return xerrors.Errorf("unexpected initial head notification length: %d", len(cur)) } if cur[0].Type != store.HCCurrent { - log.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type) - e.restartHeadChanges(ctx) - return + return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type) } if err := e.tsc.add(cur[0].Val); err != nil { - log.Warn("tsc.add: adding current tipset failed: %s", err) + log.Warn("tsc.add: adding current tipset failed: %w", err) } e.readyOnce.Do(func() { @@ -156,8 +152,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) { } } - log.Warn("listenHeadChanges loop quit") - e.restartHeadChanges(ctx) + return nil } func (e *Events) headChange(rev, app []*types.TipSet) error { diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 2d61edf35..4628b5fae 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -160,7 +160,7 @@ func TestAt(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) var applied bool var reverted bool @@ -219,7 +219,7 @@ func TestAtStart(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) fcs.advance(0, 5, nil) // 6 @@ -253,7 +253,7 @@ func TestAtStartConfidence(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) fcs.advance(0, 10, nil) // 11 @@ -286,7 +286,7 @@ func TestCalled(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -484,7 +484,7 @@ func TestCalledTimeout(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) @@ -524,7 +524,7 @@ func TestCalledTimeout(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events = NewEvents(fcs) + events = NewEvents(context.Background(), fcs) err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil @@ -558,7 +558,7 @@ func TestCalledOrder(t *testing.T) { } require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) - events := NewEvents(fcs) + events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) diff --git a/storage/miner.go b/storage/miner.go index 5cbb7015d..2ffaa0c8b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -68,8 +68,7 @@ type storageMinerApi interface { func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) { return &Miner{ - api: api, - events: events.NewEvents(api), + api: api, maddr: addr, h: h, @@ -84,6 +83,8 @@ func (m *Miner) Run(ctx context.Context) error { return errors.Wrap(err, "miner preflight checks failed") } + m.events = events.NewEvents(ctx, m.api) + ts, err := m.api.ChainHead(ctx) if err != nil { return err