Merge pull request #3142 from filecoin-project/asr/client-retrieve

Create eventless version of ClientRetrieve
This commit is contained in:
Łukasz Magiera 2020-08-18 18:15:16 +02:00 committed by GitHub
commit 255bd6881a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 20 deletions

View File

@ -244,7 +244,10 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error)
// ClientRetrieve initiates the retrieval of a file, as specified in the order. // ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner. // ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
// ClientCalcCommP calculates the CommP for a specified file // ClientCalcCommP calculates the CommP for a specified file

View File

@ -127,20 +127,21 @@ type FullNodeStruct struct {
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
WalletDelete func(context.Context, address.Address) error `perm:"write"` WalletDelete func(context.Context, address.Address) error `perm:"write"`
ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -425,10 +426,14 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx) return c.Internal.ClientListDeals(ctx)
} }
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
return c.Internal.ClientRetrieve(ctx, order, ref) return c.Internal.ClientRetrieve(ctx, order, ref)
} }
func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return c.Internal.ClientRetrieveWithEvents(ctx, order, ref)
}
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) { func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner) return c.Internal.ClientQueryAsk(ctx, p, miner)
} }

View File

@ -398,7 +398,7 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu
Path: filepath.Join(rpath, "ret"), Path: filepath.Join(rpath, "ret"),
IsCAR: carExport, IsCAR: carExport,
} }
updates, err := client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) updates, err := client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref)
for update := range updates { for update := range updates {
if update.Err != "" { if update.Err != "" {
t.Fatalf("%v", err) t.Fatalf("%v", err)

View File

@ -847,7 +847,7 @@ var clientRetrieveCmd = &cli.Command{
Path: cctx.Args().Get(1), Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"), IsCAR: cctx.Bool("car"),
} }
updates, err := fapi.ClientRetrieve(ctx, offer.Order(payer), ref) updates, err := fapi.ClientRetrieveWithEvents(ctx, offer.Order(payer), ref)
if err != nil { if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err) return xerrors.Errorf("error setting up retrieval: %w", err)
} }
@ -868,7 +868,7 @@ var clientRetrieveCmd = &cli.Command{
} }
if evt.Err != "" { if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %v", err) return xerrors.Errorf("retrieval failed: %s", evt.Err)
} }
case <-ctx.Done(): case <-ctx.Done():
return xerrors.Errorf("retrieval timed out") return xerrors.Errorf("retrieval timed out")

View File

@ -400,7 +400,27 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
return out, nil return out, nil
} }
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent) events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events) go a.clientRetrieve(ctx, order, ref, events)
return events, nil return events, nil