diff --git a/api/api_full.go b/api/api_full.go index 6235e98d6..3bbfda178 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -28,7 +28,6 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/types" - marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" ) @@ -352,10 +351,9 @@ type FullNode interface { // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read // ClientRetrieve initiates the retrieval of a file, as specified in the order. - ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error //perm:admin - // ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel - // of status updates. - ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin + ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) //perm:admin + // ClientExport exports a file stored in the local filestore to a system file + ClientExport(ctx context.Context, exportRef ExportRef, fileRef FileRef) error //perm:admin // ClientListRetrievals returns information about retrievals made by the local client ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) //perm:write // ClientGetRetrievalUpdates returns status of updated retrieval deals @@ -940,8 +938,6 @@ type RetrievalOrder struct { DatamodelPathSelector *textselector.Expression Size uint64 - FromLocalCAR string // if specified, get data from a local CARv2 file. - // TODO: support offset Total types.BigInt UnsealPrice types.BigInt PaymentInterval uint64 diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 78ae607bf..d78304397 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -28,7 +28,6 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/journal/alerting" - marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/specs-storage/storage" @@ -162,6 +161,8 @@ type FullNodeStruct struct { ClientDealSize func(p0 context.Context, p1 cid.Cid) (DataSize, error) `perm:"read"` + ClientExport func(p0 context.Context, p1 ExportRef, p2 FileRef) error `perm:"admin"` + ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientGenCar func(p0 context.Context, p1 FileRef, p2 string) error `perm:"write"` @@ -194,12 +195,10 @@ type FullNodeStruct struct { ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` - ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error `perm:"admin"` + ClientRetrieve func(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) `perm:"admin"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` - ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` - ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStatelessDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"write"` @@ -1357,6 +1356,17 @@ func (s *FullNodeStub) ClientDealSize(p0 context.Context, p1 cid.Cid) (DataSize, return *new(DataSize), ErrNotSupported } +func (s *FullNodeStruct) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error { + if s.Internal.ClientExport == nil { + return ErrNotSupported + } + return s.Internal.ClientExport(p0, p1, p2) +} + +func (s *FullNodeStub) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error { + return ErrNotSupported +} + func (s *FullNodeStruct) ClientFindData(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) { if s.Internal.ClientFindData == nil { return *new([]QueryOffer), ErrNotSupported @@ -1533,15 +1543,15 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran return ErrNotSupported } -func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error { +func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) { if s.Internal.ClientRetrieve == nil { - return ErrNotSupported + return nil, ErrNotSupported } - return s.Internal.ClientRetrieve(p0, p1, p2) + return s.Internal.ClientRetrieve(p0, p1) } -func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error { - return ErrNotSupported +func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) { + return nil, ErrNotSupported } func (s *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(p0 context.Context, p1 address.Address) error { @@ -1555,17 +1565,6 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont return ErrNotSupported } -func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) { - if s.Internal.ClientRetrieveWithEvents == nil { - return nil, ErrNotSupported - } - return s.Internal.ClientRetrieveWithEvents(p0, p1, p2) -} - -func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) { - return nil, ErrNotSupported -} - func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) { if s.Internal.ClientStartDeal == nil { return nil, ErrNotSupported diff --git a/api/types.go b/api/types.go index 9d887b0a1..a4c477545 100644 --- a/api/types.go +++ b/api/types.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/lotus/chain/types" + textselector "github.com/ipld/go-ipld-selector-text-lite" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-state-types/abi" @@ -194,4 +195,19 @@ type RetrievalInfo struct { TransferChannelID *datatransfer.ChannelID DataTransfer *DataTransferChannel + + // optional event if part of ClientGetRetrievalUpdates + Event *retrievalmarket.ClientEvent +} + +type RestrievalRes struct { + DealID retrievalmarket.DealID +} + +type ExportRef struct { + Root cid.Cid + DatamodelPathSelector *textselector.Expression + + FromLocalCAR string // if specified, get data from a local CARv2 file. + DealID retrievalmarket.DealID } diff --git a/api/v0api/full.go b/api/v0api/full.go index d7e38ce97..20e5a7179 100644 --- a/api/v0api/full.go +++ b/api/v0api/full.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" "github.com/ipfs/go-cid" + textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/lotus/api" @@ -325,10 +326,10 @@ type FullNode interface { // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) //perm:read // ClientRetrieve initiates the retrieval of a file, as specified in the order. - ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error //perm:admin + ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error //perm:admin // ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel // of status updates. - ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin + ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin // ClientQueryAsk returns a signed StorageAsk from the specified miner. // ClientListRetrievals returns information about retrievals made by the local client ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) //perm:write @@ -714,3 +715,21 @@ type FullNode interface { // the path specified when calling CreateBackup is within the base path CreateBackup(ctx context.Context, fpath string) error //perm:admin } + +type RetrievalOrder struct { + // TODO: make this less unixfs specific + Root cid.Cid + Piece *cid.Cid + DatamodelPathSelector *textselector.Expression + Size uint64 + + FromLocalCAR string // if specified, get data from a local CARv2 file. + // TODO: support offset + Total types.BigInt + UnsealPrice types.BigInt + PaymentInterval uint64 + PaymentIntervalIncrease uint64 + Client address.Address + Miner address.Address + MinerPeer *retrievalmarket.RetrievalPeer +} diff --git a/api/v0api/proxy_gen.go b/api/v0api/proxy_gen.go index dd6330a02..af0687fe5 100644 --- a/api/v0api/proxy_gen.go +++ b/api/v0api/proxy_gen.go @@ -125,11 +125,11 @@ type FullNodeStruct struct { ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` - ClientRetrieve func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error `perm:"admin"` + ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error `perm:"admin"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` - ClientRetrieveWithEvents func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` + ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` @@ -965,14 +965,14 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran return ErrNotSupported } -func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error { +func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error { if s.Internal.ClientRetrieve == nil { return ErrNotSupported } return s.Internal.ClientRetrieve(p0, p1, p2) } -func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error { +func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error { return ErrNotSupported } @@ -987,14 +987,14 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont return ErrNotSupported } -func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { +func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { if s.Internal.ClientRetrieveWithEvents == nil { return nil, ErrNotSupported } return s.Internal.ClientRetrieveWithEvents(p0, p1, p2) } -func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { +func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { return nil, ErrNotSupported } diff --git a/api/v0api/v1_wrapper.go b/api/v0api/v1_wrapper.go index e36f478f5..0a1a463e5 100644 --- a/api/v0api/v1_wrapper.go +++ b/api/v0api/v1_wrapper.go @@ -3,7 +3,10 @@ package v0api import ( "context" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" + marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" @@ -194,4 +197,135 @@ func (w *WrapperV1Full) ChainGetRandomnessFromBeacon(ctx context.Context, tsk ty return w.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk) } +func (w *WrapperV1Full) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error { + events := make(chan marketevents.RetrievalEvent) + go w.clientRetrieve(ctx, order, ref, events) + + for { + select { + case evt, ok := <-events: + if !ok { // done successfully + return nil + } + + if evt.Err != "" { + return xerrors.Errorf("retrieval failed: %s", evt.Err) + } + case <-ctx.Done(): + return xerrors.Errorf("retrieval timed out") + } + } +} + +func (w *WrapperV1Full) ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { + events := make(chan marketevents.RetrievalEvent) + go w.clientRetrieve(ctx, order, ref, events) + return events, nil +} + +func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents <-chan api.RetrievalInfo, events chan marketevents.RetrievalEvent) error { + for { + var subscribeEvent api.RetrievalInfo + var evt retrievalmarket.ClientEvent + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case subscribeEvent = <-subscribeEvents: + if subscribeEvent.ID != dealID { + // we can't check the deal ID ahead of time because: + // 1. We need to subscribe before retrieving. + // 2. We won't know the deal ID until after retrieving. + continue + } + if subscribeEvent.Event != nil { + evt = *subscribeEvent.Event + } + } + + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case events <- marketevents.RetrievalEvent{ + Event: evt, + Status: subscribeEvent.Status, + BytesReceived: subscribeEvent.BytesReceived, + FundsSpent: subscribeEvent.TotalPaid, + }: + } + + switch subscribeEvent.Status { + case retrievalmarket.DealStatusCompleted: + return nil + case retrievalmarket.DealStatusRejected: + return xerrors.Errorf("Retrieval Proposal Rejected: %s", subscribeEvent.Message) + case + retrievalmarket.DealStatusDealNotFound, + retrievalmarket.DealStatusErrored: + return xerrors.Errorf("Retrieval Error: %s", subscribeEvent.Message) + } + } +} + +func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { + defer close(events) + + finish := func(e error) { + if e != nil { + events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()} + } + } + + var dealID retrievalmarket.DealID + if order.FromLocalCAR == "" { + // Subscribe to events before retrieving to avoid losing events. + subscribeCtx, cancel := context.WithCancel(ctx) + defer cancel() + retrievalEvents, err := w.ClientGetRetrievalUpdates(subscribeCtx) + + if err != nil { + finish(xerrors.Errorf("GetRetrievalUpdates failed: %w", err)) + return + } + + retrievalRes, err := w.FullNode.ClientRetrieve(ctx, api.RetrievalOrder{ + Root: order.Root, + Piece: order.Piece, + Size: order.Size, + Total: order.Total, + UnsealPrice: order.UnsealPrice, + PaymentInterval: order.PaymentInterval, + PaymentIntervalIncrease: order.PaymentIntervalIncrease, + Client: order.Client, + Miner: order.Miner, + MinerPeer: order.MinerPeer, + }) + + if err != nil { + finish(xerrors.Errorf("Retrieve failed: %w", err)) + return + } + + dealID = retrievalRes.DealID + + err = readSubscribeEvents(ctx, retrievalRes.DealID, retrievalEvents, events) + if err != nil { + finish(xerrors.Errorf("Retrieve: %w", err)) + return + } + } + + // If ref is nil, it only fetches the data into the configured blockstore. + if ref == nil { + finish(nil) + return + } + + finish(w.ClientExport(ctx, api.ExportRef{ + Root: order.Root, + DatamodelPathSelector: order.DatamodelPathSelector, + FromLocalCAR: order.FromLocalCAR, + DealID: dealID, + }, *ref)) +} + var _ FullNode = &WrapperV1Full{} diff --git a/cli/client.go b/cli/client.go index daaf5f3fe..e715ec9fb 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1069,7 +1069,7 @@ var clientRetrieveCmd = &cli.Command{ return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments")) } - fapi, closer, err := GetFullNodeAPI(cctx) + fapi, closer, err := GetFullNodeAPIV1(cctx) if err != nil { return err } @@ -1101,7 +1101,7 @@ var clientRetrieveCmd = &cli.Command{ pieceCid = &parsed } - var order *lapi.RetrievalOrder + var eref *lapi.ExportRef if cctx.Bool("allow-local") { imports, err := fapi.ClientListImports(ctx) if err != nil { @@ -1110,19 +1110,17 @@ var clientRetrieveCmd = &cli.Command{ for _, i := range imports { if i.Root != nil && i.Root.Equals(file) { - order = &lapi.RetrievalOrder{ - Root: file, - FromLocalCAR: i.CARPath, - - Total: big.Zero(), - UnsealPrice: big.Zero(), + eref = &lapi.ExportRef{ + Root: file, + FromLocalCAR: i.CARPath, } break } } } - if order == nil { + // no local found, so make a retrieval + if eref == nil { var offer api.QueryOffer minerStrAddr := cctx.String("miner") if minerStrAddr == "" { // Local discovery @@ -1163,7 +1161,7 @@ var clientRetrieveCmd = &cli.Command{ } } if offer.Err != "" { - return fmt.Errorf("The received offer errored: %s", offer.Err) + return fmt.Errorf("offer error: %s", offer.Err) } maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice) @@ -1180,55 +1178,67 @@ var clientRetrieveCmd = &cli.Command{ } o := offer.Order(payer) - order = &o - } - ref := &lapi.FileRef{ - Path: cctx.Args().Get(1), - IsCAR: cctx.Bool("car"), + + subscribeEvents, err := fapi.ClientGetRetrievalUpdates(ctx) + if err != nil { + return xerrors.Errorf("error setting up retrieval updates: %w", err) + } + retrievalRes, err := fapi.ClientRetrieve(ctx, o) + if err != nil { + return xerrors.Errorf("error setting up retrieval: %w", err) + } + + readEvents: + for { + var evt api.RetrievalInfo + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case evt = <-subscribeEvents: + if evt.ID != retrievalRes.DealID { + // we can't check the deal ID ahead of time because: + // 1. We need to subscribe before retrieving. + // 2. We won't know the deal ID until after retrieving. + continue + } + } + afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", + types.SizeStr(types.NewInt(evt.BytesReceived)), + types.FIL(evt.TotalPaid), + retrievalmarket.ClientEvents[*evt.Event], + retrievalmarket.DealStatuses[evt.Status], + ) + switch evt.Status { + case retrievalmarket.DealStatusCompleted: + break readEvents + case retrievalmarket.DealStatusRejected: + return xerrors.Errorf("Retrieval Proposal Rejected: %s", evt.Message) + case + retrievalmarket.DealStatusDealNotFound, + retrievalmarket.DealStatusErrored: + return xerrors.Errorf("Retrieval Error: %s", evt.Message) + } + } + + eref = &lapi.ExportRef{ + Root: file, + DealID: retrievalRes.DealID, + } } if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" { - order.DatamodelPathSelector = &sel + eref.DatamodelPathSelector = &sel } - updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref) + err = fapi.ClientExport(ctx, *eref, lapi.FileRef{ + Path: cctx.Args().Get(1), + IsCAR: cctx.Bool("car"), + }) if err != nil { - return xerrors.Errorf("error setting up retrieval: %w", err) - } - - var prevStatus retrievalmarket.DealStatus - - for { - select { - case evt, ok := <-updates: - if ok { - afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", - types.SizeStr(types.NewInt(evt.BytesReceived)), - types.FIL(evt.FundsSpent), - retrievalmarket.ClientEvents[evt.Event], - retrievalmarket.DealStatuses[evt.Status], - ) - prevStatus = evt.Status - } - - if evt.Err != "" { - return xerrors.Errorf("retrieval failed: %s", evt.Err) - } - - if !ok { - if prevStatus == retrievalmarket.DealStatusCompleted { - afmt.Println("Success") - } else { - afmt.Printf("saw final deal state %s instead of expected success state DealStatusCompleted\n", - retrievalmarket.DealStatuses[prevStatus]) - } - return nil - } - - case <-ctx.Done(): - return xerrors.Errorf("retrieval timed out") - } + return err } + afmt.Println("Success") + return nil }, } diff --git a/gen/api/proxygen.go b/gen/api/proxygen.go index 3e0766c31..df39132ff 100644 --- a/gen/api/proxygen.go +++ b/gen/api/proxygen.go @@ -10,7 +10,6 @@ import ( "path/filepath" "strings" "text/template" - "unicode" "golang.org/x/xerrors" ) @@ -71,9 +70,6 @@ func typeName(e ast.Expr, pkg string) (string, error) { return t.X.(*ast.Ident).Name + "." + t.Sel.Name, nil case *ast.Ident: pstr := t.Name - if !unicode.IsLower(rune(pstr[0])) && pkg != "api" { - pstr = "api." + pstr // todo src pkg name - } return pstr, nil case *ast.ArrayType: subt, err := typeName(t.Elt, pkg) diff --git a/itests/kit/deals.go b/itests/kit/deals.go index 4a9af69e6..b8534982a 100644 --- a/itests/kit/deals.go +++ b/itests/kit/deals.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" @@ -320,17 +321,44 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root caddr, err := dh.client.WalletDefaultAddress(ctx) require.NoError(dh.t, err) - ref := &api.FileRef{ - Path: carFile.Name(), - IsCAR: carExport, - } + updatesCtx, cancel := context.WithCancel(ctx) + updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx) - updates, err := dh.client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref) + retrievalRes, err := dh.client.ClientRetrieve(ctx, offers[0].Order(caddr)) require.NoError(dh.t, err) - - for update := range updates { - require.Emptyf(dh.t, update.Err, "retrieval failed: %s", update.Err) +consumeEvents: + for { + var evt api.RetrievalInfo + select { + case <-updatesCtx.Done(): + dh.t.Fatal("Retrieval Timed Out") + case evt = <-updates: + if evt.ID != retrievalRes.DealID { + continue + } + } + switch evt.Status { + case retrievalmarket.DealStatusCompleted: + break consumeEvents + case retrievalmarket.DealStatusRejected: + dh.t.Fatalf("Retrieval Proposal Rejected: %s", evt.Message) + case + retrievalmarket.DealStatusDealNotFound, + retrievalmarket.DealStatusErrored: + dh.t.Fatalf("Retrieval Error: %s", evt.Message) + } } + cancel() + + require.NoError(dh.t, dh.client.ClientExport(ctx, + api.ExportRef{ + Root: root, + DealID: retrievalRes.DealID, + }, + api.FileRef{ + Path: carFile.Name(), + IsCAR: carExport, + })) ret := carFile.Name() if carExport { diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 3122e76a5..8f82e6513 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -60,7 +60,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" - marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/repo/imports" @@ -762,79 +761,6 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e } } -func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { - events := make(chan marketevents.RetrievalEvent) - go a.clientRetrieve(ctx, order, ref, events) - - for { - select { - case evt, ok := <-events: - if !ok { // done successfully - return nil - } - - if evt.Err != "" { - return xerrors.Errorf("retrieval failed: %s", evt.Err) - } - case <-ctx.Done(): - return xerrors.Errorf("retrieval timed out") - } - } -} - -func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { - events := make(chan marketevents.RetrievalEvent) - go a.clientRetrieve(ctx, order, ref, events) - return events, nil -} - -type retrievalSubscribeEvent struct { - event rm.ClientEvent - state rm.ClientDealState -} - -func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error { - for { - var subscribeEvent retrievalSubscribeEvent - select { - case <-ctx.Done(): - return xerrors.New("Retrieval Timed Out") - case subscribeEvent = <-subscribeEvents: - if subscribeEvent.state.ID != dealID { - // we can't check the deal ID ahead of time because: - // 1. We need to subscribe before retrieving. - // 2. We won't know the deal ID until after retrieving. - continue - } - } - - select { - case <-ctx.Done(): - return xerrors.New("Retrieval Timed Out") - case events <- marketevents.RetrievalEvent{ - Event: subscribeEvent.event, - Status: subscribeEvent.state.Status, - BytesReceived: subscribeEvent.state.TotalReceived, - FundsSpent: subscribeEvent.state.FundsSpent, - }: - } - - state := subscribeEvent.state - switch state.Status { - case rm.DealStatusCompleted: - return nil - case rm.DealStatusRejected: - return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) - case rm.DealStatusCancelled: - return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message) - case - rm.DealStatusDealNotFound, - rm.DealStatusErrored: - return xerrors.Errorf("Retrieval Error: %s", state.Message) - } - } -} - func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) { sel := selectorparse.CommonSelector_ExploreAllRecursively if dps != nil { @@ -870,7 +796,23 @@ func getRetrievalSelector(dps *textselector.Expression) (datamodel.Node, error) return sel, nil } -func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, events chan marketevents.RetrievalEvent) (rm.DealID, error) { +func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) { + sel, err := getRetrievalSelector(params.DatamodelPathSelector) + if err != nil { + return nil, err + } + + di, err := a.doRetrieval(ctx, params, sel) + if err != nil { + return nil, err + } + + return &api.RestrievalRes{ + DealID: di, + }, nil +} + +func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node) (rm.DealID, error) { if order.MinerPeer == nil || order.MinerPeer.ID == "" { mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) if err != nil { @@ -898,20 +840,6 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat return 0, xerrors.Errorf("Error in retrieval params: %s", err) } - // Subscribe to events before retrieving to avoid losing events. - subscribeEvents := make(chan retrievalSubscribeEvent, 1) - subscribeCtx, cancel := context.WithCancel(ctx) - defer cancel() - unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { - // We'll check the deal IDs inside consumeAllEvents. - if state.PayloadCID.Equals(order.Root) { - select { - case <-subscribeCtx.Done(): - case subscribeEvents <- retrievalSubscribeEvent{event, state}: - } - } - }) - id := a.Retrieval.NextID() id, err = a.Retrieval.Retrieve( ctx, @@ -925,21 +853,58 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat ) if err != nil { - unsubscribe() return 0, xerrors.Errorf("Retrieve failed: %w", err) } - err = consumeAllEvents(ctx, id, subscribeEvents, events) - - unsubscribe() - if err != nil { - return 0, xerrors.Errorf("Retrieve: %w", err) - } - return id, nil } -func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error { +func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error { + proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) + carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) + carPath := exportRef.FromLocalCAR + + sel, err := getRetrievalSelector(exportRef.DatamodelPathSelector) + if err != nil { + return err + } + + if carPath == "" { + if !retrieveIntoIPFS && !retrieveIntoCAR { + return xerrors.Errorf("unsupported retrieval blockstore accessor") + } + + if retrieveIntoCAR { + carPath = carBss.PathFor(exportRef.DealID) + } + } + + var retrievalBs bstore.Blockstore + if retrieveIntoIPFS { + retrievalBs = proxyBss.Blockstore + } else { + cbs, err := stores.ReadOnlyFilestore(carPath) + if err != nil { + return err + } + defer cbs.Close() //nolint:errcheck + retrievalBs = cbs + } + + // Are we outputting a CAR? + if ref.IsCAR { + // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in + if !retrieveIntoIPFS && exportRef.DatamodelPathSelector == nil { + return carv2.ExtractV1File(carPath, ref.Path) + } + + return a.outputCAR(ctx, exportRef.Root, sel, retrievalBs, ref) + } + + return a.outputUnixFS(ctx, exportRef.Root, exportRef.DatamodelPathSelector != nil, sel, retrievalBs, ref) +} + +func (a *API) outputCAR(ctx context.Context, root cid.Cid, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error { // generating a CARv1 from the configured blockstore f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { @@ -950,7 +915,7 @@ func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datam ctx, bs, []car.Dag{{ - Root: order.Root, + Root: root, Selector: sel, }}, car.MaxTraversalLinks(config.MaxTraversalLinks), @@ -962,12 +927,11 @@ func (a *API) outputCAR(ctx context.Context, order api.RetrievalOrder, sel datam return f.Close() } -func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node, bs bstore.Blockstore, ref *api.FileRef) error { +func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, findSubroot bool, sel datamodel.Node, bs bstore.Blockstore, ref api.FileRef) error { ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - root := order.Root // if we used a selector - need to find the sub-root the user actually wanted to retrieve - if order.DatamodelPathSelector != nil { + if findSubroot { var subRootFound bool if err := utils.TraverseDag( @@ -1001,7 +965,7 @@ func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel da } if !subRootFound { - return xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root) + return xerrors.Errorf("path selection does not match a node within %s", root) } } @@ -1017,90 +981,6 @@ func (a *API) outputUnixFS(ctx context.Context, order api.RetrievalOrder, sel da return files.WriteTo(file, ref.Path) } -func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) { - defer close(events) - - finish := func(e error) { - if e != nil { - events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()} - } - } - - sel, err := getRetrievalSelector(order.DatamodelPathSelector) - if err != nil { - finish(err) - return - } - - // summary: - // 1. if we're retrieving from an import, FromLocalCAR will be set. - // Skip the retrieval itself, and use the provided car as a blockstore further down - // to extract a CAR or UnixFS export from. - // 2. if we're using an IPFS blockstore for retrieval, retrieve into it, - // then use the virtual blockstore to extract a CAR or UnixFS export from it. - // 3. if we have to retrieve, perform a CARv2 retrieval, then either - // extract the CARv1 (with ExtractV1File) or use it as a blockstore further down. - - // this indicates we're proxying to IPFS. - proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) - carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) - carPath := order.FromLocalCAR - - // we actually need to retrieve from the network - if carPath == "" { - if !retrieveIntoIPFS && !retrieveIntoCAR { - // we don't recognize the blockstore accessor. - finish(xerrors.Errorf("unsupported retrieval blockstore accessor")) - return - } - - id, err := a.doRetrieval(ctx, order, sel, events) - if err != nil { - finish(err) - return - } - - if retrieveIntoCAR { - carPath = carBss.PathFor(id) - } - } - - if ref == nil { - // If ref is nil, it only fetches the data into the configured blockstore - // (if fetching from network). - finish(nil) - return - } - - // determine where did the retrieval go - var retrievalBs bstore.Blockstore - if retrieveIntoIPFS { - retrievalBs = proxyBss.Blockstore - } else { - cbs, err := stores.ReadOnlyFilestore(carPath) - if err != nil { - finish(err) - return - } - defer cbs.Close() //nolint:errcheck - retrievalBs = cbs - } - - // Are we outputting a CAR? - if ref.IsCAR { - // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in - if !retrieveIntoIPFS && order.DatamodelPathSelector == nil { - finish(carv2.ExtractV1File(carPath, ref.Path)) - return - } - - finish(a.outputCAR(ctx, order, sel, retrievalBs, ref)) - return - } - - finish(a.outputUnixFS(ctx, order, sel, retrievalBs, ref)) -} - func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) { deals, err := a.Retrieval.ListDeals() if err != nil {