more retrieval api work

This commit is contained in:
Łukasz Magiera 2021-11-10 17:51:16 +01:00
parent 89138bab4d
commit b868769ec8
11 changed files with 163 additions and 95 deletions

View File

@ -352,6 +352,8 @@ type FullNode interface {
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read
// ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) //perm:admin
// ClientRetrieveWait waits for retrieval to be complete
ClientRetrieveWait(ctx context.Context, deal retrievalmarket.DealID) error //perm:admin
// ClientExport exports a file stored in the local filestore to a system file
ClientExport(ctx context.Context, exportRef ExportRef, fileRef FileRef) error //perm:admin
// ClientListRetrievals returns information about retrievals made by the local client

View File

@ -91,6 +91,7 @@ func init() {
storeIDExample := imports.ID(50)
textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash")
clientEvent := retrievalmarket.ClientEventDealAccepted
addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
@ -122,6 +123,8 @@ func init() {
addExample(datatransfer.Ongoing)
addExample(storeIDExample)
addExample(&storeIDExample)
addExample(clientEvent)
addExample(&clientEvent)
addExample(retrievalmarket.ClientEventDealAccepted)
addExample(retrievalmarket.DealStatusNew)
addExample(&textSelExample)

View File

@ -25,7 +25,6 @@ import (
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
types "github.com/filecoin-project/lotus/chain/types"
alerting "github.com/filecoin-project/lotus/journal/alerting"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
imports "github.com/filecoin-project/lotus/node/repo/imports"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -537,6 +536,20 @@ func (mr *MockFullNodeMockRecorder) ClientDealSize(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientDealSize", reflect.TypeOf((*MockFullNode)(nil).ClientDealSize), arg0, arg1)
}
// ClientExport mocks base method.
func (m *MockFullNode) ClientExport(arg0 context.Context, arg1 api.ExportRef, arg2 api.FileRef) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientExport", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// ClientExport indicates an expected call of ClientExport.
func (mr *MockFullNodeMockRecorder) ClientExport(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientExport", reflect.TypeOf((*MockFullNode)(nil).ClientExport), arg0, arg1, arg2)
}
// ClientFindData mocks base method.
func (m *MockFullNode) ClientFindData(arg0 context.Context, arg1 cid.Cid, arg2 *cid.Cid) ([]api.QueryOffer, error) {
m.ctrl.T.Helper()
@ -775,17 +788,18 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2,
}
// ClientRetrieve mocks base method.
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error {
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder) (*api.RestrievalRes, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1)
ret0, _ := ret[0].(*api.RestrievalRes)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ClientRetrieve indicates an expected call of ClientRetrieve.
func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1)
}
// ClientRetrieveTryRestartInsufficientFunds mocks base method.
@ -802,21 +816,6 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveTryRestartInsufficientFunds", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveTryRestartInsufficientFunds), arg0, arg1)
}
// ClientRetrieveWithEvents mocks base method.
func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2)
ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ClientRetrieveWithEvents indicates an expected call of ClientRetrieveWithEvents.
func (mr *MockFullNodeMockRecorder) ClientRetrieveWithEvents(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveWithEvents", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveWithEvents), arg0, arg1, arg2)
}
// ClientStartDeal mocks base method.
func (m *MockFullNode) ClientStartDeal(arg0 context.Context, arg1 *api.StartDealParams) (*cid.Cid, error) {
m.ctrl.T.Helper()

View File

@ -199,6 +199,8 @@ type FullNodeStruct struct {
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientRetrieveWait func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"admin"`
ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientStatelessDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"write"`
@ -1565,6 +1567,17 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont
return ErrNotSupported
}
func (s *FullNodeStruct) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error {
if s.Internal.ClientRetrieveWait == nil {
return ErrNotSupported
}
return s.Internal.ClientRetrieveWait(p0, p1)
}
func (s *FullNodeStub) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error {
return ErrNotSupported
}
func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) {
if s.Internal.ClientStartDeal == nil {
return nil, ErrNotSupported

View File

@ -21,6 +21,7 @@ import (
network "github.com/filecoin-project/go-state-types/network"
api "github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types"
v0api "github.com/filecoin-project/lotus/api/v0api"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
types "github.com/filecoin-project/lotus/chain/types"
alerting "github.com/filecoin-project/lotus/journal/alerting"
@ -760,7 +761,7 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2,
}
// ClientRetrieve mocks base method.
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error {
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
@ -788,7 +789,7 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar
}
// ClientRetrieveWithEvents mocks base method.
func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2)
ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent)

