make add piece idempotent

This commit is contained in:
Aarsh Shah 2022-02-21 11:51:25 +04:00
parent 214c32d2ef
commit 26707a5e03
2 changed files with 18 additions and 14 deletions

View File

@ -311,28 +311,25 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
} }
m.inputLk.Lock() m.inputLk.Lock()
if _, exist := m.pendingPieces[proposalCID(deal)]; exist { if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock() m.inputLk.Unlock()
return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal)) select {
case res := <-pp.resCh:
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
case <-ctx.Done():
return api.SectorOffset{}, ctx.Err()
}
} }
resCh := make(chan struct { resCh := make(chan *pieceAcceptResp, 1)
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
m.pendingPieces[proposalCID(deal)] = &pendingPiece{ m.pendingPieces[proposalCID(deal)] = &pendingPiece{
resCh: resCh,
size: size, size: size,
deal: deal, deal: deal,
data: data, data: data,
assigned: false, assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct { resCh <- &pieceAcceptResp{sn, offset, err}
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}{sn: sn, offset: offset, err: err}
}, },
} }
@ -344,7 +341,6 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
}() }()
res := <-resCh res := <-resCh
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
} }

View File

@ -140,7 +140,15 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi
return expiration >= dealEnd, nil return expiration >= dealEnd, nil
} }
type pieceAcceptResp struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}
type pendingPiece struct { type pendingPiece struct {
resCh chan *pieceAcceptResp
size abi.UnpaddedPieceSize size abi.UnpaddedPieceSize
deal api.PieceDealInfo deal api.PieceDealInfo