fpost: better fault checks

This commit is contained in:
Łukasz Magiera 2020-01-30 01:50:58 +01:00
parent 03f07042ff
commit 4aaa758543
6 changed files with 93 additions and 32 deletions

View File

@ -108,6 +108,7 @@ type FullNode interface {
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error)
StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error)
StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
StateListMiners(context.Context, *types.TipSet) ([]address.Address, error) StateListMiners(context.Context, *types.TipSet) ([]address.Address, error)

View File

@ -99,6 +99,7 @@ type FullNodeStruct struct {
StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"`
StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"`
StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"` StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"`
StateMinerFaults func(context.Context, address.Address, *types.TipSet) ([]uint64, error) `perm:"read"`
StateCall func(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) `perm:"read"` StateCall func(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) `perm:"read"`
StateReplay func(context.Context, *types.TipSet, cid.Cid) (*api.ReplayResults, error) `perm:"read"` StateReplay func(context.Context, *types.TipSet, cid.Cid) (*api.ReplayResults, error) `perm:"read"`
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
@ -410,6 +411,10 @@ func (c *FullNodeStruct) StateMinerSectorSize(ctx context.Context, actor address
return c.Internal.StateMinerSectorSize(ctx, actor, ts) return c.Internal.StateMinerSectorSize(ctx, actor, ts)
} }
func (c *FullNodeStruct) StateMinerFaults(ctx context.Context, actor address.Address, ts *types.TipSet) ([]uint64, error) {
return c.Internal.StateMinerFaults(ctx, actor, ts)
}
func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) { func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) {
return c.Internal.StateCall(ctx, msg, ts) return c.Internal.StateCall(ctx, msg, ts)
} }

View File

@ -2,6 +2,8 @@ package stmgr
import ( import (
"context" "context"
amt2 "github.com/filecoin-project/go-amt-ipld/v2"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
@ -253,6 +255,21 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma
return mas.SlashedAt, nil return mas.SlashedAt, nil
} }
func GetMinerFaults(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]uint64, error) {
var mas actors.StorageMinerActorState
_, err := sm.LoadActorState(ctx, maddr, &mas, ts)
if err != nil {
return nil, xerrors.Errorf("(get ssize) failed to load miner actor state: %w", err)
}
ss, lerr := amt2.LoadAMT(amt.WrapBlockstore(sm.cs.Blockstore()), mas.Sectors)
if lerr != nil {
return nil, aerrors.HandleExternalError(lerr, "could not load proving set node")
}
return mas.FaultSet.All(2 * ss.Count)
}
func GetStorageDeal(ctx context.Context, sm *StateManager, dealId uint64, ts *types.TipSet) (*actors.OnChainDeal, error) { func GetStorageDeal(ctx context.Context, sm *StateManager, dealId uint64, ts *types.TipSet) (*actors.OnChainDeal, error) {
var state actors.StorageMarketState var state actors.StorageMarketState
if _, err := sm.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { if _, err := sm.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil {

View File

@ -84,6 +84,10 @@ func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Addre
return stmgr.GetMinerSectorSize(ctx, a.StateManager, ts, actor) return stmgr.GetMinerSectorSize(ctx, a.StateManager, ts, actor)
} }
func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, ts *types.TipSet) ([]uint64, error) {
return stmgr.GetMinerFaults(ctx, a.StateManager, ts, addr)
}
func (a *StateAPI) StatePledgeCollateral(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { func (a *StateAPI) StatePledgeCollateral(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
param, err := actors.SerializeParams(&actors.PledgeCollateralParams{Size: types.NewInt(0)}) param, err := actors.SerializeParams(&actors.PledgeCollateralParams{Size: types.NewInt(0)})
if err != nil { if err != nil {

View File

@ -50,26 +50,12 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe
}() }()
} }
func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) { func (s *fpostScheduler) declareFaults(ctx context.Context, fc uint64, params *actors.DeclareFaultsParams) error {
faults := s.sb.Scrub(ssi) log.Warnf("DECLARING %d FAULTS", fc)
var faultIDs []uint64
if len(faults) > 0 {
params := &actors.DeclareFaultsParams{Faults: types.NewBitField()}
for _, fault := range faults {
log.Warnf("fault detected: sector %d: %s", fault.SectorID, fault.Err)
faultIDs = append(faultIDs, fault.SectorID)
// TODO: omit already declared (with finality in mind though..)
params.Faults.Set(fault.SectorID)
}
log.Warnf("DECLARING %d FAULTS", len(faults))
enc, aerr := actors.SerializeParams(params) enc, aerr := actors.SerializeParams(params)
if aerr != nil { if aerr != nil {
return nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr) return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr)
} }
msg := &types.Message{ msg := &types.Message{
@ -84,18 +70,65 @@ func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.Sort
sm, err := s.api.MpoolPushMessage(ctx, msg) sm, err := s.api.MpoolPushMessage(ctx, msg)
if err != nil { if err != nil {
return nil, xerrors.Errorf("pushing faults message to mpool: %w", err) return xerrors.Errorf("pushing faults message to mpool: %w", err)
} }
rec, err := s.api.StateWaitMsg(ctx, sm.Cid()) rec, err := s.api.StateWaitMsg(ctx, sm.Cid())
if err != nil { if err != nil {
return nil, xerrors.Errorf("waiting for declare faults: %w", err) return xerrors.Errorf("waiting for declare faults: %w", err)
} }
if rec.Receipt.ExitCode != 0 { if rec.Receipt.ExitCode != 0 {
return nil, xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode) return xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode)
} }
log.Infof("Faults declared successfully") log.Infof("Faults declared successfully")
return nil
}
func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) {
faults := s.sb.Scrub(ssi)
declaredFaults := map[uint64]struct{}{}
{
chainFaults, err := s.api.StateMinerFaults(ctx, s.actor, nil)
if err != nil {
return nil, xerrors.Errorf("checking on-chain faults: %w", err)
}
for _, fault := range chainFaults {
declaredFaults[fault] = struct{}{}
}
}
if len(faults) > 0 {
params := &actors.DeclareFaultsParams{Faults: types.NewBitField()}
for _, fault := range faults {
if _, ok := declaredFaults[fault.SectorID]; ok {
continue
}
log.Warnf("new fault detected: sector %d: %s", fault.SectorID, fault.Err)
declaredFaults[fault.SectorID] = struct{}{}
params.Faults.Set(fault.SectorID)
}
pc, err := params.Faults.Count()
if err != nil {
return nil, xerrors.Errorf("counting faults: %w", err)
}
if pc > 0 {
if err := s.declareFaults(ctx, pc, params); err != nil {
return nil, err
}
}
}
faultIDs := make([]uint64, 0, len(declaredFaults))
for fault := range declaredFaults {
faultIDs = append(faultIDs, fault)
} }
return faultIDs, nil return faultIDs, nil

View File

@ -50,6 +50,7 @@ type storageMinerApi interface {
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)