From e1c29ca4698735c913f2f244350bdb230ec4dc94 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 1 Jul 2020 21:08:45 -0700 Subject: [PATCH] polish: wire up miner state predicate --- cmd/lotus-chainwatch/storage.go | 282 +++++++++++++++++--------------- cmd/lotus-chainwatch/sync.go | 37 +++-- 2 files changed, 177 insertions(+), 142 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index e68c586b5..db2b5abfd 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -3,20 +3,18 @@ package main import ( "context" "database/sql" - "fmt" - "github.com/filecoin-project/specs-actors/actors/util/adt" - "github.com/libp2p/go-libp2p-core/peer" "sync" "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" - miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/ipfs/go-cid" _ "github.com/lib/pq" + "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" ) @@ -25,8 +23,7 @@ type storage struct { headerLk sync.Mutex - // stateful miner data - minerSectors map[cid.Cid]struct{} + genesisTs *types.TipSet } func openStorage(dbSource string) (*storage, error) { @@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) { db.SetMaxOpenConns(1350) - ms := make(map[cid.Cid]struct{}) - ms[cid.Undef] = struct{}{} - - st := &storage{db: db, minerSectors: ms} + st := &storage{db: db} return st, st.setup() } @@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads ); +create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED'); + +create table if not exists miner_sector_events +( + miner_id text not null, + sector_id bigint not null, + state_root text not null, + event miner_sector_event_type not null, + + constraint miner_sector_events_pk + primary key (sector_id, event, miner_id, state_root) +) + /* create or replace function miner_tips(epoch bigint) returns table (head text, @@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } -type minerSectorUpdate struct { - minerState *minerStateInfo - tskey types.TipSetKey - oldSector cid.Cid -} - func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { @@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return err } - var updateMiners []*minerSectorUpdate - for tsk, miners := range minerTips { + for _, miners := range minerTips { for _, miner := range miners { - sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr) - if err != nil { - panic(err) - } - if sectorCID == cid.Undef { - continue - } - if _, found := st.minerSectors[sectorCID]; !found { - // schedule miner table update - updateMiners = append(updateMiners, &minerSectorUpdate{ - minerState: miner, - tskey: tsk, - oldSector: sectorCID, - }) - } - st.minerSectors[sectorCID] = struct{}{} - log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String()) if _, err := stmt.Exec( miner.addr.String(), miner.state.Sectors.String(), @@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return xerrors.Errorf("actor put: %w", err) } - if err := tx.Commit(); err != nil { + return tx.Commit() +} + +type sectorUpdate struct { + terminationEpoch abi.ChainEpoch + terminated bool + + expirationEpoch abi.ChainEpoch + + sectorID abi.SectorNumber + minerID address.Address +} + +func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { + log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips)) + pred := state.NewStatePredicates(api) + + eventTx, err := st.db.Begin() + if err != nil { return err } - return st.updateMinerSectors(updateMiners, api) -} -type deletedSector struct { - deletedSector miner_spec.SectorOnChainInfo - miner address.Address - tskey types.TipSetKey -} + if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } -func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error { - log.Info("updating miners constant sector table") - var deletedSectors []*deletedSector - for _, miner := range miners { - s := &apiIpldStore{context.TODO(), api} - newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors) - if err != nil { - log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors) - return err + eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) + if err != nil { + return err + } + + var sectorUpdates []sectorUpdate + // TODO consider performing the miner sector diffing in parallel and performing the database update after. + for _, miners := range minerTips { + for _, miner := range miners { + // special case genesis miners + if miner.tsKey == st.genesisTs.Key() { + sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey) + if err != nil { + log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String()) + continue + } + + for _, sector := range sectors { + if _, err := eventStmt.Exec(sector.Info.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + } + } else { + sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange()) + changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey) + if err != nil { + log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err) + continue + } + if !changed { + continue + } + changes := val.(*state.MinerSectorChanges) + log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey) + + for _, extended := range changes.Extended { + if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: 0, + terminated: false, + expirationEpoch: extended.To.Info.Expiration, + sectorID: extended.To.Info.SectorNumber, + minerID: miner.addr, + }) + log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration) + } + curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey) + if err != nil { + return err + } + + for _, removed := range changes.Removed { + // decide if they were terminated or extended + if removed.Info.Expiration > curTs.Height() { + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height()) + sectorUpdates = append(sectorUpdates, sectorUpdate{ + terminationEpoch: curTs.Height(), + terminated: true, + expirationEpoch: removed.Info.Expiration, + sectorID: removed.Info.SectorNumber, + minerID: miner.addr, + }) + } + if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil { + return err + } + log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height()) + } + + for _, added := range changes.Added { + if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil { + return err + } + } + } } + } + if err := eventStmt.Close(); err != nil { + return err + } - oldSectors, err := adt.AsArray(s, miner.oldSector) - if err != nil { - log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String()) - return err - } + if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } - var oldSecInfo miner_spec.SectorOnChainInfo - var newSecInfo miner_spec.SectorOnChainInfo - // if we cannot find an old sector in the new list then it was removed. - if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error { - found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo) - if err != nil { - log.Warnw("new sectors get", "error", err) + if err := eventTx.Commit(); err != nil { + return err + } + + updateTx, err := st.db.Begin() + if err != nil { + return err + } + + updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) + if err != nil { + return err + } + + for _, update := range sectorUpdates { + if update.terminated { + if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { return err } - if !found { - log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String()) - deletedSectors = append(deletedSectors, &deletedSector{ - deletedSector: oldSecInfo, - miner: miner.minerState.addr, - tskey: miner.tskey, - }) + } else { + if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { + return err } - return nil - }); err != nil { - log.Warnw("old sectors foreach", "error", err) - return err - } - if len(deletedSectors) > 0 { - log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors)) - } - } - // now we have all the sectors that were removed, update the database - tx, err := st.db.Begin() - if err != nil { - return err - } - stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`) - if err != nil { - return err - } - for _, ds := range deletedSectors { - ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey) - if err != nil { - log.Warnw("get tipset", "error", err) - return err - } - // TODO validate this shits right - if ts.Height() >= ds.deletedSector.Info.Expiration { - // means it expired, do nothing - log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) - continue - } - log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) - // means it was terminated. - if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil { - return err } } - if err := stmt.Close(); err != nil { + if err := updateStmt.Close(); err != nil { return err } - defer log.Info("update miner sectors complete") - return tx.Commit() + + return updateTx.Commit() } func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { @@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error { func (st *storage) close() error { return st.db.Close() } - -func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) { - queryStr := fmt.Sprintf(` -SELECT miner_sectors_cid -FROM miner_sectors_heads -LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot -WHERE miner_id = '%s' -ORDER BY blocks.height DESC -LIMIT 1; -`, - miner.String()) - - var cidstr string - err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr) - switch { - case err == sql.ErrNoRows: - log.Warnf("no miner with miner_id: %s in table", miner) - return cid.Undef, nil - case err != nil: - return cid.Undef, err - default: - return cid.Decode(cidstr) - } -} diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 59e77e4a0..bbefb3e2c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -59,6 +59,10 @@ type minerStateInfo struct { act types.Actor stateroot cid.Cid + // calculating changes + tsKey types.TipSetKey + parentTsKey types.TipSetKey + // miner specific state miner.State info miner.MinerInfo @@ -71,9 +75,10 @@ type minerStateInfo struct { } type actorInfo struct { - stateroot cid.Cid - tsKey types.TipSetKey - state string + stateroot cid.Cid + tsKey types.TipSetKey + parentTsKey types.TipSetKey + state string } func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) { @@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. if len(bh.Parents) == 0 { // genesis case genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh}) + st.genesisTs = genesisTs + aadrs, err := api.StateListActors(ctx, genesisTs.Key()) if err != nil { log.Error(err) @@ -201,9 +208,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. actors[addr] = map[types.Actor]actorInfo{} } actors[addr][*act] = actorInfo{ - stateroot: bh.ParentStateRoot, - tsKey: genesisTs.Key(), - state: string(state), + stateroot: bh.ParentStateRoot, + tsKey: genesisTs.Key(), + parentTsKey: genesisTs.Key(), + state: string(state), } addressToID[addr] = address.Undef alk.Unlock() @@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } ast, err := api.StateReadState(ctx, addr, pts.Key()) - if err != nil { log.Error(err) return @@ -256,9 +263,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // a change occurred for the actor with address `addr` and state `act` at tipset `pts`. actors[addr][act] = actorInfo{ - stateroot: bh.ParentStateRoot, - state: string(state), - tsKey: pts.Key(), + stateroot: bh.ParentStateRoot, + state: string(state), + tsKey: pts.Key(), + parentTsKey: pts.Parents(), } addressToID[addr] = address.Undef alk.Unlock() @@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. act: actor, stateroot: c.stateroot, + tsKey: c.tsKey, + parentTsKey: c.parentTsKey, + state: miner.State{}, info: miner.MinerInfo{}, @@ -426,6 +437,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. return } + log.Info("updating miner sectors heads") + if err := st.updateMinerSectors(minerTips, api); err != nil { + log.Error(err) + return + } + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil {