diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 13dc40f0f..c999badfd 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -311,41 +311,68 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec } m.inputLk.Lock() - if _, exist := m.pendingPieces[proposalCID(deal)]; exist { + if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) + + // we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful + for { + res, err := waitAddPieceResp(ctx, pp) + if err != nil { + return api.SectorOffset{}, err + } + // there was an error waiting for a pre-existing add piece call, let's retry + if res.err != nil { + m.inputLk.Lock() + pp = m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() + continue + } + // all good, return the response + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err + } } - resCh := make(chan struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }, 1) + pp := m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() + res, err := waitAddPieceResp(ctx, pp) + if err != nil { + return api.SectorOffset{}, err + } + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err +} - m.pendingPieces[proposalCID(deal)] = &pendingPiece{ +func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { + doneCh := make(chan struct{}) + pp := &pendingPiece{ + doneCh: doneCh, size: size, deal: deal, data: data, assigned: false, - accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { - resCh <- struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }{sn: sn, offset: offset, err: err} - }, + } + pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { + pp.resp = &pieceAcceptResp{sn, offset, err} + close(pp.doneCh) } + m.pendingPieces[proposalCID(deal)] = pp go func() { - defer m.inputLk.Unlock() if err := m.updateInput(ctx, sp); err != nil { log.Errorf("%+v", err) } }() - res := <-resCh + return pp +} - return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err +func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) { + select { + case <-pp.doneCh: + res := pp.resp + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } } func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error { diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 81f6b38e9..907d7cdfd 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -140,7 +140,16 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi return expiration >= dealEnd, nil } +type pieceAcceptResp struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error +} + type pendingPiece struct { + doneCh chan struct{} + resp *pieceAcceptResp + size abi.UnpaddedPieceSize deal api.PieceDealInfo