Merge pull request #2329 from filecoin-project/feat/chainwatch/capture-power
chainwatch: Capture power from miner and reward actors
This commit is contained in:
commit
69125f15f9
@ -51,7 +51,7 @@ func main() {
|
|||||||
|
|
||||||
if err := app.Run(os.Args); err != nil {
|
if err := app.Run(os.Args); err != nil {
|
||||||
log.Warnf("%+v", err)
|
log.Warnf("%+v", err)
|
||||||
return
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,6 +294,30 @@ create table if not exists miner_info
|
|||||||
primary key (miner_id)
|
primary key (miner_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* captures chain-specific power state for any given stateroot
|
||||||
|
*/
|
||||||
|
create table if not exists chain_power
|
||||||
|
(
|
||||||
|
state_root text not null
|
||||||
|
constraint chain_power_pk
|
||||||
|
primary key,
|
||||||
|
baseline_power text not null
|
||||||
|
);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* captures miner-specific power state for any given stateroot
|
||||||
|
*/
|
||||||
|
create table if not exists miner_power
|
||||||
|
(
|
||||||
|
miner_id text not null,
|
||||||
|
state_root text not null,
|
||||||
|
raw_bytes_power text not null,
|
||||||
|
quality_adjusted_power text not null,
|
||||||
|
constraint miner_power_pk
|
||||||
|
primary key (miner_id, state_root)
|
||||||
|
);
|
||||||
|
|
||||||
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
|
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
|
||||||
create table if not exists miner_sectors_heads
|
create table if not exists miner_sectors_heads
|
||||||
(
|
(
|
||||||
@ -500,6 +524,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeChainPower captures reward actor state as it relates to power captured on-chain
|
||||||
|
func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error {
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin chain_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep chain_power temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy cp (state_root, baseline_power) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp chain_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rewardState := range rewardTips {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
rewardState.stateroot.String(),
|
||||||
|
rewardState.baselinePower.String(),
|
||||||
|
); err != nil {
|
||||||
|
log.Errorw("failed to store chain power", "state_root", rewardState.stateroot, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared chain_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert chain_power from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit chain_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type storeSectorsAPI interface {
|
type storeSectorsAPI interface {
|
||||||
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
|
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
|
||||||
}
|
}
|
||||||
@ -607,6 +671,50 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// storeMinerPower captures miner actor state as it relates to power per miner captured on-chain
|
||||||
|
func (st *storage) storeMinerPower(minerTips map[types.TipSetKey][]*minerStateInfo) error {
|
||||||
|
tx, err := st.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin miner_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil {
|
||||||
|
return xerrors.Errorf("prep miner_power temp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare tmp miner_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, miners := range minerTips {
|
||||||
|
for _, minerInfo := range miners {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
minerInfo.addr.String(),
|
||||||
|
minerInfo.stateroot.String(),
|
||||||
|
minerInfo.rawPower.String(),
|
||||||
|
minerInfo.qalPower.String(),
|
||||||
|
); err != nil {
|
||||||
|
log.Errorw("failed to store miner power", "miner", minerInfo.addr, "stateroot", minerInfo.stateroot, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("close prepared miner_power: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("insert miner_power from tmp: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("commit miner_power tx: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
|
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
|
||||||
tx, err := st.db.Begin()
|
tx, err := st.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -13,12 +13,14 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
|
||||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rewardStateInfo struct {
|
||||||
|
stateroot cid.Cid
|
||||||
|
baselinePower big.Int
|
||||||
|
}
|
||||||
|
|
||||||
type minerStateInfo struct {
|
type minerStateInfo struct {
|
||||||
// common
|
// common
|
||||||
addr address.Address
|
addr address.Address
|
||||||
@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// map of tipset to reward state
|
||||||
|
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
|
||||||
// map of tipset to all miners that had a head-change at that tipset.
|
// map of tipset to all miners that had a head-change at that tipset.
|
||||||
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
|
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
|
||||||
// heads we've seen, im being paranoid
|
// heads we've seen, im being paranoid
|
||||||
@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
alk.Unlock()
|
alk.Unlock()
|
||||||
})
|
})
|
||||||
|
|
||||||
log.Infof("Getting miner info")
|
log.Infof("Getting actor change info")
|
||||||
|
|
||||||
minerChanges := 0
|
minerChanges := 0
|
||||||
for addr, m := range actors {
|
for addr, m := range actors {
|
||||||
for actor, c := range m {
|
for actor, c := range m {
|
||||||
if actor.Code != builtin.StorageMinerActorCodeID {
|
// reward actor
|
||||||
|
if actor.Code == builtin.RewardActorCodeID {
|
||||||
|
rewardTips[c.tsKey] = &rewardStateInfo{
|
||||||
|
stateroot: c.stateroot,
|
||||||
|
baselinePower: big.Zero(),
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// only want miner actors with head change events
|
// miner actors with head change events
|
||||||
if _, found := headsSeen[actor.Head]; found {
|
if actor.Code == builtin.StorageMinerActorCodeID {
|
||||||
continue
|
if _, found := headsSeen[actor.Head]; found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
minerChanges++
|
||||||
|
|
||||||
|
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
|
||||||
|
addr: addr,
|
||||||
|
act: actor,
|
||||||
|
stateroot: c.stateroot,
|
||||||
|
|
||||||
|
tsKey: c.tsKey,
|
||||||
|
parentTsKey: c.parentTsKey,
|
||||||
|
|
||||||
|
state: miner.State{},
|
||||||
|
info: miner.MinerInfo{},
|
||||||
|
|
||||||
|
rawPower: big.Zero(),
|
||||||
|
qalPower: big.Zero(),
|
||||||
|
})
|
||||||
|
|
||||||
|
headsSeen[actor.Head] = struct{}{}
|
||||||
}
|
}
|
||||||
minerChanges++
|
continue
|
||||||
|
|
||||||
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
|
|
||||||
addr: addr,
|
|
||||||
act: actor,
|
|
||||||
stateroot: c.stateroot,
|
|
||||||
|
|
||||||
tsKey: c.tsKey,
|
|
||||||
parentTsKey: c.parentTsKey,
|
|
||||||
|
|
||||||
state: miner.State{},
|
|
||||||
info: miner.MinerInfo{},
|
|
||||||
|
|
||||||
rawPower: big.Zero(),
|
|
||||||
qalPower: big.Zero(),
|
|
||||||
})
|
|
||||||
|
|
||||||
headsSeen[actor.Head] = struct{}{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rewardProcessingStartedAt := time.Now()
|
||||||
|
parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) {
|
||||||
|
tsKey, rewardInfo := it()
|
||||||
|
// get reward actor states at each tipset once for all updates
|
||||||
|
rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var rewardActorState reward.State
|
||||||
|
if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
|
||||||
|
log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rewardInfo.baselinePower = rewardActorState.BaselinePower
|
||||||
|
})
|
||||||
|
log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips))
|
||||||
|
|
||||||
minerProcessingStartedAt := time.Now()
|
minerProcessingStartedAt := time.Now()
|
||||||
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
|
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
|
||||||
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
|
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
|
||||||
@ -411,25 +454,35 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Storing actors")
|
log.Info("Storing actors")
|
||||||
|
|
||||||
if err := st.storeActors(actors); err != nil {
|
if err := st.storeActors(actors); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chainPowerStartedAt := time.Now()
|
||||||
|
if err := st.storeChainPower(rewardTips); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String())
|
||||||
|
|
||||||
log.Info("Storing miners")
|
log.Info("Storing miners")
|
||||||
if err := st.storeMiners(minerTips); err != nil {
|
if err := st.storeMiners(minerTips); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Storing miner sectors")
|
minerPowerStartedAt := time.Now()
|
||||||
|
if err := st.storeMinerPower(minerTips); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
log.Infow("Stored miner power", "duration", time.Since(minerPowerStartedAt).String())
|
||||||
|
|
||||||
sectorStart := time.Now()
|
sectorStart := time.Now()
|
||||||
if err := st.storeSectors(minerTips, api); err != nil {
|
if err := st.storeSectors(minerTips, api); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
|
log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String())
|
||||||
|
|
||||||
log.Info("Storing miner sectors heads")
|
log.Info("Storing miner sectors heads")
|
||||||
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
|
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user