polish: wire up miner state predicate

This commit is contained in:
frrist 2020-07-01 21:08:45 -07:00
parent 3c6e46cd70
commit e1c29ca469
2 changed files with 177 additions and 142 deletions

View File

@ -3,20 +3,18 @@ package main
import (
"context"
"database/sql"
"fmt"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/libp2p/go-libp2p-core/peer"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
miner_spec "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
)
@ -25,8 +23,7 @@ type storage struct {
headerLk sync.Mutex
// stateful miner data
minerSectors map[cid.Cid]struct{}
genesisTs *types.TipSet
}
func openStorage(dbSource string) (*storage, error) {
@ -37,10 +34,7 @@ func openStorage(dbSource string) (*storage, error) {
db.SetMaxOpenConns(1350)
ms := make(map[cid.Cid]struct{})
ms[cid.Undef] = struct{}{}
st := &storage{db: db, minerSectors: ms}
st := &storage{db: db}
return st, st.setup()
}
@ -313,6 +307,19 @@ create table if not exists miner_sectors_heads
);
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
create table if not exists miner_sector_events
(
miner_id text not null,
sector_id bigint not null,
state_root text not null,
event miner_sector_event_type not null,
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
/*
create or replace function miner_tips(epoch bigint)
returns table (head text,
@ -600,12 +607,6 @@ func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*minerStateInfo)
return tx.Commit()
}
type minerSectorUpdate struct {
minerState *minerStateInfo
tskey types.TipSetKey
oldSector cid.Cid
}
func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
tx, err := st.db.Begin()
if err != nil {
@ -621,26 +622,8 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return err
}
var updateMiners []*minerSectorUpdate
for tsk, miners := range minerTips {
for _, miners := range minerTips {
for _, miner := range miners {
sectorCID, err := st.getLatestMinerSectorCID(context.TODO(), miner.addr)
if err != nil {
panic(err)
}
if sectorCID == cid.Undef {
continue
}
if _, found := st.minerSectors[sectorCID]; !found {
// schedule miner table update
updateMiners = append(updateMiners, &minerSectorUpdate{
minerState: miner,
tskey: tsk,
oldSector: sectorCID,
})
}
st.minerSectors[sectorCID] = struct{}{}
log.Debugw("got sector CID", "miner", miner.addr, "cid", sectorCID.String())
if _, err := stmt.Exec(
miner.addr.String(),
miner.state.Sectors.String(),
@ -660,94 +643,153 @@ func (st *storage) storeMinerSectorsHeads(minerTips map[types.TipSetKey][]*miner
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return tx.Commit()
}
type sectorUpdate struct {
terminationEpoch abi.ChainEpoch
terminated bool
expirationEpoch abi.ChainEpoch
sectorID abi.SectorNumber
minerID address.Address
}
func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStateInfo, api api.FullNode) error {
log.Debugw("updating miners constant sector table", "#tipsets", len(minerTips))
pred := state.NewStatePredicates(api)
eventTx, err := st.db.Begin()
if err != nil {
return err
}
return st.updateMinerSectors(updateMiners, api)
}
type deletedSector struct {
deletedSector miner_spec.SectorOnChainInfo
miner address.Address
tskey types.TipSetKey
}
if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
func (st *storage) updateMinerSectors(miners []*minerSectorUpdate, api api.FullNode) error {
log.Info("updating miners constant sector table")
var deletedSectors []*deletedSector
for _, miner := range miners {
s := &apiIpldStore{context.TODO(), api}
newSectors, err := adt.AsArray(s, miner.minerState.state.Sectors)
if err != nil {
log.Warnw("new sectors as array", "error", err, "cid", miner.minerState.state.Sectors)
return err
eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `)
if err != nil {
return err
}
var sectorUpdates []sectorUpdate
// TODO consider performing the miner sector diffing in parallel and performing the database update after.
for _, miners := range minerTips {
for _, miner := range miners {
// special case genesis miners
if miner.tsKey == st.genesisTs.Key() {
sectors, err := api.StateMinerSectors(context.TODO(), miner.addr, nil, true, miner.tsKey)
if err != nil {
log.Debugw("failed to get miner info for genesis", "miner", miner.addr.String())
continue
}
for _, sector := range sectors {
if _, err := eventStmt.Exec(sector.Info.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
}
} else {
sectorDiffFn := pred.OnMinerActorChange(miner.addr, pred.OnMinerSectorChange())
changed, val, err := sectorDiffFn(context.TODO(), miner.parentTsKey, miner.tsKey)
if err != nil {
log.Debugw("error getting miner sector diff", "miner", miner.addr, "error", err)
continue
}
if !changed {
continue
}
changes := val.(*state.MinerSectorChanges)
log.Debugw("sector changes for miner", "miner", miner.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", miner.parentTsKey, "newState", miner.tsKey)
for _, extended := range changes.Extended {
if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: 0,
terminated: false,
expirationEpoch: extended.To.Info.Expiration,
sectorID: extended.To.Info.SectorNumber,
minerID: miner.addr,
})
log.Debugw("sector extended", "miner", miner.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration)
}
curTs, err := api.ChainGetTipSet(context.TODO(), miner.tsKey)
if err != nil {
return err
}
for _, removed := range changes.Removed {
// decide if they were terminated or extended
if removed.Info.Expiration > curTs.Height() {
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector terminated", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height())
sectorUpdates = append(sectorUpdates, sectorUpdate{
terminationEpoch: curTs.Height(),
terminated: true,
expirationEpoch: removed.Info.Expiration,
sectorID: removed.Info.SectorNumber,
minerID: miner.addr,
})
}
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
log.Debugw("sector removed", "miner", miner.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height())
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
return err
}
}
}
}
}
if err := eventStmt.Close(); err != nil {
return err
}
oldSectors, err := adt.AsArray(s, miner.oldSector)
if err != nil {
log.Warnw("old sectors as array", "error", err, "cid", miner.oldSector.String())
return err
}
if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
var oldSecInfo miner_spec.SectorOnChainInfo
var newSecInfo miner_spec.SectorOnChainInfo
// if we cannot find an old sector in the new list then it was removed.
if err := oldSectors.ForEach(&oldSecInfo, func(i int64) error {
found, err := newSectors.Get(uint64(oldSecInfo.Info.SectorNumber), &newSecInfo)
if err != nil {
log.Warnw("new sectors get", "error", err)
if err := eventTx.Commit(); err != nil {
return err
}
updateTx, err := st.db.Begin()
if err != nil {
return err
}
updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`)
if err != nil {
return err
}
for _, update := range sectorUpdates {
if update.terminated {
if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
if !found {
log.Infow("MINER DELETED SECTOR", "miner", miner.minerState.addr.String(), "sector", oldSecInfo.Info.SectorNumber, "tipset", miner.tskey.String())
deletedSectors = append(deletedSectors, &deletedSector{
deletedSector: oldSecInfo,
miner: miner.minerState.addr,
tskey: miner.tskey,
})
} else {
if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
return nil
}); err != nil {
log.Warnw("old sectors foreach", "error", err)
return err
}
if len(deletedSectors) > 0 {
log.Infow("Calculated updates", "miner", miner.minerState.addr, "deleted sectors", len(deletedSectors))
}
}
// now we have all the sectors that were removed, update the database
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1 WHERE miner_id=$2 AND sector_id=$3`)
if err != nil {
return err
}
for _, ds := range deletedSectors {
ts, err := api.ChainGetTipSet(context.TODO(), ds.tskey)
if err != nil {
log.Warnw("get tipset", "error", err)
return err
}
// TODO validate this shits right
if ts.Height() >= ds.deletedSector.Info.Expiration {
// means it expired, do nothing
log.Infow("expired sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
continue
}
log.Infow("terminated sector", "miner", ds.miner.String(), "sector", ds.deletedSector.Info.SectorNumber)
// means it was terminated.
if _, err := stmt.Exec(int64(ts.Height()), ds.miner.String(), int64(ds.deletedSector.Info.SectorNumber)); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
if err := updateStmt.Close(); err != nil {
return err
}
defer log.Info("update miner sectors complete")
return tx.Commit()
return updateTx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
@ -1252,27 +1294,3 @@ func (st *storage) refreshViews() error {
func (st *storage) close() error {
return st.db.Close()
}
func (st *storage) getLatestMinerSectorCID(ctx context.Context, miner address.Address) (cid.Cid, error) {
queryStr := fmt.Sprintf(`
SELECT miner_sectors_cid
FROM miner_sectors_heads
LEFT JOIN blocks ON miner_sectors_heads.state_root = blocks.parentstateroot
WHERE miner_id = '%s'
ORDER BY blocks.height DESC
LIMIT 1;
`,
miner.String())
var cidstr string
err := st.db.QueryRowContext(ctx, queryStr).Scan(&cidstr)
switch {
case err == sql.ErrNoRows:
log.Warnf("no miner with miner_id: %s in table", miner)
return cid.Undef, nil
case err != nil:
return cid.Undef, err
default:
return cid.Decode(cidstr)
}
}

View File

@ -59,6 +59,10 @@ type minerStateInfo struct {
act types.Actor
stateroot cid.Cid
// calculating changes
tsKey types.TipSetKey
parentTsKey types.TipSetKey
// miner specific
state miner.State
info miner.MinerInfo
@ -71,9 +75,10 @@ type minerStateInfo struct {
}
type actorInfo struct {
stateroot cid.Cid
tsKey types.TipSetKey
state string
stateroot cid.Cid
tsKey types.TipSetKey
parentTsKey types.TipSetKey
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
@ -169,6 +174,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
if len(bh.Parents) == 0 { // genesis case
genesisTs, _ := types.NewTipSet([]*types.BlockHeader{bh})
st.genesisTs = genesisTs
aadrs, err := api.StateListActors(ctx, genesisTs.Key())
if err != nil {
log.Error(err)
@ -201,9 +208,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
state: string(state),
stateroot: bh.ParentStateRoot,
tsKey: genesisTs.Key(),
parentTsKey: genesisTs.Key(),
state: string(state),
}
addressToID[addr] = address.Undef
alk.Unlock()
@ -237,7 +245,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
ast, err := api.StateReadState(ctx, addr, pts.Key())
if err != nil {
log.Error(err)
return
@ -256,9 +263,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
}
// a change occurred for the actor with address `addr` and state `act` at tipset `pts`.
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
stateroot: bh.ParentStateRoot,
state: string(state),
tsKey: pts.Key(),
parentTsKey: pts.Parents(),
}
addressToID[addr] = address.Undef
alk.Unlock()
@ -314,6 +322,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: miner.State{},
info: miner.MinerInfo{},
@ -426,6 +437,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return
}
log.Info("updating miner sectors heads")
if err := st.updateMinerSectors(minerTips, api); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {