Add piece cid to retrieval queries

This commit is contained in:
Ingar Shu 2020-07-09 09:29:57 -07:00
parent 18199ff323
commit 171ce39e8c
No known key found for this signature in database
GPG Key ID: BE3D9CE79F22E769
5 changed files with 55 additions and 18 deletions

View File

@ -203,7 +203,7 @@ type FullNode interface {
// ClientHasLocal indicates whether a certain CID is locally stored.
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
// ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer).
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error)
ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]QueryOffer, error)
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, root cid.Cid, miner address.Address) (QueryOffer, error)
// ClientRetrieve initiates the retrieval of a file, as specified in the order.
@ -461,6 +461,7 @@ type QueryOffer struct {
Err string
Root cid.Cid
Piece *cid.Cid
Size uint64
MinPrice types.BigInt
@ -473,6 +474,7 @@ type QueryOffer struct {
func (o *QueryOffer) Order(client address.Address) RetrievalOrder {
return RetrievalOrder{
Root: o.Root,
Piece: o.Piece,
Size: o.Size,
Total: o.MinPrice,
PaymentInterval: o.PaymentInterval,
@ -497,6 +499,7 @@ type MarketDeal struct {
type RetrievalOrder struct {
// TODO: make this less unixfs specific
Root cid.Cid
Piece *cid.Cid
Size uint64
// TODO: support offset
Total types.BigInt

View File

@ -115,7 +115,7 @@ type FullNodeStruct struct {
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientRemoveImport func(ctx context.Context, importID int64) error `perm:"admin"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientMinerQueryOffer func(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
@ -358,8 +358,8 @@ func (c *FullNodeStruct) ClientHasLocal(ctx context.Context, root cid.Cid) (bool
return c.Internal.ClientHasLocal(ctx, root)
}
func (c *FullNodeStruct) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
return c.Internal.ClientFindData(ctx, root)
func (c *FullNodeStruct) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
return c.Internal.ClientFindData(ctx, root, piece)
}
func (c *FullNodeStruct) ClientMinerQueryOffer(ctx context.Context, root cid.Cid, miner address.Address) (api.QueryOffer, error) {

View File

@ -132,8 +132,10 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
waitDealSealed(t, ctx, miner, client, deal)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal)
require.NoError(t, err)
testRetrieval(t, ctx, err, client, fcid, carExport, data)
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data)
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid {
@ -199,8 +201,8 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
}
}
func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.FullNodeAPI, fcid cid.Cid, carExport bool, data []byte) {
offers, err := client.ClientFindData(ctx, fcid)
func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.FullNodeAPI, fcid cid.Cid, piece *cid.Cid, carExport bool, data []byte) {
offers, err := client.ClientFindData(ctx, fcid, piece)
if err != nil {
t.Fatal(err)
}

View File

@ -424,6 +424,12 @@ var clientFindCmd = &cli.Command{
Name: "find",
Usage: "find data in the network",
ArgsUsage: "[dataCid]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID",
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("Usage: find [CID]")
@ -453,7 +459,16 @@ var clientFindCmd = &cli.Command{
fmt.Println("LOCAL")
}
offers, err := api.ClientFindData(ctx, file)
var pieceCid *cid.Cid
if cctx.String("pieceCid") != "" {
parsed, err := cid.Parse(cctx.String("pieceCid"))
if err != nil {
return err
}
pieceCid = &parsed
}
offers, err := api.ClientFindData(ctx, file, pieceCid)
if err != nil {
return err
}
@ -487,6 +502,10 @@ var clientRetrieveCmd = &cli.Command{
Name: "miner",
Usage: "miner address for retrieval, if not present it'll use local discovery",
},
&cli.StringFlag{
Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
@ -528,10 +547,19 @@ var clientRetrieveCmd = &cli.Command{
return nil
}*/ // TODO: fix
var pieceCid *cid.Cid
if cctx.String("pieceCid") != "" {
parsed, err := cid.Parse(cctx.String("pieceCid"))
if err != nil {
return err
}
pieceCid = &parsed
}
var offer api.QueryOffer
minerStrAddr := cctx.String("miner")
if minerStrAddr == "" { // Local discovery
offers, err := fapi.ClientFindData(ctx, file)
offers, err := fapi.ClientFindData(ctx, file, pieceCid)
if err != nil {
return err
}

View File

@ -211,15 +211,18 @@ func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
return true, nil
}
func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) {
func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) {
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
}
out := make([]api.QueryOffer, len(peers))
for k, p := range peers {
out[k] = a.makeRetrievalQuery(ctx, p, root, rm.QueryParams{})
out := make([]api.QueryOffer, 0, len(peers))
for _, p := range peers {
if piece != nil && !piece.Equals(*p.PieceCID) {
continue
}
out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{}))
}
return out, nil
@ -234,10 +237,10 @@ func (a *API) ClientMinerQueryOffer(ctx context.Context, payload cid.Cid, miner
Address: miner,
ID: mi.PeerId,
}
return a.makeRetrievalQuery(ctx, rp, payload, rm.QueryParams{}), nil
return a.makeRetrievalQuery(ctx, rp, payload, nil, rm.QueryParams{}), nil
}
func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, qp rm.QueryParams) api.QueryOffer {
func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, piece *cid.Cid, qp rm.QueryParams) api.QueryOffer {
queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
if err != nil {
return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID}
@ -254,6 +257,7 @@ func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, paylo
return api.QueryOffer{
Root: payload,
Piece: piece,
Size: queryResponse.Size,
MinPrice: queryResponse.PieceRetrievalPrice(),
PaymentInterval: queryResponse.MaxPaymentInterval,