Check deal id when emitting events

Make sure to unsubscribe from retrieval events
This commit is contained in:
Ingar Shu 2020-08-20 09:16:18 -07:00
parent 2570712a29
commit c02c69a8d6
No known key found for this signature in database
GPG Key ID: BE3D9CE79F22E769

View File

@ -463,8 +463,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
retrievalResult := make(chan error, 1) retrievalResult := make(chan error, 1)
var dealId retrievalmarket.DealID
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) { if state.PayloadCID.Equals(order.Root) && state.ID == dealId {
events <- marketevents.RetrievalEvent{ events <- marketevents.RetrievalEvent{
Event: event, Event: event,
@ -504,7 +506,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_ = a.RetrievalStoreMgr.ReleaseStore(store) _ = a.RetrievalStoreMgr.ReleaseStore(store)
}() }()
_, err = a.Retrieval.Retrieve( dealId, err = a.Retrieval.Retrieve(
ctx, ctx,
order.Root, order.Root,
params, params,
@ -521,17 +523,17 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
select { select {
case <-ctx.Done(): case <-ctx.Done():
unsubscribe()
finish(xerrors.New("Retrieval Timed Out")) finish(xerrors.New("Retrieval Timed Out"))
return return
case err := <-retrievalResult: case err := <-retrievalResult:
unsubscribe()
if err != nil { if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err)) finish(xerrors.Errorf("Retrieve: %w", err))
return return
} }
} }
unsubscribe()
// If ref is nil, it only fetches the data into the configured blockstore. // If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil { if ref == nil {
finish(nil) finish(nil)