fsm: Reuse tickets in PC1 on retry

This commit is contained in:
Łukasz Magiera 2020-09-29 09:57:36 +02:00
parent 1e6a69f8aa
commit 0f2dcf28b1
6 changed files with 45 additions and 17 deletions

View File

@ -115,11 +115,6 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
Prover: prover,
wt: &workTracker{
done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]trackedWork{},
},
work: statestore.New(ds),
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},

View File

@ -439,7 +439,7 @@ func TestSched(t *testing.T) {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
wid: uint64(wid),
})
}
}

View File

@ -45,12 +45,14 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorAddPiece{}, WaitDeals),
on(SectorStartPacking{}, Packing),
),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne(on(SectorTicket{}, PreCommit1)),
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
on(SectorOldTicket{}, GetTicket),
),
PreCommit2: planOne(
on(SectorPreCommit2{}, PreCommitting),
@ -219,6 +221,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
*<- Packing <- incoming committed capacity
| |
| v
| GetTicket
| | ^
| v |
*<- PreCommit1 <--> SealPreCommit1Failed
| | ^ ^^
| | *----------++----\
@ -267,6 +272,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
log.Infof("Waiting for deals %d", state.SectorNumber)
case Packing:
return m.handlePacking, processed, nil
case GetTicket:
return m.handleGetTicket, processed, nil
case PreCommit1:
return m.handlePreCommit1, processed, nil
case PreCommit2:

View File

@ -101,16 +101,26 @@ func (evt SectorPacked) apply(state *SectorInfo) {
}
}
type SectorTicket struct {
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
}
func (evt SectorTicket) apply(state *SectorInfo) {
state.TicketEpoch = evt.TicketEpoch
state.TicketValue = evt.TicketValue
}
type SectorOldTicket struct{}
func (evt SectorOldTicket) apply(*SectorInfo) {}
type SectorPreCommit1 struct {
PreCommit1Out storage.PreCommit1Out
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
}
func (evt SectorPreCommit1) apply(state *SectorInfo) {
state.PreCommit1Out = evt.PreCommit1Out
state.TicketEpoch = evt.TicketEpoch
state.TicketValue = evt.TicketValue
state.PreCommit2Fails = 0
}

View File

@ -41,6 +41,7 @@ const (
Empty SectorState = "Empty"
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
GetTicket SectorState = "GetTicket" // generate ticket
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit2
PreCommitting SectorState = "PreCommitting" // on chain pre-commit

View File

@ -21,6 +21,7 @@ import (
)
var DealSectorPriority = 1024
var MaxTicketAge = abi.ChainEpoch(builtin.EpochsInDay * 2)
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
@ -83,6 +84,18 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
return abi.SealRandomness(rand), ticketEpoch, nil
}
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector)
if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)})
}
return ctx.Send(SectorTicket{
TicketValue: ticketValue,
TicketEpoch: ticketEpoch,
})
}
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), m.maddr, sector, m.api); err != nil { // Sanity check state
switch err.(type) {
@ -99,21 +112,23 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
}
}
log.Infow("performing sector replication...", "sector", sector.SectorNumber)
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector)
_, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)})
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
}
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
if height-sector.TicketEpoch > MaxTicketAge {
return ctx.Send(SectorOldTicket{})
}
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
return ctx.Send(SectorPreCommit1{
PreCommit1Out: pc1o,
TicketValue: ticketValue,
TicketEpoch: ticketEpoch,
})
}