diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index 704c4d457..b5ceb7348 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -18,6 +18,9 @@ var log = logging.Logger("chainwatch") func main() { _ = logging.SetLogLevel("*", "INFO") + if err := logging.SetLogLevel("rpc", "error"); err != nil { + panic(err) + } log.Info("Starting chainwatch") diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index ea45380b7..74ffa8771 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -46,7 +46,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { msgs[v.Message.Message.Cid()] = &v.Message.Message } - log.Infof("Processing %d mpool updates", len(msgs)) + log.Debugf("Processing %d mpool updates", len(msgs)) err := st.storeMessages(msgs) if err != nil { diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index f7f80a9c6..e68c586b5 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -1,11 +1,17 @@ package main import ( + "context" "database/sql" + "fmt" + "github.com/filecoin-project/specs-actors/actors/util/adt" + "github.com/libp2p/go-libp2p-core/peer" "sync" "time" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" + miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/ipfs/go-cid" _ "github.com/lib/pq" "golang.org/x/xerrors" @@ -18,6 +24,9 @@ type storage struct { db *sql.DB headerLk sync.Mutex + + // stateful miner data + minerSectors map[cid.Cid]struct{} } func openStorage(dbSource string) (*storage, error) { @@ -28,7 +37,10 @@ func openStorage(dbSource string) (*storage, error) { db.SetMaxOpenConns(1350) - st := &storage{db: db} + ms := make(map[cid.Cid]struct{}) + ms[cid.Undef] = struct{}{} + + st := &storage{db: db, minerSectors: ms} return st, st.setup() } @@ -252,31 +264,56 @@ create table if not exists receipts create index if not exists receipts_msg_state_index on receipts (msg, state); -/* -create table if not exists miner_heads + +create table if not exists miner_sectors ( - head text not null, - addr text not null, - stateroot text not null, - sectorset text not null, - setsize decimal not null, - provingset text not null, - provingsize decimal not null, - owner text not null, - worker text not null, - peerid text not null, - sectorsize bigint not null, - power decimal not null, - active bool, - ppe bigint not null, - slashed_at bigint not null, - constraint miner_heads_pk - primary key (head, addr) + miner_id text not null, + sector_id bigint not null, + + activation_epoch bigint not null, + expiration_epoch bigint not null, + termination_epoch bigint, + + deal_weight text not null, + verified_deal_weight text not null, + seal_cid text not null, + seal_rand_epoch bigint not null, + constraint miner_sectors_pk + primary key (miner_id, sector_id) ); -create index if not exists miner_heads_stateroot_index - on miner_heads (stateroot); +create index if not exists miner_sectors_miner_sectorid_index + on miner_sectors (miner_id, sector_id); +create table if not exists miner_info +( + miner_id text not null, + owner_addr text not null, + worker_addr text not null, + peer_id text, + sector_size text not null, + + precommit_deposits text not null, + locked_funds text not null, + next_deadline_process_faults bigint not null, + constraint miner_info_pk + primary key (miner_id) +); + +/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ +create table if not exists miner_sectors_heads +( + miner_id text not null, + miner_sectors_cid text not null, + + state_root text not null, + + constraint miner_sectors_heads_pk + primary key (miner_id,miner_sectors_cid) + +); + +/* create or replace function miner_tips(epoch bigint) returns table (head text, addr text, @@ -456,54 +493,261 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } -func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { - /*tx, err := st.db.Begin() - if err != nil { - return err - } +type storeSectorsAPI interface { + StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) +} - if _, err := tx.Exec(` +func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*minerStateInfo, sectorApi storeSectorsAPI) error { + tx, err := st.db.Begin() + if err != nil { + return err + } - create temp table mh (like miner_heads excluding constraints) on commit drop; + if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`) + if err != nil { + return err + } - `); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } + for tipset, miners := range minerTips { + for _, miner := range miners { + sectors, err := sectorApi.StateMinerSectors(context.TODO(), miner.addr, nil, true, tipset) + if err != nil { + log.Debugw("Failed to load sectors", "tipset", tipset.String(), "miner", miner.addr.String(), "error", err) + } - stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`) - if err != nil { - return err - } - for k, i := range miners { - if _, err := stmt.Exec( - k.act.Head.String(), - k.addr.String(), - k.stateroot.String(), - i.state.Sectors.String(), - fmt.Sprint(i.ssize), - i.state.ProvingSet.String(), - fmt.Sprint(i.psize), - i.info.Owner.String(), - i.info.Worker.String(), - i.info.PeerId.String(), - i.info.SectorSize, - i.power.String(), // TODO: SPA - i.state.PoStState.ProvingPeriodStart, - ); err != nil { - return err + for _, sector := range sectors { + if _, err := stmt.Exec( + miner.addr.String(), + uint64(sector.ID), + int64(sector.Info.ActivationEpoch), + int64(sector.Info.Info.Expiration), + sector.Info.DealWeight.String(), + sector.Info.VerifiedDealWeight.String(), + sector.Info.Info.SealedCID.String(), + int64(sector.Info.Info.SealRandEpoch), + ); err != nil { + return err + } } } - if err := stmt.Close(); err != nil { + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`) + if err != nil { + return err + } + for ts, miners := range minerTips { + for _, miner := range miners { + var pid string + if len(miner.info.PeerId) != 0 { + peerid, err := peer.IDFromBytes(miner.info.PeerId) + if err != nil { + // this should "never happen", but if it does we should still store info about the miner. + log.Warnw("failed to decode peerID", "peerID (bytes)", miner.info.PeerId, "miner", miner.addr, "tipset", ts.String()) + } else { + pid = peerid.String() + } + } + if _, err := stmt.Exec( + miner.addr.String(), + miner.info.Owner.String(), + miner.info.Worker.String(), + pid, + miner.info.SectorSize.ShortString(), + miner.state.PreCommitDeposits.String(), + miner.state.LockedFunds.String(), + miner.state.NextDeadlineToProcessFaults, + ); err != nil { + log.Errorw("failed to store miner state", "state", miner.state, "info", miner.info, "error", err) + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +type minerSectorUpdate struct { + minerState *minerStateInfo + tskey types.TipSetKey + oldSector cid.Cid +} + +func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`) + if err != nil { + return err + } + + var updateMiners []*minerSectorUpdate + for tsk, miners := range minerTips { + for _, miner := range miners { + sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr) + if err != nil { + panic(err) + } + if sectorCID == cid.Undef { + continue + } + if _, found := st.minerSectors[sectorCID]; !found { + // schedule miner table update + updateMiners = append(updateMiners, &minerSectorUpdate{ + minerState: miner, + tskey: tsk, + oldSector: sectorCID, + }) + } + st.minerSectors[sectorCID] = struct{}{} + log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String()) + if _, err := stmt.Exec( + miner.addr.String(), + miner.state.Sectors.String(), + miner.stateroot.String(), + ); err != nil { + log.Errorw("failed to store miners sectors head", "state", miner.state, "info", miner.info, "error", err) + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + if err := tx.Commit(); err != nil { + return err + } + return st.updateMinerSectors(updateMiners, api) +} + +type deletedSector struct { + deletedSector miner_spec.SectorOnChainInfo + miner address.Address + tskey types.TipSetKey +} + +func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error { + log.Info("updating miners constant sector table") + var deletedSectors []*deletedSector + for _, miner := range miners { + s := &apiIpldStore{context.TODO(), api} + newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors) + if err != nil { + log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors) return err } - if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) + oldSectors, err := adt.AsArray(s, miner.oldSector) + if err != nil { + log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String()) + return err } - return tx.Commit()*/ - return nil + var oldSecInfo miner_spec.SectorOnChainInfo + var newSecInfo miner_spec.SectorOnChainInfo + // if we cannot find an old sector in the new list then it was removed. + if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error { + found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo) + if err != nil { + log.Warnw("new sectors get", "error", err) + return err + } + if !found { + log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String()) + deletedSectors = append(deletedSectors, &deletedSector{ + deletedSector: oldSecInfo, + miner: miner.minerState.addr, + tskey: miner.tskey, + }) + } + return nil + }); err != nil { + log.Warnw("old sectors foreach", "error", err) + return err + } + if len(deletedSectors) > 0 { + log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors)) + } + } + // now we have all the sectors that were removed, update the database + tx, err := st.db.Begin() + if err != nil { + return err + } + stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`) + if err != nil { + return err + } + for _, ds := range deletedSectors { + ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey) + if err != nil { + log.Warnw("get tipset", "error", err) + return err + } + // TODO validate this shits right + if ts.Height() >= ds.deletedSector.Info.Expiration { + // means it expired, do nothing + log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) + continue + } + log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) + // means it was terminated. + if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { + return err + } + defer log.Info("update miner sectors complete") + return tx.Commit() } func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { @@ -1008,3 +1252,27 @@ func (st *storage) refreshViews() error { func (st *storage) close() error { return st.db.Close() } + +func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) { + queryStr := fmt.Sprintf(` +SELECT miner_sectors_cid +FROM miner_sectors_heads +LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot +WHERE miner_id = '%s' +ORDER BY blocks.height DESC +LIMIT 1; +`, + miner.String()) + + var cidstr string + err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr) + switch { + case err == sql.ErrNoRows: + log.Warnf("no miner with miner_id: %s in table", miner) + return cid.Undef, nil + case err != nil: + return cid.Undef, err + default: + return cid.Decode(cidstr) + } +} diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 059dcf9d6..59e77e4a0 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -5,17 +5,21 @@ import ( "container/list" "context" "encoding/json" + "fmt" "math" "sync" - - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "time" "github.com/filecoin-project/go-address" + "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" - - "github.com/ipfs/go-cid" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/store" @@ -49,20 +53,21 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } -type minerKey struct { +type minerStateInfo struct { + // common addr address.Address act types.Actor stateroot cid.Cid - tsKey types.TipSetKey -} -type minerInfo struct { + // miner specific state miner.State info miner.MinerInfo - power big.Int - ssize uint64 - psize uint64 + // tracked by power actor + rawPower big.Int + qalPower big.Int + ssize uint64 + psize uint64 } type actorInfo struct { @@ -80,25 +85,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infof("Getting headers / actors") + // global list of all blocks that need to be synced allToSync := map[cid.Cid]*types.BlockHeader{} + // a stack toVisit := list.New() for _, header := range headTs.Blocks() { toVisit.PushBack(header) } + // TODO consider making a db query to check where syncing left off at in the case of a restart and avoid reprocessing + // those entries, or write value to file on shutdown + // walk the entire chain starting from headTS for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - _, has := hazlist[bh.Cid()] if _, seen := allToSync[bh.Cid()]; seen || has { continue } allToSync[bh.Cid()] = bh - if len(allToSync)%500 == 10 { - log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) + log.Debugf("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) } if len(bh.Parents) == 0 { @@ -116,17 +124,25 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } } + // Main worker loop, this loop runs until all tipse from headTS to genesis have been processed. for len(allToSync) > 0 { + // first map is addresses -> common actors states (head, code, balance, nonce) + // second map common actor states -> chain state (tipset, stateroot) & unique actor state (deserialization of their head CID) represented as json. actors := map[address.Address]map[types.Actor]actorInfo{} + + // map of actor public key address to ID address addressToID := map[address.Address]address.Address{} minH := abi.ChainEpoch(math.MaxInt64) + // find the blockheader with the lowest height for _, header := range allToSync { if header.Height < minH { minH = header.Height } } + // toSync maps block cids to their headers and contains all block headers that will be synced in this batch + // `maxBatch` is a tunable parameter to control how many blocks we sync per iteration. toSync := map[cid.Cid]*types.BlockHeader{} for c, header := range allToSync { if header.Height < minH+abi.ChainEpoch(maxBatch) { @@ -134,12 +150,16 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. addressToID[header.Miner] = address.Undef } } + // remove everything we are syncing this round from the global list of blocks to sync for c := range toSync { delete(allToSync, c) } - log.Infof("Syncing %d blocks", len(toSync)) + log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch) + // map of addresses to changed actors + var changes map[string]types.Actor + // collect all actor state that has changes between block headers paDone := 0 parmap.Par(50, parmap.MapArr(toSync), func(bh *types.BlockHeader) { paDone++ @@ -155,6 +175,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + // TODO suspicious there is not a lot to be gained by doing this in parallel since the genesis state + // is unlikely to contain a lot of actors, why not for loop here? parmap.Par(50, aadrs, func(addr address.Address) { act, err := api.StateGetActor(ctx, addr, genesisTs.Key()) if err != nil { @@ -196,12 +218,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) + // TODO Does this return actors that have been deleted between states? + // collect all actors that had state changes between the blockheader parent-state and its grandparent-state. + changes, err = api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) if err != nil { log.Error(err) return } + // record the state of all actors that have changed for a, act := range changes { act := act @@ -229,6 +254,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. if !ok { actors[addr] = map[types.Actor]actorInfo{} } + // a change occurred for the actor with address `addr` and state `act` at tipset `pts`. actors[addr][act] = actorInfo{ stateroot: bh.ParentStateRoot, state: string(state), @@ -239,6 +265,11 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } }) + // map of tipset to all miners that had a head-change at that tipset. + minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) + // heads we've seen, im being paranoid + headsSeen := make(map[cid.Cid]struct{}, len(actors)) + log.Infof("Getting messages") msgs, incls := fetchMessages(ctx, api, toSync) @@ -265,57 +296,90 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infof("Getting miner info") - miners := map[minerKey]*minerInfo{} - + minerChanges := 0 for addr, m := range actors { for actor, c := range m { if actor.Code != builtin.StorageMinerActorCodeID { continue } - miners[minerKey{ + // only want miner actors with head change events + if _, found := headsSeen[actor.Head]; found { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ addr: addr, act: actor, stateroot: c.stateroot, - tsKey: c.tsKey, - }] = &minerInfo{} + + state: miner.State{}, + info: miner.MinerInfo{}, + + rawPower: big.Zero(), + qalPower: big.Zero(), + }) + + headsSeen[actor.Head] = struct{}{} } } - parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) { - k, info := it() + minerProcessingStartedAt := time.Now() + log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) + // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their + // claims from the power actor state. This ensures we only fetch the power actors state once for each tipset. + parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*minerStateInfo)) { + tsKey, minerInfo := it() - // TODO: get the storage power actors state and and pull the miner power from there, currently this hits the - // storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset - // and reading each miner power from the result. - pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey) - if err != nil { - log.Error(err) - // Not sure why this would fail, but its probably worth continuing - } - info.power = pow.MinerPower.QualityAdjPower - - sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) + // get the power actors claims map + mp, err := getPowerActorClaimsMap(ctx, api, tsKey) if err != nil { log.Error(err) return } - info.psize = sszs.Pset - info.ssize = sszs.Sset + // Get miner raw and quality power + for _, mi := range minerInfo { + var claim power.Claim + // get miner claim from power actors claim map and store if found, else the miner had no claim at + // this tipset + found, err := mp.Get(adt.AddrKey(mi.addr), &claim) + if err != nil { + log.Error(err) + } + if found { + mi.qalPower = claim.QualityAdjPower + mi.rawPower = claim.RawBytePower + } - astb, err := api.ChainReadObj(ctx, k.act.Head) - if err != nil { - log.Error(err) - return + // Get the miner state info + astb, err := api.ChainReadObj(ctx, mi.act.Head) + if err != nil { + log.Error(err) + return + } + if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Error(err) + return + } + mi.info = mi.state.Info } - if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { - log.Error(err) - return - } - - info.info = info.state.Info + // TODO Get the Sector Count + // FIXME this is returning a lot of "address not found" errors, which is strange given that StateChangedActors + // retruns all actors that had a state change at tipset `k.tsKey`, maybe its returning deleted miners too?? + /* + sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) + if err != nil { + info.psize = 0 + info.ssize = 0 + } else { + info.psize = sszs.Pset + info.ssize = sszs.Sset + } + */ }) + log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges) log.Info("Getting receipts") @@ -343,8 +407,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing miners") + if err := st.storeMiners(minerTips); err != nil { + log.Error(err) + return + } - if err := st.storeMiners(miners); err != nil { + log.Info("Storing miner sectors") + sectorStart := time.Now() + if err := st.storeSectors(minerTips, api); err != nil { + log.Error(err) + return + } + log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + + log.Info("Storing miner sectors heads") + if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { log.Error(err) return } @@ -465,3 +542,55 @@ func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.C return out } + +// load the power actor state clam as an adt.Map at the tipset `ts`. +func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) { + powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts) + if err != nil { + return nil, err + } + + powerRaw, err := api.ChainReadObj(ctx, powerActor.Head) + if err != nil { + return nil, err + } + + var powerActorState power.State + if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil { + return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err) + } + + s := &apiIpldStore{ctx, api} + return adt.AsMap(s, powerActorState.Claims) +} + +// require for AMT and HAMT access +// TODO extract this to a common location in lotus and reuse the code +type apiIpldStore struct { + ctx context.Context + api api.FullNode +} + +func (ht *apiIpldStore) Context() context.Context { + return ht.ctx +} + +func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + raw, err := ht.api.ChainReadObj(ctx, c) + if err != nil { + return err + } + + cu, ok := out.(cbg.CBORUnmarshaler) + if ok { + if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil { + return err + } + return nil + } + return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out) +} + +func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 43e16d34f..e810cf1a4 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -15,6 +15,15 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + samsig "github.com/filecoin-project/specs-actors/actors/builtin/multisig" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/beacon" @@ -27,14 +36,6 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/sector-storage/ffiwrapper" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/filecoin-project/specs-actors/actors/abi/big" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/specs-actors/actors/builtin/miner" - samsig "github.com/filecoin-project/specs-actors/actors/builtin/multisig" - "github.com/filecoin-project/specs-actors/actors/builtin/power" ) type StateAPI struct {