From f0f2b3e1f42d1f32db295e115ff5230013d99353 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 22 Feb 2022 14:22:51 +0400 Subject: [PATCH] changes as per review --- extern/storage-sealing/input.go | 41 +++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index 1ad85eb08..c999badfd 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -313,15 +313,35 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec m.inputLk.Lock() if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() - select { - case <-pp.doneCh: - res := pp.resp + + // we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful + for { + res, err := waitAddPieceResp(ctx, pp) + if err != nil { + return api.SectorOffset{}, err + } + // there was an error waiting for a pre-existing add piece call, let's retry + if res.err != nil { + m.inputLk.Lock() + pp = m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() + continue + } + // all good, return the response return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err - case <-ctx.Done(): - return api.SectorOffset{}, ctx.Err() } } + pp := m.addPendingPiece(ctx, size, data, deal, sp) + m.inputLk.Unlock() + res, err := waitAddPieceResp(ctx, pp) + if err != nil { + return api.SectorOffset{}, err + } + return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err +} + +func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { doneCh := make(chan struct{}) pp := &pendingPiece{ doneCh: doneCh, @@ -337,18 +357,21 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec m.pendingPieces[proposalCID(deal)] = pp go func() { - defer m.inputLk.Unlock() if err := m.updateInput(ctx, sp); err != nil { log.Errorf("%+v", err) } }() + return pp +} + +func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) { select { - case <-doneCh: + case <-pp.doneCh: res := pp.resp - return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err + return res, nil case <-ctx.Done(): - return api.SectorOffset{}, ctx.Err() + return nil, ctx.Err() } }