diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 0e3f6ba2b..a1739a1cc 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -50,6 +50,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto ), AddPiece: planOne( on(SectorPieceAdded{}, WaitDeals), + apply(SectorStartPacking{}), ), Packing: planOne(on(SectorPacked{}, GetTicket)), GetTicket: planOne( @@ -447,56 +448,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) { return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) } -func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { state.State = next - return nil + return false, nil } } } -func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { - return func() (mutator, func(*SectorInfo) error) { - return mut, func(state *SectorInfo) error { +// like `on`, but doesn't change state +func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { + return true, nil + } + } +} + +func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { if state.Return == "" { - return xerrors.Errorf("return state not set") + return false, xerrors.Errorf("return state not set") } state.State = SectorState(state.Return) state.Return = "" - return nil + return false, nil } } } -func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { +func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { - if gm, ok := events[0].User.(globalMutator); ok { - gm.applyGlobal(state) - return 1, nil - } + for i, event := range events { + if gm, ok := event.User.(globalMutator); ok { + gm.applyGlobal(state) + return uint64(i + 1), nil + } - for _, t := range ts { - mut, next := t() + for _, t := range ts { + mut, next := t() - if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { + if reflect.TypeOf(event.User) != reflect.TypeOf(mut) { + continue + } + + if err, iserr := event.User.(error); iserr { + log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err) + } + + event.User.(mutator).apply(state) + more, err := next(state) + if err != nil || !more { + return uint64(i + 1), err + } + } + + _, ok := event.User.(Ignorable) + if ok { continue } - if err, iserr := events[0].User.(error); iserr { - log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err) - } - - events[0].User.(mutator).apply(state) - return 1, next(state) + return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event) } - _, ok := events[0].User.(Ignorable) - if ok { - return 1, nil - } - - return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0]) + return uint64(len(events)), nil } } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 969ccf1e2..f9c1553e3 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -1,13 +1,16 @@ package sealing import ( - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "time" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" ) type mutator interface { @@ -81,6 +84,9 @@ type SectorAddPiece struct { } func (evt SectorAddPiece) apply(state *SectorInfo) { + if state.CreationTime.IsZero() { + state.CreationTime = time.Now() + } state.PendingPieces = append(state.PendingPieces, evt.NewPiece) } diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index b0d06b326..0361f5c08 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -3,6 +3,7 @@ package sealing import ( "context" "sort" + "time" "golang.org/x/xerrors" @@ -21,9 +22,35 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e // if full / oldish / has oldish deals goto seal // ^ also per sector deal limit - // send SectorStartPacking - m.inputLk.Lock() + + now := time.Now() + st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] + if st != nil { + if !st.Stop() { // timer expired, SectorStartPacking was/is being sent + // we send another SectorStartPacking in case one was sent in the handleAddPiece state + return ctx.Send(SectorStartPacking{}) + } + } + + if !sector.CreationTime.IsZero() { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + sealTime := sector.CreationTime.Add(cfg.WaitDealsDelay) + if now.After(sealTime) { + return ctx.Send(SectorStartPacking{}) + } else { + m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + if err := ctx.Send(SectorStartPacking{}); err != nil { + log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) + } + }) + } + } + var used abi.UnpaddedPieceSize for _, piece := range sector.Pieces { used += piece.Piece.Size.Unpadded() @@ -32,7 +59,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ used: used, maybeAccept: func(cid cid.Cid) error { - // todo check space + // todo double check space // todo check deal expiration @@ -62,6 +89,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er res := SectorPieceAdded{} + var offset abi.UnpaddedPieceSize + pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) + for i, p := range sector.Pieces { + pieceSizes[i] = p.Piece.Size.Unpadded() + offset += p.Piece.Size.Unpadded() + } + for _, piece := range sector.PendingPieces { m.inputLk.Lock() deal, ok := m.pendingPieces[piece] @@ -72,23 +106,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) } - var stored abi.PaddedPieceSize - for _, piece := range sector.Pieces { - stored += piece.Piece.Size - } + pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded()) - pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded()) - - if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { + if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) { return xerrors.Errorf("piece assigned to a sector with not enough space") } - offset := padLength - pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces)) - for i, p := range sector.Pieces { - pieceSizes[i] = p.Piece.Size.Unpadded() - offset += p.Piece.Size - } + offset += padLength.Unpadded() for _, p := range pads { ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), @@ -115,9 +139,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er return xerrors.Errorf("writing padding piece: %w", err) // todo failed state } + deal.accepted(sector.SectorNumber, offset, nil) + + offset += deal.size pieceSizes = append(pieceSizes, deal.size) + res.NewPieces = append(res.NewPieces, Piece{ - Piece: ppi, + Piece: ppi, DealInfo: &deal.deal, }) } @@ -155,7 +183,11 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) } - resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1) + resCh := make(chan struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error + }, 1) m.pendingPieces[*deal.PublishCid] = &pendingPiece{ size: size, @@ -164,9 +196,9 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec assigned: false, accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { resCh <- struct { - sn abi.SectorNumber + sn abi.SectorNumber offset abi.UnpaddedPieceSize - err error + err error }{sn: sn, offset: offset, err: err} }, } @@ -203,15 +235,15 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners // (unlikely to be a problem now) - for id, sector := range m.openSectors { - avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used + for pieceCid, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } - for pieceCid, piece := range m.pendingPieces { - if piece.assigned { - continue // already assigned to a sector, skip - } + toAssign[pieceCid] = struct{}{} - toAssign[pieceCid] = struct{}{} + for id, sector := range m.openSectors { + avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) matches = append(matches, match{ @@ -222,10 +254,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e padding: avail % piece.size, }) } - } } - sort.Slice(matches, func(i, j int) bool { if matches[i].padding != matches[j].padding { // less padding is better return matches[i].padding < matches[j].padding @@ -266,8 +296,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } if len(toAssign) > 0 { - m.tryCreateDealSector(ctx, sp) - + if err := m.tryCreateDealSector(ctx, sp); err != nil { + log.Errorw("Failed to create a new sector for deals", "error", err) + } } return nil @@ -279,13 +310,12 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return xerrors.Errorf("getting storage config: %w", err) } - if cfg.MaxSealingSectorsForDeals > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return nil - } - if m.stats.curStaging() > cfg.MaxWaitDealsSectors { - return nil - } + if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { + return nil + } + + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() > cfg.MaxWaitDealsSectors { + return nil } // Now actually create a new sector @@ -306,3 +336,7 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal SectorType: sp, }) } + +func (m *Sealing) StartPacking(sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorStartPacking{}) +} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index ce54b8a9e..6f4ed0976 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -85,6 +86,7 @@ type Sealing struct { inputLk sync.Mutex openSectors map[abi.SectorID]*openSector + sectorTimers map[abi.SectorID]*time.Timer pendingPieces map[cid.Cid]*pendingPiece upgradeLk sync.Mutex diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 762fe227a..3f9f55b74 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,8 +3,8 @@ package sealing import ( "bytes" "context" - "github.com/ipfs/go-cid" + "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -70,6 +70,7 @@ type SectorInfo struct { SectorType abi.RegisteredSealProof // Packing + CreationTime time.Time Pieces []Piece PendingPieces []cid.Cid diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index f459cf2c4..64f62d6ae 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -805,7 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), - TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, + TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, } }) return @@ -820,7 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, + TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, } }) return