storagefsm: Add rest of checks in WaitDeals
This commit is contained in:
parent
069766ecc4
commit
f96f12c836
40
extern/storage-sealing/input.go
vendored
40
extern/storage-sealing/input.go
vendored
@ -19,9 +19,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
// if full / oldish / has oldish deals goto seal
|
|
||||||
// ^ also per sector deal limit
|
|
||||||
|
|
||||||
m.inputLk.Lock()
|
m.inputLk.Lock()
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
@ -35,12 +32,28 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
maxDeals, err := getDealPerSectorLimit(sector.SectorType)
|
ssize, err := sector.SectorType.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting sector size")
|
||||||
|
}
|
||||||
|
|
||||||
|
maxDeals, err := getDealPerSectorLimit(ssize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting per-sector deal limit: %w", err)
|
return xerrors.Errorf("getting per-sector deal limit: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(sector.dealIDs()) >= maxDeals {
|
if len(sector.dealIDs()) >= maxDeals {
|
||||||
|
// can't accept more deals
|
||||||
|
return ctx.Send(SectorStartPacking{})
|
||||||
|
}
|
||||||
|
|
||||||
|
var used abi.UnpaddedPieceSize
|
||||||
|
for _, piece := range sector.Pieces {
|
||||||
|
used += piece.Piece.Size.Unpadded()
|
||||||
|
}
|
||||||
|
|
||||||
|
if used.Padded() == abi.PaddedPieceSize(ssize) {
|
||||||
|
// sector full
|
||||||
return ctx.Send(SectorStartPacking{})
|
return ctx.Send(SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +64,9 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
return xerrors.Errorf("getting storage config: %w", err)
|
return xerrors.Errorf("getting storage config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo check deal age, start sealing if any deal has less than X (configurable) to start deadline
|
||||||
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
|
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
|
||||||
|
|
||||||
if now.After(sealTime) {
|
if now.After(sealTime) {
|
||||||
m.inputLk.Unlock()
|
m.inputLk.Unlock()
|
||||||
return ctx.Send(SectorStartPacking{})
|
return ctx.Send(SectorStartPacking{})
|
||||||
@ -64,17 +79,10 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
var used abi.UnpaddedPieceSize
|
|
||||||
for _, piece := range sector.Pieces {
|
|
||||||
used += piece.Piece.Size.Unpadded()
|
|
||||||
}
|
|
||||||
|
|
||||||
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
|
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
|
||||||
used: used,
|
used: used,
|
||||||
maybeAccept: func(cid cid.Cid) error {
|
maybeAccept: func(cid cid.Cid) error {
|
||||||
// todo double check space
|
// todo check deal start deadline (configurable)
|
||||||
|
|
||||||
// todo check deal expiration
|
|
||||||
|
|
||||||
sid := m.minerSectorID(sector.SectorNumber)
|
sid := m.minerSectorID(sector.SectorNumber)
|
||||||
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
|
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
|
||||||
@ -120,7 +128,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
offset += p.Piece.Size.Unpadded()
|
offset += p.Piece.Size.Unpadded()
|
||||||
}
|
}
|
||||||
|
|
||||||
maxDeals, err := getDealPerSectorLimit(sector.SectorType)
|
maxDeals, err := getDealPerSectorLimit(ssize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting per-sector deal limit: %w", err)
|
return xerrors.Errorf("getting per-sector deal limit: %w", err)
|
||||||
}
|
}
|
||||||
@ -134,7 +142,7 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(sector.dealIDs())+(i+1) > maxDeals {
|
if len(sector.dealIDs())+(i+1) > maxDeals {
|
||||||
// shouldn't happen, but just in case
|
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
|
||||||
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber))
|
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -142,7 +150,9 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
|
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
|
||||||
|
|
||||||
if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
|
if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
|
||||||
return xerrors.Errorf("piece assigned to a sector with not enough space")
|
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
|
||||||
|
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += padLength.Unpadded()
|
offset += padLength.Unpadded()
|
||||||
|
7
extern/storage-sealing/sealing.go
vendored
7
extern/storage-sealing/sealing.go
vendored
@ -243,12 +243,7 @@ func (m *Sealing) Address() address.Address {
|
|||||||
return m.maddr
|
return m.maddr
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDealPerSectorLimit(spt abi.RegisteredSealProof) (int, error) {
|
func getDealPerSectorLimit(size abi.SectorSize) (int, error) {
|
||||||
size, err := spt.SectorSize()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if size < 64<<30 {
|
if size < 64<<30 {
|
||||||
return 256, nil
|
return 256, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user