View File

@ -1111,8 +1111,8 @@ var clientRetrieveCmd = &cli.Command{
for _, i := range imports {
if i.Root != nil && i.Root.Equals(file) {
eref = &lapi.ExportRef{
Root: file,
FromLocalCAR: i.CARPath,
Root: file,
FromLocalCAR: i.CARPath,
}
break
}

View File

@ -1269,7 +1269,8 @@ Response:
"Stages": {
"Stages": null
}
}
},
"Event": 5
}
```

View File

@ -41,6 +41,7 @@
* [ClientDataTransferUpdates](#ClientDataTransferUpdates)
* [ClientDealPieceCID](#ClientDealPieceCID)
* [ClientDealSize](#ClientDealSize)
* [ClientExport](#ClientExport)
* [ClientFindData](#ClientFindData)
* [ClientGenCar](#ClientGenCar)
* [ClientGetDealInfo](#ClientGetDealInfo)
@ -59,7 +60,6 @@
* [ClientRestartDataTransfer](#ClientRestartDataTransfer)
* [ClientRetrieve](#ClientRetrieve)
* [ClientRetrieveTryRestartInsufficientFunds](#ClientRetrieveTryRestartInsufficientFunds)
* [ClientRetrieveWithEvents](#ClientRetrieveWithEvents)
* [ClientStartDeal](#ClientStartDeal)
* [ClientStatelessDeal](#ClientStatelessDeal)
* [Create](#Create)
@ -1055,6 +1055,32 @@ Response:
}
```
### ClientExport
ClientExport exports a file stored in the local filestore to a system file
Perms: admin
Inputs:
```json
[
{
"Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"FromLocalCAR": "string value",
"DealID": 5
},
{
"Path": "string value",
"IsCAR": true
}
]
```
Response: `{}`
### ClientFindData
ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer).
@ -1282,7 +1308,8 @@ Response:
"Stages": {
"Stages": null
}
}
},
"Event": 5
}
```
@ -1484,7 +1511,6 @@ Inputs:
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",
"UnsealPrice": "0",
"PaymentInterval": 42,
@ -1496,15 +1522,16 @@ Inputs:
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null
}
},
{
"Path": "string value",
"IsCAR": true
}
]
```
Response: `{}`
Response:
```json
{
"DealID": 5
}
```
### ClientRetrieveTryRestartInsufficientFunds
ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel
@ -1522,54 +1549,6 @@ Inputs:
Response: `{}`
### ClientRetrieveWithEvents
ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
of status updates.
Perms: admin
Inputs:
```json
[
{
"Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",
"UnsealPrice": "0",
"PaymentInterval": 42,
"PaymentIntervalIncrease": 42,
"Client": "f01234",
"Miner": "f01234",
"MinerPeer": {
"Address": "f01234",
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null
}
},
{
"Path": "string value",
"IsCAR": true
}
]
```
Response:
```json
{
"Event": 5,
"Status": 0,
"BytesReceived": 42,
"FundsSpent": "0",
"Err": "string value"
}
```
### ClientStartDeal
ClientStartDeal proposes a deal with a miner.

View File

