diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 27cf361a0..e68c586b5 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -3,13 +3,15 @@ package main import ( "context" "database/sql" - "github.com/filecoin-project/specs-actors/actors/abi" + "fmt" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/libp2p/go-libp2p-core/peer" "sync" "time" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" + miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/ipfs/go-cid" _ "github.com/lib/pq" "golang.org/x/xerrors" @@ -22,6 +24,9 @@ type storage struct { db *sql.DB headerLk sync.Mutex + + // stateful miner data + minerSectors map[cid.Cid]struct{} } func openStorage(dbSource string) (*storage, error) { @@ -32,7 +37,10 @@ func openStorage(dbSource string) (*storage, error) { db.SetMaxOpenConns(1350) - st := &storage{db: db} + ms := make(map[cid.Cid]struct{}) + ms[cid.Undef] = struct{}{} + + st := &storage{db: db, minerSectors: ms} return st, st.setup() } @@ -261,8 +269,11 @@ create table if not exists miner_sectors ( miner_id text not null, sector_id bigint not null, + activation_epoch bigint not null, expiration_epoch bigint not null, + termination_epoch bigint, + deal_weight text not null, verified_deal_weight text not null, seal_cid text not null, @@ -589,7 +600,13 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo) return tx.Commit() } -func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo) error { +type minerSectorUpdate struct { + minerState *minerStateInfo + tskey types.TipSetKey + oldSector cid.Cid +} + +func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error { tx, err := st.db.Begin() if err != nil { return err @@ -604,8 +621,26 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return err } - for _, miners := range minerTips { + var updateMiners []*minerSectorUpdate + for tsk, miners := range minerTips { for _, miner := range miners { + sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr) + if err != nil { + panic(err) + } + if sectorCID == cid.Undef { + continue + } + if _, found := st.minerSectors[sectorCID]; !found { + // schedule miner table update + updateMiners = append(updateMiners, &minerSectorUpdate{ + minerState: miner, + tskey: tsk, + oldSector: sectorCID, + }) + } + st.minerSectors[sectorCID] = struct{}{} + log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String()) if _, err := stmt.Exec( miner.addr.String(), miner.state.Sectors.String(), @@ -625,6 +660,93 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner return xerrors.Errorf("actor put: %w", err) } + if err := tx.Commit(); err != nil { + return err + } + return st.updateMinerSectors(updateMiners, api) +} + +type deletedSector struct { + deletedSector miner_spec.SectorOnChainInfo + miner address.Address + tskey types.TipSetKey +} + +func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error { + log.Info("updating miners constant sector table") + var deletedSectors []*deletedSector + for _, miner := range miners { + s := &apiIpldStore{context.TODO(), api} + newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors) + if err != nil { + log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors) + return err + } + + oldSectors, err := adt.AsArray(s, miner.oldSector) + if err != nil { + log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String()) + return err + } + + var oldSecInfo miner_spec.SectorOnChainInfo + var newSecInfo miner_spec.SectorOnChainInfo + // if we cannot find an old sector in the new list then it was removed. + if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error { + found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo) + if err != nil { + log.Warnw("new sectors get", "error", err) + return err + } + if !found { + log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String()) + deletedSectors = append(deletedSectors, &deletedSector{ + deletedSector: oldSecInfo, + miner: miner.minerState.addr, + tskey: miner.tskey, + }) + } + return nil + }); err != nil { + log.Warnw("old sectors foreach", "error", err) + return err + } + if len(deletedSectors) > 0 { + log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors)) + } + } + // now we have all the sectors that were removed, update the database + tx, err := st.db.Begin() + if err != nil { + return err + } + stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`) + if err != nil { + return err + } + for _, ds := range deletedSectors { + ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey) + if err != nil { + log.Warnw("get tipset", "error", err) + return err + } + // TODO validate this shits right + if ts.Height() >= ds.deletedSector.Info.Expiration { + // means it expired, do nothing + log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) + continue + } + log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber) + // means it was terminated. + if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { + return err + } + defer log.Info("update miner sectors complete") return tx.Commit() } @@ -1130,3 +1252,27 @@ func (st *storage) refreshViews() error { func (st *storage) close() error { return st.db.Close() } + +func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) { + queryStr := fmt.Sprintf(` +SELECT miner_sectors_cid +FROM miner_sectors_heads +LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot +WHERE miner_id = '%s' +ORDER BY blocks.height DESC +LIMIT 1; +`, + miner.String()) + + var cidstr string + err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr) + switch { + case err == sql.ErrNoRows: + log.Warnf("no miner with miner_id: %s in table", miner) + return cid.Undef, nil + case err != nil: + return cid.Undef, err + default: + return cid.Decode(cidstr) + } +} diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index d67f013e0..96821a262 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -419,7 +419,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) log.Info("Storing miner sectors heads") - if err := st.storeMinerSectorsHeads(minerTips); err != nil { + if err := st.storeMinerSectorsHeads(minerTips, api); err != nil { log.Error(err) return }