Merge pull request #5917 from filecoin-project/feat/local-retrieval

Local retrieval support
This commit is contained in:
Łukasz Magiera 2021-04-02 13:07:00 +02:00 committed by GitHub
commit eede19fb0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 133 deletions

View File

@ -884,7 +884,7 @@ func (o *QueryOffer) Order(client address.Address) RetrievalOrder {
Client: client, Client: client,
Miner: o.Miner, Miner: o.Miner,
MinerPeer: o.MinerPeer, MinerPeer: &o.MinerPeer,
} }
} }
@ -903,6 +903,8 @@ type RetrievalOrder struct {
Root cid.Cid Root cid.Cid
Piece *cid.Cid Piece *cid.Cid
Size uint64 Size uint64
LocalStore *multistore.StoreID // if specified, get data from local store
// TODO: support offset // TODO: support offset
Total types.BigInt Total types.BigInt
UnsealPrice types.BigInt UnsealPrice types.BigInt
@ -910,7 +912,7 @@ type RetrievalOrder struct {
PaymentIntervalIncrease uint64 PaymentIntervalIncrease uint64
Client address.Address Client address.Address
Miner address.Address Miner address.Address
MinerPeer retrievalmarket.RetrievalPeer MinerPeer *retrievalmarket.RetrievalPeer
} }
type InvocResult struct { type InvocResult struct {

Binary file not shown.

View File

@ -1001,7 +1001,7 @@ var clientFindCmd = &cli.Command{
}, },
} }
const DefaultMaxRetrievePrice = 1 const DefaultMaxRetrievePrice = "0.01"
var clientRetrieveCmd = &cli.Command{ var clientRetrieveCmd = &cli.Command{
Name: "retrieve", Name: "retrieve",
@ -1022,12 +1022,15 @@ var clientRetrieveCmd = &cli.Command{
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "maxPrice", Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %d FIL)", DefaultMaxRetrievePrice), Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
}, },
&cli.StringFlag{ &cli.StringFlag{
Name: "pieceCid", Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID", Usage: "require data to be retrieved from a specific Piece CID",
}, },
&cli.BoolFlag{
Name: "allow-local",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 { if cctx.NArg() != 2 {
@ -1057,18 +1060,6 @@ var clientRetrieveCmd = &cli.Command{
return err return err
} }
// Check if we already have this data locally
/*has, err := api.ClientHasLocal(ctx, file)
if err != nil {
return err
}
if has {
fmt.Println("Success: Already in local storage")
return nil
}*/ // TODO: fix
var pieceCid *cid.Cid var pieceCid *cid.Cid
if cctx.String("pieceCid") != "" { if cctx.String("pieceCid") != "" {
parsed, err := cid.Parse(cctx.String("pieceCid")) parsed, err := cid.Parse(cctx.String("pieceCid"))
@ -1078,69 +1069,93 @@ var clientRetrieveCmd = &cli.Command{
pieceCid = &parsed pieceCid = &parsed
} }
var offer api.QueryOffer var order *lapi.RetrievalOrder
minerStrAddr := cctx.String("miner") if cctx.Bool("allow-local") {
if minerStrAddr == "" { // Local discovery imports, err := fapi.ClientListImports(ctx)
offers, err := fapi.ClientFindData(ctx, file, pieceCid) if err != nil {
return err
}
var cleaned []api.QueryOffer for _, i := range imports {
// filter out offers that errored if i.Root != nil && i.Root.Equals(file) {
for _, o := range offers { order = &lapi.RetrievalOrder{
if o.Err == "" { Root: file,
cleaned = append(cleaned, o) LocalStore: &i.Key,
Total: big.Zero(),
UnsealPrice: big.Zero(),
}
break
}
}
}
if order == nil {
var offer api.QueryOffer
minerStrAddr := cctx.String("miner")
if minerStrAddr == "" { // Local discovery
offers, err := fapi.ClientFindData(ctx, file, pieceCid)
var cleaned []api.QueryOffer
// filter out offers that errored
for _, o := range offers {
if o.Err == "" {
cleaned = append(cleaned, o)
}
}
offers = cleaned
// sort by price low to high
sort.Slice(offers, func(i, j int) bool {
return offers[i].MinPrice.LessThan(offers[j].MinPrice)
})
if err != nil {
return err
}
// TODO: parse offer strings from `client find`, make this smarter
if len(offers) < 1 {
fmt.Println("Failed to find file")
return nil
}
offer = offers[0]
} else { // Directed retrieval
minerAddr, err := address.NewFromString(minerStrAddr)
if err != nil {
return err
}
offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid)
if err != nil {
return err
}
}
if offer.Err != "" {
return fmt.Errorf("The received offer errored: %s", offer.Err)
}
maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice)
if cctx.String("maxPrice") != "" {
maxPrice, err = types.ParseFIL(cctx.String("maxPrice"))
if err != nil {
return xerrors.Errorf("parsing maxPrice: %w", err)
} }
} }
offers = cleaned if offer.MinPrice.GreaterThan(big.Int(maxPrice)) {
return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice)
// sort by price low to high
sort.Slice(offers, func(i, j int) bool {
return offers[i].MinPrice.LessThan(offers[j].MinPrice)
})
if err != nil {
return err
} }
// TODO: parse offer strings from `client find`, make this smarter o := offer.Order(payer)
if len(offers) < 1 { order = &o
fmt.Println("Failed to find file")
return nil
}
offer = offers[0]
} else { // Directed retrieval
minerAddr, err := address.NewFromString(minerStrAddr)
if err != nil {
return err
}
offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid)
if err != nil {
return err
}
} }
if offer.Err != "" {
return fmt.Errorf("The received offer errored: %s", offer.Err)
}
maxPrice := types.FromFil(DefaultMaxRetrievePrice)
if cctx.String("maxPrice") != "" {
maxPriceFil, err := types.ParseFIL(cctx.String("maxPrice"))
if err != nil {
return xerrors.Errorf("parsing maxPrice: %w", err)
}
maxPrice = types.BigInt(maxPriceFil)
}
if offer.MinPrice.GreaterThan(maxPrice) {
return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice)
}
ref := &lapi.FileRef{ ref := &lapi.FileRef{
Path: cctx.Args().Get(1), Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"), IsCAR: cctx.Bool("car"),
} }
updates, err := fapi.ClientRetrieveWithEvents(ctx, offer.Order(payer), ref)
updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
if err != nil { if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err) return xerrors.Errorf("error setting up retrieval: %w", err)
} }

