Merge pull request #3275 from filecoin-project/fix/chainwatch/sync

fix(chainwatch): use height to determine unsynced blocks and fix deadlock in sector deal table
This commit is contained in:
Łukasz Magiera 2020-08-28 12:16:00 +02:00 committed by GitHub
commit 932ab61c2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 245 additions and 304 deletions

View File

@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
log.Fatalw("Failed to persist market actors", "error", err) log.Fatalw("Failed to persist market actors", "error", err)
} }
// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
close(p.sectorDealEvents)
return err
}
if err := p.updateMarket(ctx, marketChanges); err != nil { if err := p.updateMarket(ctx, marketChanges); err != nil {
log.Fatalw("Failed to update market actors", "error", err) log.Fatalw("Failed to update market actors", "error", err)
} }
@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip
} }
func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}
stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}
for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}
if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil
}
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
start := time.Now() start := time.Now()
defer func() { defer func() {

View File

@ -3,7 +3,6 @@ package processor
import ( import (
"context" "context"
"sync" "sync"
"time"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -120,10 +119,6 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c
} }
func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error { func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Receipts", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin() tx, err := p.db.Begin()
if err != nil { if err != nil {
return err return err
@ -164,10 +159,6 @@ create temp table recs (like receipts excluding constraints) on commit drop;
} }
func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin() tx, err := p.db.Begin()
if err != nil { if err != nil {
return err return err
@ -206,10 +197,6 @@ create temp table mi (like block_messages excluding constraints) on commit drop;
} }
func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error { func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Messages", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin() tx, err := p.db.Begin()
if err != nil { if err != nil {
return err return err

View File

@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
preCommitEvents := make(chan *MinerSectorsEvent, 8) preCommitEvents := make(chan *MinerSectorsEvent, 8)
sectorEvents := make(chan *MinerSectorsEvent, 8) sectorEvents := make(chan *MinerSectorsEvent, 8)
partitionEvents := make(chan *MinerSectorsEvent, 8) partitionEvents := make(chan *MinerSectorsEvent, 8)
p.sectorDealEvents = make(chan *SectorDealEvent, 8) dealEvents := make(chan *SectorDealEvent, 8)
grp.Go(func() error {
return p.storePreCommitDealInfo(dealEvents)
})
grp.Go(func() error { grp.Go(func() error {
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
grp.Go(func() error { grp.Go(func() error {
defer func() { defer func() {
close(preCommitEvents) close(preCommitEvents)
close(p.sectorDealEvents) close(dealEvents)
}() }()
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents) return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
}) })
grp.Go(func() error { grp.Go(func() error {
@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
return tx.Commit() return tx.Commit()
} }
func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
}
stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
if err != nil {
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
}
for sde := range dealEvents {
for _, did := range sde.DealIDs {
if _, err := stmt.Exec(
uint64(did),
sde.MinerID.String(),
sde.SectorID,
); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
}
if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
}
return nil
}
func (p *Processor) storeMinersPower(miners []minerActorInfo) error { func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now() start := time.Now()
defer func() { defer func() {

View File

@ -47,8 +47,6 @@ func (p *Processor) subMpool(ctx context.Context) {
msgs[v.Message.Message.Cid()] = &v.Message.Message msgs[v.Message.Message.Cid()] = &v.Message.Message
} }
log.Debugf("Processing %d mpool updates", len(msgs))
err := p.storeMessages(msgs) err := p.storeMessages(msgs)
if err != nil { if err != nil {
log.Error(err) log.Error(err)

View File

@ -7,6 +7,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/smoothing" "github.com/filecoin-project/specs-actors/actors/util/smoothing"
@ -15,7 +16,19 @@ import (
type powerActorInfo struct { type powerActorInfo struct {
common actorInfo common actorInfo
epochSmoothingEstimate *smoothing.FilterEstimate totalRawBytes big.Int
totalRawBytesCommitted big.Int
totalQualityAdjustedBytes big.Int
totalQualityAdjustedBytesCommitted big.Int
totalPledgeCollateral big.Int
newRawBytes big.Int
newQualityAdjustedBytes big.Int
newPledgeCollateral big.Int
newQAPowerSmoothed *smoothing.FilterEstimate
minerCount int64
minerCountAboveMinimumPower int64
} }
func (p *Processor) setupPower() error { func (p *Processor) setupPower() error {
@ -25,13 +38,27 @@ func (p *Processor) setupPower() error {
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create table if not exists power_smoothing_estimates create table if not exists chain_power
( (
state_root text not null state_root text not null
constraint power_smoothing_estimates_pk constraint power_smoothing_estimates_pk
primary key, primary key,
position_estimate text not null,
velocity_estimate text not null new_raw_bytes_power text not null,
new_qa_bytes_power text not null,
new_pledge_collateral text not null,
total_raw_bytes_power text not null,
total_raw_bytes_committed text not null,
total_qa_bytes_power text not null,
total_qa_bytes_committed text not null,
total_pledge_collateral text not null,
qa_smoothed_position_estimate text not null,
qa_smoothed_velocity_estimate text not null,
miner_count int not null,
minimum_consensus_miner_count int not null
); );
`); err != nil { `); err != nil {
return err return err
@ -60,8 +87,8 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
}() }()
var out []powerActorInfo var out []powerActorInfo
for tipset, powers := range powerTips { for tipset, powerStates := range powerTips {
for _, act := range powers { for _, act := range powerStates {
var pw powerActorInfo var pw powerActorInfo
pw.common = act pw.common = act
@ -80,7 +107,19 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err) return nil, xerrors.Errorf("unmarshal state (@ %s): %w", pw.common.stateroot.String(), err)
} }
pw.epochSmoothingEstimate = powerActorState.ThisEpochQAPowerSmoothed pw.totalRawBytes = powerActorState.TotalRawBytePower
pw.totalRawBytesCommitted = powerActorState.TotalBytesCommitted
pw.totalQualityAdjustedBytes = powerActorState.TotalQualityAdjPower
pw.totalQualityAdjustedBytesCommitted = powerActorState.TotalQABytesCommitted
pw.totalPledgeCollateral = powerActorState.TotalPledgeCollateral
pw.newRawBytes = powerActorState.ThisEpochRawBytePower
pw.newQualityAdjustedBytes = powerActorState.ThisEpochQualityAdjPower
pw.newPledgeCollateral = powerActorState.ThisEpochPledgeCollateral
pw.newQAPowerSmoothed = powerActorState.ThisEpochQAPowerSmoothed
pw.minerCount = powerActorState.MinerCount
pw.minerCountAboveMinimumPower = powerActorState.MinerAboveMinPowerCount
out = append(out, pw) out = append(out, pw)
} }
} }
@ -88,46 +127,59 @@ func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips)
return out, nil return out, nil
} }
func (p *Processor) persistPowerActors(ctx context.Context, powers []powerActorInfo) error { func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error {
// NB: use errgroup when there is more than a single store operation // NB: use errgroup when there is more than a single store operation
return p.storePowerSmoothingEstimates(powers) return p.storePowerSmoothingEstimates(powerStates)
} }
func (p *Processor) storePowerSmoothingEstimates(powers []powerActorInfo) error { func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error {
tx, err := p.db.Begin() tx, err := p.db.Begin()
if err != nil { if err != nil {
return xerrors.Errorf("begin power_smoothing_estimates tx: %w", err) return xerrors.Errorf("begin chain_power tx: %w", err)
} }
if _, err := tx.Exec(`create temp table rse (like power_smoothing_estimates) on commit drop`); err != nil { if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil {
return xerrors.Errorf("prep power_smoothing_estimates: %w", err) return xerrors.Errorf("prep chain_power: %w", err)
} }
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`) stmt, err := tx.Prepare(`copy cp (state_root, new_raw_bytes_power, new_qa_bytes_power, new_pledge_collateral, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`)
if err != nil { if err != nil {
return xerrors.Errorf("prepare tmp power_smoothing_estimates: %w", err) return xerrors.Errorf("prepare tmp chain_power: %w", err)
} }
for _, powerState := range powers { for _, ps := range powerStates {
if _, err := stmt.Exec( if _, err := stmt.Exec(
powerState.common.stateroot.String(), ps.common.stateroot.String(),
powerState.epochSmoothingEstimate.PositionEstimate.String(), ps.newRawBytes.String(),
powerState.epochSmoothingEstimate.VelocityEstimate.String(), ps.newQualityAdjustedBytes.String(),
ps.newPledgeCollateral.String(),
ps.totalRawBytes.String(),
ps.totalRawBytesCommitted.String(),
ps.totalQualityAdjustedBytes.String(),
ps.totalQualityAdjustedBytesCommitted.String(),
ps.totalPledgeCollateral.String(),
ps.newQAPowerSmoothed.PositionEstimate.String(),
ps.newQAPowerSmoothed.VelocityEstimate.String(),
ps.minerCount,
ps.minerCountAboveMinimumPower,
); err != nil { ); err != nil {
return xerrors.Errorf("failed to store smoothing estimate: %w", err) return xerrors.Errorf("failed to store smoothing estimate: %w", err)
} }
} }
if err := stmt.Close(); err != nil { if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared power_smoothing_estimates: %w", err) return xerrors.Errorf("close prepared chain_power: %w", err)
} }
if _, err := tx.Exec(`insert into power_smoothing_estimates select * from rse on conflict do nothing`); err != nil { if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert power_smoothing_estimates from tmp: %w", err) return xerrors.Errorf("insert chain_power from tmp: %w", err)
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit power_smoothing_estimates tx: %w", err) return xerrors.Errorf("commit chain_power tx: %w", err)
} }
return nil return nil

View File

@ -35,9 +35,6 @@ type Processor struct {
// number of blocks processed at a time // number of blocks processed at a time
batch int batch int
// process communication channels
sectorDealEvents chan *SectorDealEvent
} }
type ActorTips map[types.TipSetKey][]actorInfo type ActorTips map[types.TipSetKey][]actorInfo
@ -152,7 +149,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle market changes: %w", err) log.Errorf("Failed to handle market changes: %w", err)
return return
} }
log.Info("Processed Market Changes")
}() }()
grp.Add(1) grp.Add(1)
@ -162,7 +158,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle miner changes: %w", err) log.Errorf("Failed to handle miner changes: %w", err)
return return
} }
log.Info("Processed Miner Changes")
}() }()
grp.Add(1) grp.Add(1)
@ -172,7 +167,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle reward changes: %w", err) log.Errorf("Failed to handle reward changes: %w", err)
return return
} }
log.Info("Processed Reward Changes")
}() }()
grp.Add(1) grp.Add(1)
@ -182,7 +176,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle power actor changes: %w", err) log.Errorf("Failed to handle power actor changes: %w", err)
return return
} }
log.Info("Processes Power Changes")
}() }()
grp.Add(1) grp.Add(1)
@ -192,7 +185,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle message changes: %w", err) log.Errorf("Failed to handle message changes: %w", err)
return return
} }
log.Info("Processed Message Changes")
}() }()
grp.Add(1) grp.Add(1)
@ -202,7 +194,6 @@ func (p *Processor) Start(ctx context.Context) {
log.Errorf("Failed to handle common actor changes: %w", err) log.Errorf("Failed to handle common actor changes: %w", err)
return return
} }
log.Info("Processed CommonActor Changes")
}() }()
grp.Wait() grp.Wait()
@ -214,7 +205,7 @@ func (p *Processor) Start(ctx context.Context) {
if err := p.refreshViews(); err != nil { if err := p.refreshViews(); err != nil {
log.Errorw("Failed to refresh views", "error", err) log.Errorw("Failed to refresh views", "error", err)
} }
log.Infow("Processed Batch", "duration", time.Since(loopStart).String()) log.Infow("Processed Batch Complete", "duration", time.Since(loopStart).String())
} }
} }
}() }()

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"time" "time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
@ -13,20 +12,23 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/util/smoothing" "github.com/filecoin-project/specs-actors/actors/util/smoothing"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
type rewardActorInfo struct { type rewardActorInfo struct {
common actorInfo common actorInfo
// expected power in bytes during this epoch cumSumBaselinePower big.Int
baselinePower big.Int cumSumRealizedPower big.Int
// base reward in attofil for each block found during this epoch effectiveNetworkTime int64
baseBlockReward big.Int effectiveBaselinePower big.Int
epochSmoothingEstimate *smoothing.FilterEstimate newBaselinePower big.Int
newBaseReward big.Int
newSmoothingEstimate *smoothing.FilterEstimate
totalMinedReward big.Int
} }
func (p *Processor) setupRewards() error { func (p *Processor) setupRewards() error {
@ -36,34 +38,23 @@ func (p *Processor) setupRewards() error {
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
/*
* captures base block reward per miner per state root and does not
* include penalties or gas reward
*/
create table if not exists base_block_rewards
(
state_root text not null
constraint block_rewards_pk
primary key,
base_block_reward numeric not null
);
/* captures chain-specific power state for any given stateroot */ /* captures chain-specific power state for any given stateroot */
create table if not exists chain_power create table if not exists chain_reward
( (
state_root text not null state_root text not null
constraint chain_power_pk constraint chain_reward_pk
primary key, primary key,
baseline_power text not null cum_sum_baseline text not null,
); cum_sum_realized text not null,
effective_network_time int not null,
effective_baseline_power text not null,
create table if not exists reward_smoothing_estimates new_baseline_power text not null,
( new_reward numeric not null,
state_root text not null new_reward_smoothed_position_estimate text not null,
constraint reward_smoothing_estimates_pk new_reward_smoothed_velocity_estimate text not null,
primary key,
position_estimate text not null, total_mined_reward text not null
velocity_estimate text not null
); );
`); err != nil { `); err != nil {
return err return err
@ -113,9 +104,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
return nil, xerrors.Errorf("unmarshal state (@ %s): %w", rw.common.stateroot.String(), err) return nil, xerrors.Errorf("unmarshal state (@ %s): %w", rw.common.stateroot.String(), err)
} }
rw.baseBlockReward = rewardActorState.ThisEpochReward rw.cumSumBaselinePower = rewardActorState.CumsumBaseline
rw.baselinePower = rewardActorState.ThisEpochBaselinePower rw.cumSumRealizedPower = rewardActorState.CumsumRealized
rw.epochSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime)
rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower
rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower
rw.newBaseReward = rewardActorState.ThisEpochReward
rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed
rw.totalMinedReward = rewardActorState.TotalMined
out = append(out, rw) out = append(out, rw)
} }
} }
@ -145,8 +141,14 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
return nil, err return nil, err
} }
rw.baseBlockReward = rewardActorState.ThisEpochReward rw.cumSumBaselinePower = rewardActorState.CumsumBaseline
rw.baselinePower = rewardActorState.ThisEpochBaselinePower rw.cumSumRealizedPower = rewardActorState.CumsumRealized
rw.effectiveNetworkTime = int64(rewardActorState.EffectiveNetworkTime)
rw.effectiveBaselinePower = rewardActorState.EffectiveBaselinePower
rw.newBaselinePower = rewardActorState.ThisEpochBaselinePower
rw.newBaseReward = rewardActorState.ThisEpochReward
rw.newSmoothingEstimate = rewardActorState.ThisEpochRewardSmoothed
rw.totalMinedReward = rewardActorState.TotalMined
out = append(out, rw) out = append(out, rw)
} }
@ -159,149 +161,47 @@ func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardAct
log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String()) log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String())
}() }()
grp, ctx := errgroup.WithContext(ctx) //nolint
grp.Go(func() error {
if err := p.storeChainPower(rewards); err != nil {
return err
}
return nil
})
grp.Go(func() error {
if err := p.storeBaseBlockReward(rewards); err != nil {
return err
}
return nil
})
grp.Go(func() error {
if err := p.storeRewardSmoothingEstimates(rewards); err != nil {
return err
}
return nil
})
return grp.Wait()
}
func (p *Processor) storeChainPower(rewards []rewardActorInfo) error {
tx, err := p.db.Begin() tx, err := p.db.Begin()
if err != nil { if err != nil {
return xerrors.Errorf("begin chain_power tx: %w", err) return xerrors.Errorf("begin chain_reward tx: %w", err)
} }
if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_power temp: %w", err) return xerrors.Errorf("prep chain_reward temp: %w", err)
} }
stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`) stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`)
if err != nil { if err != nil {
return xerrors.Errorf("prepare tmp chain_power: %w", err) return xerrors.Errorf("prepare tmp chain_reward: %w", err)
} }
for _, rewardState := range rewards { for _, rewardState := range rewards {
if _, err := stmt.Exec( if _, err := stmt.Exec(
rewardState.common.stateroot.String(), rewardState.common.stateroot.String(),
rewardState.baselinePower.String(), rewardState.cumSumBaselinePower.String(),
rewardState.cumSumRealizedPower.String(),
rewardState.effectiveNetworkTime,
rewardState.effectiveBaselinePower.String(),
rewardState.newBaselinePower.String(),
rewardState.newBaseReward.String(),
rewardState.newSmoothingEstimate.PositionEstimate.String(),
rewardState.newSmoothingEstimate.VelocityEstimate.String(),
rewardState.totalMinedReward.String(),
); err != nil { ); err != nil {
log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err) log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err)
} }
} }
if err := stmt.Close(); err != nil { if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared chain_power: %w", err) return xerrors.Errorf("close prepared chain_reward: %w", err)
} }
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_power from tmp: %w", err) return xerrors.Errorf("insert chain_reward from tmp: %w", err)
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit chain_power tx: %w", err) return xerrors.Errorf("commit chain_reward tx: %w", err)
}
return nil
}
func (p *Processor) storeBaseBlockReward(rewards []rewardActorInfo) error {
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin base_block_reward tx: %w", err)
}
if _, err := tx.Exec(`create temp table bbr (like base_block_rewards excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep base_block_reward temp: %w", err)
}
stmt, err := tx.Prepare(`copy bbr (state_root, base_block_reward) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp base_block_reward: %w", err)
}
for _, rewardState := range rewards {
baseBlockReward := big.Div(rewardState.baseBlockReward, big.NewIntUnsigned(build.BlocksPerEpoch))
if _, err := stmt.Exec(
rewardState.common.stateroot.String(),
baseBlockReward.String(),
); err != nil {
log.Errorw("failed to store base block reward", "state_root", rewardState.common.stateroot, "error", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared base_block_reward: %w", err)
}
if _, err := tx.Exec(`insert into base_block_rewards select * from bbr on conflict do nothing`); err != nil {
return xerrors.Errorf("insert base_block_reward from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit base_block_reward tx: %w", err)
}
return nil
}
func (p *Processor) storeRewardSmoothingEstimates(rewards []rewardActorInfo) error {
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin reward_smoothing_estimates tx: %w", err)
}
if _, err := tx.Exec(`create temp table rse (like reward_smoothing_estimates) on commit drop`); err != nil {
return xerrors.Errorf("prep reward_smoothing_estimates: %w", err)
}
stmt, err := tx.Prepare(`copy rse (state_root, position_estimate, velocity_estimate) from stdin;`)
if err != nil {
return xerrors.Errorf("prepare tmp reward_smoothing_estimates: %w", err)
}
for _, rewardState := range rewards {
if rewardState.epochSmoothingEstimate == nil {
continue
}
if _, err := stmt.Exec(
rewardState.common.stateroot.String(),
rewardState.epochSmoothingEstimate.PositionEstimate.String(),
rewardState.epochSmoothingEstimate.VelocityEstimate.String(),
); err != nil {
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared reward_smoothing_estimates: %w", err)
}
if _, err := tx.Exec(`insert into reward_smoothing_estimates select * from rse on conflict do nothing`); err != nil {
return xerrors.Errorf("insert reward_smoothing_estimates from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit reward_smoothing_estimates tx: %w", err)
} }
return nil return nil

View File

@ -90,7 +90,7 @@ var runCmd = &cli.Command{
} }
db.SetMaxOpenConns(1350) db.SetMaxOpenConns(1350)
sync := syncer.NewSyncer(db, api) sync := syncer.NewSyncer(db, api, 1400)
sync.Start(ctx) sync.Start(ctx)
proc := processor.NewProcessor(ctx, db, api, maxBatch) proc := processor.NewProcessor(ctx, db, api, maxBatch)

View File

@ -3,7 +3,6 @@ package scheduler
import ( import (
"context" "context"
"database/sql" "database/sql"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -24,9 +23,9 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error {
with total_rewards_by_miner as ( with total_rewards_by_miner as (
select select
b.miner, b.miner,
sum(bbr.base_block_reward * b.win_count) as total_reward sum(cr.new_reward * b.win_count) as total_reward
from blocks b from blocks b
inner join base_block_rewards bbr on b.parentstateroot = bbr.state_root inner join chain_reward cr on b.parentstateroot = cr.state_root
group by 1 group by 1
) select ) select
rank() over (order by total_reward desc), rank() over (order by total_reward desc),
@ -43,17 +42,17 @@ func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error {
b."timestamp"as current_timestamp, b."timestamp"as current_timestamp,
max(b.height) as current_height max(b.height) as current_height
from blocks b from blocks b
join base_block_rewards bbr on b.parentstateroot = bbr.state_root join chain_reward cr on b.parentstateroot = cr.state_root
where bbr.base_block_reward is not null where cr.new_reward is not null
group by 1 group by 1
order by 1 desc order by 1 desc
limit 1; limit 1;
`); err != nil { `); err != nil {
return xerrors.Errorf("create top_miner_by_base_reward views: %w", err) return xerrors.Errorf("create top_miners_by_base_reward views: %w", err)
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return xerrors.Errorf("committing top_miner_by_base_reward views; %w", err) return xerrors.Errorf("committing top_miners_by_base_reward views; %w", err)
} }
return nil return nil
} }
@ -65,11 +64,6 @@ func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
default: default:
} }
t := time.Now()
defer func() {
log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String())
}()
_, err := db.Exec("refresh materialized view top_miners_by_base_reward;") _, err := db.Exec("refresh materialized view top_miners_by_base_reward;")
if err != nil { if err != nil {
return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err) return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err)

View File

@ -40,7 +40,7 @@ func (s *Scheduler) Start(ctx context.Context) {
go func() { go func() {
// run once on start after schema has initialized // run once on start after schema has initialized
time.Sleep(5 * time.Second) time.Sleep(1 * time.Minute)
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
log.Errorw("failed to refresh top miner", "error", err) log.Errorw("failed to refresh top miner", "error", err)
} }

View File

@ -23,14 +23,17 @@ var log = logging.Logger("syncer")
type Syncer struct { type Syncer struct {
db *sql.DB db *sql.DB
lookbackLimit uint64
headerLk sync.Mutex headerLk sync.Mutex
node api.FullNode node api.FullNode
} }
func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer {
return &Syncer{ return &Syncer{
db: db, db: db,
node: node, node: node,
lookbackLimit: lookbackLimit,
} }
} }
@ -148,40 +151,34 @@ create index if not exists state_heights_parentstateroot_index
} }
func (s *Syncer) Start(ctx context.Context) { func (s *Syncer) Start(ctx context.Context) {
if err := logging.SetLogLevel("syncer", "info"); err != nil {
log.Fatal(err)
}
log.Debug("Starting Syncer") log.Debug("Starting Syncer")
if err := s.setupSchemas(); err != nil { if err := s.setupSchemas(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
// doing the initial sync here lets us avoid the HCCurrent case in the switch
head, err := s.node.ChainHead(ctx)
if err != nil {
log.Fatalw("Failed to get chain head form lotus", "error", err)
}
unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0))
if err != nil {
log.Fatalw("failed to gather unsynced blocks", "error", err)
}
if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
log.Fatalw("failed to store unsynced blocks", "error", err)
}
// continue to keep the block headers table up to date. // continue to keep the block headers table up to date.
notifs, err := s.node.ChainNotify(ctx) notifs, err := s.node.ChainNotify(ctx)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
lastSynced := time.Now() // we need to ensure that on a restart we don't reprocess the whole flarping chain
blkCID, height, err := s.mostRecentlySyncedBlockHeight()
if err != nil {
log.Fatalw("failed to find most recently synced block", "error", err)
}
log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height)
sinceEpoch := uint64(height)
go func() { go func() {
for notif := range notifs { for notif := range notifs {
for _, change := range notif { for _, change := range notif {
switch change.Type { switch change.Type {
case store.HCApply: case store.HCApply:
unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch)
if err != nil { if err != nil {
log.Errorw("failed to gather unsynced blocks", "error", err) log.Errorw("failed to gather unsynced blocks", "error", err)
} }
@ -194,13 +191,13 @@ func (s *Syncer) Start(ctx context.Context) {
continue continue
} }
if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
// so this is pretty bad, need some kind of retry.. // so this is pretty bad, need some kind of retry..
// for now just log an error and the blocks will be attempted again on next notifi // for now just log an error and the blocks will be attempted again on next notifi
log.Errorw("failed to store unsynced blocks", "error", err) log.Errorw("failed to store unsynced blocks", "error", err)
} }
lastSynced = time.Now() sinceEpoch = uint64(change.Val.Height())
case store.HCRevert: case store.HCRevert:
log.Debug("revert todo") log.Debug("revert todo")
} }
@ -209,12 +206,8 @@ func (s *Syncer) Start(ctx context.Context) {
}() }()
} }
func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) {
// get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire hasList, err := s.syncedBlocks(since, s.lookbackLimit)
// table every time.
lookback := since.Add(-(time.Minute * 3))
log.Debugw("Gathering unsynced blocks", "since", lookback.String())
hasList, err := s.syncedBlocks(lookback)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -257,9 +250,8 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t
return toSync, nil return toSync, nil
} }
func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) {
// timestamp is used to return a configurable amount of rows based on when they were last added. rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit)
rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix())
if err != nil { if err != nil {
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
} }
@ -281,14 +273,43 @@ func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error)
return out, nil return out, nil
} }
func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) {
rw := s.db.QueryRow(`
select blocks_synced.cid, b.height
from blocks_synced
left join blocks b on blocks_synced.cid = b.cid
where processed_at is not null
order by height desc
limit 1
`)
var c string
var h int64
if err := rw.Scan(&c, &h); err != nil {
if err == sql.ErrNoRows {
return cid.Undef, 0, nil
}
return cid.Undef, -1, err
}
ci, err := cid.Parse(c)
if err != nil {
return cid.Undef, -1, err
}
return ci, h, nil
}
func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error {
supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key()) supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key())
if err != nil { if err != nil {
return err return err
} }
ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil)` + ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) ` +
`values ('%s', '%s', '%s', '%s', '%s', '%s');` `values ('%s', '%s', '%s', '%s', '%s', '%s') on conflict on constraint chain_economics_pk do ` +
`update set (circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) = ('%[2]s', '%[3]s', '%[4]s', '%[5]s', '%[6]s') ` +
`where chain_economics.parent_state_root = '%[1]s';`
if _, err := s.db.Exec(fmt.Sprintf(ceInsert, if _, err := s.db.Exec(fmt.Sprintf(ceInsert,
tipset.ParentState().String(), tipset.ParentState().String(),