storagefsm: Improve new deal sector logic
This commit is contained in:
parent
25070314ce
commit
6166204bac
96
extern/storage-sealing/sealing.go
vendored
96
extern/storage-sealing/sealing.go
vendored
@ -283,29 +283,47 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
|||||||
|
|
||||||
// Caller should hold m.unsealedInfoMap.lk
|
// Caller should hold m.unsealedInfoMap.lk
|
||||||
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
||||||
for k, v := range m.unsealedInfoMap.infos {
|
for tries := 0; tries < 100; tries++ {
|
||||||
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
|
for k, v := range m.unsealedInfoMap.infos {
|
||||||
|
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
|
||||||
|
|
||||||
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
|
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
|
||||||
return k, pads, nil
|
return k, pads, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(m.unsealedInfoMap.infos) > 0 {
|
||||||
|
log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries)
|
||||||
|
}
|
||||||
|
|
||||||
|
ns, ssize, err := m.newDealSector(ctx)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
|
||||||
|
numDeals: 0,
|
||||||
|
stored: 0,
|
||||||
|
pieceSizes: nil,
|
||||||
|
ssize: ssize,
|
||||||
|
}
|
||||||
|
case errTooManySealing:
|
||||||
|
log.Infow("")
|
||||||
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
m.unsealedInfoMap.lk.Lock()
|
||||||
|
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
return 0, nil, xerrors.Errorf("creating new sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, ssize, err := m.newDealSector(ctx)
|
return 0, nil, xerrors.Errorf("failed to allocate piece to a sector")
|
||||||
if err != nil {
|
|
||||||
return 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
|
|
||||||
numDeals: 0,
|
|
||||||
stored: 0,
|
|
||||||
pieceSizes: nil,
|
|
||||||
ssize: ssize,
|
|
||||||
}
|
|
||||||
|
|
||||||
return ns, nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var errTooManySealing = errors.New("too many sectors sealing")
|
||||||
|
|
||||||
// newDealSector creates a new sector for deal storage
|
// newDealSector creates a new sector for deal storage
|
||||||
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
|
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
|
||||||
// First make sure we don't have too many 'open' sectors
|
// First make sure we don't have too many 'open' sectors
|
||||||
@ -322,25 +340,14 @@ func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.MaxWaitDealsSectors > 0 {
|
if cfg.MaxWaitDealsSectors > 0 {
|
||||||
// run in a loop because we have to drop the map lock here for a bit
|
// Too many sectors are sealing in parallel. Start sealing one, and retry
|
||||||
tries := 0
|
// allocating the piece to a sector (we're dropping the lock here, so in
|
||||||
|
// case other goroutines are also trying to create a sector, we retry in
|
||||||
|
// getSectorAndPadding instead of here - otherwise if we have lots of
|
||||||
|
// parallel deals in progress, we can start creating a ton of sectors
|
||||||
|
// with just a single deal in them)
|
||||||
|
if uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
|
||||||
|
|
||||||
// we have to run in a loop as we're dropping unsealedInfoMap.lk
|
|
||||||
// to actually call StartPacking. When we do that, another entry can
|
|
||||||
// get added to unsealedInfoMap.
|
|
||||||
for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
|
|
||||||
if tries > 10 {
|
|
||||||
// whatever...
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if tries > 0 {
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
}
|
|
||||||
|
|
||||||
tries++
|
|
||||||
var mostStored abi.PaddedPieceSize = math.MaxUint64
|
var mostStored abi.PaddedPieceSize = math.MaxUint64
|
||||||
var best abi.SectorNumber = math.MaxUint64
|
var best abi.SectorNumber = math.MaxUint64
|
||||||
|
|
||||||
@ -350,17 +357,18 @@ func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if best == math.MaxUint64 {
|
if best != math.MaxUint64 {
|
||||||
// probably not possible, but who knows
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
break
|
err := m.StartPacking(best)
|
||||||
|
m.unsealedInfoMap.lk.Lock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("newDealSector StartPacking error: %+v", err)
|
||||||
|
// let's pretend this is fine
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
return 0, 0, errTooManySealing // will wait a bit and retry
|
||||||
if err := m.StartPacking(best); err != nil {
|
|
||||||
log.Errorf("newDealSector StartPacking error: %+v", err)
|
|
||||||
continue // let's pretend this is fine
|
|
||||||
}
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user