lotus/cmd/lotus-chainwatch/processor/reward.go
2020-09-18 13:52:07 -07:00

235 lines
7.0 KiB
Go

package processor
import (
"context"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/builtin/reward"
"github.com/filecoin-project/lotus/chain/types"
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
)
type rewardActorInfo struct {
common actorInfo
cumSumBaselinePower big.Int
cumSumRealizedPower big.Int
effectiveNetworkTime abi.ChainEpoch
effectiveBaselinePower big.Int
// NOTE: These variables are wrong. Talk to @ZX about fixing. These _do
// not_ represent "new" anything.
newBaselinePower big.Int
newBaseReward big.Int
newSmoothingEstimate builtin.FilterEstimate
totalMinedReward big.Int
}
func (rw *rewardActorInfo) set(s reward.State) (err error) {
rw.cumSumBaselinePower, err = s.CumsumBaseline()
if err != nil {
return xerrors.Errorf("getting cumsum baseline power (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.cumSumRealizedPower, err = s.CumsumRealized()
if err != nil {
return xerrors.Errorf("getting cumsum realized power (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.effectiveNetworkTime, err = s.EffectiveNetworkTime()
if err != nil {
return xerrors.Errorf("getting effective network time (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.effectiveBaselinePower, err = s.EffectiveBaselinePower()
if err != nil {
return xerrors.Errorf("getting effective baseline power (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.totalMinedReward, err = s.TotalStoragePowerReward()
if err != nil {
return xerrors.Errorf("getting total mined (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.newBaselinePower, err = s.ThisEpochBaselinePower()
if err != nil {
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.newBaseReward, err = s.ThisEpochReward()
if err != nil {
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
}
rw.newSmoothingEstimate, err = s.ThisEpochRewardSmoothed()
if err != nil {
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
}
return nil
}
func (p *Processor) setupRewards() error {
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
/* captures chain-specific power state for any given stateroot */
create table if not exists chain_reward
(
state_root text not null
constraint chain_reward_pk
primary key,
cum_sum_baseline text not null,
cum_sum_realized text not null,
effective_network_time int not null,
effective_baseline_power text not null,
new_baseline_power text not null,
new_reward numeric not null,
new_reward_smoothed_position_estimate text not null,
new_reward_smoothed_velocity_estimate text not null,
total_mined_reward text not null
);
`); err != nil {
return err
}
return tx.Commit()
}
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
if err != nil {
return xerrors.Errorf("Failed to process reward actors: %w", err)
}
if err := p.persistRewardActors(ctx, rewardChanges); err != nil {
return err
}
return nil
}
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) {
start := time.Now()
defer func() {
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
}()
var out []rewardActorInfo
for tipset, rewards := range rewardTips {
for _, act := range rewards {
var rw rewardActorInfo
rw.common = act
// get reward actor states at each tipset once for all updates
rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tipset)
if err != nil {
return nil, xerrors.Errorf("get reward state (@ %s): %w", rw.common.stateroot.String(), err)
}
rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor)
if err != nil {
return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err)
}
if err := rw.set(rewardActorState); err != nil {
return nil, err
}
out = append(out, rw)
}
}
for _, tsKey := range nullRounds {
var rw rewardActorInfo
tipset, err := p.node.ChainGetTipSet(ctx, tsKey)
if err != nil {
return nil, err
}
rw.common.tsKey = tipset.Key()
rw.common.height = tipset.Height()
rw.common.stateroot = tipset.ParentState()
rw.common.parentTsKey = tipset.Parents()
// get reward actor states at each tipset once for all updates
rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tsKey)
if err != nil {
return nil, err
}
rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor)
if err != nil {
return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err)
}
if err := rw.set(rewardActorState); err != nil {
return nil, err
}
out = append(out, rw)
}
return out, nil
}
func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin chain_reward tx: %w", err)
}
if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_reward temp: %w", err)
}
stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp chain_reward: %w", err)
}
for _, rewardState := range rewards {
if _, err := stmt.Exec(
rewardState.common.stateroot.String(),
rewardState.cumSumBaselinePower.String(),
rewardState.cumSumRealizedPower.String(),
uint64(rewardState.effectiveNetworkTime),
rewardState.effectiveBaselinePower.String(),
rewardState.newBaselinePower.String(),
rewardState.newBaseReward.String(),
rewardState.newSmoothingEstimate.PositionEstimate.String(),
rewardState.newSmoothingEstimate.VelocityEstimate.String(),
rewardState.totalMinedReward.String(),
); err != nil {
log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared chain_reward: %w", err)
}
if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_reward from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit chain_reward tx: %w", err)
}
return nil
}