From 2c0a4914cff1c60d5cfad8c81f815fef558ce1db Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 24 Jun 2020 17:57:51 -0700 Subject: [PATCH] feat: define miner sector schema - define a miner sector schema to store miner sector details at each tipset the miner experiences a state change. This solution stores redundant data since a miner state change (head cid changes) does not necessarily indicate its sectors changes. - makes progress towards sentinel/issues/10 --- cmd/lotus-chainwatch/mpool.go | 2 +- cmd/lotus-chainwatch/storage.go | 75 +++++++++++ cmd/lotus-chainwatch/sync.go | 229 ++++++++++++++++++++++++++------ 3 files changed, 264 insertions(+), 42 deletions(-) diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index ea45380b7..74ffa8771 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -46,7 +46,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { msgs[v.Message.Message.Cid()] = &v.Message.Message } - log.Infof("Processing %d mpool updates", len(msgs)) + log.Debugf("Processing %d mpool updates", len(msgs)) err := st.storeMessages(msgs) if err != nil { diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index f7f80a9c6..dfdde08e3 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -1,7 +1,9 @@ package main import ( + "context" "database/sql" + "github.com/filecoin-project/specs-actors/actors/abi" "sync" "time" @@ -252,6 +254,24 @@ create table if not exists receipts create index if not exists receipts_msg_state_index on receipts (msg, state); + +create table if not exists miner_sectors +( + stateroot text not null, + miner text not null, + sectorid bigint not null, + activation bigint not null, + dealweight bigint not null, + verifieddealweight bigint not null, + expiration bigint not null, + sealcid text not null, + sealrandepoch bigint not null, + constraint miner_sectors_pk + primary key (stateroot, miner, sectorid) +); + +create index if not exists miner_sectors_state_index + on miner_sectors (stateroot, miner, sectorid); /* create table if not exists miner_heads ( @@ -456,6 +476,61 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } +type storeSectorsAPI interface { + StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) +} + +func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, sectorApi storeSectorsAPI) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy ms (stateroot, miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) + if err != nil { + return err + } + + for tipset, miners := range minerTips { + for _, miner := range miners { + sectors, err := sectorApi.StateMinerSectors(context.TODO(), miner.addr, nil, true, tipset) + if err != nil { + log.Debugw("Failed to load sectors", "tipset", tipset.String(), "miner", miner.addr.String(), "error", err) + } + + for _, sector := range sectors { + if _, err := stmt.Exec( + miner.stateroot.String(), + miner.addr.String(), + uint64(sector.ID), + int64(sector.Info.ActivationEpoch), + sector.Info.DealWeight.Uint64(), + sector.Info.VerifiedDealWeight.Uint64(), + int64(sector.Info.Info.Expiration), + sector.Info.Info.SealedCID.String(), + int64(sector.Info.Info.SealRandEpoch), + ); err != nil { + return err + } + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { /*tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 88afb647e..0a0b812d5 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -5,11 +5,15 @@ import ( "container/list" "context" "encoding/json" - "math" - "sync" - + "fmt" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/util/adt" + cbg "github.com/whyrusleeping/cbor-gen" + "math" + "sync" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" @@ -60,9 +64,27 @@ type minerInfo struct { state miner.State info miner.MinerInfo - power big.Int - ssize uint64 - psize uint64 + rawPower big.Int + qalPower big.Int + ssize uint64 + psize uint64 +} + +type newMinerInfo struct { + // common + addr address.Address + act types.Actor + stateroot cid.Cid + + // miner specific + state miner.State + info miner.MinerInfo + + // tracked by power actor + rawPower big.Int + qalPower big.Int + ssize uint64 + psize uint64 } type actorInfo struct { @@ -80,25 +102,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infof("Getting headers / actors") + // global list of all blocks that need to be synced allToSync := map[cid.Cid]*types.BlockHeader{} + // a stack toVisit := list.New() for _, header := range headTs.Blocks() { toVisit.PushBack(header) } + // TODO consider making a db query to check where syncing left off at in the case of a restart and avoid reprocessing + // those entries, or write value to file on shutdown + // walk the entire chain starting from headTS for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - _, has := hazlist[bh.Cid()] if _, seen := allToSync[bh.Cid()]; seen || has { continue } allToSync[bh.Cid()] = bh - if len(allToSync)%500 == 10 { - log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) + log.Infof("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) } if len(bh.Parents) == 0 { @@ -116,17 +141,25 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } } + // Main worker loop, this loop runs until all tipse from headTS to genesis have been processed. for len(allToSync) > 0 { + // first map is addresses -> common actors states (head, code, balance, nonce) + // second map common actor states -> chain state (tipset, stateroot) & unique actor state (deserialization of their head CID) represented as json. actors := map[address.Address]map[types.Actor]actorInfo{} + + // map of actor public key address to ID address addressToID := map[address.Address]address.Address{} minH := abi.ChainEpoch(math.MaxInt64) + // find the blockheader with the lowest height for _, header := range allToSync { if header.Height < minH { minH = header.Height } } + // toSync maps block cids to their headers and contains all block headers that will be synced in this batch + // `maxBatch` is a tunable parameter to control how many blocks we sync per iteration. toSync := map[cid.Cid]*types.BlockHeader{} for c, header := range allToSync { if header.Height < minH+abi.ChainEpoch(maxBatch) { @@ -134,12 +167,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. addressToID[header.Miner] = address.Undef } } + // remove everything we are syncing this round from the global list of blocks to sync for c := range toSync { delete(allToSync, c) } - log.Infof("Syncing %d blocks", len(toSync)) + log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch) + // collect all actor state that has changes between block headers paDone := 0 parmap.Par(50, parmap.MapArr(toSync), func(bh *types.BlockHeader) { paDone++ @@ -155,6 +190,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + // TODO suspicious there is not a lot to be gained by doing this in parallel since the genesis state + // is unlikely to contain a lot of actors, why not for loop here? parmap.Par(50, aadrs, func(addr address.Address) { act, err := api.StateGetActor(ctx, addr, genesisTs.Key()) if err != nil { @@ -195,12 +232,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + // TODO Does this return actors that have been deleted between states? + // collect all actors that had state changes between the blockheader parent-state and its grandparent-state. changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) if err != nil { log.Error(err) return } + // record the state of all actors that have changed for a, act := range changes { act := act @@ -227,12 +267,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. if !ok { actors[addr] = map[types.Actor]actorInfo{} } + // a change occurred for the actor with address `addr` and state `act` at tipset `pts`. actors[addr][act] = actorInfo{ stateroot: bh.ParentStateRoot, state: string(state), tsKey: pts.Key(), } addressToID[addr] = address.Undef + // alk.Unlock() } }) @@ -263,57 +305,97 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infof("Getting miner info") - miners := map[minerKey]*minerInfo{} + // map of tipset to all miners that had a head-change at that tipset. + minerTips := map[types.TipSetKey][]*newMinerInfo{} + // heads we've seen, im being paranoid + headsSeen := map[cid.Cid]bool{} + minerChanges := 0 for addr, m := range actors { for actor, c := range m { if actor.Code != builtin.StorageMinerActorCodeID { continue } - miners[minerKey{ + // only want miner actors with head change events + if headsSeen[actor.Head] { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &newMinerInfo{ addr: addr, act: actor, stateroot: c.stateroot, - tsKey: c.tsKey, - }] = &minerInfo{} + + state: miner.State{}, + info: miner.MinerInfo{}, + + rawPower: big.Zero(), + qalPower: big.Zero(), + ssize: 0, + psize: 0, + }) + + headsSeen[actor.Head] = true } } - parmap.Par(50, parmap.KVMapArr(miners), func(it func() (minerKey, *minerInfo)) { - k, info := it() + minerProcessingState := time.Now() + log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) + // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their + // claims from the power actor state. This ensures we only fetch the power actors state once for each tipset. + parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*newMinerInfo)) { + tsKey, minerInfo := it() - // TODO: get the storage power actors state and and pull the miner power from there, currently this hits the - // storage power actor once for each miner for each tipset, we can do better by just getting it for each tipset - // and reading each miner power from the result. - pow, err := api.StateMinerPower(ctx, k.addr, k.tsKey) - if err != nil { - log.Error(err) - // Not sure why this would fail, but its probably worth continuing - } - info.power = pow.MinerPower.QualityAdjPower - - sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) + // get the power actors claims map + mp, err := getPowerActorClaimsMap(ctx, api, tsKey) if err != nil { log.Error(err) return } - info.psize = sszs.Pset - info.ssize = sszs.Sset + // Get miner raw and quality power + for _, mi := range minerInfo { + var claim power.Claim + // get miner claim from power actors claim map and store if found, else the miner had no claim at + // this tipset + found, err := mp.Get(adt.AddrKey(mi.addr), &claim) + if err != nil { + log.Error(err) + } + if found { + mi.qalPower = claim.QualityAdjPower + mi.rawPower = claim.RawBytePower + } - astb, err := api.ChainReadObj(ctx, k.act.Head) - if err != nil { - log.Error(err) - return + // Get the miner state info + astb, err := api.ChainReadObj(ctx, mi.act.Head) + if err != nil { + log.Error(err) + return + } + if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Error(err) + return + } + mi.info = mi.state.Info } - if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { - log.Error(err) - return - } - - info.info = info.state.Info + // TODO Get the Sector Count + // FIXME this is returning a lot of "address not found" errors, which is strange given that StateChangedActors + // retruns all actors that had a state change at tipset `k.tsKey`, maybe its returning deleted miners too?? + /* + sszs, err := api.StateMinerSectorCount(ctx, k.addr, k.tsKey) + if err != nil { + info.psize = 0 + info.ssize = 0 + } else { + info.psize = sszs.Pset + info.ssize = sszs.Sset + } + */ }) + log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingState).String(), "processed", minerChanges) log.Info("Getting receipts") @@ -340,12 +422,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - log.Info("Storing miners") + // TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too. + /* + log.Info("Storing miners") - if err := st.storeMiners(miners); err != nil { + if err := st.storeMiners(miners); err != nil { + log.Error(err) + return + } + */ + + log.Info("Storing miner sectors") + + sectorStart := time.Now() + if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) return } + log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) log.Infof("Storing messages") @@ -463,3 +557,56 @@ func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.C return out } + +// load the power actor state clam as an adt.Map at the tipset `ts`. +func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) { + powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts) + if err != nil { + return nil, err + } + + powerRaw, err := api.ChainReadObj(ctx, powerActor.Head) + if err != nil { + return nil, err + } + + var powerActorState power.State + if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil { + return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err) + } + + s := &apiIpldStore{ctx, api} + return adt.AsMap(s, powerActorState.Claims) +} + +// require for AMT and HAMT access +// TODO extract this to a common locaiton in lotus and reuse the code +type apiIpldStore struct { + ctx context.Context + api api.FullNode +} + +func (ht *apiIpldStore) Context() context.Context { + return ht.ctx +} + +func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { + raw, err := ht.api.ChainReadObj(ctx, c) + if err != nil { + return err + } + + cu, ok := out.(cbg.CBORUnmarshaler) + if ok { + if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil { + return err + } + return nil + } + + return fmt.Errorf("Object does not implement CBORUnmarshaler") +} + +func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") +}