Merge pull request #34 from filecoin-project/feat/cc-upgrade-1
Update storage interfaces, support removing sectors
This commit is contained in:
commit
fe71d5b42d
30
fsm.go
30
fsm.go
@ -32,6 +32,8 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
|
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
|
||||||
|
// Sealing
|
||||||
|
|
||||||
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
|
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
|
||||||
Packing: planOne(on(SectorPacked{}, PreCommit1)),
|
Packing: planOne(on(SectorPacked{}, PreCommit1)),
|
||||||
PreCommit1: planOne(
|
PreCommit1: planOne(
|
||||||
@ -69,10 +71,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
on(SectorFinalizeFailed{}, FinalizeFailed),
|
on(SectorFinalizeFailed{}, FinalizeFailed),
|
||||||
),
|
),
|
||||||
|
|
||||||
Proving: planOne(
|
// Sealing errors
|
||||||
on(SectorFaultReported{}, FaultReported),
|
|
||||||
on(SectorFaulty{}, Faulty),
|
|
||||||
),
|
|
||||||
|
|
||||||
SealPreCommit1Failed: planOne(
|
SealPreCommit1Failed: planOne(
|
||||||
on(SectorRetrySealPreCommit1{}, PreCommit1),
|
on(SectorRetrySealPreCommit1{}, PreCommit1),
|
||||||
@ -102,10 +101,23 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
on(SectorRetryFinalize{}, FinalizeSector),
|
on(SectorRetryFinalize{}, FinalizeSector),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
// Post-seal
|
||||||
|
|
||||||
|
Proving: planOne(
|
||||||
|
on(SectorFaultReported{}, FaultReported),
|
||||||
|
on(SectorFaulty{}, Faulty),
|
||||||
|
on(SectorRemove{}, Removing),
|
||||||
|
),
|
||||||
|
Removing: planOne(
|
||||||
|
on(SectorRemoved{}, Removed),
|
||||||
|
on(SectorRemoveFailed{}, RemoveFailed),
|
||||||
|
),
|
||||||
Faulty: planOne(
|
Faulty: planOne(
|
||||||
on(SectorFaultReported{}, FaultReported),
|
on(SectorFaultReported{}, FaultReported),
|
||||||
),
|
),
|
||||||
|
|
||||||
FaultedFinal: final,
|
FaultedFinal: final,
|
||||||
|
Removed: final,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(statemachine.Context, SectorInfo) error, error) {
|
||||||
@ -207,9 +219,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
return m.handleCommitWait, nil
|
return m.handleCommitWait, nil
|
||||||
case FinalizeSector:
|
case FinalizeSector:
|
||||||
return m.handleFinalizeSector, nil
|
return m.handleFinalizeSector, nil
|
||||||
case Proving:
|
|
||||||
// TODO: track sector health / expiration
|
|
||||||
log.Infof("Proving sector %d", state.SectorNumber)
|
|
||||||
|
|
||||||
// Handled failure modes
|
// Handled failure modes
|
||||||
case SealPreCommit1Failed:
|
case SealPreCommit1Failed:
|
||||||
@ -225,6 +234,13 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
case FinalizeFailed:
|
case FinalizeFailed:
|
||||||
return m.handleFinalizeFailed, nil
|
return m.handleFinalizeFailed, nil
|
||||||
|
|
||||||
|
// Post-seal
|
||||||
|
case Proving:
|
||||||
|
// TODO: track sector health / expiration
|
||||||
|
log.Infof("Proving sector %d", state.SectorNumber)
|
||||||
|
case Removing:
|
||||||
|
return m.handleRemoving, nil
|
||||||
|
|
||||||
// Faults
|
// Faults
|
||||||
case Faulty:
|
case Faulty:
|
||||||
return m.handleFaulty, nil
|
return m.handleFaulty, nil
|
||||||
|
@ -230,3 +230,18 @@ func (evt SectorFaultReported) apply(state *SectorInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SectorFaultedFinal struct{}
|
type SectorFaultedFinal struct{}
|
||||||
|
|
||||||
|
// External events
|
||||||
|
|
||||||
|
type SectorRemove struct{}
|
||||||
|
|
||||||
|
func (evt SectorRemove) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorRemoved struct{}
|
||||||
|
|
||||||
|
func (evt SectorRemoved) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorRemoveFailed struct{ error }
|
||||||
|
|
||||||
|
func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
|
||||||
|
func (evt SectorRemoveFailed) apply(*SectorInfo) {}
|
||||||
|
4
go.mod
4
go.mod
@ -9,9 +9,9 @@ require (
|
|||||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
|
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect
|
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect
|
||||||
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
|
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
|
||||||
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6
|
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d
|
||||||
github.com/filecoin-project/specs-actors v0.6.0
|
github.com/filecoin-project/specs-actors v0.6.0
|
||||||
github.com/filecoin-project/specs-storage v0.1.0
|
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
|
||||||
github.com/ipfs/go-cid v0.0.5
|
github.com/ipfs/go-cid v0.0.5
|
||||||
github.com/ipfs/go-datastore v0.4.4
|
github.com/ipfs/go-datastore v0.4.4
|
||||||
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242 // indirect
|
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242 // indirect
|
||||||
|
8
go.sum
8
go.sum
@ -55,15 +55,15 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h
|
|||||||
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
|
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
|
||||||
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
|
||||||
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||||
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6 h1:NIcubpeasVs++K5EFelMXeURRb8sWCuXQNOSWnvTc14=
|
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d h1:yJJqXCMEhvXJoOS6T1O46FXl+A3mlttXhgjcTCp+Tgo=
|
||||||
github.com/filecoin-project/sector-storage v0.0.0-20200615154852-728a47ab99d6/go.mod h1:M59QnAeA/oV+Z8oHFLoNpGMv0LZ8Rll+vHVXX7GirPM=
|
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
|
||||||
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
|
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
|
||||||
github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0=
|
github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0=
|
||||||
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
|
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
|
||||||
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
|
github.com/filecoin-project/specs-actors v0.6.0 h1:IepUsmDGY60QliENVTkBTAkwqGWw9kNbbHOcU/9oiC0=
|
||||||
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
|
||||||
github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94=
|
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
|
||||||
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
|
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
|
||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||||
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
|
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
|
||||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||||
|
@ -138,6 +138,10 @@ func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredSealProof, pi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||||
|
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
|
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
|
||||||
mid, err := address.IDFromAddress(m.maddr)
|
mid, err := address.IDFromAddress(m.maddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -30,4 +30,8 @@ const (
|
|||||||
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
|
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
|
||||||
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
|
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
|
||||||
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
|
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
|
||||||
|
|
||||||
|
Removing SectorState = "Removing"
|
||||||
|
RemoveFailed SectorState = "RemoveFailed"
|
||||||
|
Removed SectorState = "Removed"
|
||||||
)
|
)
|
||||||
|
40
states_proving.go
Normal file
40
states_proving.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: noop because this is now handled by the PoSt scheduler. We can reuse
|
||||||
|
// this state for tracking faulty sectors, or remove it when that won't be
|
||||||
|
// a breaking change
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.FaultReportMsg == nil {
|
||||||
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
||||||
|
}
|
||||||
|
|
||||||
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mw.Receipt.ExitCode != 0 {
|
||||||
|
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorNumber)
|
||||||
|
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorFaultedFinal{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorNumber)); err != nil {
|
||||||
|
return ctx.Send(SectorRemoveFailed{err})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRemoved{})
|
||||||
|
}
|
@ -332,92 +332,9 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
|
|||||||
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
// TODO: Maybe wait for some finality
|
// TODO: Maybe wait for some finality
|
||||||
|
|
||||||
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorNumber)); err != nil {
|
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorNumber), nil); err != nil {
|
||||||
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.Send(SectorFinalized{})
|
return ctx.Send(SectorFinalized{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
|
||||||
// TODO: check if the fault has already been reported, and that this sector is even valid
|
|
||||||
|
|
||||||
// TODO: coalesce faulty sector reporting
|
|
||||||
|
|
||||||
// TODO: ReportFaultFailed
|
|
||||||
bf := abi.NewBitField()
|
|
||||||
bf.Set(uint64(sector.SectorNumber))
|
|
||||||
|
|
||||||
deadlines, err := m.api.StateMinerDeadlines(ctx.Context(), m.maddr, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
deadline := -1
|
|
||||||
for d, field := range deadlines.Due {
|
|
||||||
set, err := field.IsSet(uint64(sector.SectorNumber))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if set {
|
|
||||||
deadline = d
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if deadline == -1 {
|
|
||||||
log.Errorf("handleFaulty: deadline not found")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
params := &miner.DeclareFaultsParams{
|
|
||||||
Faults: []miner.FaultDeclaration{
|
|
||||||
{
|
|
||||||
Deadline: uint64(deadline),
|
|
||||||
Sectors: bf,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
enc := new(bytes.Buffer)
|
|
||||||
if err := params.MarshalCBOR(enc); err != nil {
|
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
|
|
||||||
}
|
|
||||||
|
|
||||||
tok, _, err := m.api.ChainHead(ctx.Context())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("handleFaulty: api error, not proceeding: %+v", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.DeclareFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctx.Send(SectorFaultReported{reportMsg: mcid})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
|
||||||
if sector.FaultReportMsg == nil {
|
|
||||||
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
|
||||||
}
|
|
||||||
|
|
||||||
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mw.Receipt.ExitCode != 0 {
|
|
||||||
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorNumber)
|
|
||||||
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctx.Send(SectorFaultedFinal{})
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user