Merge pull request #3170 from filecoin-project/frrist/chainwatch-SectorOnChainInfo-ext
refactor(cw): add new values to sector_info table and smoothing estimate
This commit is contained in:
commit
f943ea7d5d
@ -88,6 +88,8 @@ create table if not exists sector_info
|
|||||||
verified_deal_weight text not null,
|
verified_deal_weight text not null,
|
||||||
|
|
||||||
initial_pledge text not null,
|
initial_pledge text not null,
|
||||||
|
expected_day_reward text not null,
|
||||||
|
expected_storage_pledge text not null,
|
||||||
|
|
||||||
constraint sector_info_pk
|
constraint sector_info_pk
|
||||||
primary key (miner_id, sector_id, sealed_cid)
|
primary key (miner_id, sector_id, sealed_cid)
|
||||||
@ -307,6 +309,7 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
|
|||||||
}
|
}
|
||||||
|
|
||||||
stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`)
|
stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
|
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
|
||||||
}
|
}
|
||||||
@ -431,7 +434,7 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
|
|||||||
return xerrors.Errorf("Failed to create temp table for sector_: %w", err)
|
return xerrors.Errorf("Failed to create temp table for sector_: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge) from STDIN`)
|
stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge, expected_day_reward, expected_storage_pledge) from STDIN`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
|
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
|
||||||
}
|
}
|
||||||
@ -463,6 +466,8 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
|
|||||||
added.DealWeight.String(),
|
added.DealWeight.String(),
|
||||||
added.VerifiedDealWeight.String(),
|
added.VerifiedDealWeight.String(),
|
||||||
added.InitialPledge.String(),
|
added.InitialPledge.String(),
|
||||||
|
added.ExpectedDayReward.String(),
|
||||||
|
added.ExpectedStoragePledge.String(),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
135
cmd/lotus-chainwatch/processor/power.go
Normal file
135
cmd/lotus-chainwatch/processor/power.go
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
package processor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
|
||||||
|
)
|
||||||
|
|
||||||
|
type powerActorInfo struct {
|
||||||
|
common actorInfo
|
||||||
|
|
||||||
|
epochSmoothingEstimate *smoothing.FilterEstimate
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Processor) setupPower() error {
|
||||||
|
tx, err := p.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`
|
||||||
|
create table if not exists power_smoothing_estimates
|
||||||
|
(
|
||||||
|
state_root text not null
|
||||||
|
constraint power_smoothing_estimates_pk
|
||||||
|
primary key,
|
||||||
|
position_estimate text not null,
|
||||||
|
velocity_estimate text not null
|
||||||
|
);
|
||||||
|
`); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error {
|
||||||
|
powerChanges, err := p.processPowerActors(ctx, powerTips)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Failed to process power actors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.persistPowerActors(ctx, powerChanges); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) {
|
||||||
|
start := time.Now()
|
||||||
|
defer func() {
|
||||||
|
log.Debugw("Processed Power Actors", "duration", time.Since(start).String())
|
||||||
|
}()
|
||||||
|
|
||||||
|
var out []powerActorInfo
|
||||||
|
for tipset, powers := range powerTips {
|
||||||
|
for _, act := range powers {
|
||||||
|
var pw powerActorInfo
|
||||||
|
pw.common = act
|
||||||
|
|
||||||
|
powerActor, err := p.node.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
powerStateRaw, err := p.node.ChainReadObj(ctx, powerActor.Head)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("read state obj (@ %s): %w", pw.common.stateroot.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var powerActorState power.State
|
||||||
|
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerStateRaw)); err != nil {
|
||||||
|
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed
|
||||||
|
out = append(out, pw)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error {
|
||||||
|
// NB: use errgroup when there is more than a single store operation
|
||||||
|
return p.storePowerSmoothingEstimates(powers)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error {
|
||||||
|
tx, err := p.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep power_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, powerState := range powers {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
powerState.common.stateroot.String(),
|
||||||
|
powerState.epochSmoothingEstimate.PositionEstimate.String(),
|
||||||
|
powerState.epochSmoothingEstimate.VelocityEstimate.String(),
|
||||||
|
); err != nil {
|
||||||
|
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
@ -88,6 +88,10 @@ func (p *Processor) setupSchemas() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := p.setupPower(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,6 +170,14 @@ func (p *Processor) Start(ctx context.Context) {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
grp.Go(func() error {
|
||||||
|
if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil {
|
||||||
|
return xerrors.Errorf("Failed to handle power actor changes: %w", err)
|
||||||
|
}
|
||||||
|
log.Info("Processes Power Changes")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle message changes: %w", err)
|
return xerrors.Errorf("Failed to handle message changes: %w", err)
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/util/smoothing"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -24,6 +25,8 @@ type rewardActorInfo struct {
|
|||||||
|
|
||||||
// base reward in attofil for each block found during this epoch
|
// base reward in attofil for each block found during this epoch
|
||||||
baseBlockReward big.Int
|
baseBlockReward big.Int
|
||||||
|
|
||||||
|
epochSmoothingEstimate *smoothing.FilterEstimate
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) setupRewards() error {
|
func (p *Processor) setupRewards() error {
|
||||||
@ -53,6 +56,15 @@ create table if not exists chain_power
|
|||||||
primary key,
|
primary key,
|
||||||
baseline_power text not null
|
baseline_power text not null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
create table if not exists reward_smoothing_estimates
|
||||||
|
(
|
||||||
|
state_root text not null
|
||||||
|
constraint reward_smoothing_estimates_pk
|
||||||
|
primary key,
|
||||||
|
position_estimate text not null,
|
||||||
|
velocity_estimate text not null
|
||||||
|
);
|
||||||
`); err != nil {
|
`); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -63,7 +75,7 @@ create table if not exists chain_power
|
|||||||
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
|
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
|
||||||
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
|
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalw("Failed to process reward actors", "error", err)
|
return xerrors.Errorf("Failed to process reward actors: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.persistRewardActors(ctx, rewardChanges); err != nil {
|
if err := p.persistRewardActors(ctx, rewardChanges); err != nil {
|
||||||
@ -103,6 +115,7 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
|
|||||||
|
|
||||||
rw.baseBlockReward = rewardActorState.ThisEpochReward
|
rw.baseBlockReward = rewardActorState.ThisEpochReward
|
||||||
rw.baselinePower = rewardActorState.ThisEpochBaselinePower
|
rw.baselinePower = rewardActorState.ThisEpochBaselinePower
|
||||||
|
rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed
|
||||||
out = append(out, rw)
|
out = append(out, rw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -162,6 +175,13 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
grp.Go(func() error {
|
||||||
|
if err := p.storeRewardSmoothingEstimates(rewards); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
return grp.Wait()
|
return grp.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,3 +263,43 @@ func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error {
|
||||||
|
tx, err := p.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep reward_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rewardState := range rewards {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
rewardState.common.stateroot.String(),
|
||||||
|
rewardState.epochSmoothingEstimate.PositionEstimate.String(),
|
||||||
|
rewardState.epochSmoothingEstimate.VelocityEstimate.String(),
|
||||||
|
); err != nil {
|
||||||
|
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user