diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index db2b5abfd..9647c9f47 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -294,6 +294,17 @@ create table if not exists miner_info primary key (miner_id) ); +/* +* captures chain-specific power state for any given stateroot +*/ +create table if not exists chain_power +( + stateroot text not null + constraint chain_power_pk + primary key, + baseline_power text not null +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -500,6 +511,46 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorI return nil } +// storeChainPower captures reward actor state as it relates to power captured on-chain +func (st *storage) storeChainPower(rewardTips map[types.TipSetKey]*rewardStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return xerrors.Errorf("begin chain_power tx: %w", err) + } + + if _, err := tx.Exec(`create temp table cp (like chain_power excluding constraints) on commit drop`); err != nil { + return xerrors.Errorf("prep chain_power temp: %w", err) + } + + stmt, err := tx.Prepare(`copy cp (stateroot, baseline_power) from STDIN`) + if err != nil { + return xerrors.Errorf("prepare tmp chain_power: %w", err) + } + + for _, rewardState := range rewardTips { + if _, err := stmt.Exec( + rewardState.stateroot.String(), + rewardState.baselinePower.String(), + ); err != nil { + return xerrors.Errorf("exec prepared chain_power: %w", err) + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("close prepared chain_power: %w", err) + } + + if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { + return xerrors.Errorf("insert chain_power from tmp: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit chain_power tx: %w", err) + } + + return nil +} + type storeSectorsAPI interface { StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index bbefb3e2c..fe4970ffb 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -13,12 +13,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" @@ -53,6 +55,11 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } +type rewardStateInfo struct { + stateroot cid.Cid + baselinePower big.Int +} + type minerStateInfo struct { // common addr address.Address @@ -273,6 +280,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } }) + // map of tipset to reward state + rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes)) // map of tipset to all miners that had a head-change at that tipset. minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) // heads we've seen, im being paranoid @@ -302,40 +311,74 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. alk.Unlock() }) - log.Infof("Getting miner info") + log.Infof("Getting actor change info") minerChanges := 0 for addr, m := range actors { for actor, c := range m { + // reward actor + if actor.Code != builtin.RewardActorCodeID { + rewardTips[c.tsKey] = &rewardStateInfo{ + stateroot: c.stateroot, + baselinePower: big.Zero(), + } + continue + } + + // miner actors with head change events if actor.Code != builtin.StorageMinerActorCodeID { - continue + if _, found := headsSeen[actor.Head]; found { + continue + } + minerChanges++ + + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ + addr: addr, + act: actor, + stateroot: c.stateroot, + + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + + state: miner.State{}, + info: miner.MinerInfo{}, + + rawPower: big.Zero(), + qalPower: big.Zero(), + }) + + headsSeen[actor.Head] = struct{}{} } - - // only want miner actors with head change events - if _, found := headsSeen[actor.Head]; found { - continue - } - minerChanges++ - - minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ - addr: addr, - act: actor, - stateroot: c.stateroot, - - tsKey: c.tsKey, - parentTsKey: c.parentTsKey, - - state: miner.State{}, - info: miner.MinerInfo{}, - - rawPower: big.Zero(), - qalPower: big.Zero(), - }) - - headsSeen[actor.Head] = struct{}{} + continue } } + rewardProcessingStartedAt := time.Now() + parmap.Par(50, parmap.KVMapArr(rewardTips), func(it func() (types.TipSetKey, *rewardStateInfo)) { + tsKey, rewardInfo := it() + // get reward actor states at each tipset once for all updates + rewardActor, err := api.StateGetActor(ctx, builtin.RewardActorAddr, tsKey) + if err != nil { + log.Error(xerrors.Errorf("get reward state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardStateRaw, err := api.ChainReadObj(ctx, rewardActor.Head) + if err != nil { + log.Error(xerrors.Errorf("read state obj (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + var rewardActorState reward.State + if err := rewardActorState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + log.Error(xerrors.Errorf("unmarshal state (@ %s): %w", rewardInfo.stateroot.String(), err)) + return + } + + rewardInfo.baselinePower = rewardActorState.BaselinePower + }) + log.Infow("Completed Reward Processing", "duration", time.Since(rewardProcessingStartedAt).String(), "processed", len(rewardTips)) + minerProcessingStartedAt := time.Now() log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their @@ -411,25 +454,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Info("Storing actors") - if err := st.storeActors(actors); err != nil { log.Error(err) return } + chainPowerStartedAt := time.Now() + if err := st.storeChainPower(rewardTips); err != nil { + log.Error(err) + } + log.Infow("Stored chain power", "duration", time.Since(chainPowerStartedAt).String()) + log.Info("Storing miners") if err := st.storeMiners(minerTips); err != nil { log.Error(err) return } - log.Info("Storing miner sectors") sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) return } - log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + log.Infow("Stored miner sectors", "duration", time.Since(sectorStart).String()) log.Info("Storing miner sectors heads") if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {