diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index f91709b56..ef43601ec 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -119,7 +119,7 @@ func (p *Processor) Start(ctx context.Context) { // TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where // before doing "normal" processing. - actorChanges, err := p.collectActorChanges(ctx, toProcess) + actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess) if err != nil { log.Fatalw("Failed to collect actor changes", "error", err) } @@ -141,7 +141,7 @@ func (p *Processor) Start(ctx context.Context) { }) grp.Go(func() error { - if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil { + if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil { return xerrors.Errorf("Failed to handle reward changes: %w", err) } return nil @@ -191,7 +191,7 @@ func (p *Processor) refreshViews() error { return nil } -func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) { +func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) { start := time.Now() defer func() { log.Debugw("Collected Actor Changes", "duration", time.Since(start).String()) @@ -204,6 +204,9 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C var changes map[string]types.Actor actorsSeen := map[cid.Cid]struct{}{} + var nullRounds []types.TipSetKey + var nullBlkMu sync.Mutex + // collect all actor state that has changes between block headers paDone := 0 parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) { @@ -217,6 +220,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C panic(err) } + if pts.ParentState().Equals(bh.ParentStateRoot) { + nullBlkMu.Lock() + nullRounds = append(nullRounds, pts.Key()) + nullBlkMu.Unlock() + } + // collect all actors that had state changes between the blockheader parent-state and its grandparent-state. // TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider // a separate strategy for deleted actors @@ -232,12 +241,12 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C addr, err := address.NewFromString(a) if err != nil { - panic(err) + log.Fatal(err.Error()) } ast, err := p.node.StateReadState(ctx, addr, pts.Key()) if err != nil { - panic(err) + log.Fatal(err.Error()) } // TODO look here for an empty state, maybe thats a sign the actor was deleted? @@ -267,7 +276,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C outMu.Unlock() } }) - return out, nil + return out, nullRounds, nil } func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) { diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index af339b74d..19c34fa61 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -8,10 +8,12 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/reward" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" ) type rewardActorInfo struct { @@ -58,8 +60,8 @@ create table if not exists chain_power return tx.Commit() } -func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips) error { - rewardChanges, err := p.processRewardActors(ctx, rewardTips) +func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error { + rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds) if err != nil { log.Fatalw("Failed to process reward actors", "error", err) } @@ -71,7 +73,7 @@ func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTip return nil } -func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) { +func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) { start := time.Now() defer func() { log.Debugw("Processed Reward Actors", "duration", time.Since(start).String()) @@ -104,6 +106,37 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip out = append(out, rw) } } + for _, tsKey := range nullRounds { + var rw rewardActorInfo + tipset , err := p.node.ChainGetTipSet(ctx, tsKey) + if err != nil { + return nil, err + } + rw.common.tsKey = tipset.Key() + rw.common.height = tipset.Height() + rw.common.stateroot = tipset.ParentState() + rw.common.parentTsKey = tipset.Parents() + // get reward actor states at each tipset once for all updates + rewardActor, err := p.node.StateGetActor(ctx, builtin.RewardActorAddr, tsKey) + if err != nil { + return nil, err + } + + rewardStateRaw, err := p.node.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + return nil, err + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + return nil, err + } + + rw.baseBlockReward = rewardActorState.ThisEpochReward + rw.baselinePower = rewardActorState.ThisEpochBaselinePower + out = append(out, rw) + } + return out, nil }