Merge pull request #2140 from filecoin-project/chainwatch/sector-info-schema

feat: define miner sector schema
This commit is contained in:
Łukasz Magiera 2020-07-01 11:02:57 +02:00 committed by GitHub
commit abdafc8ee4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 516 additions and 115 deletions

View File

@ -18,6 +18,9 @@ var log = logging.Logger("chainwatch")
func main() { func main() {
_ = logging.SetLogLevel("*", "INFO") _ = logging.SetLogLevel("*", "INFO")
if err := logging.SetLogLevel("rpc", "error"); err != nil {
panic(err)
}
log.Info("Starting chainwatch") log.Info("Starting chainwatch")

View File

@ -46,7 +46,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
msgs[v.Message.Message.Cid()] = &v.Message.Message msgs[v.Message.Message.Cid()] = &v.Message.Message
} }
log.Infof("Processing %d mpool updates", len(msgs)) log.Debugf("Processing %d mpool updates", len(msgs))
err := st.storeMessages(msgs) err := st.storeMessages(msgs)
if err != nil { if err != nil {

View File

@ -1,11 +1,17 @@
package main package main
import ( import (
"context"
"database/sql" "database/sql"
"fmt"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer"
"sync" "sync"
"time" "time"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -18,6 +24,9 @@ type storage struct {
db *sql.DB db *sql.DB
headerLk sync.Mutex headerLk sync.Mutex
// stateful miner data
minerSectors map[cid.Cid]struct{}
} }
func openStorage(dbSource string) (*storage, error) { func openStorage(dbSource string) (*storage, error) {
@ -28,7 +37,10 @@ func openStorage(dbSource string) (*storage, error) {
db.SetMaxOpenConns(1350) db.SetMaxOpenConns(1350)
st := &storage{db: db} ms := make(map[cid.Cid]struct{})
ms[cid.Undef] = struct{}{}
st := &storage{db: db, minerSectors: ms}
return st, st.setup() return st, st.setup()
} }
@ -252,31 +264,56 @@ create table if not exists receipts
create index if not exists receipts_msg_state_index create index if not exists receipts_msg_state_index
on receipts (msg, state); on receipts (msg, state);
/*
create table if not exists miner_heads create table if not exists miner_sectors
( (
head text not null, miner_id text not null,
addr text not null, sector_id bigint not null,
stateroot text not null,
sectorset text not null, activation_epoch bigint not null,
setsize decimal not null, expiration_epoch bigint not null,
provingset text not null, termination_epoch bigint,
provingsize decimal not null,
owner text not null, deal_weight text not null,
worker text not null, verified_deal_weight text not null,
peerid text not null, seal_cid text not null,
sectorsize bigint not null, seal_rand_epoch bigint not null,
power decimal not null, constraint miner_sectors_pk
active bool, primary key (miner_id, sector_id)
ppe bigint not null,
slashed_at bigint not null,
constraint miner_heads_pk
primary key (head, addr)
); );
create index if not exists miner_heads_stateroot_index create index if not exists miner_sectors_miner_sectorid_index
on miner_heads (stateroot); on miner_sectors (miner_id, sector_id);
create table if not exists miner_info
(
miner_id text not null,
owner_addr text not null,
worker_addr text not null,
peer_id text,
sector_size text not null,
precommit_deposits text not null,
locked_funds text not null,
next_deadline_process_faults bigint not null,
constraint miner_info_pk
primary key (miner_id)
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
miner_id text not null,
miner_sectors_cid text not null,
state_root text not null,
constraint miner_sectors_heads_pk
primary key (miner_id,miner_sectors_cid)
);
/*
create or replace function miner_tips(epoch bigint) create or replace function miner_tips(epoch bigint)
returns table (head text, returns table (head text,
addr text, addr text,
@ -456,54 +493,261 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI
return nil return nil
} }
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { type storeSectorsAPI interface {
/*tx, err := st.db.Begin() StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error)
}
func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*minerStateInfo, sectorApi storeSectorsAPI) error {
tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil {
create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`) stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
for k, i := range miners {
for tipset, miners := range minerTips {
for _, miner := range miners {
sectors, err := sectorApi.StateMinerSectors(context.TODO(), miner.addr, nil, true, tipset)
if err != nil {
log.Debugw("Failed to load sectors", "tipset", tipset.String(), "miner", miner.addr.String(), "error", err)
}
for _, sector := range sectors {
if _, err := stmt.Exec( if _, err := stmt.Exec(
k.act.Head.String(), miner.addr.String(),
k.addr.String(), uint64(sector.ID),
k.stateroot.String(), int64(sector.Info.ActivationEpoch),
i.state.Sectors.String(), int64(sector.Info.Info.Expiration),
fmt.Sprint(i.ssize), sector.Info.DealWeight.String(),
i.state.ProvingSet.String(), sector.Info.VerifiedDealWeight.String(),
fmt.Sprint(i.psize), sector.Info.Info.SealedCID.String(),
i.info.Owner.String(), int64(sector.Info.Info.SealRandEpoch),
i.info.Worker.String(),
i.info.PeerId.String(),
i.info.SectorSize,
i.power.String(), // TODO: SPA
i.state.PoStState.ProvingPeriodStart,
); err != nil { ); err != nil {
return err return err
} }
} }
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`)
if err != nil {
return err
}
for ts, miners := range minerTips {
for _, miner := range miners {
var pid string
if len(miner.info.PeerId) != 0 {
peerid, err := peer.IDFromBytes(miner.info.PeerId)
if err != nil {
// this should "never happen", but if it does we should still store info about the miner.
log.Warnw("failed to decode peerID", "peerID (bytes)", miner.info.PeerId, "miner", miner.addr, "tipset", ts.String())
} else {
pid = peerid.String()
}
}
if _, err := stmt.Exec(
miner.addr.String(),
miner.info.Owner.String(),
miner.info.Worker.String(),
pid,
miner.info.SectorSize.ShortString(),
miner.state.PreCommitDeposits.String(),
miner.state.LockedFunds.String(),
miner.state.NextDeadlineToProcessFaults,
); err != nil {
log.Errorw("failed to store miner state", "state", miner.state, "info", miner.info, "error", err)
return err
}
}
}
if err := stmt.Close(); err != nil { if err := stmt.Close(); err != nil {
return err return err
} }
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }
return tx.Commit()*/ return tx.Commit()
}
type minerSectorUpdate struct {
minerState *minerStateInfo
tskey types.TipSetKey
oldSector cid.Cid
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`)
if err != nil {
return err
}
var updateMiners []*minerSectorUpdate
for tsk, miners := range minerTips {
for _, miner := range miners {
sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr)
if err != nil {
panic(err)
}
if sectorCID == cid.Undef {
continue
}
if _, found := st.minerSectors[sectorCID]; !found {
// schedule miner table update
updateMiners = append(updateMiners, &minerSectorUpdate{
minerState: miner,
tskey: tsk,
oldSector: sectorCID,
})
}
st.minerSectors[sectorCID] = struct{}{}
log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String())
if _, err := stmt.Exec(
miner.addr.String(),
miner.state.Sectors.String(),
miner.stateroot.String(),
); err != nil {
log.Errorw("failed to store miners sectors head", "state", miner.state, "info", miner.info, "error", err)
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}
return st.updateMinerSectors(updateMiners, api)
}
type deletedSector struct {
deletedSector miner_spec.SectorOnChainInfo
miner address.Address
tskey types.TipSetKey
}
func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error {
log.Info("updating miners constant sector table")
var deletedSectors []*deletedSector
for _, miner := range miners {
s := &apiIpldStore{context.TODO(), api}
newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors)
if err != nil {
log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors)
return err
}
oldSectors, err := adt.AsArray(s, miner.oldSector)
if err != nil {
log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String())
return err
}
var oldSecInfo miner_spec.SectorOnChainInfo
var newSecInfo miner_spec.SectorOnChainInfo
// if we cannot find an old sector in the new list then it was removed.
if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error {
found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo)
if err != nil {
log.Warnw("new sectors get", "error", err)
return err
}
if !found {
log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String())
deletedSectors = append(deletedSectors, &deletedSector{
deletedSector: oldSecInfo,
miner: miner.minerState.addr,
tskey: miner.tskey,
})
}
return nil return nil
}); err != nil {
log.Warnw("old sectors foreach", "error", err)
return err
}
if len(deletedSectors) > 0 {
log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors))
}
}
// now we have all the sectors that were removed, update the database
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`)
if err != nil {
return err
}
for _, ds := range deletedSectors {
ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey)
if err != nil {
log.Warnw("get tipset", "error", err)
return err
}
// TODO validate this shits right
if ts.Height() >= ds.deletedSector.Info.Expiration {
// means it expired, do nothing
log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
continue
}
log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
// means it was terminated.
if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
return err
}
defer log.Info("update miner sectors complete")
return tx.Commit()
} }
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
@ -1008,3 +1252,27 @@ func (st *storage) refreshViews() error {
func (st *storage) close() error { func (st *storage) close() error {
return st.db.Close() return st.db.Close()
} }
func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) {
queryStr := fmt.Sprintf(`
SELECT miner_sectors_cid
FROM miner_sectors_heads
LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot
WHERE miner_id = '%s'
ORDER BY blocks.height DESC
LIMIT 1;
`,
miner.String())
var cidstr string
err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr)
switch {
case err == sql.ErrNoRows:
log.Warnf("no miner with miner_id: %s in table", miner)
return cid.Undef, nil
case err != nil:
return cid.Undef, err
default:
return cid.Decode(cidstr)
}
}

