From f2dd0f046ebfc575b4008a36929c4b3b09e38826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Apr 2020 20:02:52 +0200 Subject: [PATCH] wdpost: Make chain notif loop reconnect friendly --- storage/wdpost_sched.go | 55 +++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index c4c319931..4838fe51e 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -3,6 +3,7 @@ package storage import ( "context" "sync" + "time" "go.opencensus.io/trace" "golang.org/x/xerrors" @@ -43,32 +44,50 @@ func NewWindowedPoStScheduler(api storageMinerApi, sb storage.Prover, actor addr } func (s *WindowPoStScheduler) Run(ctx context.Context) { - notifs, err := s.api.ChainNotify(ctx) - if err != nil { - return - } - - current := <-notifs - if len(current) != 1 { - panic("expected first notif to have len = 1") - } - if current[0].Type != store.HCCurrent { - panic("expected first notif to tell current ts") - } - - if err := s.update(ctx, current[0].Val); err != nil { - panic(err) - } - defer s.abortActivePoSt() + var notifs <-chan []*store.HeadChange + var err error + var gotCur bool + // not fine to panic after this point for { + if notifs == nil { + notifs, err = s.api.ChainNotify(ctx) + if err != nil { + log.Errorf("ChainNotify error: %+v") + + time.Sleep(10 * time.Second) + continue + } + + gotCur = false + } + select { case changes, ok := <-notifs: if !ok { log.Warn("WindowPoStScheduler notifs channel closed") - return + notifs = nil + continue + } + + if !gotCur { + if len(changes) != 1 { + log.Errorf("expected first notif to have len = 1") + continue + } + if changes[0].Type != store.HCCurrent { + log.Errorf("expected first notif to tell current ts") + continue + } + + if err := s.update(ctx, changes[0].Val); err != nil { + log.Errorf("%+v", err) + } + + gotCur = true + continue } ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.headChange")