storagefsm: cleanup openSectors better; pendingPieces by pieceCid
This commit is contained in:
parent
e92b8b24b1
commit
e27a530cbc
28
extern/storage-sealing/input.go
vendored
28
extern/storage-sealing/input.go
vendored
@ -28,6 +28,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
||||
|
||||
started, err := m.maybeStartSealing(ctx, sector, used)
|
||||
if err != nil || started {
|
||||
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
||||
|
||||
m.inputLk.Unlock()
|
||||
|
||||
return err
|
||||
@ -243,14 +245,14 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
||||
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
||||
}
|
||||
|
||||
if deal.PublishCid == nil {
|
||||
return 0, 0, xerrors.Errorf("piece must have a PublishCID")
|
||||
if _, err := deal.DealProposal.Cid(); err != nil {
|
||||
return 0, 0, xerrors.Errorf("getting proposal CID: %w", err)
|
||||
}
|
||||
|
||||
m.inputLk.Lock()
|
||||
if _, exist := m.pendingPieces[*deal.PublishCid]; exist {
|
||||
if _, exist := m.pendingPieces[proposalCID(deal)]; exist {
|
||||
m.inputLk.Unlock()
|
||||
return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid)
|
||||
return 0, 0, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal))
|
||||
}
|
||||
|
||||
resCh := make(chan struct {
|
||||
@ -259,7 +261,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
||||
err error
|
||||
}, 1)
|
||||
|
||||
m.pendingPieces[*deal.PublishCid] = &pendingPiece{
|
||||
m.pendingPieces[proposalCID(deal)] = &pendingPiece{
|
||||
size: size,
|
||||
deal: deal,
|
||||
data: data,
|
||||
@ -305,12 +307,12 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
|
||||
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
|
||||
// (unlikely to be a problem now)
|
||||
for pieceCid, piece := range m.pendingPieces {
|
||||
for proposalCid, piece := range m.pendingPieces {
|
||||
if piece.assigned {
|
||||
continue // already assigned to a sector, skip
|
||||
}
|
||||
|
||||
toAssign[pieceCid] = struct{}{}
|
||||
toAssign[proposalCid] = struct{}{}
|
||||
|
||||
for id, sector := range m.openSectors {
|
||||
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
|
||||
@ -318,7 +320,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
|
||||
matches = append(matches, match{
|
||||
sector: id,
|
||||
deal: pieceCid,
|
||||
deal: proposalCid,
|
||||
|
||||
size: piece.size,
|
||||
padding: avail % piece.size,
|
||||
@ -410,3 +412,13 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
|
||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||
}
|
||||
|
||||
func proposalCID(deal DealInfo) cid.Cid {
|
||||
pc, err := deal.DealProposal.Cid()
|
||||
if err != nil {
|
||||
log.Errorf("DealProposal.Cid error: %+v", err)
|
||||
return cid.Undef
|
||||
}
|
||||
|
||||
return pc
|
||||
}
|
||||
|
18
extern/storage-sealing/states_sealing.go
vendored
18
extern/storage-sealing/states_sealing.go
vendored
@ -25,6 +25,24 @@ var DealSectorPriority = 1024
|
||||
var MaxTicketAge = abi.ChainEpoch(builtin0.EpochsInDay * 2)
|
||||
|
||||
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
||||
m.inputLk.Lock()
|
||||
// make sure we not accepting deals into this sector
|
||||
for _, c := range m.assignedPieces[m.minerSectorID(sector.SectorNumber)] {
|
||||
pp := m.pendingPieces[c]
|
||||
delete(m.pendingPieces, c)
|
||||
if pp == nil {
|
||||
log.Errorf("nil assigned pending piece %s", c)
|
||||
continue
|
||||
}
|
||||
|
||||
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
||||
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early"))
|
||||
}
|
||||
|
||||
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
||||
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
|
||||
m.inputLk.Unlock()
|
||||
|
||||
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
|
||||
|
||||
var allocated abi.UnpaddedPieceSize
|
||||
|
Loading…
Reference in New Issue
Block a user