diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go index 3d5eaa7b4..5411cb68a 100644 --- a/cmd/lotus-chainwatch/processor/reward.go +++ b/cmd/lotus-chainwatch/processor/reward.go @@ -3,6 +3,7 @@ package processor import ( "bytes" "context" + "github.com/filecoin-project/specs-actors/actors/util/smoothing" "time" "golang.org/x/sync/errgroup" @@ -24,6 +25,8 @@ type rewardActorInfo struct { // base reward in attofil for each block found during this epoch baseBlockReward big.Int + + epochSmoothingEstimate *smoothing.FilterEstimate } func (p *Processor) setupRewards() error { @@ -53,6 +56,15 @@ create table if not exists chain_power primary key, baseline_power text not null ); + +create table if not exists reward_smoothing_estimates +( + state_root text not null + constraint reward_smoothing_estimates_pk + primary key, + position_estimate text not null, + velocity_estimate text not null +); `); err != nil { return err } @@ -103,6 +115,7 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip rw.baseBlockReward = rewardActorState.ThisEpochReward rw.baselinePower = rewardActorState.ThisEpochBaselinePower + rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed out = append(out, rw) } } @@ -162,6 +175,13 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct return nil }) + grp.Go(func() error { + if err := p.storeRewardSmoothingEstimates(rewards); err != nil { + return err + } + return nil + }) + return grp.Wait() } @@ -243,3 +263,43 @@ func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error { return nil } + +func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error { + tx, err := p.db.Begin() + if err != nil { + return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err) + } + + if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil { + return xerrors.Errorf("prep reward_smoothing_estimates: %w", err) + } + + stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) + if err != nil { + return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err) + } + + for _, rewardState := range rewards { + if _, err := stmt.Exec( + rewardState.common.stateroot.String(), + rewardState.epochSmoothingEstimate.PositionEstimate.String(), + rewardState.epochSmoothingEstimate.VelocityEstimate.String(), + ); err != nil { + return xerrors.Errorf("failed to store smoothing estimate: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err) + } + + if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil { + return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err) + } + + return nil +}