From 3df85ca784de27eb3b6041bcaa061054a7f04c26 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 19 Aug 2020 11:50:29 -0700 Subject: [PATCH 1/3] refactor(chainwatch): add new values to sector_info table --- cmd/lotus-chainwatch/processor/miner.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index 19fa3da95..6e4d40dec 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -88,6 +88,8 @@ create table if not exists sector_info verified_deal_weight text not null, initial_pledge text not null, + expected_day_reward text not null, + expected_storage_pledge text not null, constraint sector_info_pk primary key (miner_id, sector_id, sealed_cid) @@ -307,6 +309,7 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA } stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`) + if err != nil { return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err) } @@ -431,7 +434,7 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo return xerrors.Errorf("Failed to create temp table for sector_: %w", err) } - stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge) from STDIN`) + stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge, expected_day_reward, expected_storage_pledge) from STDIN`) if err != nil { return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) } @@ -463,6 +466,8 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo added.DealWeight.String(), added.VerifiedDealWeight.String(), added.InitialPledge.String(), + added.ExpectedDayReward.String(), + added.ExpectedStoragePledge.String(), ); err != nil { return err } From fecaeda382310f2625aec6ed529c4e705a7ab504 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 19 Aug 2020 12:55:53 -0700 Subject: [PATCH 2/3] feat(chainwatch): reward actor smoothing estimate --- cmd/lotus-chainwatch/processor/reward.go | 60 ++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index 3d5eaa7b4..5411cb68a 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -3,6 +3,7 @@ package processor import ( "bytes" "context" + "github.com/filecoin-project/specs-actors/actors/util/smoothing" "time" "golang.org/x/sync/errgroup" @@ -24,6 +25,8 @@ type rewardActorInfo struct { // base reward in attofil for each block found during this epoch baseBlockReward big.Int + + epochSmoothingEstimate *smoothing.FilterEstimate } func (p *Processor) setupRewards() error { @@ -53,6 +56,15 @@ create table if not exists chain_power primary key, baseline_power text not null ); + +create table if not exists reward_smoothing_estimates +( + state_root text not null + constraint reward_smoothing_estimates_pk + primary key, + position_estimate text not null, + velocity_estimate text not null +); `); err != nil { return err } @@ -103,6 +115,7 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip rw.baseBlockReward = rewardActorState.ThisEpochReward rw.baselinePower = rewardActorState.ThisEpochBaselinePower + rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed out = append(out, rw) } } @@ -162,6 +175,13 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct return nil }) + grp.Go(func() error { + if err := p.storeRewardSmoothingEstimates(rewards); err != nil { + return err + } + return nil + }) + return grp.Wait() } @@ -243,3 +263,43 @@ func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error { return nil } + +func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error { + tx, err := p.db.Begin() + if err != nil { + return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err) + } + + if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil { + return xerrors.Errorf("prep reward_smoothing_estimates: %w", err) + } + + stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) + if err != nil { + return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err) + } + + for _, rewardState := range rewards { + if _, err := stmt.Exec( + rewardState.common.stateroot.String(), + rewardState.epochSmoothingEstimate.PositionEstimate.String(), + rewardState.epochSmoothingEstimate.VelocityEstimate.String(), + ); err != nil { + return xerrors.Errorf("failed to store smoothing estimate: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err) + } + + if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil { + return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err) + } + + return nil +} From 05f1a23c949783d1b0cb299ab8e1b8ed817a6739 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 19 Aug 2020 13:34:58 -0700 Subject: [PATCH 3/3] feat(chainwatch): power actor smoothing estimate --- cmd/lotus-chainwatch/processor/power.go | 135 ++++++++++++++++++++ cmd/lotus-chainwatch/processor/processor.go | 12 ++ cmd/lotus-chainwatch/processor/reward.go | 4 +- 3 files changed, 149 insertions(+), 2 deletions(-) create mode 100644 cmd/lotus-chainwatch/processor/power.go diff --git a/cmd/lotus-chainwatch/processor/power.go b/cmd/lotus-chainwatch/processor/power.go new file mode 100644 index 000000000..daf17a884 --- /dev/null +++ b/cmd/lotus-chainwatch/processor/power.go @@ -0,0 +1,135 @@ +package processor + +import ( + "bytes" + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/util/smoothing" +) + +type powerActorInfo struct { + common actorInfo + + epochSmoothingEstimate *smoothing.FilterEstimate +} + +func (p *Processor) setupPower() error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` +create table if not exists power_smoothing_estimates +( + state_root text not null + constraint power_smoothing_estimates_pk + primary key, + position_estimate text not null, + velocity_estimate text not null +); +`); err != nil { + return err + } + + return tx.Commit() +} + +func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error { + powerChanges, err := p.processPowerActors(ctx, powerTips) + if err != nil { + return xerrors.Errorf("Failed to process power actors: %w", err) + } + + if err := p.persistPowerActors(ctx, powerChanges); err != nil { + return err + } + + return nil +} + +func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) { + start := time.Now() + defer func() { + log.Debugw("Processed Power Actors", "duration", time.Since(start).String()) + }() + + var out []powerActorInfo + for tipset, powers := range powerTips { + for _, act := range powers { + var pw powerActorInfo + pw.common = act + + powerActor, err := p.node.StateGetActor(ctx, builtin.StoragePowerActorAddr, tipset) + if err != nil { + return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err) + } + + powerStateRaw, err := p.node.ChainReadObj(ctx, powerActor.Head) + if err != nil { + return nil, xerrors.Errorf("read state obj (@ %s): %w", pw.common.stateroot.String(), err) + } + + var powerActorState power.State + if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerStateRaw)); err != nil { + return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err) + } + + pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed + out = append(out, pw) + } + } + + return out, nil +} + +func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error { + // NB: use errgroup when there is more than a single store operation + return p.storePowerSmoothingEstimates(powers) +} + +func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error { + tx, err := p.db.Begin() + if err != nil { + return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err) + } + + if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil { + return xerrors.Errorf("prep power_smoothing_estimates: %w", err) + } + + stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) + if err != nil { + return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err) + } + + for _, powerState := range powers { + if _, err := stmt.Exec( + powerState.common.stateroot.String(), + powerState.epochSmoothingEstimate.PositionEstimate.String(), + powerState.epochSmoothingEstimate.VelocityEstimate.String(), + ); err != nil { + return xerrors.Errorf("failed to store smoothing estimate: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err) + } + + if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil { + return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err) + } + + return nil + +} diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index 417cdd03b..2f70f1cb3 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -88,6 +88,10 @@ func (p *Processor) setupSchemas() error { return err } + if err := p.setupPower(); err != nil { + return err + } + return nil } @@ -166,6 +170,14 @@ func (p *Processor) Start(ctx context.Context) { return nil }) + grp.Go(func() error { + if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil { + return xerrors.Errorf("Failed to handle power actor changes: %w", err) + } + log.Info("Processes Power Changes") + return nil + }) + grp.Go(func() error { if err := p.HandleMessageChanges(ctx, toProcess); err != nil { return xerrors.Errorf("Failed to handle message changes: %w", err) diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index 5411cb68a..84aa1d990 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -3,7 +3,6 @@ package processor import ( "bytes" "context" - "github.com/filecoin-project/specs-actors/actors/util/smoothing" "time" "golang.org/x/sync/errgroup" @@ -12,6 +11,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/reward" + "github.com/filecoin-project/specs-actors/actors/util/smoothing" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" @@ -75,7 +75,7 @@ create table if not exists reward_smoothing_estimates func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error { rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds) if err != nil { - log.Fatalw("Failed to process reward actors", "error", err) + return xerrors.Errorf("Failed to process reward actors: %w", err) } if err := p.persistRewardActors(ctx, rewardChanges); err != nil {