fix a race when retrieving pieces
We'd read the deal ID without synchronizing. This could (and probably did given the history of flaky deal tests) cause us to miss events. This patch also makes sure to always unsubscribe from events, even on error.
This commit is contained in:
parent
09bff14d85
commit
283fd054e8
@ -462,13 +462,19 @@ type retrievalSubscribeEvent struct {
|
||||
state rm.ClientDealState
|
||||
}
|
||||
|
||||
func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
||||
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
|
||||
for {
|
||||
var subscribeEvent retrievalSubscribeEvent
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return xerrors.New("Retrieval Timed Out")
|
||||
case subscribeEvent = <-subscribeEvents:
|
||||
if subscribeEvent.state.ID != dealID {
|
||||
// we can't check the deal ID ahead of time because:
|
||||
// 1. We need to subscribe before retrieving.
|
||||
// 2. We won't know the deal ID until after retrieving.
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
@ -531,19 +537,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
return err
|
||||
}*/
|
||||
|
||||
var dealID retrievalmarket.DealID
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
if state.PayloadCID.Equals(order.Root) && state.ID == dealID {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
|
||||
|
||||
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
|
||||
@ -562,7 +555,21 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
_ = a.RetrievalStoreMgr.ReleaseStore(store)
|
||||
}()
|
||||
|
||||
dealID, err = a.Retrieval.Retrieve(
|
||||
// Subscribe to events before retrieving to avoid losing events.
|
||||
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
|
||||
subscribeCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
// We'll check the deal IDs inside readSubscribeEvents.
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
select {
|
||||
case <-subscribeCtx.Done():
|
||||
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
dealID, err := a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
order.Root,
|
||||
params,
|
||||
@ -573,11 +580,12 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
store.StoreID())
|
||||
|
||||
if err != nil {
|
||||
unsubscribe()
|
||||
finish(xerrors.Errorf("Retrieve failed: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
err = readSubscribeEvents(ctx, subscribeEvents, events)
|
||||
err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)
|
||||
|
||||
unsubscribe()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user