diff --git a/api/api_full.go b/api/api_full.go index 2a1d830ed..6feeb12cf 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -244,7 +244,10 @@ type FullNode interface { // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) // ClientRetrieve initiates the retrieval of a file, as specified in the order. - ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) + ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error + // ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel + // of status updates. + ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) // ClientQueryAsk returns a signed StorageAsk from the specified miner. ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) // ClientCalcCommP calculates the CommP for a specified file diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index eabf4fa26..8fb8e68ed 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -127,20 +127,21 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` - ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` - ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` - ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` - ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` - ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` - ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` - ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` - ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` - ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` - ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` - ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` - ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` + ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` + ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` + ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` + ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` + ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` + ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` + ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` + ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` + ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` + ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` + ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` + ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` + ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -425,10 +426,14 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e return c.Internal.ClientListDeals(ctx) } -func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { +func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { return c.Internal.ClientRetrieve(ctx, order, ref) } +func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { + return c.Internal.ClientRetrieveWithEvents(ctx, order, ref) +} + func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) { return c.Internal.ClientQueryAsk(ctx, p, miner) } diff --git a/api/test/deals.go b/api/test/deals.go index f30b5b8ac..054f26b8a 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -398,7 +398,7 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu Path: filepath.Join(rpath, "ret"), IsCAR: carExport, } - updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) + updates, err := client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref) for update := range updates { if update.Err != "" { t.Fatalf("%v", err) diff --git a/cli/client.go b/cli/client.go index c0d5972b0..777073292 100644 --- a/cli/client.go +++ b/cli/client.go @@ -847,7 +847,7 @@ var clientRetrieveCmd = &cli.Command{ Path: cctx.Args().Get(1), IsCAR: cctx.Bool("car"), } - updates, err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref) + updates, err := fapi.ClientRetrieveWithEvents(ctx, offer.Order(payer), ref) if err != nil { return xerrors.Errorf("error setting up retrieval: %w", err) } @@ -868,7 +868,7 @@ var clientRetrieveCmd = &cli.Command{ } if evt.Err != "" { - return xerrors.Errorf("retrieval failed: %v", err) + return xerrors.Errorf("retrieval failed: %s", evt.Err) } case <-ctx.Done(): return xerrors.Errorf("retrieval timed out") diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 70cd770f4..3b992fe54 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -400,7 +400,27 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { return out, nil } -func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { +func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { + events := make(chan marketevents.RetrievalEvent) + go a.clientRetrieve(ctx, order, ref, events) + + for { + select { + case evt, ok := <-events: + if !ok { // done successfully + return nil + } + + if evt.Err != "" { + return xerrors.Errorf("retrieval failed: %s", evt.Err) + } + case <-ctx.Done(): + return xerrors.Errorf("retrieval timed out") + } + } +} + +func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) return events, nil