Merge pull request #6452 from filecoin-project/feat/early-finalize

sealing: Early finalization option
This commit is contained in:
Łukasz Magiera 2021-06-11 18:16:43 +02:00 committed by GitHub
commit 7f8718d805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 131 additions and 4 deletions

View File

@ -295,6 +295,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommitBatchWait},
{col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.CommitFinalize},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.SubmitCommitAggregate},
@ -315,6 +316,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.PreCommitFailed},
{col: color.FgRed, state: sealing.ComputeProofFailed},
{col: color.FgRed, state: sealing.CommitFailed},
{col: color.FgRed, state: sealing.CommitFinalizeFailed},
{col: color.FgRed, state: sealing.PackingFailed},
{col: color.FgRed, state: sealing.FinalizeFailed},
{col: color.FgRed, state: sealing.Faulty},

View File

@ -102,6 +102,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed),
),
Committing: planCommitting,
CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed),
),
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorSubmitCommitAggregate{}, SubmitCommitAggregate),
@ -150,6 +154,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryComputeProof{}, Committing),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
),
CommitFinalizeFailed: planOne(
on(SectorRetryFinalize{}, CommitFinalizeFailed),
),
CommitFailed: planOne(
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorRetryWaitSeed{}, WaitSeed),
@ -372,6 +379,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
fallthrough
case CommitWait:
return m.handleCommitWait, processed, nil
case CommitFinalize:
fallthrough
case FinalizeSector:
return m.handleFinalizeSector, processed, nil
@ -386,6 +395,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleComputeProofFailed, processed, nil
case CommitFailed:
return m.handleCommitFailed, processed, nil
case CommitFinalizeFailed:
fallthrough
case FinalizeFailed:
return m.handleFinalizeFailed, processed, nil
case PackingFailed: // DEPRECATED: remove this for the next reset
@ -474,6 +485,9 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
case SectorCommitted: // the normal case
e.apply(state)
state.State = SubmitCommit
case SectorProofReady: // early finalize
e.apply(state)
state.State = CommitFinalize
case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")

View File

@ -245,6 +245,15 @@ func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
// like SectorCommitted, but finalizes before sending the proof to the chain
type SectorProofReady struct {
Proof []byte
}
func (evt SectorProofReady) apply(state *SectorInfo) {
state.Proof = evt.Proof
}
type SectorSubmitCommitAggregate struct{}
func (evt SectorSubmitCommitAggregate) apply(*SectorInfo) {}

View File

@ -87,6 +87,73 @@ func TestHappyPath(t *testing.T) {
}
}
func TestHappyPathFinalizeEarly(t *testing.T) {
var notif []struct{ before, after SectorInfo }
ma, _ := address.NewIDAddress(55151)
m := test{
s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
notifee: func(before, after SectorInfo) {
notif = append(notif, struct{ before, after SectorInfo }{before, after})
},
},
t: t,
state: &SectorInfo{State: Packing},
}
m.planSingle(SectorPacked{})
require.Equal(m.t, m.state.State, GetTicket)
m.planSingle(SectorTicket{})
require.Equal(m.t, m.state.State, PreCommit1)
m.planSingle(SectorPreCommit1{})
require.Equal(m.t, m.state.State, PreCommit2)
m.planSingle(SectorPreCommit2{})
require.Equal(m.t, m.state.State, PreCommitting)
m.planSingle(SectorPreCommitted{})
require.Equal(m.t, m.state.State, PreCommitWait)
m.planSingle(SectorPreCommitLanded{})
require.Equal(m.t, m.state.State, WaitSeed)
m.planSingle(SectorSeedReady{})
require.Equal(m.t, m.state.State, Committing)
m.planSingle(SectorProofReady{})
require.Equal(m.t, m.state.State, CommitFinalize)
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, SubmitCommit)
m.planSingle(SectorSubmitCommitAggregate{})
require.Equal(m.t, m.state.State, SubmitCommitAggregate)
m.planSingle(SectorCommitAggregateSent{})
require.Equal(m.t, m.state.State, CommitWait)
m.planSingle(SectorProving{})
require.Equal(m.t, m.state.State, FinalizeSector)
m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)
expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
}
if n.after.State != expected[i+1] {
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
}
}
}
func TestSeedRevert(t *testing.T) {
ma, _ := address.NewIDAddress(55151)
m := test{

View File

@ -18,6 +18,8 @@ type Config struct {
AlwaysKeepUnsealedCopy bool
FinalizeEarly bool
BatchPreCommits bool
MaxPreCommitBatch int
MinPreCommitBatch int

View File

@ -17,6 +17,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
PreCommitBatchWait: {},
WaitSeed: {},
Committing: {},
CommitFinalize: {},
CommitFinalizeFailed: {},
SubmitCommit: {},
CommitWait: {},
SubmitCommitAggregate: {},
@ -63,8 +65,10 @@ const (
SubmitPreCommitBatch SectorState = "SubmitPreCommitBatch"
PreCommitBatchWait SectorState = "PreCommitBatchWait"
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep
CommitFinalize SectorState = "CommitFinalize" // cleanup sector metadata before submitting the proof (early finalize)
CommitFinalizeFailed SectorState = "CommitFinalizeFailed"
// single commit
SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
@ -106,7 +110,7 @@ func toStatState(st SectorState) statSectorState {
switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving

View File

@ -478,6 +478,11 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
log.Info("scheduling seal proof computation...")
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
@ -500,6 +505,24 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
}
{
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("commit check error: %w", err)})
}
}
if cfg.FinalizeEarly {
return ctx.Send(SectorProofReady{
Proof: proof,
})
}
return ctx.Send(SectorCommitted{
Proof: proof,
})
@ -524,7 +547,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}

View File

@ -86,6 +86,9 @@ type SealingConfig struct {
AlwaysKeepUnsealedCopy bool
// Run sector finalization before submitting sector proof to the chain
FinalizeEarly bool
// enable / disable precommit batching (takes effect after nv13)
BatchPreCommits bool
// maximum precommit batch size - batches will be sent immediately above this size
@ -279,6 +282,7 @@ func DefaultStorageMiner() *StorageMiner {
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: Duration(time.Hour * 6),
AlwaysKeepUnsealedCopy: true,
FinalizeEarly: false,
BatchPreCommits: true,
MinPreCommitBatch: 1, // we must have at least one precommit to batch

View File

@ -826,6 +826,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,
BatchPreCommits: cfg.BatchPreCommits,
MinPreCommitBatch: cfg.MinPreCommitBatch,
@ -857,6 +858,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MinPreCommitBatch: cfg.Sealing.MinPreCommitBatch,