storagefsm: Separate satte for submitting commit message

This commit is contained in:
Łukasz Magiera 2020-08-27 12:57:08 +02:00
parent 99ecef89b8
commit 788c7dbf48
5 changed files with 79 additions and 58 deletions

View File

@ -247,6 +247,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommitWait}, {col: color.FgYellow, state: sealing.PreCommitWait},
{col: color.FgYellow, state: sealing.WaitSeed}, {col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing}, {col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.SubmitCommit},
{col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.FinalizeSector}, {col: color.FgYellow, state: sealing.FinalizeSector},

View File

@ -71,6 +71,10 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorChainPreCommitFailed{}, PreCommitFailed),
), ),
Committing: planCommitting, Committing: planCommitting,
SubmitCommit: planOne(
on(SectorCommitSubmitted{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed),
),
CommitWait: planOne( CommitWait: planOne(
on(SectorProving{}, FinalizeSector), on(SectorProving{}, FinalizeSector),
on(SectorCommitFailed{}, CommitFailed), on(SectorCommitFailed{}, CommitFailed),
@ -182,47 +186,50 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
/* /*
* Empty <- incoming deals * Empty <- incoming deals
| | | |
| v | v
*<- WaitDeals <- incoming deals *<- WaitDeals <- incoming deals
| | | |
| v | v
*<- Packing <- incoming committed capacity *<- Packing <- incoming committed capacity
| | | |
| v | v
*<- PreCommit1 <--> SealPreCommit1Failed *<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^ | | ^ ^^
| | *----------++----\ | | *----------++----\
| v v || | | v v || |
*<- PreCommit2 --------++--> SealPreCommit2Failed *<- PreCommit2 --------++--> SealPreCommit2Failed
| | || | | ||
| v /-------/| | v /-------/|
* PreCommitting <-----+---> PreCommitFailed * PreCommitting <-----+---> PreCommitFailed
| | | ^ | | | ^
| v | | | v | |
*<- WaitSeed -----------+-----/ *<- WaitSeed -----------+-----/
| ||| ^ | | ||| ^ |
| ||| \--------*-----/ | ||| \--------*-----/
| ||| | | ||| |
| vvv v----+----> ComputeProofFailed | vvv v----+----> ComputeProofFailed
*<- Committing | *<- Committing |
| | ^--> CommitFailed | | ^--> CommitFailed
| v ^ | v ^
*<- CommitWait ---/ | SubmitCommit |
| | | | |
| v | v |
| FinalizeSector <--> FinalizeFailed *<- CommitWait ---/
| | | |
| v | v
*<- Proving | FinalizeSector <--> FinalizeFailed
| | |
v | v
FailedUnrecoverable *<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯ UndefinedSectorState <- ¯\_()_/¯
| ^ | ^
*---------------------/ *---------------------/
*/ */
@ -248,6 +255,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleWaitSeed, processed, nil return m.handleWaitSeed, processed, nil
case Committing: case Committing:
return m.handleCommitting, processed, nil return m.handleCommitting, processed, nil
case SubmitCommit:
return m.handleSubmitCommit, processed, nil
case CommitWait: case CommitWait:
return m.handleCommitWait, processed, nil return m.handleCommitWait, processed, nil
case FinalizeSector: case FinalizeSector:
@ -294,24 +303,25 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
} }
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) { func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
for _, event := range events { for i, event := range events {
switch e := event.User.(type) { switch e := event.User.(type) {
case globalMutator: case globalMutator:
if e.applyGlobal(state) { if e.applyGlobal(state) {
return 1, nil return uint64(i + 1), nil
} }
case SectorCommitted: // the normal case case SectorCommitted: // the normal case
e.apply(state) e.apply(state)
state.State = CommitWait state.State = SubmitCommit
case SectorSeedReady: // seed changed :/ case SectorSeedReady: // seed changed :/
if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) { if e.SeedEpoch == state.SeedEpoch && bytes.Equal(e.SeedValue, state.SeedValue) {
log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change") log.Warnf("planCommitting: got SectorSeedReady, but the seed didn't change")
continue // or it didn't! continue // or it didn't!
} }
log.Warnf("planCommitting: commit Seed changed") log.Warnf("planCommitting: commit Seed changed")
e.apply(state) e.apply(state)
state.State = Committing state.State = Committing
return 1, nil return uint64(i + 1), nil
case SectorComputeProofFailed: case SectorComputeProofFailed:
state.State = ComputeProofFailed state.State = ComputeProofFailed
case SectorSealPreCommit1Failed: case SectorSealPreCommit1Failed:
@ -321,10 +331,10 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
case SectorRetryCommitWait: case SectorRetryCommitWait:
state.State = CommitWait state.State = CommitWait
default: default:
return 0, xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) return uint64(i), xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
} }
} }
return 1, nil return uint64(len(events)), nil
} }
func (m *Sealing) restartSectors(ctx context.Context) error { func (m *Sealing) restartSectors(ctx context.Context) error {

View File

@ -192,12 +192,18 @@ func (evt SectorCommitFailed) FormatError(xerrors.Printer) (next error) { return
func (evt SectorCommitFailed) apply(*SectorInfo) {} func (evt SectorCommitFailed) apply(*SectorInfo) {}
type SectorCommitted struct { type SectorCommitted struct {
Message cid.Cid Proof []byte
Proof []byte
} }
func (evt SectorCommitted) apply(state *SectorInfo) { func (evt SectorCommitted) apply(state *SectorInfo) {
state.Proof = evt.Proof state.Proof = evt.Proof
}
type SectorCommitSubmitted struct {
Message cid.Cid
}
func (evt SectorCommitSubmitted) apply(state *SectorInfo) {
state.CommitMessage = &evt.Message state.CommitMessage = &evt.Message
} }

View File

@ -10,12 +10,13 @@ const (
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
Packing SectorState = "Packing" // sector not in sealStore, and not on chain Packing SectorState = "Packing" // sector not in sealStore, and not on chain
PreCommit1 SectorState = "PreCommit1" // do PreCommit1 PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1 PreCommit2 SectorState = "PreCommit2" // do PreCommit2
PreCommitting SectorState = "PreCommitting" // on chain pre-commit PreCommitting SectorState = "PreCommitting" // on chain pre-commit
PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain
WaitSeed SectorState = "WaitSeed" // waiting for seed WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" Committing SectorState = "Committing" // compute PoRep
CommitWait SectorState = "CommitWait" // waiting for message to land on chain SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain
CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain
FinalizeSector SectorState = "FinalizeSector" FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving" Proving SectorState = "Proving"
// error modes // error modes

View File

@ -326,21 +326,25 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
} }
return ctx.Send(SectorCommitted{
Proof: proof,
})
}
func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.api.ChainHead(ctx.Context()) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err) log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil return nil
} }
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil { if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
} }
// TODO: Consider splitting states and persist proof for faster recovery
params := &miner.ProveCommitSectorParams{ params := &miner.ProveCommitSectorParams{
SectorNumber: sector.SectorNumber, SectorNumber: sector.SectorNumber,
Proof: proof, Proof: sector.Proof,
} }
enc := new(bytes.Buffer) enc := new(bytes.Buffer)
@ -372,14 +376,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
collateral = big.Zero() collateral = big.Zero()
} }
// TODO: check seed / ticket are up to date // TODO: check seed / ticket / deals are up to date
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes()) mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes())
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
} }
return ctx.Send(SectorCommitted{ return ctx.Send(SectorCommitSubmitted{
Proof: proof,
Message: mcid, Message: mcid,
}) })
} }