package processor import ( "context" "time" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/chain/actors/builtin" ) type powerActorInfo struct { common actorInfo totalRawBytes big.Int totalRawBytesCommitted big.Int totalQualityAdjustedBytes big.Int totalQualityAdjustedBytesCommitted big.Int totalPledgeCollateral big.Int qaPowerSmoothed builtin.FilterEstimate minerCount int64 minerCountAboveMinimumPower int64 } func (p *Processor) setupPower() error { tx, err := p.db.Begin() if err != nil { return err } if _, err := tx.Exec(` create table if not exists chain_power ( state_root text not null constraint power_smoothing_estimates_pk primary key, total_raw_bytes_power text not null, total_raw_bytes_committed text not null, total_qa_bytes_power text not null, total_qa_bytes_committed text not null, total_pledge_collateral text not null, qa_smoothed_position_estimate text not null, qa_smoothed_velocity_estimate text not null, miner_count int not null, minimum_consensus_miner_count int not null ); `); err != nil { return err } return tx.Commit() } func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error { powerChanges, err := p.processPowerActors(ctx, powerTips) if err != nil { return xerrors.Errorf("Failed to process power actors: %w", err) } if err := p.persistPowerActors(ctx, powerChanges); err != nil { return err } return nil } func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) { start := time.Now() defer func() { log.Debugw("Processed Power Actors", "duration", time.Since(start).String()) }() var out []powerActorInfo for tipset, powerStates := range powerTips { for _, act := range powerStates { var pw powerActorInfo pw.common = act powerActorState, err := getPowerActorState(ctx, p.node, tipset) if err != nil { return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err) } totalPower, err := powerActorState.TotalPower() if err != nil { return nil, xerrors.Errorf("failed to compute total power: %w", err) } totalCommitted, err := powerActorState.TotalCommitted() if err != nil { return nil, xerrors.Errorf("failed to compute total committed: %w", err) } totalLocked, err := powerActorState.TotalLocked() if err != nil { return nil, xerrors.Errorf("failed to compute total locked: %w", err) } powerSmoothed, err := powerActorState.TotalPowerSmoothed() if err != nil { return nil, xerrors.Errorf("failed to determine smoothed power: %w", err) } // NOTE: this doesn't set new* fields. Previously, we // filled these using ThisEpoch* fields from the actor // state, but these fields are effectively internal // state and don't represent "new" power, as was // assumed. participatingMiners, totalMiners, err := powerActorState.MinerCounts() if err != nil { return nil, xerrors.Errorf("failed to count miners: %w", err) } pw.totalRawBytes = totalPower.RawBytePower pw.totalQualityAdjustedBytes = totalPower.QualityAdjPower pw.totalRawBytesCommitted = totalCommitted.RawBytePower pw.totalQualityAdjustedBytesCommitted = totalCommitted.QualityAdjPower pw.totalPledgeCollateral = totalLocked pw.qaPowerSmoothed = powerSmoothed pw.minerCountAboveMinimumPower = int64(participatingMiners) pw.minerCount = int64(totalMiners) } } return out, nil } func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error { // NB: use errgroup when there is more than a single store operation return p.storePowerSmoothingEstimates(powerStates) } func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error { tx, err := p.db.Begin() if err != nil { return xerrors.Errorf("begin chain_power tx: %w", err) } if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil { return xerrors.Errorf("prep chain_power: %w", err) } stmt, err := tx.Prepare(`copy cp (state_root, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`) if err != nil { return xerrors.Errorf("prepare tmp chain_power: %w", err) } for _, ps := range powerStates { if _, err := stmt.Exec( ps.common.stateroot.String(), ps.totalRawBytes.String(), ps.totalRawBytesCommitted.String(), ps.totalQualityAdjustedBytes.String(), ps.totalQualityAdjustedBytesCommitted.String(), ps.totalPledgeCollateral.String(), ps.qaPowerSmoothed.PositionEstimate.String(), ps.qaPowerSmoothed.VelocityEstimate.String(), ps.minerCount, ps.minerCountAboveMinimumPower, ); err != nil { return xerrors.Errorf("failed to store smoothing estimate: %w", err) } } if err := stmt.Close(); err != nil { return xerrors.Errorf("close prepared chain_power: %w", err) } if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { return xerrors.Errorf("insert chain_power from tmp: %w", err) } if err := tx.Commit(); err != nil { return xerrors.Errorf("commit chain_power tx: %w", err) } return nil }