events: Safer restarting in listenHeadChanges

This commit is contained in:
Łukasz Magiera 2019-09-18 20:07:39 +02:00
parent cc82cc9675
commit 689b1e5b3c
3 changed files with 27 additions and 31 deletions

View File

@ -47,7 +47,7 @@ type Events struct {
calledEvents
}
func NewEvents(api eventApi) *Events {
func NewEvents(ctx context.Context, api eventApi) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
@ -81,7 +81,7 @@ func NewEvents(api eventApi) *Events {
e.ready.Add(1)
go e.listenHeadChanges(context.TODO())
go e.listenHeadChanges(ctx)
e.ready.Wait()
@ -90,48 +90,44 @@ func NewEvents(api eventApi) *Events {
return e
}
func (e *Events) restartHeadChanges(ctx context.Context) {
go func() {
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
if ctx.Err() != nil {
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
time.Sleep(time.Second)
log.Info("restarting listenHeadChanges")
e.listenHeadChanges(ctx)
}()
}
}
func (e *Events) listenHeadChanges(ctx context.Context) {
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// TODO: retry
log.Errorf("listenHeadChanges ChainNotify call failed: %s", err)
e.restartHeadChanges(ctx)
return
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
}
cur, ok := <-notifs // TODO: timeout?
if !ok {
log.Error("notification channel closed")
e.restartHeadChanges(ctx)
return
return xerrors.Errorf("notification channel closed")
}
if len(cur) != 1 {
log.Errorf("unexpected initial head notification length: %d", len(cur))
e.restartHeadChanges(ctx)
return
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
}
if cur[0].Type != store.HCCurrent {
log.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
e.restartHeadChanges(ctx)
return
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
if err := e.tsc.add(cur[0].Val); err != nil {
log.Warn("tsc.add: adding current tipset failed: %s", err)
log.Warn("tsc.add: adding current tipset failed: %w", err)
}
e.readyOnce.Do(func() {
@ -156,8 +152,7 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
}
}
log.Warn("listenHeadChanges loop quit")
e.restartHeadChanges(ctx)
return nil
}
func (e *Events) headChange(rev, app []*types.TipSet) error {

View File

@ -160,7 +160,7 @@ func TestAt(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
@ -219,7 +219,7 @@ func TestAtStart(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 5, nil) // 6
@ -253,7 +253,7 @@ func TestAtStartConfidence(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 10, nil) // 11
@ -286,7 +286,7 @@ func TestCalled(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -484,7 +484,7 @@ func TestCalledTimeout(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
@ -524,7 +524,7 @@ func TestCalledTimeout(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events = NewEvents(fcs)
events = NewEvents(context.Background(), fcs)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
@ -558,7 +558,7 @@ func TestCalledOrder(t *testing.T) {
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)

View File

@ -69,7 +69,6 @@ type storageMinerApi interface {
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) {
return &Miner{
api: api,
events: events.NewEvents(api),
maddr: addr,
h: h,
@ -84,6 +83,8 @@ func (m *Miner) Run(ctx context.Context) error {
return errors.Wrap(err, "miner preflight checks failed")
}
m.events = events.NewEvents(ctx, m.api)
ts, err := m.api.ChainHead(ctx)
if err != nil {
return err