Merge pull request #3526 from filecoin-project/fix/chainwatch/no-panic-during-processing
Stop SyncIncomingBlocks from leaking into chainwatch processing; No panics during processing
This commit is contained in:
commit
0a620514b7
@ -254,7 +254,9 @@ func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types
|
|||||||
parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) {
|
parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) {
|
||||||
msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid())
|
msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
log.Debugw("ChainGetBlockMessages", "header_cid", header.Cid())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
vmm := make([]*types.Message, 0, len(msgs.Cids))
|
vmm := make([]*types.Message, 0, len(msgs.Cids))
|
||||||
@ -290,11 +292,15 @@ func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid]
|
|||||||
parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) {
|
parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) {
|
||||||
recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid())
|
recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
log.Debugw("ChainGetParentReceipts", "header_cid", header.Cid())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid())
|
msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
log.Debugw("ChainGetParentMessages", "header_cid", header.Cid())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lk.Lock()
|
lk.Lock()
|
||||||
|
@ -246,7 +246,8 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
|
|
||||||
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if pts.ParentState().Equals(bh.ParentStateRoot) {
|
if pts.ParentState().Equals(bh.ParentStateRoot) {
|
||||||
@ -260,7 +261,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
// a separate strategy for deleted actors
|
// a separate strategy for deleted actors
|
||||||
changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
|
changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
log.Debugw("StateChangedActors", "grandparent_state", pts.ParentState(), "parent_state", bh.ParentStateRoot)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// record the state of all actors that have changed
|
// record the state of all actors that have changed
|
||||||
@ -271,7 +274,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
// ignore actors that were deleted.
|
// ignore actors that were deleted.
|
||||||
has, err := p.node.ChainHasObj(ctx, act.Head)
|
has, err := p.node.ChainHasObj(ctx, act.Head)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Error(err)
|
||||||
|
log.Debugw("ChanHasObj", "actor_head", act.Head)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if !has {
|
if !has {
|
||||||
continue
|
continue
|
||||||
@ -279,19 +284,24 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
|
|
||||||
addr, err := address.NewFromString(a)
|
addr, err := address.NewFromString(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err.Error())
|
log.Error(err)
|
||||||
|
log.Debugw("NewFromString", "address_string", a)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err.Error())
|
log.Error(err)
|
||||||
|
log.Debugw("StateReadState", "address_string", a, "parent_tipset_key", pts.Key())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
||||||
|
|
||||||
state, err := json.Marshal(ast.State)
|
state, err := json.Marshal(ast.State)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Error(err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
outMu.Lock()
|
outMu.Lock()
|
||||||
@ -324,10 +334,10 @@ func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.C
|
|||||||
}()
|
}()
|
||||||
rows, err := p.db.Query(`
|
rows, err := p.db.Query(`
|
||||||
with toProcess as (
|
with toProcess as (
|
||||||
select blocks.cid, blocks.height, rank() over (order by height) as rnk
|
select b.cid, b.height, rank() over (order by height) as rnk
|
||||||
from blocks
|
from blocks_synced bs
|
||||||
left join blocks_synced bs on blocks.cid = bs.cid
|
left join blocks b on bs.cid = b.cid
|
||||||
where bs.processed_at is null and blocks.height > 0
|
where bs.processed_at is null and b.height > 0
|
||||||
)
|
)
|
||||||
select cid
|
select cid
|
||||||
from toProcess
|
from toProcess
|
||||||
|
Loading…
Reference in New Issue
Block a user