View File

@ -1383,6 +1383,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12,
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,
@ -1436,6 +1437,7 @@ Inputs:
}, },
"Piece": null, "Piece": null,
"Size": 42, "Size": 42,
"LocalStore": 12,
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,

View File

@ -57,6 +57,7 @@ import (
"github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
) )
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
@ -77,6 +78,7 @@ type API struct {
Chain *store.ChainStore Chain *store.ChainStore
Imports dtypes.ClientImportMgr Imports dtypes.ClientImportMgr
Mds dtypes.ClientMultiDstore
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
@ -582,86 +584,102 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
} }
} }
if order.MinerPeer.ID == "" { var store retrievalstoremgr.RetrievalStore
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil { if order.LocalStore == nil {
finish(err) if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
finish(err)
return
}
order.MinerPeer = &retrievalmarket.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
}
if order.Size == 0 {
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
return return
} }
order.MinerPeer = retrievalmarket.RetrievalPeer{ /*id, st, err := a.imgr().NewStore()
ID: *mi.PeerId, if err != nil {
Address: order.Miner, return err
} }
} if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
return err
}*/
if order.Size == 0 { ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
return
}
/*id, st, err := a.imgr().NewStore() params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
if err != nil { if err != nil {
return err finish(xerrors.Errorf("Error in retrieval params: %s", err))
} return
if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { }
return err
}*/
ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) store, err = a.RetrievalStoreMgr.NewStore()
if err != nil {
finish(xerrors.Errorf("Error setting up new store: %w", err))
return
}
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) defer func() {
if err != nil { _ = a.RetrievalStoreMgr.ReleaseStore(store)
finish(xerrors.Errorf("Error in retrieval params: %s", err)) }()
return
}
store, err := a.RetrievalStoreMgr.NewStore() // Subscribe to events before retrieving to avoid losing events.
if err != nil { subscribeEvents := make(chan retrievalSubscribeEvent, 1)
finish(xerrors.Errorf("Error setting up new store: %w", err)) subscribeCtx, cancel := context.WithCancel(ctx)
return defer cancel()
} unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside readSubscribeEvents.
defer func() { if state.PayloadCID.Equals(order.Root) {
_ = a.RetrievalStoreMgr.ReleaseStore(store) select {
}() case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
// Subscribe to events before retrieving to avoid losing events. }
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside readSubscribeEvents.
if state.PayloadCID.Equals(order.Root) {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
} }
})
dealID, err := a.Retrieval.Retrieve(
ctx,
order.Root,
params,
order.Total,
*order.MinerPeer,
order.Client,
order.Miner,
store.StoreID())
if err != nil {
unsubscribe()
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
} }
})
dealID, err := a.Retrieval.Retrieve( err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)
ctx,
order.Root,
params,
order.Total,
order.MinerPeer,
order.Client,
order.Miner,
store.StoreID())
if err != nil {
unsubscribe() unsubscribe()
finish(xerrors.Errorf("Retrieve failed: %w", err)) if err != nil {
return finish(xerrors.Errorf("Retrieve: %w", err))
} return
}
} else {
// local retrieval
st, err := ((*multistore.MultiStore)(a.Mds)).Get(*order.LocalStore)
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
err = readSubscribeEvents(ctx, dealID, subscribeEvents, events) store = &multiStoreRetrievalStore{
storeID: *order.LocalStore,
unsubscribe() store: st,
if err != nil { }
finish(xerrors.Errorf("Retrieve: %w", err))
return
} }
// If ref is nil, it only fetches the data into the configured blockstore. // If ref is nil, it only fetches the data into the configured blockstore.
@ -701,6 +719,19 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return return
} }
type multiStoreRetrievalStore struct {
storeID multistore.StoreID
store *multistore.Store
}
func (mrs *multiStoreRetrievalStore) StoreID() *multistore.StoreID {
return &mrs.storeID
}
func (mrs *multiStoreRetrievalStore) DAGService() ipld.DAGService {
return mrs.store.DAG
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) { func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK) mi, err := a.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil { if err != nil {