@ -9,6 +9,8 @@ import (
"testing"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
@ -56,14 +58,11 @@ func TestPartialRetrieval(t *testing.T) {
for _, fullCycle := range []bool{false, true} {
var retOrder api.RetrievalOrder
var eref api.ExportRef
if !fullCycle {
retOrder.FromLocalCAR = sourceCar
retOrder.Root = carRoot
eref.FromLocalCAR = sourceCar
} else {
dp := dh.DefaultStartDealParams()
dp.Data = &storagemarket.DataRef{
// FIXME: figure out how to do this with an online partial transfer
@ -96,6 +95,8 @@ func TestPartialRetrieval(t *testing.T) {
}
retOrder.DatamodelPathSelector = &textSelector
eref.DatamodelPathSelector = &textSelector
eref.Root = carRoot
// test retrieval of either data or constructing a partial selective-car
for _, retrieveAsCar := range []bool{false, true} {
@ -107,6 +108,7 @@ func TestPartialRetrieval(t *testing.T) {
ctx,
client,
retOrder,
eref,
&api.FileRef{
Path: outFile.Name(),
IsCAR: retrieveAsCar,
@ -131,10 +133,13 @@ func TestPartialRetrieval(t *testing.T) {
ctx,
client,
api.RetrievalOrder{
FromLocalCAR: sourceCar,
Root: carRoot,
DatamodelPathSelector: &textSelectorNonexistent,
},
api.ExportRef{
FromLocalCAR: sourceCar,
DatamodelPathSelector: &textSelectorNonexistent,
},
&api.FileRef{},
nil,
),
@ -148,10 +153,13 @@ func TestPartialRetrieval(t *testing.T) {
ctx,
client,
api.RetrievalOrder{
FromLocalCAR: sourceCar,
Root: carRoot,
DatamodelPathSelector: &textSelectorNonLink,
},
api.ExportRef{
FromLocalCAR: sourceCar,
DatamodelPathSelector: &textSelectorNonLink,
},
&api.FileRef{},
nil,
),
@ -159,7 +167,7 @@ func TestPartialRetrieval(t *testing.T) {
)
}
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, retRef *api.FileRef, outFile *os.File) error {
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, outFile *os.File) error {
if retOrder.Total.Nil() {
retOrder.Total = big.Zero()
@ -168,7 +176,19 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
retOrder.UnsealPrice = big.Zero()
}
err := client.ClientRetrieve(ctx, retOrder, retRef)
if eref.FromLocalCAR == "" {
rr, err := client.ClientRetrieve(ctx, retOrder)
if err != nil {
return err
}
eref.DealID = rr.DealID
if err := client.ClientRetrieveWait(ctx, rr.DealID); err != nil {
return xerrors.Errorf("retrieval wait: %w", err)
}
}
err := client.ClientExport(ctx, eref, *retRef)
if err != nil {
return err
}

View File

@ -859,6 +859,56 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
return id, nil
}
func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
subscribeEvents := make(chan rm.ClientDealState, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside consumeAllEvents.
if state.ID != deal {
return
}
select {
case <-ctx.Done():
case subscribeEvents <- state:
}
})
defer unsubscribe()
{
state, err := a.Retrieval.GetDeal(deal)
if err != nil {
return xerrors.Errorf("getting deal state: %w", err)
}
select {
case subscribeEvents <- state:
default: // already have an event queued from the subscription
}
}
for {
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case state := <-subscribeEvents:
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case rm.DealStatusCancelled:
return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
}
func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error {
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)

View File

@ -60,7 +60,7 @@ func TestImportLocal(t *testing.T) {
require.NoError(t, err)
require.True(t, local)
order := api.RetrievalOrder{
order := api.ExportRef{
Root: root,
FromLocalCAR: it.CARPath,
}
@ -68,7 +68,7 @@ func TestImportLocal(t *testing.T) {
// retrieve as UnixFS.
out1 := filepath.Join(dir, "retrieval1.data") // as unixfs
out2 := filepath.Join(dir, "retrieval2.data") // as car
err = a.ClientRetrieve(ctx, order, &api.FileRef{
err = a.ClientExport(ctx, order, api.FileRef{
Path: out1,
})
require.NoError(t, err)
@ -77,7 +77,7 @@ func TestImportLocal(t *testing.T) {
require.NoError(t, err)
require.Equal(t, b, outBytes)
err = a.ClientRetrieve(ctx, order, &api.FileRef{
err = a.ClientExport(ctx, order, api.FileRef{
Path: out2,
IsCAR: true,
})