changes as per review

This commit is contained in:
Aarsh Shah 2022-02-22 14:22:51 +04:00
parent 8e62fec1d3
commit f0f2b3e1f4

View File

@ -313,15 +313,35 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.inputLk.Lock()
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock()
select {
case <-pp.doneCh:
res := pp.resp
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
case <-ctx.Done():
return api.SectorOffset{}, ctx.Err()
}
}
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
doneCh: doneCh,
@ -337,18 +357,21 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
return pp
}
func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) {
select {
case <-doneCh:
case <-pp.doneCh:
res := pp.resp
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
return res, nil
case <-ctx.Done():
return api.SectorOffset{}, ctx.Err()
return nil, ctx.Err()
}
}