idempotent add piece

This commit is contained in:
Aarsh Shah 2022-02-21 13:51:43 +04:00
parent 1245871004
commit c2522e8ca9
2 changed files with 15 additions and 9 deletions

View File

@ -314,25 +314,28 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock() m.inputLk.Unlock()
select { select {
case res := <-pp.resCh: case <-pp.doneCh:
res := pp.resp.Load().(*pieceAcceptResp)
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
case <-ctx.Done(): case <-ctx.Done():
return api.SectorOffset{}, ctx.Err() return api.SectorOffset{}, ctx.Err()
} }
} }
resCh := make(chan *pieceAcceptResp, 1) doneCh := make(chan struct{}, 1)
m.pendingPieces[proposalCID(deal)] = &pendingPiece{ pp := &pendingPiece{
resCh: resCh, doneCh: doneCh,
size: size, size: size,
deal: deal, deal: deal,
data: data, data: data,
assigned: false, assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { }
resCh <- &pieceAcceptResp{sn, offset, err} pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
}, pp.resp.Store(&pieceAcceptResp{sn, offset, err})
close(pp.doneCh)
} }
m.pendingPieces[proposalCID(deal)] = pp
go func() { go func() {
defer m.inputLk.Unlock() defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil { if err := m.updateInput(ctx, sp); err != nil {
@ -341,7 +344,8 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
}() }()
select { select {
case res := <-resCh: case <-doneCh:
res := pp.resp.Load().(*pieceAcceptResp)
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
case <-ctx.Done(): case <-ctx.Done():
return api.SectorOffset{}, ctx.Err() return api.SectorOffset{}, ctx.Err()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -147,7 +148,8 @@ type pieceAcceptResp struct {
} }
type pendingPiece struct { type pendingPiece struct {
resCh chan *pieceAcceptResp doneCh chan struct{}
resp atomic.Value
size abi.UnpaddedPieceSize size abi.UnpaddedPieceSize
deal api.PieceDealInfo deal api.PieceDealInfo