From b868769ec8e313918e77bef91955ca91a6a0f0c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 10 Nov 2021 17:51:16 +0100 Subject: [PATCH] more retrieval api work --- api/api_full.go | 2 + api/docgen/docgen.go | 3 + api/mocks/mock_full.go | 43 +++++----- api/proxy_gen.go | 13 +++ api/v0api/v0mocks/mock_full.go | 5 +- cli/client.go | 4 +- documentation/en/api-v0-methods.md | 3 +- documentation/en/api-v1-unstable-methods.md | 91 ++++++++------------- itests/deals_partial_retrieval_test.go | 38 +++++++-- node/impl/client/client.go | 50 +++++++++++ node/impl/client/client_test.go | 6 +- 11 files changed, 163 insertions(+), 95 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 3bbfda178..48b5d0d3c 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -352,6 +352,8 @@ type FullNode interface { ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read // ClientRetrieve initiates the retrieval of a file, as specified in the order. ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) //perm:admin + // ClientRetrieveWait waits for retrieval to be complete + ClientRetrieveWait(ctx context.Context, deal retrievalmarket.DealID) error //perm:admin // ClientExport exports a file stored in the local filestore to a system file ClientExport(ctx context.Context, exportRef ExportRef, fileRef FileRef) error //perm:admin // ClientListRetrievals returns information about retrievals made by the local client diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 25b9ac8c9..498a0747c 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -91,6 +91,7 @@ func init() { storeIDExample := imports.ID(50) textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash") + clientEvent := retrievalmarket.ClientEventDealAccepted addExample(bitfield.NewFromSet([]uint64{5})) addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1) @@ -122,6 +123,8 @@ func init() { addExample(datatransfer.Ongoing) addExample(storeIDExample) addExample(&storeIDExample) + addExample(clientEvent) + addExample(&clientEvent) addExample(retrievalmarket.ClientEventDealAccepted) addExample(retrievalmarket.DealStatusNew) addExample(&textSelExample) diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 44fe82b60..4b18eb365 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -25,7 +25,6 @@ import ( miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" types "github.com/filecoin-project/lotus/chain/types" alerting "github.com/filecoin-project/lotus/journal/alerting" - marketevents "github.com/filecoin-project/lotus/markets/loggers" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" imports "github.com/filecoin-project/lotus/node/repo/imports" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -537,6 +536,20 @@ func (mr *MockFullNodeMockRecorder) ClientDealSize(arg0, arg1 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientDealSize", reflect.TypeOf((*MockFullNode)(nil).ClientDealSize), arg0, arg1) } +// ClientExport mocks base method. +func (m *MockFullNode) ClientExport(arg0 context.Context, arg1 api.ExportRef, arg2 api.FileRef) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClientExport", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// ClientExport indicates an expected call of ClientExport. +func (mr *MockFullNodeMockRecorder) ClientExport(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientExport", reflect.TypeOf((*MockFullNode)(nil).ClientExport), arg0, arg1, arg2) +} + // ClientFindData mocks base method. func (m *MockFullNode) ClientFindData(arg0 context.Context, arg1 cid.Cid, arg2 *cid.Cid) ([]api.QueryOffer, error) { m.ctrl.T.Helper() @@ -775,17 +788,18 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2, } // ClientRetrieve mocks base method. -func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error { +func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder) (*api.RestrievalRes, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 + ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1) + ret0, _ := ret[0].(*api.RestrievalRes) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ClientRetrieve indicates an expected call of ClientRetrieve. -func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1) } // ClientRetrieveTryRestartInsufficientFunds mocks base method. @@ -802,21 +816,6 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveTryRestartInsufficientFunds", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveTryRestartInsufficientFunds), arg0, arg1) } -// ClientRetrieveWithEvents mocks base method. -func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2) - ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ClientRetrieveWithEvents indicates an expected call of ClientRetrieveWithEvents. -func (mr *MockFullNodeMockRecorder) ClientRetrieveWithEvents(arg0, arg1, arg2 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveWithEvents", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveWithEvents), arg0, arg1, arg2) -} - // ClientStartDeal mocks base method. func (m *MockFullNode) ClientStartDeal(arg0 context.Context, arg1 *api.StartDealParams) (*cid.Cid, error) { m.ctrl.T.Helper() diff --git a/api/proxy_gen.go b/api/proxy_gen.go index d78304397..feb08531f 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -199,6 +199,8 @@ type FullNodeStruct struct { ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` + ClientRetrieveWait func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"admin"` + ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStatelessDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"write"` @@ -1565,6 +1567,17 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont return ErrNotSupported } +func (s *FullNodeStruct) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error { + if s.Internal.ClientRetrieveWait == nil { + return ErrNotSupported + } + return s.Internal.ClientRetrieveWait(p0, p1) +} + +func (s *FullNodeStub) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error { + return ErrNotSupported +} + func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) { if s.Internal.ClientStartDeal == nil { return nil, ErrNotSupported diff --git a/api/v0api/v0mocks/mock_full.go b/api/v0api/v0mocks/mock_full.go index 0344eebf3..3e9caaee8 100644 --- a/api/v0api/v0mocks/mock_full.go +++ b/api/v0api/v0mocks/mock_full.go @@ -21,6 +21,7 @@ import ( network "github.com/filecoin-project/go-state-types/network" api "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" + v0api "github.com/filecoin-project/lotus/api/v0api" miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" types "github.com/filecoin-project/lotus/chain/types" alerting "github.com/filecoin-project/lotus/journal/alerting" @@ -760,7 +761,7 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2, } // ClientRetrieve mocks base method. -func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error { +func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -788,7 +789,7 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar } // ClientRetrieveWithEvents mocks base method. -func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { +func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2) ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent) diff --git a/cli/client.go b/cli/client.go index e715ec9fb..4aa64ef55 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1111,8 +1111,8 @@ var clientRetrieveCmd = &cli.Command{ for _, i := range imports { if i.Root != nil && i.Root.Equals(file) { eref = &lapi.ExportRef{ - Root: file, - FromLocalCAR: i.CARPath, + Root: file, + FromLocalCAR: i.CARPath, } break } diff --git a/documentation/en/api-v0-methods.md b/documentation/en/api-v0-methods.md index 4d9530821..2e57b8d7d 100644 --- a/documentation/en/api-v0-methods.md +++ b/documentation/en/api-v0-methods.md @@ -1269,7 +1269,8 @@ Response: "Stages": { "Stages": null } - } + }, + "Event": 5 } ``` diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 24eba2d06..17cb16339 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -41,6 +41,7 @@ * [ClientDataTransferUpdates](#ClientDataTransferUpdates) * [ClientDealPieceCID](#ClientDealPieceCID) * [ClientDealSize](#ClientDealSize) + * [ClientExport](#ClientExport) * [ClientFindData](#ClientFindData) * [ClientGenCar](#ClientGenCar) * [ClientGetDealInfo](#ClientGetDealInfo) @@ -59,7 +60,6 @@ * [ClientRestartDataTransfer](#ClientRestartDataTransfer) * [ClientRetrieve](#ClientRetrieve) * [ClientRetrieveTryRestartInsufficientFunds](#ClientRetrieveTryRestartInsufficientFunds) - * [ClientRetrieveWithEvents](#ClientRetrieveWithEvents) * [ClientStartDeal](#ClientStartDeal) * [ClientStatelessDeal](#ClientStatelessDeal) * [Create](#Create) @@ -1055,6 +1055,32 @@ Response: } ``` +### ClientExport +ClientExport exports a file stored in the local filestore to a system file + + +Perms: admin + +Inputs: +```json +[ + { + "Root": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "DatamodelPathSelector": "Links/21/Hash/Links/42/Hash", + "FromLocalCAR": "string value", + "DealID": 5 + }, + { + "Path": "string value", + "IsCAR": true + } +] +``` + +Response: `{}` + ### ClientFindData ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer). @@ -1282,7 +1308,8 @@ Response: "Stages": { "Stages": null } - } + }, + "Event": 5 } ``` @@ -1484,7 +1511,6 @@ Inputs: "Piece": null, "DatamodelPathSelector": "Links/21/Hash/Links/42/Hash", "Size": 42, - "FromLocalCAR": "string value", "Total": "0", "UnsealPrice": "0", "PaymentInterval": 42, @@ -1496,15 +1522,16 @@ Inputs: "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "PieceCID": null } - }, - { - "Path": "string value", - "IsCAR": true } ] ``` -Response: `{}` +Response: +```json +{ + "DealID": 5 +} +``` ### ClientRetrieveTryRestartInsufficientFunds ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel @@ -1522,54 +1549,6 @@ Inputs: Response: `{}` -### ClientRetrieveWithEvents -ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel -of status updates. - - -Perms: admin - -Inputs: -```json -[ - { - "Root": { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - }, - "Piece": null, - "DatamodelPathSelector": "Links/21/Hash/Links/42/Hash", - "Size": 42, - "FromLocalCAR": "string value", - "Total": "0", - "UnsealPrice": "0", - "PaymentInterval": 42, - "PaymentIntervalIncrease": 42, - "Client": "f01234", - "Miner": "f01234", - "MinerPeer": { - "Address": "f01234", - "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", - "PieceCID": null - } - }, - { - "Path": "string value", - "IsCAR": true - } -] -``` - -Response: -```json -{ - "Event": 5, - "Status": 0, - "BytesReceived": 42, - "FundsSpent": "0", - "Err": "string value" -} -``` - ### ClientStartDeal ClientStartDeal proposes a deal with a miner. diff --git a/itests/deals_partial_retrieval_test.go b/itests/deals_partial_retrieval_test.go index ffc8c5e2c..8c7775588 100644 --- a/itests/deals_partial_retrieval_test.go +++ b/itests/deals_partial_retrieval_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -56,14 +58,11 @@ func TestPartialRetrieval(t *testing.T) { for _, fullCycle := range []bool{false, true} { var retOrder api.RetrievalOrder + var eref api.ExportRef if !fullCycle { - - retOrder.FromLocalCAR = sourceCar - retOrder.Root = carRoot - + eref.FromLocalCAR = sourceCar } else { - dp := dh.DefaultStartDealParams() dp.Data = &storagemarket.DataRef{ // FIXME: figure out how to do this with an online partial transfer @@ -96,6 +95,8 @@ func TestPartialRetrieval(t *testing.T) { } retOrder.DatamodelPathSelector = &textSelector + eref.DatamodelPathSelector = &textSelector + eref.Root = carRoot // test retrieval of either data or constructing a partial selective-car for _, retrieveAsCar := range []bool{false, true} { @@ -107,6 +108,7 @@ func TestPartialRetrieval(t *testing.T) { ctx, client, retOrder, + eref, &api.FileRef{ Path: outFile.Name(), IsCAR: retrieveAsCar, @@ -131,10 +133,13 @@ func TestPartialRetrieval(t *testing.T) { ctx, client, api.RetrievalOrder{ - FromLocalCAR: sourceCar, Root: carRoot, DatamodelPathSelector: &textSelectorNonexistent, }, + api.ExportRef{ + FromLocalCAR: sourceCar, + DatamodelPathSelector: &textSelectorNonexistent, + }, &api.FileRef{}, nil, ), @@ -148,10 +153,13 @@ func TestPartialRetrieval(t *testing.T) { ctx, client, api.RetrievalOrder{ - FromLocalCAR: sourceCar, Root: carRoot, DatamodelPathSelector: &textSelectorNonLink, }, + api.ExportRef{ + FromLocalCAR: sourceCar, + DatamodelPathSelector: &textSelectorNonLink, + }, &api.FileRef{}, nil, ), @@ -159,7 +167,7 @@ func TestPartialRetrieval(t *testing.T) { ) } -func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, retRef *api.FileRef, outFile *os.File) error { +func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, outFile *os.File) error { if retOrder.Total.Nil() { retOrder.Total = big.Zero() @@ -168,7 +176,19 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde retOrder.UnsealPrice = big.Zero() } - err := client.ClientRetrieve(ctx, retOrder, retRef) + if eref.FromLocalCAR == "" { + rr, err := client.ClientRetrieve(ctx, retOrder) + if err != nil { + return err + } + eref.DealID = rr.DealID + + if err := client.ClientRetrieveWait(ctx, rr.DealID); err != nil { + return xerrors.Errorf("retrieval wait: %w", err) + } + } + + err := client.ClientExport(ctx, eref, *retRef) if err != nil { return err } diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8f82e6513..34bde7e26 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -859,6 +859,56 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat return id, nil } +func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + subscribeEvents := make(chan rm.ClientDealState, 1) + + unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { + // We'll check the deal IDs inside consumeAllEvents. + if state.ID != deal { + return + } + select { + case <-ctx.Done(): + case subscribeEvents <- state: + } + }) + defer unsubscribe() + + { + state, err := a.Retrieval.GetDeal(deal) + if err != nil { + return xerrors.Errorf("getting deal state: %w", err) + } + select { + case subscribeEvents <- state: + default: // already have an event queued from the subscription + } + } + + for { + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case state := <-subscribeEvents: + switch state.Status { + case rm.DealStatusCompleted: + return nil + case rm.DealStatusRejected: + return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) + case rm.DealStatusCancelled: + return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message) + case + rm.DealStatusDealNotFound, + rm.DealStatusErrored: + return xerrors.Errorf("Retrieval Error: %s", state.Message) + } + } + } +} + func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error { proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) diff --git a/node/impl/client/client_test.go b/node/impl/client/client_test.go index 834c980ab..bf7ff7735 100644 --- a/node/impl/client/client_test.go +++ b/node/impl/client/client_test.go @@ -60,7 +60,7 @@ func TestImportLocal(t *testing.T) { require.NoError(t, err) require.True(t, local) - order := api.RetrievalOrder{ + order := api.ExportRef{ Root: root, FromLocalCAR: it.CARPath, } @@ -68,7 +68,7 @@ func TestImportLocal(t *testing.T) { // retrieve as UnixFS. out1 := filepath.Join(dir, "retrieval1.data") // as unixfs out2 := filepath.Join(dir, "retrieval2.data") // as car - err = a.ClientRetrieve(ctx, order, &api.FileRef{ + err = a.ClientExport(ctx, order, api.FileRef{ Path: out1, }) require.NoError(t, err) @@ -77,7 +77,7 @@ func TestImportLocal(t *testing.T) { require.NoError(t, err) require.Equal(t, b, outBytes) - err = a.ClientRetrieve(ctx, order, &api.FileRef{ + err = a.ClientExport(ctx, order, api.FileRef{ Path: out2, IsCAR: true, })