Implement a way to remove sectors

This commit is contained in:
Łukasz Magiera 2020-06-22 18:42:38 +02:00
parent d8c81e712e
commit 24fd125223
6 changed files with 150 additions and 90 deletions

30
fsm.go
View File

@ -32,6 +32,8 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
}
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
// Sealing
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
PreCommit1: planOne(
@ -69,10 +71,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorFinalizeFailed{}, FinalizeFailed),
),
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
),
// Sealing errors
SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1),
@ -102,10 +101,23 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryFinalize{}, FinalizeSector),
),
// Post-seal
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorRemove{}, Removing),
),
Removing: planOne(
on(SectorRemoved{}, Removed),
on(SectorRemoveFailed{}, RemoveFailed),
),
Faulty: planOne(
on(SectorFaultReported{}, FaultReported),
),
FaultedFinal: final,
Removed: final,
}
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
@ -207,9 +219,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleCommitWait, nil
case FinalizeSector:
return m.handleFinalizeSector, nil
case Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", state.SectorNumber)
// Handled failure modes
case SealPreCommit1Failed:
@ -225,6 +234,13 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case FinalizeFailed:
return m.handleFinalizeFailed, nil
// Post-seal
case Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", state.SectorNumber)
case Removing:
return m.handleRemoving, nil
// Faults
case Faulty:
return m.handleFaulty, nil

View File

@ -230,3 +230,18 @@ func (evt SectorFaultReported) apply(state *SectorInfo) {
}
type SectorFaultedFinal struct{}
// External events
type SectorRemove struct{}
func (evt SectorRemove) apply(state *SectorInfo) {}
type SectorRemoved struct{}
func (evt SectorRemoved) apply(state *SectorInfo) {}
type SectorRemoveFailed struct{ error }
func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoveFailed) apply(*SectorInfo) {}

View File

@ -138,6 +138,10 @@ func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredSealProof, pi
})
}
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{})
}
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
mid, err := address.IDFromAddress(m.maddr)
if err != nil {

View File

@ -30,4 +30,8 @@ const (
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
Removing SectorState = "Removing"
RemoveFailed SectorState = "RemoveFailed"
Removed SectorState = "Removed"
)

104
states_proving.go Normal file
View File

@ -0,0 +1,104 @@
package sealing
import (
"bytes"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
)
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
// TODO: check if the fault has already been reported, and that this sector is even valid
// TODO: coalesce faulty sector reporting
// TODO: ReportFaultFailed
bf := abi.NewBitField()
bf.Set(uint64(sector.SectorNumber))
deadlines, err := m.api.StateMinerDeadlines(ctx.Context(), m.maddr, nil)
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
deadline := -1
for d, field := range deadlines.Due {
set, err := field.IsSet(uint64(sector.SectorNumber))
if err != nil {
return err
}
if set {
deadline = d
break
}
}
if deadline == -1 {
log.Errorf("handleFaulty: deadline not found")
return nil
}
params := &miner.DeclareFaultsParams{
Faults: []miner.FaultDeclaration{
{
Deadline: uint64(deadline),
Sectors: bf,
},
},
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
}
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.DeclareFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
}
return ctx.Send(SectorFaultReported{reportMsg: mcid})
}
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
if sector.FaultReportMsg == nil {
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
}
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
if err != nil {
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorNumber)
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
}
return ctx.Send(SectorFaultedFinal{})
}
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorNumber)); err != nil {
return ctx.Send(SectorRemoveFailed{err})
}
return ctx.Send(SectorRemoved{})
}

View File

@ -338,86 +338,3 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn
return ctx.Send(SectorFinalized{})
}
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
// TODO: check if the fault has already been reported, and that this sector is even valid
// TODO: coalesce faulty sector reporting
// TODO: ReportFaultFailed
bf := abi.NewBitField()
bf.Set(uint64(sector.SectorNumber))
deadlines, err := m.api.StateMinerDeadlines(ctx.Context(), m.maddr, nil)
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
deadline := -1
for d, field := range deadlines.Due {
set, err := field.IsSet(uint64(sector.SectorNumber))
if err != nil {
return err
}
if set {
deadline = d
break
}
}
if deadline == -1 {
log.Errorf("handleFaulty: deadline not found")
return nil
}
params := &miner.DeclareFaultsParams{
Faults: []miner.FaultDeclaration{
{
Deadline: uint64(deadline),
Sectors: bf,
},
},
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
}
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
return nil
}
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.DeclareFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
}
return ctx.Send(SectorFaultReported{reportMsg: mcid})
}
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
if sector.FaultReportMsg == nil {
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
}
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
if err != nil {
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorNumber)
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
}
return ctx.Send(SectorFaultedFinal{})
}