diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 280a8698e..e005c3263 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -19,70 +19,18 @@ import ( ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { - m.inputLk.Lock() - - now := time.Now() - st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] - if st != nil { - if !st.Stop() { // timer expired, SectorStartPacking was/is being sent - m.inputLk.Unlock() - - // we send another SectorStartPacking in case one was sent in the handleAddPiece state - log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") - return ctx.Send(SectorStartPacking{}) - } - } - - ssize, err := sector.SectorType.SectorSize() - if err != nil { - return xerrors.Errorf("getting sector size") - } - - maxDeals, err := getDealPerSectorLimit(ssize) - if err != nil { - return xerrors.Errorf("getting per-sector deal limit: %w", err) - } - - if len(sector.dealIDs()) >= maxDeals { - // can't accept more deals - log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") - return ctx.Send(SectorStartPacking{}) - } - var used abi.UnpaddedPieceSize for _, piece := range sector.Pieces { used += piece.Piece.Size.Unpadded() } - if used.Padded() == abi.PaddedPieceSize(ssize) { - // sector full - log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") - return ctx.Send(SectorStartPacking{}) - } + m.inputLk.Lock() - if sector.CreationTime != 0 { - cfg, err := m.getConfig() - if err != nil { - m.inputLk.Unlock() - return xerrors.Errorf("getting storage config: %w", err) - } + started, err := m.maybeStartSealing(ctx, sector, used) + if err != nil || started { + m.inputLk.Unlock() - // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline - sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) - - if now.After(sealTime) { - m.inputLk.Unlock() - log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") - return ctx.Send(SectorStartPacking{}) - } - - m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { - log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer") - - if err := ctx.Send(SectorStartPacking{}); err != nil { - log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) - } - }) + return err } m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ @@ -107,6 +55,67 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e return nil } +func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) { + now := time.Now() + st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] + if st != nil { + if !st.Stop() { // timer expired, SectorStartPacking was/is being sent + // we send another SectorStartPacking in case one was sent in the handleAddPiece state + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") + return true, ctx.Send(SectorStartPacking{}) + } + } + + ssize, err := sector.SectorType.SectorSize() + if err != nil { + return false, xerrors.Errorf("getting sector size") + } + + maxDeals, err := getDealPerSectorLimit(ssize) + if err != nil { + return false, xerrors.Errorf("getting per-sector deal limit: %w", err) + } + + if len(sector.dealIDs()) >= maxDeals { + // can't accept more deals + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") + return true, ctx.Send(SectorStartPacking{}) + } + + if used.Padded() == abi.PaddedPieceSize(ssize) { + // sector full + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") + return true, ctx.Send(SectorStartPacking{}) + } + + if sector.CreationTime != 0 { + cfg, err := m.getConfig() + if err != nil { + m.inputLk.Unlock() + return false, xerrors.Errorf("getting storage config: %w", err) + } + + // todo check deal age, start sealing if any deal has less than X (configurable) to start deadline + sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) + + if now.After(sealTime) { + m.inputLk.Unlock() + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") + return true, ctx.Send(SectorStartPacking{}) + } + + m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { + log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer") + + if err := ctx.Send(SectorStartPacking{}); err != nil { + log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) + } + }) + } + + return false, nil +} + func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { ssize, err := sector.SectorType.SectorSize() if err != nil {