From c02c69a8d6e59396ecdd8f5f09c71d1e43dfa14d Mon Sep 17 00:00:00 2001 From: Ingar Shu Date: Thu, 20 Aug 2020 09:16:18 -0700 Subject: [PATCH] Check deal id when emitting events Make sure to unsubscribe from retrieval events --- node/impl/client/client.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 3b992fe54..cbdbf9cb7 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -463,8 +463,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref retrievalResult := make(chan error, 1) + var dealId retrievalmarket.DealID + unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { - if state.PayloadCID.Equals(order.Root) { + if state.PayloadCID.Equals(order.Root) && state.ID == dealId { events <- marketevents.RetrievalEvent{ Event: event, @@ -504,7 +506,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref _ = a.RetrievalStoreMgr.ReleaseStore(store) }() - _, err = a.Retrieval.Retrieve( + dealId, err = a.Retrieval.Retrieve( ctx, order.Root, params, @@ -521,17 +523,17 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref select { case <-ctx.Done(): + unsubscribe() finish(xerrors.New("Retrieval Timed Out")) return case err := <-retrievalResult: + unsubscribe() if err != nil { finish(xerrors.Errorf("Retrieve: %w", err)) return } } - unsubscribe() - // If ref is nil, it only fetches the data into the configured blockstore. if ref == nil { finish(nil)