Simplify retrieval APIs

This commit is contained in:
Łukasz Magiera 2021-11-10 15:45:46 +01:00
parent 60ea33b1c7
commit 89138bab4d
10 changed files with 364 additions and 286 deletions

View File

@ -28,7 +28,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/types"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
)
@ -352,10 +351,9 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read
// ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error //perm:admin
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin
ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) //perm:admin
// ClientExport exports a file stored in the local filestore to a system file
ClientExport(ctx context.Context, exportRef ExportRef, fileRef FileRef) error //perm:admin
// ClientListRetrievals returns information about retrievals made by the local client
ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) //perm:write
// ClientGetRetrievalUpdates returns status of updated retrieval deals
@ -940,8 +938,6 @@ type RetrievalOrder struct {
DatamodelPathSelector *textselector.Expression
Size uint64
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt
PaymentInterval uint64

View File

@ -28,7 +28,6 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/journal/alerting"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/specs-storage/storage"
@ -162,6 +161,8 @@ type FullNodeStruct struct {
ClientDealSize func(p0 context.Context, p1 cid.Cid) (DataSize, error) `perm:"read"`
ClientExport func(p0 context.Context, p1 ExportRef, p2 FileRef) error `perm:"admin"`
ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientGenCar func(p0 context.Context, p1 FileRef, p2 string) error `perm:"write"`
@ -194,12 +195,10 @@ type FullNodeStruct struct {
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error `perm:"admin"`
ClientRetrieve func(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) `perm:"admin"`
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientStatelessDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"write"`
@ -1357,6 +1356,17 @@ func (s *FullNodeStub) ClientDealSize(p0 context.Context, p1 cid.Cid) (DataSize,
return *new(DataSize), ErrNotSupported
}
func (s *FullNodeStruct) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error {
if s.Internal.ClientExport == nil {
return ErrNotSupported
}
return s.Internal.ClientExport(p0, p1, p2)
}
func (s *FullNodeStub) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error {
return ErrNotSupported
}
func (s *FullNodeStruct) ClientFindData(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) {
if s.Internal.ClientFindData == nil {
return *new([]QueryOffer), ErrNotSupported
@ -1533,15 +1543,15 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran
return ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error {
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) {
if s.Internal.ClientRetrieve == nil {
return ErrNotSupported
return nil, ErrNotSupported
}
return s.Internal.ClientRetrieve(p0, p1, p2)
return s.Internal.ClientRetrieve(p0, p1)
}
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error {
return ErrNotSupported
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(p0 context.Context, p1 address.Address) error {
@ -1555,17 +1565,6 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont
return ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) {
if s.Internal.ClientRetrieveWithEvents == nil {
return nil, ErrNotSupported
}
return s.Internal.ClientRetrieveWithEvents(p0, p1, p2)
}
func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) {
if s.Internal.ClientStartDeal == nil {
return nil, ErrNotSupported

View File

@ -7,6 +7,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/chain/types"
textselector "github.com/ipld/go-ipld-selector-text-lite"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/abi"
@ -194,4 +195,19 @@ type RetrievalInfo struct {
TransferChannelID *datatransfer.ChannelID
DataTransfer *DataTransferChannel
// optional event if part of ClientGetRetrievalUpdates
Event *retrievalmarket.ClientEvent
}
type RestrievalRes struct {
DealID retrievalmarket.DealID
}
type ExportRef struct {
Root cid.Cid
DatamodelPathSelector *textselector.Expression
FromLocalCAR string // if specified, get data from a local CARv2 file.
DealID retrievalmarket.DealID
}

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/lotus/api"
@ -325,10 +326,10 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) //perm:read
// ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error //perm:admin
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error //perm:admin
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
// ClientListRetrievals returns information about retrievals made by the local client
ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) //perm:write
@ -714,3 +715,21 @@ type FullNode interface {
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin
}
type RetrievalOrder struct {
// TODO: make this less unixfs specific
Root cid.Cid
Piece *cid.Cid
DatamodelPathSelector *textselector.Expression
Size uint64
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt
PaymentInterval uint64
PaymentIntervalIncrease uint64
Client address.Address
Miner address.Address
MinerPeer *retrievalmarket.RetrievalPeer
}

View File

@ -125,11 +125,11 @@ type FullNodeStruct struct {
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientRetrieve func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error `perm:"admin"`
ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error `perm:"admin"`
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientRetrieveWithEvents func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
@ -965,14 +965,14 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran
return ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error {
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error {
if s.Internal.ClientRetrieve == nil {
return ErrNotSupported
}
return s.Internal.ClientRetrieve(p0, p1, p2)
}
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error {
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error {
return ErrNotSupported
}
@ -987,14 +987,14 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont
return ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
if s.Internal.ClientRetrieveWithEvents == nil {
return nil, ErrNotSupported
}
return s.Internal.ClientRetrieveWithEvents(p0, p1, p2)
}
func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return nil, ErrNotSupported
}

View File

@ -3,7 +3,10 @@ package v0api
import (
"context"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
@ -194,4 +197,135 @@ func (w *WrapperV1Full) ChainGetRandomnessFromBeacon(ctx context.Context, tsk ty
return w.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk)
}
func (w *WrapperV1Full) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error {
events := make(chan marketevents.RetrievalEvent)
go w.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (w *WrapperV1Full) ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent)
go w.clientRetrieve(ctx, order, ref, events)
return events, nil
}
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents <-chan api.RetrievalInfo, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent api.RetrievalInfo
var evt retrievalmarket.ClientEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
if subscribeEvent.Event != nil {
evt = *subscribeEvent.Event
}
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: evt,
Status: subscribeEvent.Status,
BytesReceived: subscribeEvent.BytesReceived,
FundsSpent: subscribeEvent.TotalPaid,
}:
}
switch subscribeEvent.Status {
case retrievalmarket.DealStatusCompleted:
return nil
case retrievalmarket.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", subscribeEvent.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", subscribeEvent.Message)
}
}
}
func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
if e != nil {
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
}
}
var dealID retrievalmarket.DealID
if order.FromLocalCAR == "" {
// Subscribe to events before retrieving to avoid losing events.
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
retrievalEvents, err := w.ClientGetRetrievalUpdates(subscribeCtx)
if err != nil {
finish(xerrors.Errorf("GetRetrievalUpdates failed: %w", err))
return
}
retrievalRes, err := w.FullNode.ClientRetrieve(ctx, api.RetrievalOrder{
Root: order.Root,
Piece: order.Piece,
Size: order.Size,
Total: order.Total,
UnsealPrice: order.UnsealPrice,
PaymentInterval: order.PaymentInterval,
PaymentIntervalIncrease: order.PaymentIntervalIncrease,
Client: order.Client,
Miner: order.Miner,
MinerPeer: order.MinerPeer,
})
if err != nil {
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}
dealID = retrievalRes.DealID
err = readSubscribeEvents(ctx, retrievalRes.DealID, retrievalEvents, events)
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
}
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
finish(nil)
return
}
finish(w.ClientExport(ctx, api.ExportRef{
Root: order.Root,
DatamodelPathSelector: order.DatamodelPathSelector,
FromLocalCAR: order.FromLocalCAR,
DealID: dealID,
}, *ref))
}
var _ FullNode = &WrapperV1Full{}

View File

@ -1069,7 +1069,7 @@ var clientRetrieveCmd = &cli.Command{
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
fapi, closer, err := GetFullNodeAPI(cctx)
fapi, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
@ -1101,7 +1101,7 @@ var clientRetrieveCmd = &cli.Command{
pieceCid = &parsed
}
var order *lapi.RetrievalOrder
var eref *lapi.ExportRef
if cctx.Bool("allow-local") {
imports, err := fapi.ClientListImports(ctx)
if err != nil {
@ -1110,19 +1110,17 @@ var clientRetrieveCmd = &cli.Command{
for _, i := range imports {
if i.Root != nil && i.Root.Equals(file) {
order = &lapi.RetrievalOrder{
Root: file,
FromLocalCAR: i.CARPath,
Total: big.Zero(),
UnsealPrice: big.Zero(),
eref = &lapi.ExportRef{
Root: file,
FromLocalCAR: i.CARPath,
}
break
}
}
}
if order == nil {
// no local found, so make a retrieval
if eref == nil {
var offer api.QueryOffer
minerStrAddr := cctx.String("miner")
if minerStrAddr == "" { // Local discovery
@ -1163,7 +1161,7 @@ var clientRetrieveCmd = &cli.Command{
}
}
if offer.Err != "" {
return fmt.Errorf("The received offer errored: %s", offer.Err)
return fmt.Errorf("offer error: %s", offer.Err)
}
maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice)
@ -1180,55 +1178,67 @@ var clientRetrieveCmd = &cli.Command{
}
o := offer.Order(payer)
order = &o
}
ref := &lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
subscribeEvents, err := fapi.ClientGetRetrievalUpdates(ctx)
if err != nil {
return xerrors.Errorf("error setting up retrieval updates: %w", err)
}
retrievalRes, err := fapi.ClientRetrieve(ctx, o)
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)
}
readEvents:
for {
var evt api.RetrievalInfo
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case evt = <-subscribeEvents:
if evt.ID != retrievalRes.DealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}
afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n",
types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.TotalPaid),
retrievalmarket.ClientEvents[*evt.Event],
retrievalmarket.DealStatuses[evt.Status],
)
switch evt.Status {
case retrievalmarket.DealStatusCompleted:
break readEvents
case retrievalmarket.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", evt.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", evt.Message)
}
}
eref = &lapi.ExportRef{
Root: file,
DealID: retrievalRes.DealID,
}
}
if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" {
order.DatamodelPathSelector = &sel
eref.DatamodelPathSelector = &sel
}
updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
err = fapi.ClientExport(ctx, *eref, lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
})
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)
}
var prevStatus retrievalmarket.DealStatus
for {
select {
case evt, ok := <-updates:
if ok {
afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n",
types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.FundsSpent),
retrievalmarket.ClientEvents[evt.Event],
retrievalmarket.DealStatuses[evt.Status],
)
prevStatus = evt.Status
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
if !ok {
if prevStatus == retrievalmarket.DealStatusCompleted {
afmt.Println("Success")
} else {
afmt.Printf("saw final deal state %s instead of expected success state DealStatusCompleted\n",
retrievalmarket.DealStatuses[prevStatus])
}
return nil
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
return err
}
afmt.Println("Success")
return nil
},
}

View File

@ -10,7 +10,6 @@ import (
"path/filepath"
"strings"
"text/template"
"unicode"
"golang.org/x/xerrors"
)
@ -71,9 +70,6 @@ func typeName(e ast.Expr, pkg string) (string, error) {
return t.X.(*ast.Ident).Name + "." + t.Sel.Name, nil
case *ast.Ident:
pstr := t.Name
if !unicode.IsLower(rune(pstr[0])) && pkg != "api" {
pstr = "api." + pstr // todo src pkg name
}
return pstr, nil
case *ast.ArrayType:
subt, err := typeName(t.Elt, pkg)

View File

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
@ -320,17 +321,44 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root
caddr, err := dh.client.WalletDefaultAddress(ctx)
require.NoError(dh.t, err)
ref := &api.FileRef{
Path: carFile.Name(),
IsCAR: carExport,
}
updatesCtx, cancel := context.WithCancel(ctx)
updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx)
updates, err := dh.client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref)
retrievalRes, err := dh.client.ClientRetrieve(ctx, offers[0].Order(caddr))
require.NoError(dh.t, err)
for update := range updates {
require.Emptyf(dh.t, update.Err, "retrieval failed: %s", update.Err)
consumeEvents:
for {
var evt api.RetrievalInfo
select {
case <-updatesCtx.Done():
dh.t.Fatal("Retrieval Timed Out")
case evt = <-updates:
if evt.ID != retrievalRes.DealID {
continue
}
}
switch evt.Status {
case retrievalmarket.DealStatusCompleted:
break consumeEvents
case retrievalmarket.DealStatusRejected:
dh.t.Fatalf("Retrieval Proposal Rejected: %s", evt.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
dh.t.Fatalf("Retrieval Error: %s", evt.Message)
}
}
cancel()
require.NoError(dh.t, dh.client.ClientExport(ctx,
api.ExportRef{
Root: root,
DealID: retrievalRes.DealID,
},
api.FileRef{
Path: carFile.Name(),
IsCAR: carExport,
}))
ret := carFile.Name()
if carExport {

View File

@ -60,7 +60,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo/imports"
@ -762,79 +761,6 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e
}
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
return events, nil
}
type retrievalSubscribeEvent struct {
event rm.ClientEvent
state rm.ClientDealState
}
func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.state.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: subscribeEvent.event,
Status: subscribeEvent.state.Status,
BytesReceived: subscribeEvent.state.TotalReceived,
FundsSpent: subscribeEvent.state.FundsSpent,
}:
}
state := subscribeEvent.state
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case rm.DealStatusCancelled:
return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) {
sel := selectorparse.CommonSelector_ExploreAllRecursively
if dps != nil {
@ -870,7 +796,23 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error)
return sel, nil
}
func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, events chan marketevents.RetrievalEvent) (rm.DealID, error) {
func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) {
sel, err := getRetrievalSelector(params.DatamodelPathSelector)
if err != nil {
return nil, err
}
di, err := a.doRetrieval(ctx, params, sel)
if err != nil {
return nil, err
}
return &api.RestrievalRes{
DealID: di,
}, nil
}
func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node) (rm.DealID, error) {
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
@ -898,20 +840,6 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
return 0, xerrors.Errorf("Error in retrieval params: %s", err)
}
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside consumeAllEvents.
if state.PayloadCID.Equals(order.Root) {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})
id := a.Retrieval.NextID()
id, err = a.Retrieval.Retrieve(
ctx,
@ -925,21 +853,58 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
)
if err != nil {
unsubscribe()
return 0, xerrors.Errorf("Retrieve failed: %w", err)
}
err = consumeAllEvents(ctx, id, subscribeEvents, events)
unsubscribe()
if err != nil {
return 0, xerrors.Errorf("Retrieve: %w", err)
}
return id, nil
}
func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error {
func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error {
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := exportRef.FromLocalCAR
sel, err := getRetrievalSelector(exportRef.DatamodelPathSelector)
if err != nil {
return err
}
if carPath == "" {
if !retrieveIntoIPFS && !retrieveIntoCAR {
return xerrors.Errorf("unsupported retrieval blockstore accessor")
}
if retrieveIntoCAR {
carPath = carBss.PathFor(exportRef.DealID)
}
}
var retrievalBs bstore.Blockstore
if retrieveIntoIPFS {
retrievalBs = proxyBss.Blockstore
} else {
cbs, err := stores.ReadOnlyFilestore(carPath)
if err != nil {
return err
}
defer cbs.Close() //nolint:errcheck
retrievalBs = cbs
}
// Are we outputting a CAR?
if ref.IsCAR {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && exportRef.DatamodelPathSelector == nil {
return carv2.ExtractV1File(carPath, ref.Path)
}
return a.outputCAR(ctx, exportRef.Root, sel, retrievalBs, ref)
}
return a.outputUnixFS(ctx, exportRef.Root, exportRef.DatamodelPathSelector != nil, sel, retrievalBs, ref)
}
func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error {
// generating a CARv1 from the configured blockstore
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
@ -950,7 +915,7 @@ func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datam
ctx,
bs,
[]car.Dag{{
Root: order.Root,
Root: root,
Selector: sel,
}},
car.MaxTraversalLinks(config.MaxTraversalLinks),
@ -962,12 +927,11 @@ func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datam
return f.Close()
}
func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error {
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, findSubroot bool, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error {
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
root := order.Root
// if we used a selector - need to find the sub-root the user actually wanted to retrieve
if order.DatamodelPathSelector != nil {
if findSubroot {
var subRootFound bool
if err := utils.TraverseDag(
@ -1001,7 +965,7 @@ func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel da
}
if !subRootFound {
return xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root)
return xerrors.Errorf("path selection does not match a node within %s", root)
}
}
@ -1017,90 +981,6 @@ func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel da
return files.WriteTo(file, ref.Path)
}
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
if e != nil {
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
}
}
sel, err := getRetrievalSelector(order.DatamodelPathSelector)
if err != nil {
finish(err)
return
}
// summary:
// 1. if we're retrieving from an import, FromLocalCAR will be set.
// Skip the retrieval itself, and use the provided car as a blockstore further down
// to extract a CAR or UnixFS export from.
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
// then use the virtual blockstore to extract a CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then either
// extract the CARv1 (with ExtractV1File) or use it as a blockstore further down.
// this indicates we're proxying to IPFS.
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := order.FromLocalCAR
// we actually need to retrieve from the network
if carPath == "" {
if !retrieveIntoIPFS && !retrieveIntoCAR {
// we don't recognize the blockstore accessor.
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
return
}
id, err := a.doRetrieval(ctx, order, sel, events)
if err != nil {
finish(err)
return
}
if retrieveIntoCAR {
carPath = carBss.PathFor(id)
}
}
if ref == nil {
// If ref is nil, it only fetches the data into the configured blockstore
// (if fetching from network).
finish(nil)
return
}
// determine where did the retrieval go
var retrievalBs bstore.Blockstore
if retrieveIntoIPFS {
retrievalBs = proxyBss.Blockstore
} else {
cbs, err := stores.ReadOnlyFilestore(carPath)
if err != nil {
finish(err)
return
}
defer cbs.Close() //nolint:errcheck
retrievalBs = cbs
}
// Are we outputting a CAR?
if ref.IsCAR {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && order.DatamodelPathSelector == nil {
finish(carv2.ExtractV1File(carPath, ref.Path))
return
}
finish(a.outputCAR(ctx, order, sel, retrievalBs, ref))
return
}
finish(a.outputUnixFS(ctx, order, sel, retrievalBs, ref))
}
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
deals, err := a.Retrieval.ListDeals()
if err != nil {