diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 13dc40f0f..184a1e9e3 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -311,28 +311,25 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec } m.inputLk.Lock() - if _, exist := m.pendingPieces[proposalCID(deal)]; exist { + if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) + select { + case res := <-pp.resCh: + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err + case <-ctx.Done(): + return api.SectorOffset{}, ctx.Err() + } } - resCh := make(chan struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }, 1) - + resCh := make(chan *pieceAcceptResp, 1) m.pendingPieces[proposalCID(deal)] = &pendingPiece{ + resCh: resCh, size: size, deal: deal, data: data, assigned: false, accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { - resCh <- struct { - sn abi.SectorNumber - offset abi.UnpaddedPieceSize - err error - }{sn: sn, offset: offset, err: err} + resCh <- &pieceAcceptResp{sn, offset, err} }, } @@ -344,7 +341,6 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec }() res := <-resCh - return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 81f6b38e9..66ad6ca8e 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -140,7 +140,15 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi return expiration >= dealEnd, nil } +type pieceAcceptResp struct { + sn abi.SectorNumber + offset abi.UnpaddedPieceSize + err error +} + type pendingPiece struct { + resCh chan *pieceAcceptResp + size abi.UnpaddedPieceSize deal api.PieceDealInfo