Merge pull request #3198 from filecoin-project/fix/retrieval-streaming-cleanup
Retrieval CLI fixes
This commit is contained in:
commit
9bc48a8867
@ -469,8 +469,10 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
|
||||
retrievalResult := make(chan error, 1)
|
||||
|
||||
var dealId retrievalmarket.DealID
|
||||
|
||||
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
|
||||
if state.PayloadCID.Equals(order.Root) {
|
||||
if state.PayloadCID.Equals(order.Root) && state.ID == dealId {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -514,7 +516,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
_ = a.RetrievalStoreMgr.ReleaseStore(store)
|
||||
}()
|
||||
|
||||
_, err = a.Retrieval.Retrieve(
|
||||
dealId, err = a.Retrieval.Retrieve(
|
||||
ctx,
|
||||
order.Root,
|
||||
params,
|
||||
@ -531,17 +533,17 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
unsubscribe()
|
||||
finish(xerrors.New("Retrieval Timed Out"))
|
||||
return
|
||||
case err := <-retrievalResult:
|
||||
unsubscribe()
|
||||
if err != nil {
|
||||
finish(xerrors.Errorf("Retrieve: %w", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
unsubscribe()
|
||||
|
||||
// If ref is nil, it only fetches the data into the configured blockstore.
|
||||
if ref == nil {
|
||||
finish(nil)
|
||||
|
Loading…
Reference in New Issue
Block a user