feat(chainwatch): Capture baseline power in chain_power

This commit is contained in:
Mike Greenberg 2020-07-08 18:30:24 -04:00
parent 24d8a84ad7
commit b9effac437
2 changed files with 126 additions and 28 deletions

View File

@ -294,6 +294,17 @@ create table if not exists miner_info
primary key (miner_id)
);
/*
* captures chain-specific power state for any given stateroot
*/
create table if not exists chain_power
(
stateroot text not null
constraint chain_power_pk
primary key,
baseline_power text not null
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
@ -500,6 +511,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
return nil
}
// storeChainPower captures reward actor state as it relates to power captured on-chain
func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error {
tx, err := st.db.Begin()
if err != nil {
return xerrors.Errorf("begin chain_power tx: %w", err)
}
if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep chain_power temp: %w", err)
}
stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp chain_power: %w", err)
}
for _, rewardState := range rewardTips {
if _, err := stmt.Exec(
rewardState.stateroot.String(),
rewardState.baselinePower.String(),
); err != nil {
return xerrors.Errorf("exec prepared chain_power: %w", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared chain_power: %w", err)
}
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert chain_power from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit chain_power tx: %w", err)
}
return nil
}
type storeSectorsAPI interface {
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
}

View File

@ -13,12 +13,14 @@ import (
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api"
@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
}()
}
type rewardStateInfo struct {
stateroot cid.Cid
baselinePower big.Int
}
type minerStateInfo struct {
// common
addr address.Address
@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
})
// map of tipset to reward state
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
// map of tipset to all miners that had a head-change at that tipset.
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
// heads we've seen, im being paranoid
@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
alk.Unlock()
})
log.Infof("Getting miner info")
log.Infof("Getting actor change info")
minerChanges := 0
for addr, m := range actors {
for actor, c := range m {
// reward actor
if actor.Code != builtin.RewardActorCodeID {
rewardTips[c.tsKey] = &rewardStateInfo{
stateroot: c.stateroot,
baselinePower: big.Zero(),
}
continue
}
// miner actors with head change events
if actor.Code != builtin.StorageMinerActorCodeID {
continue
if _, found := headsSeen[actor.Head]; found {
continue
}
minerChanges++
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
addr: addr,
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: miner.State{},
info: miner.MinerInfo{},
rawPower: big.Zero(),
qalPower: big.Zero(),
})
headsSeen[actor.Head] = struct{}{}
}
// only want miner actors with head change events
if _, found := headsSeen[actor.Head]; found {
continue
}
minerChanges++
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
addr: addr,
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: miner.State{},
info: miner.MinerInfo{},
rawPower: big.Zero(),
qalPower: big.Zero(),
})
headsSeen[actor.Head] = struct{}{}
continue
}
}
rewardProcessingStartedAt := time.Now()
parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) {
tsKey, rewardInfo := it()
// get reward actor states at each tipset once for all updates
rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
if err != nil {
log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head)
if err != nil {
log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
var rewardActorState reward.State
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err))
return
}
rewardInfo.baselinePower = rewardActorState.BaselinePower
})
log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips))
minerProcessingStartedAt := time.Now()
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
@ -411,25 +454,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
log.Info("Storing actors")
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
chainPowerStartedAt := time.Now()
if err := st.storeChainPower(rewardTips); err != nil {
log.Error(err)
}
log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String())
log.Info("Storing miners")
if err := st.storeMiners(minerTips); err != nil {
log.Error(err)
return
}
log.Info("Storing miner sectors")
sectorStart := time.Now()
if err := st.storeSectors(minerTips, api); err != nil {
log.Error(err)
return
}
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String())
log.Info("Storing miner sectors heads")
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {