fix(retrieval): resolve retrieval close event panic

Refactor ClientRetrieve command to remove the possibility of a send on close channel race condition
This commit is contained in:
hannahhoward 2020-08-26 10:48:23 -07:00
parent 348dadf7ae
commit 10c1399474

View File

@ -432,6 +432,45 @@ func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalO
return events, nil
}
type retrievalSubscribeEvent struct {
event rm.ClientEvent
state rm.ClientDealState
}
func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: subscribeEvent.event,
Status: subscribeEvent.state.Status,
BytesReceived: subscribeEvent.state.TotalReceived,
FundsSpent: subscribeEvent.state.FundsSpent,
}:
}
state := subscribeEvent.state
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
@ -467,33 +506,13 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return err
}*/
retrievalResult := make(chan error, 1)
var dealId retrievalmarket.DealID
var dealID retrievalmarket.DealID
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) && state.ID == dealId {
if state.PayloadCID.Equals(order.Root) && state.ID == dealID {
select {
case <-ctx.Done():
return
case events <- marketevents.RetrievalEvent{
Event: event,
Status: state.Status,
BytesReceived: state.TotalReceived,
FundsSpent: state.FundsSpent,
}:
}
switch state.Status {
case rm.DealStatusCompleted:
retrievalResult <- nil
case rm.DealStatusRejected:
retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})
@ -516,7 +535,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_ = a.RetrievalStoreMgr.ReleaseStore(store)
}()
dealId, err = a.Retrieval.Retrieve(
dealID, err = a.Retrieval.Retrieve(
ctx,
order.Root,
params,
@ -531,17 +550,12 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
select {
case <-ctx.Done():
unsubscribe()
finish(xerrors.New("Retrieval Timed Out"))
err = readSubscribeEvents(ctx, subscribeEvents, events)
unsubscribe()
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
case err := <-retrievalResult:
unsubscribe()
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
}
// If ref is nil, it only fetches the data into the configured blockstore.