View File

@ -5,17 +5,21 @@ import (
"container/list" "container/list"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"math" "math"
"sync" "sync"
"time"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/ipfs/go-cid" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
@ -49,18 +53,19 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int)
}() }()
} }
type minerKey struct { type minerStateInfo struct {
// common
addr address.Address addr address.Address
act types.Actor act types.Actor
stateroot cid.Cid stateroot cid.Cid
tsKey types.TipSetKey
}
type minerInfo struct { // miner specific
state miner.State state miner.State
info miner.MinerInfo info miner.MinerInfo
power big.Int // tracked by power actor
rawPower big.Int
qalPower big.Int
ssize uint64 ssize uint64
psize uint64 psize uint64
} }
@ -80,25 +85,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infof("Getting headers / actors") log.Infof("Getting headers / actors")
// global list of all blocks that need to be synced
allToSync := map[cid.Cid]*types.BlockHeader{} allToSync := map[cid.Cid]*types.BlockHeader{}
// a stack
toVisit := list.New() toVisit := list.New()
for _, header := range headTs.Blocks() { for _, header := range headTs.Blocks() {
toVisit.PushBack(header) toVisit.PushBack(header)
} }
// TODO consider making a db query to check where syncing left off at in the case of a restart and avoid reprocessing
// those entries, or write value to file on shutdown
// walk the entire chain starting from headTS
for toVisit.Len() > 0 { for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
_, has := hazlist[bh.Cid()] _, has := hazlist[bh.Cid()]
if _, seen := allToSync[bh.Cid()]; seen || has { if _, seen := allToSync[bh.Cid()]; seen || has {
continue continue
} }
allToSync[bh.Cid()] = bh allToSync[bh.Cid()] = bh
if len(allToSync)%500 == 10 { if len(allToSync)%500 == 10 {
log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) log.Debugf("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height)
} }
if len(bh.Parents) == 0 { if len(bh.Parents) == 0 {
@ -116,17 +124,25 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
} }
// Main worker loop, this loop runs until all tipse from headTS to genesis have been processed.
for len(allToSync) > 0 { for len(allToSync) > 0 {
// first map is addresses -> common actors states (head, code, balance, nonce)
// second map common actor states -> chain state (tipset, stateroot) & unique actor state (deserialization of their head CID) represented as json.
actors := map[address.Address]map[types.Actor]actorInfo{} actors := map[address.Address]map[types.Actor]actorInfo{}
// map of actor public key address to ID address
addressToID := map[address.Address]address.Address{} addressToID := map[address.Address]address.Address{}
minH := abi.ChainEpoch(math.MaxInt64) minH := abi.ChainEpoch(math.MaxInt64)
// find the blockheader with the lowest height
for _, header := range allToSync { for _, header := range allToSync {
if header.Height < minH { if header.Height < minH {
minH = header.Height minH = header.Height
} }
} }
// toSync maps block cids to their headers and contains all block headers that will be synced in this batch
// `maxBatch` is a tunable parameter to control how many blocks we sync per iteration.
toSync := map[cid.Cid]*types.BlockHeader{} toSync := map[cid.Cid]*types.BlockHeader{}
for c, header := range allToSync { for c, header := range allToSync {
if header.Height < minH+abi.ChainEpoch(maxBatch) { if header.Height < minH+abi.ChainEpoch(maxBatch) {
@ -134,12 +150,16 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
addressToID[header.Miner] = address.Undef addressToID[header.Miner] = address.Undef
} }
} }
// remove everything we are syncing this round from the global list of blocks to sync
for c := range toSync { for c := range toSync {
delete(allToSync, c) delete(allToSync, c)
} }
log.Infof("Syncing %d blocks", len(toSync)) log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
// map of addresses to changed actors
var changes map[string]types.Actor
// collect all actor state that has changes between block headers
paDone := 0 paDone := 0
parmap.Par(50, parmap.MapArr(toSync), func(bh *types.BlockHeader) { parmap.Par(50, parmap.MapArr(toSync), func(bh *types.BlockHeader) {
paDone++ paDone++
@ -155,6 +175,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return return
} }
// TODO suspicious there is not a lot to be gained by doing this in parallel since the genesis state
// is unlikely to contain a lot of actors, why not for loop here?
parmap.Par(50, aadrs, func(addr address.Address) { parmap.Par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, genesisTs.Key()) act, err := api.StateGetActor(ctx, addr, genesisTs.Key())
if err != nil { if err != nil {
@ -196,12 +218,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return return
} }
changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) // TODO Does this return actors that have been deleted between states?
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
changes, err = api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
// record the state of all actors that have changed
for a, act := range changes { for a, act := range changes {
act := act act := act
@ -229,6 +254,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
if !ok { if !ok {
actors[addr] = map[types.Actor]actorInfo{} actors[addr] = map[types.Actor]actorInfo{}
} }
// a change occurred for the actor with address `addr` and state `act` at tipset `pts`.
actors[addr][act] = actorInfo{ actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot, stateroot: bh.ParentStateRoot,
state: string(state), state: string(state),
@ -239,6 +265,11 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
}) })
// map of tipset to all miners that had a head-change at that tipset.
minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes))
// heads we've seen, im being paranoid
headsSeen := make(map[cid.Cid]struct{}, len(actors))
log.Infof("Getting messages") log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync) msgs, incls := fetchMessages(ctx, api, toSync)
@ -265,57 +296,90 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infof("Getting miner info") log.Infof("Getting miner info")
miners := map[minerKey]*minerInfo{} minerChanges := 0
for addr, m := range actors { for addr, m := range actors {
for actor, c := range m { for actor, c := range m {
if actor.Code != builtin.StorageMinerActorCodeID { if actor.Code != builtin.StorageMinerActorCodeID {
continue continue
} }
miners[minerKey{ // only want miner actors with head change events
if _, found := headsSeen[actor.Head]; found {
continue
}
minerChanges++
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
addr: addr, addr: addr,
act: actor, act: actor,
stateroot: c.stateroot, stateroot: c.stateroot,
tsKey: c.tsKey,
}] = &minerInfo{} state: miner.State{},
info: miner.MinerInfo{},
rawPower: big.Zero(),
qalPower: big.Zero(),
})
headsSeen[actor.Head] = struct{}{}
} }
} }
parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) { minerProcessingStartedAt := time.Now()
k, info := it() log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges)
// extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their
// claims from the power actor state. This ensures we only fetch the power actors state once for each tipset.
parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*minerStateInfo)) {
tsKey, minerInfo := it()
// TODO: get the storage power actors state and and pull the miner power from there, currently this hits the // get the power actors claims map
// storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset mp, err := getPowerActorClaimsMap(ctx, api, tsKey)
// and reading each miner power from the result.
pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
// Not sure why this would fail, but its probably worth continuing return
}
// Get miner raw and quality power
for _, mi := range minerInfo {
var claim power.Claim
// get miner claim from power actors claim map and store if found, else the miner had no claim at
// this tipset
found, err := mp.Get(adt.AddrKey(mi.addr), &claim)
if err != nil {
log.Error(err)
}
if found {
mi.qalPower = claim.QualityAdjPower
mi.rawPower = claim.RawBytePower
} }
info.power = pow.MinerPower.QualityAdjPower
// Get the miner state info
astb, err := api.ChainReadObj(ctx, mi.act.Head)
if err != nil {
log.Error(err)
return
}
if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
log.Error(err)
return
}
mi.info = mi.state.Info
}
// TODO Get the Sector Count
// FIXME this is returning a lot of "address not found" errors, which is strange given that StateChangedActors
// retruns all actors that had a state change at tipset `k.tsKey`, maybe its returning deleted miners too??
/*
sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey)
if err != nil { if err != nil {
log.Error(err) info.psize = 0
return info.ssize = 0
} } else {
info.psize = sszs.Pset info.psize = sszs.Pset
info.ssize = sszs.Sset info.ssize = sszs.Sset
astb, err := api.ChainReadObj(ctx, k.act.Head)
if err != nil {
log.Error(err)
return
} }
*/
if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
log.Error(err)
return
}
info.info = info.state.Info
}) })
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges)
log.Info("Getting receipts") log.Info("Getting receipts")
@ -343,8 +407,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
} }
log.Info("Storing miners") log.Info("Storing miners")
if err := st.storeMiners(minerTips); err != nil {
log.Error(err)
return
}
if err := st.storeMiners(miners); err != nil { log.Info("Storing miner sectors")
sectorStart := time.Now()
if err := st.storeSectors(minerTips, api); err != nil {
log.Error(err)
return
}
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
log.Info("Storing miner sectors heads")
if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
log.Error(err) log.Error(err)
return return
} }
@ -465,3 +542,55 @@ func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.C
return out return out
} }
// load the power actor state clam as an adt.Map at the tipset `ts`.
func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) {
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts)
if err != nil {
return nil, err
}
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return nil, err
}
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err)
}
s := &apiIpldStore{ctx, api}
return adt.AsMap(s, powerActorState.Claims)
}
// require for AMT and HAMT access
// TODO extract this to a common location in lotus and reuse the code
type apiIpldStore struct {
ctx context.Context
api api.FullNode
}
func (ht *apiIpldStore) Context() context.Context {
return ht.ctx
}
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil {
return err
}
cu, ok := out.(cbg.CBORUnmarshaler)
if ok {
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
return err
}
return nil
}
return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out)
}
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
}

View File

@ -15,6 +15,15 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-amt-ipld/v2" "github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
samsig "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/beacon"
@ -27,14 +36,6 @@ import (
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
samsig "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
) )
type StateAPI struct { type StateAPI struct {