feat: define miner sector schema
- define a miner sector schema to store miner sector details at each tipset the miner experiences a state change. This solution stores redundant data since a miner state change (head cid changes) does not necessarily indicate its sectors changes. - makes progress towards sentinel/issues/10
This commit is contained in:
parent
06dfe3c5ae
commit
2c0a4914cf
@ -46,7 +46,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
|
||||
msgs[v.Message.Message.Cid()] = &v.Message.Message
|
||||
}
|
||||
|
||||
log.Infof("Processing %d mpool updates", len(msgs))
|
||||
log.Debugf("Processing %d mpool updates", len(msgs))
|
||||
|
||||
err := st.storeMessages(msgs)
|
||||
if err != nil {
|
||||
|
@ -1,7 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -252,6 +254,24 @@ create table if not exists receipts
|
||||
|
||||
create index if not exists receipts_msg_state_index
|
||||
on receipts (msg, state);
|
||||
|
||||
create table if not exists miner_sectors
|
||||
(
|
||||
stateroot text not null,
|
||||
miner text not null,
|
||||
sectorid bigint not null,
|
||||
activation bigint not null,
|
||||
dealweight bigint not null,
|
||||
verifieddealweight bigint not null,
|
||||
expiration bigint not null,
|
||||
sealcid text not null,
|
||||
sealrandepoch bigint not null,
|
||||
constraint miner_sectors_pk
|
||||
primary key (stateroot, miner, sectorid)
|
||||
);
|
||||
|
||||
create index if not exists miner_sectors_state_index
|
||||
on miner_sectors (stateroot, miner, sectorid);
|
||||
/*
|
||||
create table if not exists miner_heads
|
||||
(
|
||||
@ -456,6 +476,61 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
|
||||
return nil
|
||||
}
|
||||
|
||||
type storeSectorsAPI interface {
|
||||
StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
|
||||
}
|
||||
|
||||
func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, sectorApi storeSectorsAPI) error {
|
||||
tx, err := st.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy ms (stateroot, miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for tipset, miners := range minerTips {
|
||||
for _, miner := range miners {
|
||||
sectors, err := sectorApi.StateMinerSectors(context.TODO(), miner.addr, nil, true, tipset)
|
||||
if err != nil {
|
||||
log.Debugw("Failed to load sectors", "tipset", tipset.String(), "miner", miner.addr.String(), "error", err)
|
||||
}
|
||||
|
||||
for _, sector := range sectors {
|
||||
if _, err := stmt.Exec(
|
||||
miner.stateroot.String(),
|
||||
miner.addr.String(),
|
||||
uint64(sector.ID),
|
||||
int64(sector.Info.ActivationEpoch),
|
||||
sector.Info.DealWeight.Uint64(),
|
||||
sector.Info.VerifiedDealWeight.Uint64(),
|
||||
int64(sector.Info.Info.Expiration),
|
||||
sector.Info.Info.SealedCID.String(),
|
||||
int64(sector.Info.Info.SealRandEpoch),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
|
||||
/*tx, err := st.db.Begin()
|
||||
if err != nil {
|
||||
|
@ -5,11 +5,15 @@ import (
|
||||
"container/list"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"fmt"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/power"
|
||||
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
@ -60,9 +64,27 @@ type minerInfo struct {
|
||||
state miner.State
|
||||
info miner.MinerInfo
|
||||
|
||||
power big.Int
|
||||
ssize uint64
|
||||
psize uint64
|
||||
rawPower big.Int
|
||||
qalPower big.Int
|
||||
ssize uint64
|
||||
psize uint64
|
||||
}
|
||||
|
||||
type newMinerInfo struct {
|
||||
// common
|
||||
addr address.Address
|
||||
act types.Actor
|
||||
stateroot cid.Cid
|
||||
|
||||
// miner specific
|
||||
state miner.State
|
||||
info miner.MinerInfo
|
||||
|
||||
// tracked by power actor
|
||||
rawPower big.Int
|
||||
qalPower big.Int
|
||||
ssize uint64
|
||||
psize uint64
|
||||
}
|
||||
|
||||
type actorInfo struct {
|
||||
@ -80,25 +102,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
|
||||
log.Infof("Getting headers / actors")
|
||||
|
||||
// global list of all blocks that need to be synced
|
||||
allToSync := map[cid.Cid]*types.BlockHeader{}
|
||||
// a stack
|
||||
toVisit := list.New()
|
||||
|
||||
for _, header := range headTs.Blocks() {
|
||||
toVisit.PushBack(header)
|
||||
}
|
||||
|
||||
// TODO consider making a db query to check where syncing left off at in the case of a restart and avoid reprocessing
|
||||
// those entries, or write value to file on shutdown
|
||||
// walk the entire chain starting from headTS
|
||||
for toVisit.Len() > 0 {
|
||||
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
|
||||
|
||||
_, has := hazlist[bh.Cid()]
|
||||
if _, seen := allToSync[bh.Cid()]; seen || has {
|
||||
continue
|
||||
}
|
||||
|
||||
allToSync[bh.Cid()] = bh
|
||||
|
||||
if len(allToSync)%500 == 10 {
|
||||
log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
|
||||
log.Infof("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
|
||||
}
|
||||
|
||||
if len(bh.Parents) == 0 {
|
||||
@ -116,17 +141,25 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
}
|
||||
}
|
||||
|
||||
// Main worker loop, this loop runs until all tipse from headTS to genesis have been processed.
|
||||
for len(allToSync) > 0 {
|
||||
// first map is addresses -> common actors states (head, code, balance, nonce)
|
||||
// second map common actor states -> chain state (tipset, stateroot) & unique actor state (deserialization of their head CID) represented as json.
|
||||
actors := map[address.Address]map[types.Actor]actorInfo{}
|
||||
|
||||
// map of actor public key address to ID address
|
||||
addressToID := map[address.Address]address.Address{}
|
||||
minH := abi.ChainEpoch(math.MaxInt64)
|
||||
|
||||
// find the blockheader with the lowest height
|
||||
for _, header := range allToSync {
|
||||
if header.Height < minH {
|
||||
minH = header.Height
|
||||
}
|
||||
}
|
||||
|
||||
// toSync maps block cids to their headers and contains all block headers that will be synced in this batch
|
||||
// `maxBatch` is a tunable parameter to control how many blocks we sync per iteration.
|
||||
toSync := map[cid.Cid]*types.BlockHeader{}
|
||||
for c, header := range allToSync {
|
||||
if header.Height < minH+abi.ChainEpoch(maxBatch) {
|
||||
@ -134,12 +167,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
addressToID[header.Miner] = address.Undef
|
||||
}
|
||||
}
|
||||
// remove everything we are syncing this round from the global list of blocks to sync
|
||||
for c := range toSync {
|
||||
delete(allToSync, c)
|
||||
}
|
||||
|
||||
log.Infof("Syncing %d blocks", len(toSync))
|
||||
log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
|
||||
|
||||
// collect all actor state that has changes between block headers
|
||||
paDone := 0
|
||||
parmap.Par(50, parmap.MapArr(toSync), func(bh *types.BlockHeader) {
|
||||
paDone++
|
||||
@ -155,6 +190,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
return
|
||||
}
|
||||
|
||||
// TODO suspicious there is not a lot to be gained by doing this in parallel since the genesis state
|
||||
// is unlikely to contain a lot of actors, why not for loop here?
|
||||
parmap.Par(50, aadrs, func(addr address.Address) {
|
||||
act, err := api.StateGetActor(ctx, addr, genesisTs.Key())
|
||||
if err != nil {
|
||||
@ -195,12 +232,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
return
|
||||
}
|
||||
|
||||
// TODO Does this return actors that have been deleted between states?
|
||||
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
|
||||
changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
// record the state of all actors that have changed
|
||||
for a, act := range changes {
|
||||
act := act
|
||||
|
||||
@ -227,12 +267,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
if !ok {
|
||||
actors[addr] = map[types.Actor]actorInfo{}
|
||||
}
|
||||
// a change occurred for the actor with address `addr` and state `act` at tipset `pts`.
|
||||
actors[addr][act] = actorInfo{
|
||||
stateroot: bh.ParentStateRoot,
|
||||
state: string(state),
|
||||
tsKey: pts.Key(),
|
||||
}
|
||||
addressToID[addr] = address.Undef
|
||||
//
|
||||
alk.Unlock()
|
||||
}
|
||||
})
|
||||
@ -263,57 +305,97 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
|
||||
log.Infof("Getting miner info")
|
||||
|
||||
miners := map[minerKey]*minerInfo{}
|
||||
// map of tipset to all miners that had a head-change at that tipset.
|
||||
minerTips := map[types.TipSetKey][]*newMinerInfo{}
|
||||
// heads we've seen, im being paranoid
|
||||
headsSeen := map[cid.Cid]bool{}
|
||||
|
||||
minerChanges := 0
|
||||
for addr, m := range actors {
|
||||
for actor, c := range m {
|
||||
if actor.Code != builtin.StorageMinerActorCodeID {
|
||||
continue
|
||||
}
|
||||
|
||||
miners[minerKey{
|
||||
// only want miner actors with head change events
|
||||
if headsSeen[actor.Head] {
|
||||
continue
|
||||
}
|
||||
minerChanges++
|
||||
|
||||
minerTips[c.tsKey] = append(minerTips[c.tsKey], &newMinerInfo{
|
||||
addr: addr,
|
||||
act: actor,
|
||||
stateroot: c.stateroot,
|
||||
tsKey: c.tsKey,
|
||||
}] = &minerInfo{}
|
||||
|
||||
state: miner.State{},
|
||||
info: miner.MinerInfo{},
|
||||
|
||||
rawPower: big.Zero(),
|
||||
qalPower: big.Zero(),
|
||||
ssize: 0,
|
||||
psize: 0,
|
||||
})
|
||||
|
||||
headsSeen[actor.Head] = true
|
||||
}
|
||||
}
|
||||
|
||||
parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) {
|
||||
k, info := it()
|
||||
minerProcessingState := time.Now()
|
||||
log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
|
||||
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
|
||||
// claims from the power actor state. This ensures we only fetch the power actors state once for each tipset.
|
||||
parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*newMinerInfo)) {
|
||||
tsKey, minerInfo := it()
|
||||
|
||||
// TODO: get the storage power actors state and and pull the miner power from there, currently this hits the
|
||||
// storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset
|
||||
// and reading each miner power from the result.
|
||||
pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
// Not sure why this would fail, but its probably worth continuing
|
||||
}
|
||||
info.power = pow.MinerPower.QualityAdjPower
|
||||
|
||||
sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey)
|
||||
// get the power actors claims map
|
||||
mp, err := getPowerActorClaimsMap(ctx, api, tsKey)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
info.psize = sszs.Pset
|
||||
info.ssize = sszs.Sset
|
||||
// Get miner raw and quality power
|
||||
for _, mi := range minerInfo {
|
||||
var claim power.Claim
|
||||
// get miner claim from power actors claim map and store if found, else the miner had no claim at
|
||||
// this tipset
|
||||
found, err := mp.Get(adt.AddrKey(mi.addr), &claim)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
if found {
|
||||
mi.qalPower = claim.QualityAdjPower
|
||||
mi.rawPower = claim.RawBytePower
|
||||
}
|
||||
|
||||
astb, err := api.ChainReadObj(ctx, k.act.Head)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
// Get the miner state info
|
||||
astb, err := api.ChainReadObj(ctx, mi.act.Head)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
mi.info = mi.state.Info
|
||||
}
|
||||
|
||||
if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
info.info = info.state.Info
|
||||
// TODO Get the Sector Count
|
||||
// FIXME this is returning a lot of "address not found" errors, which is strange given that StateChangedActors
|
||||
// retruns all actors that had a state change at tipset `k.tsKey`, maybe its returning deleted miners too??
|
||||
/*
|
||||
sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey)
|
||||
if err != nil {
|
||||
info.psize = 0
|
||||
info.ssize = 0
|
||||
} else {
|
||||
info.psize = sszs.Pset
|
||||
info.ssize = sszs.Sset
|
||||
}
|
||||
*/
|
||||
})
|
||||
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingState).String(), "processed", minerChanges)
|
||||
|
||||
log.Info("Getting receipts")
|
||||
|
||||
@ -340,12 +422,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Storing miners")
|
||||
// TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too.
|
||||
/*
|
||||
log.Info("Storing miners")
|
||||
|
||||
if err := st.storeMiners(miners); err != nil {
|
||||
if err := st.storeMiners(miners); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
log.Info("Storing miner sectors")
|
||||
|
||||
sectorStart := time.Now()
|
||||
if err := st.storeSectors(minerTips, api); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
|
||||
|
||||
log.Infof("Storing messages")
|
||||
|
||||
@ -463,3 +557,56 @@ func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.C
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
// load the power actor state clam as an adt.Map at the tipset `ts`.
|
||||
func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) {
|
||||
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var powerActorState power.State
|
||||
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err)
|
||||
}
|
||||
|
||||
s := &apiIpldStore{ctx, api}
|
||||
return adt.AsMap(s, powerActorState.Claims)
|
||||
}
|
||||
|
||||
// require for AMT and HAMT access
|
||||
// TODO extract this to a common locaiton in lotus and reuse the code
|
||||
type apiIpldStore struct {
|
||||
ctx context.Context
|
||||
api api.FullNode
|
||||
}
|
||||
|
||||
func (ht *apiIpldStore) Context() context.Context {
|
||||
return ht.ctx
|
||||
}
|
||||
|
||||
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
|
||||
raw, err := ht.api.ChainReadObj(ctx, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cu, ok := out.(cbg.CBORUnmarshaler)
|
||||
if ok {
|
||||
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("Object does not implement CBORUnmarshaler")
|
||||
}
|
||||
|
||||
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
|
||||
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user