diff --git a/cmd/lotus-chainwatch/processor/messages.go b/cmd/lotus-chainwatch/processor/messages.go index e3d23f219..333477c6a 100644 --- a/cmd/lotus-chainwatch/processor/messages.go +++ b/cmd/lotus-chainwatch/processor/messages.go @@ -254,7 +254,9 @@ func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) { msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid()) if err != nil { - panic(err) + log.Error(err) + log.Debugw("ChainGetBlockMessages", "header_cid", header.Cid()) + return } vmm := make([]*types.Message, 0, len(msgs.Cids)) @@ -290,11 +292,15 @@ func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid] parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) { recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid()) if err != nil { - panic(err) + log.Error(err) + log.Debugw("ChainGetParentReceipts", "header_cid", header.Cid()) + return } msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid()) if err != nil { - panic(err) + log.Error(err) + log.Debugw("ChainGetParentMessages", "header_cid", header.Cid()) + return } lk.Lock() diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index 59e3bbb7e..e6c2ffb94 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -246,7 +246,8 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) if err != nil { - panic(err) + log.Error(err) + return } if pts.ParentState().Equals(bh.ParentStateRoot) { @@ -260,7 +261,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C // a separate strategy for deleted actors changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) if err != nil { - panic(err) + log.Error(err) + log.Debugw("StateChangedActors", "grandparent_state", pts.ParentState(), "parent_state", bh.ParentStateRoot) + return } // record the state of all actors that have changed @@ -271,7 +274,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C // ignore actors that were deleted. has, err := p.node.ChainHasObj(ctx, act.Head) if err != nil { - log.Fatal(err) + log.Error(err) + log.Debugw("ChanHasObj", "actor_head", act.Head) + return } if !has { continue @@ -279,19 +284,24 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C addr, err := address.NewFromString(a) if err != nil { - log.Fatal(err.Error()) + log.Error(err) + log.Debugw("NewFromString", "address_string", a) + return } ast, err := p.node.StateReadState(ctx, addr, pts.Key()) if err != nil { - log.Fatal(err.Error()) + log.Error(err) + log.Debugw("StateReadState", "address_string", a, "parent_tipset_key", pts.Key()) + return } // TODO look here for an empty state, maybe thats a sign the actor was deleted? state, err := json.Marshal(ast.State) if err != nil { - panic(err) + log.Error(err) + return } outMu.Lock() @@ -324,10 +334,10 @@ func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.C }() rows, err := p.db.Query(` with toProcess as ( - select blocks.cid, blocks.height, rank() over (order by height) as rnk - from blocks - left join blocks_synced bs on blocks.cid = bs.cid - where bs.processed_at is null and blocks.height > 0 + select b.cid, b.height, rank() over (order by height) as rnk + from blocks_synced bs + left join blocks b on bs.cid = b.cid + where bs.processed_at is null and b.height > 0 ) select cid from toProcess