fix: reward actor processes null round states

This commit is contained in:
frrist 2020-07-27 16:55:21 -07:00
parent 291d2fe2de
commit ba7eaf3cc7
2 changed files with 52 additions and 10 deletions

View File

@ -119,7 +119,7 @@ func (p *Processor) Start(ctx context.Context) {
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where // TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
// before doing "normal" processing. // before doing "normal" processing.
actorChanges, err := p.collectActorChanges(ctx, toProcess) actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess)
if err != nil { if err != nil {
log.Fatalw("Failed to collect actor changes", "error", err) log.Fatalw("Failed to collect actor changes", "error", err)
} }
@ -141,7 +141,7 @@ func (p *Processor) Start(ctx context.Context) {
}) })
grp.Go(func() error { grp.Go(func() error {
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil { if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
return xerrors.Errorf("Failed to handle reward changes: %w", err) return xerrors.Errorf("Failed to handle reward changes: %w", err)
} }
return nil return nil
@ -191,7 +191,7 @@ func (p *Processor) refreshViews() error {
return nil return nil
} }
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) { func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String()) log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
@ -204,6 +204,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
var changes map[string]types.Actor var changes map[string]types.Actor
actorsSeen := map[cid.Cid]struct{}{} actorsSeen := map[cid.Cid]struct{}{}
var nullRounds []types.TipSetKey
var nullBlkMu sync.Mutex
// collect all actor state that has changes between block headers // collect all actor state that has changes between block headers
paDone := 0 paDone := 0
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) { parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
@ -217,6 +220,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
panic(err) panic(err)
} }
if pts.ParentState().Equals(bh.ParentStateRoot) {
nullBlkMu.Lock()
nullRounds = append(nullRounds, pts.Key())
nullBlkMu.Unlock()
}
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state. // collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider // TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
// a separate strategy for deleted actors // a separate strategy for deleted actors
@ -232,12 +241,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
addr, err := address.NewFromString(a) addr, err := address.NewFromString(a)
if err != nil { if err != nil {
panic(err) log.Fatal(err.Error())
} }
ast, err := p.node.StateReadState(ctx, addr, pts.Key()) ast, err := p.node.StateReadState(ctx, addr, pts.Key())
if err != nil { if err != nil {
panic(err) log.Fatal(err.Error())
} }
// TODO look here for an empty state, maybe thats a sign the actor was deleted? // TODO look here for an empty state, maybe thats a sign the actor was deleted?
@ -267,7 +276,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
outMu.Unlock() outMu.Unlock()
} }
}) })
return out, nil return out, nullRounds, nil
} }
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) { func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {

View File

@ -8,10 +8,12 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
) )
type rewardActorInfo struct { type rewardActorInfo struct {
@ -58,8 +60,8 @@ create table if not exists chain_power
return tx.Commit() return tx.Commit()
} }
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips) error { func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
rewardChanges, err := p.processRewardActors(ctx, rewardTips) rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
if err != nil { if err != nil {
log.Fatalw("Failed to process reward actors", "error", err) log.Fatalw("Failed to process reward actors", "error", err)
} }
@ -71,7 +73,7 @@ func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTip
return nil return nil
} }
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) { func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String()) log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
@ -104,6 +106,37 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
out = append(out, rw) out = append(out, rw)
} }
} }
for _, tsKey := range nullRounds {
var rw rewardActorInfo
tipset , err := p.node.ChainGetTipSet(ctx, tsKey)
if err != nil {
return nil, err
}
rw.common.tsKey = tipset.Key()
rw.common.height = tipset.Height()
rw.common.stateroot = tipset.ParentState()
rw.common.parentTsKey = tipset.Parents()
// get reward actor states at each tipset once for all updates
rewardActor, err := p.node.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
if err != nil {
return nil, err
}
rewardStateRaw, err := p.node.ChainReadObj(ctx, rewardActor.Head)
if err != nil {
return nil, err
}
var rewardActorState reward.State
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
return nil, err
}
rw.baseBlockReward = rewardActorState.ThisEpochReward
rw.baselinePower = rewardActorState.ThisEpochBaselinePower
out = append(out, rw)
}
return out, nil return out, nil
} }