"feat": add logic to update miner sector table on term

This commit is contained in:
frrist 2020-06-30 21:26:46 -07:00
parent 8c54c09ab7
commit e576c7a08e
2 changed files with 151 additions and 5 deletions

View File

@ -3,13 +3,15 @@ package main
import ( import (
"context" "context"
"database/sql" "database/sql"
"github.com/filecoin-project/specs-actors/actors/abi" "fmt"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"sync" "sync"
"time" "time"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -22,6 +24,9 @@ type storage struct {
db *sql.DB db *sql.DB
headerLk sync.Mutex headerLk sync.Mutex
// stateful miner data
minerSectors map[cid.Cid]struct{}
} }
func openStorage(dbSource string) (*storage, error) { func openStorage(dbSource string) (*storage, error) {
@ -32,7 +37,10 @@ func openStorage(dbSource string) (*storage, error) {
db.SetMaxOpenConns(1350) db.SetMaxOpenConns(1350)
st := &storage{db: db} ms := make(map[cid.Cid]struct{})
ms[cid.Undef] = struct{}{}
st := &storage{db: db, minerSectors: ms}
return st, st.setup() return st, st.setup()
} }
@ -261,8 +269,11 @@ create table if not exists miner_sectors
( (
miner_id text not null, miner_id text not null,
sector_id bigint not null, sector_id bigint not null,
activation_epoch bigint not null, activation_epoch bigint not null,
expiration_epoch bigint not null, expiration_epoch bigint not null,
termination_epoch bigint,
deal_weight text not null, deal_weight text not null,
verified_deal_weight text not null, verified_deal_weight text not null,
seal_cid text not null, seal_cid text not null,
@ -589,7 +600,13 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
return tx.Commit() return tx.Commit()
} }
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo) error { type minerSectorUpdate struct {
minerState *minerStateInfo
tskey types.TipSetKey
oldSector cid.Cid
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
@ -604,8 +621,26 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return err return err
} }
for _, miners := range minerTips { var updateMiners []*minerSectorUpdate
for tsk, miners := range minerTips {
for _, miner := range miners { for _, miner := range miners {
sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr)
if err != nil {
panic(err)
}
if sectorCID == cid.Undef {
continue
}
if _, found := st.minerSectors[sectorCID]; !found {
// schedule miner table update
updateMiners = append(updateMiners, &minerSectorUpdate{
minerState: miner,
tskey: tsk,
oldSector: sectorCID,
})
}
st.minerSectors[sectorCID] = struct{}{}
log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String())
if _, err := stmt.Exec( if _, err := stmt.Exec(
miner.addr.String(), miner.addr.String(),
miner.state.Sectors.String(), miner.state.Sectors.String(),
@ -625,6 +660,93 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }
if err := tx.Commit(); err != nil {
return err
}
return st.updateMinerSectors(updateMiners, api)
}
type deletedSector struct {
deletedSector miner_spec.SectorOnChainInfo
miner address.Address
tskey types.TipSetKey
}
func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error {
log.Info("updating miners constant sector table")
var deletedSectors []*deletedSector
for _, miner := range miners {
s := &apiIpldStore{context.TODO(), api}
newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors)
if err != nil {
log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors)
return err
}
oldSectors, err := adt.AsArray(s, miner.oldSector)
if err != nil {
log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String())
return err
}
var oldSecInfo miner_spec.SectorOnChainInfo
var newSecInfo miner_spec.SectorOnChainInfo
// if we cannot find an old sector in the new list then it was removed.
if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error {
found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo)
if err != nil {
log.Warnw("new sectors get", "error", err)
return err
}
if !found {
log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String())
deletedSectors = append(deletedSectors, &deletedSector{
deletedSector: oldSecInfo,
miner: miner.minerState.addr,
tskey: miner.tskey,
})
}
return nil
}); err != nil {
log.Warnw("old sectors foreach", "error", err)
return err
}
if len(deletedSectors) > 0 {
log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors))
}
}
// now we have all the sectors that were removed, update the database
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`)
if err != nil {
return err
}
for _, ds := range deletedSectors {
ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey)
if err != nil {
log.Warnw("get tipset", "error", err)
return err
}
// TODO validate this shits right
if ts.Height() >= ds.deletedSector.Info.Expiration {
// means it expired, do nothing
log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
continue
}
log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
// means it was terminated.
if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
return err
}
defer log.Info("update miner sectors complete")
return tx.Commit() return tx.Commit()
} }
@ -1130,3 +1252,27 @@ func (st *storage) refreshViews() error {
func (st *storage) close() error { func (st *storage) close() error {
return st.db.Close() return st.db.Close()
} }
func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) {
queryStr := fmt.Sprintf(`
SELECT miner_sectors_cid
FROM miner_sectors_heads
LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot
WHERE miner_id = '%s'
ORDER BY blocks.height DESC
LIMIT 1;
`,
miner.String())
var cidstr string
err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr)
switch {
case err == sql.ErrNoRows:
log.Warnf("no miner with miner_id: %s in table", miner)
return cid.Undef, nil
case err != nil:
return cid.Undef, err
default:
return cid.Decode(cidstr)
}
}

View File

@ -419,7 +419,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String()) log.Infow("Finished storing miner sectors", "duration", time.Since(sectorStart).String())
log.Info("Storing miner sectors heads") log.Info("Storing miner sectors heads")
if err := st.storeMinerSectorsHeads(minerTips); err != nil { if err := st.storeMinerSectorsHeads(minerTips, api); err != nil {
log.Error(err) log.Error(err)
return return
} }