From 8c54c09ab79742e877b7d97879e9304e453fd630 Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 30 Jun 2020 13:22:58 -0700 Subject: [PATCH] pair: work from pair with placer --- cmd/lotus-chainwatch/storage.go | 122 ++++++++++++++++++++++---------- cmd/lotus-chainwatch/sync.go | 34 +++------ 2 files changed, 93 insertions(+), 63 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index e56cc9a9b..27cf361a0 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/libp2p/go-libp2p-core/peer" "sync" "time" @@ -258,35 +259,47 @@ create index if not exists receipts_msg_state_index create table if not exists miner_sectors ( - miner text not null, - sectorid bigint not null, - activation bigint not null, - dealweight bigint not null, - verifieddealweight bigint not null, - expiration bigint not null, - sealcid text not null, - sealrandepoch bigint not null, + miner_id text not null, + sector_id bigint not null, + activation_epoch bigint not null, + expiration_epoch bigint not null, + deal_weight text not null, + verified_deal_weight text not null, + seal_cid text not null, + seal_rand_epoch bigint not null, constraint miner_sectors_pk - primary key (miner, sectorid) + primary key (miner_id, sector_id) ); create index if not exists miner_sectors_miner_sectorid_index - on miner_sectors (miner, sectorid); + on miner_sectors (miner_id, sector_id); -create table if not exists miner_heads +create table if not exists miner_info ( - addr text not null, - owner text not null, - worker text not null, - peerid text, - sectorsize text not null, - windowpostpartitionsectors bigint not null, + miner_id text not null, + owner_addr text not null, + worker_addr text not null, + peer_id text, + sector_size text not null, - precommitdeposits text not null, - lockedfunds text not null, - nextdeadlineprocessfaults bigint not null, - constraint miner_heads_pk - primary key (addr) + precommit_deposits text not null, + locked_funds text not null, + next_deadline_process_faults bigint not null, + constraint miner_info_pk + primary key (miner_id) +); + +/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ +create table if not exists miner_sectors_heads +( + miner_id text not null, + miner_sectors_cid text not null, + + state_root text not null, + + constraint miner_sectors_heads_pk + primary key (miner_id,miner_sectors_cid) + ); /* @@ -473,7 +486,7 @@ type storeSectorsAPI interface { StateMinerSectors(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) } -func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, sectorApi storeSectorsAPI) error { +func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*minerStateInfo, sectorApi storeSectorsAPI) error { tx, err := st.db.Begin() if err != nil { return err @@ -483,7 +496,7 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy ms (miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) + stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`) if err != nil { return err } @@ -500,9 +513,9 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s miner.addr.String(), uint64(sector.ID), int64(sector.Info.ActivationEpoch), - sector.Info.DealWeight.Uint64(), - sector.Info.VerifiedDealWeight.Uint64(), int64(sector.Info.Info.Expiration), + sector.Info.DealWeight.String(), + sector.Info.VerifiedDealWeight.String(), sector.Info.Info.SealedCID.String(), int64(sector.Info.Info.SealRandEpoch), ); err != nil { @@ -523,22 +536,17 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s return tx.Commit() } -func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) error { +func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) error { tx, err := st.db.Begin() if err != nil { return err } - if _, err := tx.Exec(` - - create temp table mh (like miner_heads excluding constraints) on commit drop; - - - `); err != nil { + if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil { return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy mh (addr, owner, worker, peerid, sectorsize, windowpostpartitionsectors, precommitdeposits, lockedfunds, nextdeadlineprocessfaults) from STDIN`) + stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`) if err != nil { return err } @@ -560,14 +568,11 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) er miner.info.Worker.String(), pid, miner.info.SectorSize.ShortString(), - miner.info.WindowPoStPartitionSectors, - miner.state.PreCommitDeposits.String(), miner.state.LockedFunds.String(), miner.state.NextDeadlineToProcessFaults, ); err != nil { - log.Errorw("failed to store miner state", miner.state) - log.Errorw("failed to store miner info", miner.info) + log.Errorw("failed to store miner state", "state", miner.state, "info", miner.info, "error", err) return err } @@ -577,7 +582,46 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) er return err } - if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { + if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + +func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`) + if err != nil { + return err + } + + for _, miners := range minerTips { + for _, miner := range miners { + if _, err := stmt.Exec( + miner.addr.String(), + miner.state.Sectors.String(), + miner.stateroot.String(), + ); err != nil { + log.Errorw("failed to store miners sectors head", "state", miner.state, "info", miner.info, "error", err) + return err + } + + } + } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil { return xerrors.Errorf("actor put: %w", err) } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 9483113b6..d67f013e0 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -53,24 +53,7 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage, maxBatch int) }() } -type minerKey struct { - addr address.Address - act types.Actor - stateroot cid.Cid - tsKey types.TipSetKey -} - -type minerInfo struct { - state miner.State - info miner.MinerInfo - - rawPower big.Int - qalPower big.Int - ssize uint64 - psize uint64 -} - -type newMinerInfo struct { +type minerStateInfo struct { // common addr address.Address act types.Actor @@ -281,7 +264,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. }) // map of tipset to all miners that had a head-change at that tipset. - minerTips := make(map[types.TipSetKey][]*newMinerInfo, len(changes)) + minerTips := make(map[types.TipSetKey][]*minerStateInfo, len(changes)) // heads we've seen, im being paranoid headsSeen := make(map[cid.Cid]struct{}, len(actors)) @@ -324,7 +307,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } minerChanges++ - minerTips[c.tsKey] = append(minerTips[c.tsKey], &newMinerInfo{ + minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{ addr: addr, act: actor, stateroot: c.stateroot, @@ -344,7 +327,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infow("Processing miners", "numTips", len(minerTips), "numMinerChanges", minerChanges) // extract the power actor state at each tipset, loop over all miners that changed at said tipset and extract their // claims from the power actor state. This ensures we only fetch the power actors state once for each tipset. - parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*newMinerInfo)) { + parmap.Par(50, parmap.KVMapArr(minerTips), func(it func() (types.TipSetKey, []*minerStateInfo)) { tsKey, minerInfo := it() // get the power actors claims map @@ -421,16 +404,13 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } - // TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too. log.Info("Storing miners") - if err := st.storeMiners(minerTips); err != nil { log.Error(err) return } log.Info("Storing miner sectors") - sectorStart := time.Now() if err := st.storeSectors(minerTips, api); err != nil { log.Error(err) @@ -438,6 +418,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) + log.Info("Storing miner sectors heads") + if err := st.storeMinerSectorsHeads(minerTips); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil {