diff --git a/cmd/lotus-chainwatch/syncer/blockssub.go b/cmd/lotus-chainwatch/syncer/blockssub.go index 04b78da0e..ea9c079e8 100644 --- a/cmd/lotus-chainwatch/syncer/blockssub.go +++ b/cmd/lotus-chainwatch/syncer/blockssub.go @@ -11,16 +11,17 @@ import ( func (s *Syncer) subBlocks(ctx context.Context) { sub, err := s.node.SyncIncomingBlocks(ctx) if err != nil { - log.Error(err) + log.Errorf("opening incoming block channel: %+v", err) return } + log.Infow("Capturing incoming blocks") for bh := range sub { err := s.storeHeaders(map[cid.Cid]*types.BlockHeader{ bh.Cid(): bh, }, false, time.Now()) if err != nil { - log.Errorf("%+v", err) + log.Errorf("storing incoming block header: %+v", err) } } } diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go index 79b9dd6a3..9688a5f8a 100644 --- a/cmd/lotus-chainwatch/syncer/sync.go +++ b/cmd/lotus-chainwatch/syncer/sync.go @@ -160,11 +160,8 @@ func (s *Syncer) Start(ctx context.Context) { log.Fatal(err) } - // continue to keep the block headers table up to date. - notifs, err := s.node.ChainNotify(ctx) - if err != nil { - log.Fatal(err) - } + // capture all reported blocks + go s.subBlocks(ctx) // we need to ensure that on a restart we don't reprocess the whole flarping chain var sinceEpoch uint64 @@ -177,6 +174,13 @@ func (s *Syncer) Start(ctx context.Context) { sinceEpoch = uint64(height) } } + + // continue to keep the block headers table up to date. + notifs, err := s.node.ChainNotify(ctx) + if err != nil { + log.Fatal(err) + } + go func() { for notif := range notifs { for _, change := range notif {