Merge pull request #2615 from filecoin-project/fix/chainwatch-null-block-reward-actor
fix: Chainwatch handle null rounds
This commit is contained in:
commit
c0f0e2ba45
@ -119,7 +119,7 @@ func (p *Processor) Start(ctx context.Context) {
|
|||||||
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
|
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
|
||||||
// before doing "normal" processing.
|
// before doing "normal" processing.
|
||||||
|
|
||||||
actorChanges, err := p.collectActorChanges(ctx, toProcess)
|
actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalw("Failed to collect actor changes", "error", err)
|
log.Fatalw("Failed to collect actor changes", "error", err)
|
||||||
}
|
}
|
||||||
@ -141,7 +141,7 @@ func (p *Processor) Start(ctx context.Context) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil {
|
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle reward changes: %w", err)
|
return xerrors.Errorf("Failed to handle reward changes: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -191,7 +191,7 @@ func (p *Processor) refreshViews() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) {
|
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
|
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
|
||||||
@ -204,6 +204,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
var changes map[string]types.Actor
|
var changes map[string]types.Actor
|
||||||
actorsSeen := map[cid.Cid]struct{}{}
|
actorsSeen := map[cid.Cid]struct{}{}
|
||||||
|
|
||||||
|
var nullRounds []types.TipSetKey
|
||||||
|
var nullBlkMu sync.Mutex
|
||||||
|
|
||||||
// collect all actor state that has changes between block headers
|
// collect all actor state that has changes between block headers
|
||||||
paDone := 0
|
paDone := 0
|
||||||
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
||||||
@ -217,6 +220,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if pts.ParentState().Equals(bh.ParentStateRoot) {
|
||||||
|
nullBlkMu.Lock()
|
||||||
|
nullRounds = append(nullRounds, pts.Key())
|
||||||
|
nullBlkMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
|
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
|
||||||
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
|
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
|
||||||
// a separate strategy for deleted actors
|
// a separate strategy for deleted actors
|
||||||
@ -232,12 +241,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
|
|
||||||
addr, err := address.NewFromString(a)
|
addr, err := address.NewFromString(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
log.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
||||||
@ -267,7 +276,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
outMu.Unlock()
|
outMu.Unlock()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return out, nil
|
return out, nullRounds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
||||||
|
@ -8,10 +8,12 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type rewardActorInfo struct {
|
type rewardActorInfo struct {
|
||||||
@ -58,8 +60,8 @@ create table if not exists chain_power
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips) error {
|
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
|
||||||
rewardChanges, err := p.processRewardActors(ctx, rewardTips)
|
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalw("Failed to process reward actors", "error", err)
|
log.Fatalw("Failed to process reward actors", "error", err)
|
||||||
}
|
}
|
||||||
@ -71,7 +73,7 @@ func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTip
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) {
|
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
|
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
|
||||||
@ -104,6 +106,37 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
|
|||||||
out = append(out, rw)
|
out = append(out, rw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for _, tsKey := range nullRounds {
|
||||||
|
var rw rewardActorInfo
|
||||||
|
tipset , err := p.node.ChainGetTipSet(ctx, tsKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
rw.common.tsKey = tipset.Key()
|
||||||
|
rw.common.height = tipset.Height()
|
||||||
|
rw.common.stateroot = tipset.ParentState()
|
||||||
|
rw.common.parentTsKey = tipset.Parents()
|
||||||
|
// get reward actor states at each tipset once for all updates
|
||||||
|
rewardActor, err := p.node.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardStateRaw, err := p.node.ChainReadObj(ctx, rewardActor.Head)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var rewardActorState reward.State
|
||||||
|
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rw.baseBlockReward = rewardActorState.ThisEpochReward
|
||||||
|
rw.baselinePower = rewardActorState.ThisEpochBaselinePower
|
||||||
|
out = append(out, rw)
|
||||||
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user