Merge pull request #4280 from filecoin-project/fix/market-deal-race
fix a race when retrieving pieces
This commit is contained in:
commit
83ba7ec8cc
@ -462,13 +462,19 @@ type retrievalSubscribeEvent struct {
|
||||
state rm.ClientDealState
|
||||
}
|
||||
|
||||
func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
||||
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
||||
for {
|
||||
var subscribeEvent retrievalSubscribeEvent
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return xerrors.New("Retrieval Timed Out")
|
||||
case subscribeEvent = <-subscribeEvents:
|
||||
if subscribeEvent.state.ID != dealID {
|
||||
// we can't check the deal ID ahead of time because:
|
||||
// 1. We need to subscribe before retrieving.
|
||||
// 2. We won't know the deal ID until after retrieving.
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
@ -531,19 +537,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
return err
|
||||
}*/
|
||||
|
||||
var dealID retrievalmarket.DealID
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
if state.PayloadCID.Equals(order.Root) && state.ID == dealID {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
||||
|
||||
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
|
||||
@ -562,7 +555,21 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
_ = a.RetrievalStoreMgr.ReleaseStore(store)
|
||||
}()
|
||||
|
||||
dealID, err = a.Retrieval.Retrieve(
|
||||
// Subscribe to events before retrieving to avoid losing events.
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
// We'll check the deal IDs inside readSubscribeEvents.
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
dealID, err := a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
order.Root,
|
||||
params,
|
||||
@ -573,11 +580,12 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
store.StoreID())
|
||||
|
||||
if err != nil {
|
||||
unsubscribe()
|
||||
finish(xerrors.Errorf("Retrieve failed: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
err = readSubscribeEvents(ctx, subscribeEvents, events)
|
||||
err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)
|
||||
|
||||
unsubscribe()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user