feat(chainwatch): power actor smoothing estimate

This commit is contained in:
frrist 2020-08-19 13:34:58 -07:00
parent fecaeda382
commit 05f1a23c94
3 changed files with 149 additions and 2 deletions

View File

@ -0,0 +1,135 @@
package processor
import (
"bytes"
"context"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
)
type powerActorInfo struct {
common actorInfo
epochSmoothingEstimate *smoothing.FilterEstimate
}
func (p *Processor) setupPower() error {
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create table if not exists power_smoothing_estimates
(
state_root text not null
constraint power_smoothing_estimates_pk
primary key,
position_estimate text not null,
velocity_estimate text not null
);
`); err != nil {
return err
}
return tx.Commit()
}
func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error {
powerChanges, err := p.processPowerActors(ctx, powerTips)
if err != nil {
return xerrors.Errorf("Failed to process power actors: %w", err)
}
if err := p.persistPowerActors(ctx, powerChanges); err != nil {
return err
}
return nil
}
func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) {
start := time.Now()
defer func() {
log.Debugw("Processed Power Actors", "duration", time.Since(start).String())
}()
var out []powerActorInfo
for tipset, powers := range powerTips {
for _, act := range powers {
var pw powerActorInfo
pw.common = act
powerActor, err := p.node.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset)
if err != nil {
return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err)
}
powerStateRaw, err := p.node.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return nil, xerrors.Errorf("read state obj (@ %s): %w", pw.common.stateroot.String(), err)
}
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerStateRaw)); err != nil {
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
}
pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed
out = append(out, pw)
}
}
return out, nil
}
func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error {
// NB: use errgroup when there is more than a single store operation
return p.storePowerSmoothingEstimates(powers)
}
func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error {
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err)
}
if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil {
return xerrors.Errorf("prep power_smoothing_estimates: %w", err)
}
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
if err != nil {
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err)
}
for _, powerState := range powers {
if _, err := stmt.Exec(
powerState.common.stateroot.String(),
powerState.epochSmoothingEstimate.PositionEstimate.String(),
powerState.epochSmoothingEstimate.VelocityEstimate.String(),
); err != nil {
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err)
}
if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err)
}
return nil
}

View File

@ -88,6 +88,10 @@ func (p *Processor) setupSchemas() error {
return err
}
if err := p.setupPower(); err != nil {
return err
}
return nil
}
@ -166,6 +170,14 @@ func (p *Processor) Start(ctx context.Context) {
return nil
})
grp.Go(func() error {
if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil {
return xerrors.Errorf("Failed to handle power actor changes: %w", err)
}
log.Info("Processes Power Changes")
return nil
})
grp.Go(func() error {
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
return xerrors.Errorf("Failed to handle message changes: %w", err)

View File

@ -3,7 +3,6 @@ package processor
import (
"bytes"
"context"
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
"time"
"golang.org/x/sync/errgroup"
@ -12,6 +11,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
@ -75,7 +75,7 @@ create table if not exists reward_smoothing_estimates
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
if err != nil {
log.Fatalw("Failed to process reward actors", "error", err)
return xerrors.Errorf("Failed to process reward actors: %w", err)
}
if err := p.persistRewardActors(ctx, rewardChanges); err != nil {