Merge pull request #8160 from filecoin-project/fix/addpiece-idempotent

feat: Make add piece idempotent
This commit is contained in:
Łukasz Magiera 2022-03-02 15:10:16 +00:00 committed by GitHub
commit 6d464c478e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 18 deletions

View File

@ -311,41 +311,68 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
}
m.inputLk.Lock()
if _, exist := m.pendingPieces[proposalCID(deal)]; exist {
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock()
return api.SectorOffset{}, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal))
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
}
resCh := make(chan struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
m.pendingPieces[proposalCID(deal)] = &pendingPiece{
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
doneCh: doneCh,
size: size,
deal: deal,
data: data,
assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}{sn: sn, offset: offset, err: err}
},
}
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
pp.resp = &pieceAcceptResp{sn, offset, err}
close(pp.doneCh)
}
m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
res := <-resCh
return pp
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) {
select {
case <-pp.doneCh:
res := pp.resp
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {

View File

@ -140,7 +140,16 @@ func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi
return expiration >= dealEnd, nil
}
type pieceAcceptResp struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}
type pendingPiece struct {
doneCh chan struct{}
resp *pieceAcceptResp
size abi.UnpaddedPieceSize
deal api.PieceDealInfo