diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index c18dc88fa..25397ecfb 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -283,29 +283,52 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // Caller should hold m.unsealedInfoMap.lk func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { - for k, v := range m.unsealedInfoMap.infos { - pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) + for tries := 0; tries < 100; tries++ { + for k, v := range m.unsealedInfoMap.infos { + pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) - if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { - return k, pads, nil + if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { + return k, pads, nil + } } + + if len(m.unsealedInfoMap.infos) > 0 { + log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries) + } + + ns, ssize, err := m.newDealSector(ctx) + switch err { + case nil: + m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ + numDeals: 0, + stored: 0, + pieceSizes: nil, + ssize: ssize, + } + case errTooManySealing: + m.unsealedInfoMap.lk.Unlock() + + select { + case <-time.After(2 * time.Second): + case <-ctx.Done(): + m.unsealedInfoMap.lk.Lock() + return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err()) + } + + m.unsealedInfoMap.lk.Lock() + continue + default: + return 0, nil, xerrors.Errorf("creating new sector: %w", err) + } + + return ns, nil, nil } - ns, ssize, err := m.newDealSector(ctx) - if err != nil { - return 0, nil, err - } - - m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{ - numDeals: 0, - stored: 0, - pieceSizes: nil, - ssize: ssize, - } - - return ns, nil, nil + return 0, nil, xerrors.Errorf("failed to allocate piece to a sector") } +var errTooManySealing = errors.New("too many sectors sealing") + // newDealSector creates a new sector for deal storage func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { // First make sure we don't have too many 'open' sectors @@ -321,47 +344,34 @@ func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.Sect } } - if cfg.MaxWaitDealsSectors > 0 { - // run in a loop because we have to drop the map lock here for a bit - tries := 0 + if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { + // Too many sectors are sealing in parallel. Start sealing one, and retry + // allocating the piece to a sector (we're dropping the lock here, so in + // case other goroutines are also trying to create a sector, we retry in + // getSectorAndPadding instead of here - otherwise if we have lots of + // parallel deals in progress, we can start creating a ton of sectors + // with just a single deal in them) + var mostStored abi.PaddedPieceSize = math.MaxUint64 + var best abi.SectorNumber = math.MaxUint64 - // we have to run in a loop as we're dropping unsealedInfoMap.lk - // to actually call StartPacking. When we do that, another entry can - // get added to unsealedInfoMap. - for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { - if tries > 10 { - // whatever... - break + for sn, info := range m.unsealedInfoMap.infos { + if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 + best = sn } - - if tries > 0 { - m.unsealedInfoMap.lk.Unlock() - time.Sleep(time.Second) - m.unsealedInfoMap.lk.Lock() - } - - tries++ - var mostStored abi.PaddedPieceSize = math.MaxUint64 - var best abi.SectorNumber = math.MaxUint64 - - for sn, info := range m.unsealedInfoMap.infos { - if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 - best = sn - } - } - - if best == math.MaxUint64 { - // probably not possible, but who knows - break - } - - m.unsealedInfoMap.lk.Unlock() - if err := m.StartPacking(best); err != nil { - log.Errorf("newDealSector StartPacking error: %+v", err) - continue // let's pretend this is fine - } - m.unsealedInfoMap.lk.Lock() } + + if best != math.MaxUint64 { + m.unsealedInfoMap.lk.Unlock() + err := m.StartPacking(best) + m.unsealedInfoMap.lk.Lock() + + if err != nil { + log.Errorf("newDealSector StartPacking error: %+v", err) + // let's pretend this is fine + } + } + + return 0, 0, errTooManySealing // will wait a bit and retry } spt, err := m.currentSealProof(ctx)