diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 29eefd71f..9cb00da06 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -432,6 +432,45 @@ func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalO return events, nil } +type retrievalSubscribeEvent struct { + event rm.ClientEvent + state rm.ClientDealState +} + +func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error { + for { + var subscribeEvent retrievalSubscribeEvent + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case subscribeEvent = <-subscribeEvents: + } + + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case events <- marketevents.RetrievalEvent{ + Event: subscribeEvent.event, + Status: subscribeEvent.state.Status, + BytesReceived: subscribeEvent.state.TotalReceived, + FundsSpent: subscribeEvent.state.FundsSpent, + }: + } + + state := subscribeEvent.state + switch state.Status { + case rm.DealStatusCompleted: + return nil + case rm.DealStatusRejected: + return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) + case + rm.DealStatusDealNotFound, + rm.DealStatusErrored: + return xerrors.Errorf("Retrieval Error: %s", state.Message) + } + } +} + func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { defer close(events) @@ -467,33 +506,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return err }*/ - retrievalResult := make(chan error, 1) - - var dealId retrievalmarket.DealID - + var dealID retrievalmarket.DealID + subscribeEvents := make(chan retrievalSubscribeEvent, 1) + subscribeCtx, cancel := context.WithCancel(ctx) + defer cancel() unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { - if state.PayloadCID.Equals(order.Root) && state.ID == dealId { - + if state.PayloadCID.Equals(order.Root) && state.ID == dealID { select { - case <-ctx.Done(): - return - case events <- marketevents.RetrievalEvent{ - Event: event, - Status: state.Status, - BytesReceived: state.TotalReceived, - FundsSpent: state.FundsSpent, - }: - } - - switch state.Status { - case rm.DealStatusCompleted: - retrievalResult <- nil - case rm.DealStatusRejected: - retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) - case - rm.DealStatusDealNotFound, - rm.DealStatusErrored: - retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message) + case <-subscribeCtx.Done(): + case subscribeEvents <- retrievalSubscribeEvent{event, state}: } } }) @@ -516,7 +537,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref _ = a.RetrievalStoreMgr.ReleaseStore(store) }() - dealId, err = a.Retrieval.Retrieve( + dealID, err = a.Retrieval.Retrieve( ctx, order.Root, params, @@ -531,17 +552,12 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - select { - case <-ctx.Done(): - unsubscribe() - finish(xerrors.New("Retrieval Timed Out")) + err = readSubscribeEvents(ctx, subscribeEvents, events) + + unsubscribe() + if err != nil { + finish(xerrors.Errorf("Retrieve: %w", err)) return - case err := <-retrievalResult: - unsubscribe() - if err != nil { - finish(xerrors.Errorf("Retrieve: %w", err)) - return - } } // If ref is nil, it only fetches the data into the configured blockstore.