136 lines
3.7 KiB
Go
136 lines
3.7 KiB
Go
|
package processor
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/xerrors"
|
||
|
|
||
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||
|
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
|
||
|
)
|
||
|
|
||
|
type powerActorInfo struct {
|
||
|
common actorInfo
|
||
|
|
||
|
epochSmoothingEstimate *smoothing.FilterEstimate
|
||
|
}
|
||
|
|
||
|
func (p *Processor) setupPower() error {
|
||
|
tx, err := p.db.Begin()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if _, err := tx.Exec(`
|
||
|
create table if not exists power_smoothing_estimates
|
||
|
(
|
||
|
state_root text not null
|
||
|
constraint power_smoothing_estimates_pk
|
||
|
primary key,
|
||
|
position_estimate text not null,
|
||
|
velocity_estimate text not null
|
||
|
);
|
||
|
`); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return tx.Commit()
|
||
|
}
|
||
|
|
||
|
func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error {
|
||
|
powerChanges, err := p.processPowerActors(ctx, powerTips)
|
||
|
if err != nil {
|
||
|
return xerrors.Errorf("Failed to process power actors: %w", err)
|
||
|
}
|
||
|
|
||
|
if err := p.persistPowerActors(ctx, powerChanges); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) {
|
||
|
start := time.Now()
|
||
|
defer func() {
|
||
|
log.Debugw("Processed Power Actors", "duration", time.Since(start).String())
|
||
|
}()
|
||
|
|
||
|
var out []powerActorInfo
|
||
|
for tipset, powers := range powerTips {
|
||
|
for _, act := range powers {
|
||
|
var pw powerActorInfo
|
||
|
pw.common = act
|
||
|
|
||
|
powerActor, err := p.node.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset)
|
||
|
if err != nil {
|
||
|
return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err)
|
||
|
}
|
||
|
|
||
|
powerStateRaw, err := p.node.ChainReadObj(ctx, powerActor.Head)
|
||
|
if err != nil {
|
||
|
return nil, xerrors.Errorf("read state obj (@ %s): %w", pw.common.stateroot.String(), err)
|
||
|
}
|
||
|
|
||
|
var powerActorState power.State
|
||
|
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerStateRaw)); err != nil {
|
||
|
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
|
||
|
}
|
||
|
|
||
|
pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed
|
||
|
out = append(out, pw)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error {
|
||
|
// NB: use errgroup when there is more than a single store operation
|
||
|
return p.storePowerSmoothingEstimates(powers)
|
||
|
}
|
||
|
|
||
|
func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error {
|
||
|
tx, err := p.db.Begin()
|
||
|
if err != nil {
|
||
|
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err)
|
||
|
}
|
||
|
|
||
|
if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil {
|
||
|
return xerrors.Errorf("prep power_smoothing_estimates: %w", err)
|
||
|
}
|
||
|
|
||
|
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
|
||
|
if err != nil {
|
||
|
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err)
|
||
|
}
|
||
|
|
||
|
for _, powerState := range powers {
|
||
|
if _, err := stmt.Exec(
|
||
|
powerState.common.stateroot.String(),
|
||
|
powerState.epochSmoothingEstimate.PositionEstimate.String(),
|
||
|
powerState.epochSmoothingEstimate.VelocityEstimate.String(),
|
||
|
); err != nil {
|
||
|
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := stmt.Close(); err != nil {
|
||
|
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err)
|
||
|
}
|
||
|
|
||
|
if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
|
||
|
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err)
|
||
|
}
|
||
|
|
||
|
if err := tx.Commit(); err != nil {
|
||
|
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
|
||
|
}
|