Merge pull request #7610 from filecoin-project/feat/partret-ux

Partial retrieval ux improvements
This commit is contained in:
Łukasz Magiera 2021-11-24 20:25:05 +01:00 committed by GitHub
commit 58dd814765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1470 additions and 621 deletions

View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -28,7 +27,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/node/repo/imports"
) )
@ -352,10 +350,11 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (QueryOffer, error) //perm:read
// ClientRetrieve initiates the retrieval of a file, as specified in the order. // ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error //perm:admin ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) //perm:admin
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel // ClientRetrieveWait waits for retrieval to be complete
// of status updates. ClientRetrieveWait(ctx context.Context, deal retrievalmarket.DealID) error //perm:admin
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin // ClientExport exports a file stored in the local filestore to a system file
ClientExport(ctx context.Context, exportRef ExportRef, fileRef FileRef) error //perm:admin
// ClientListRetrievals returns information about retrievals made by the local client // ClientListRetrievals returns information about retrievals made by the local client
ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) //perm:write ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) //perm:write
// ClientGetRetrievalUpdates returns status of updated retrieval deals // ClientGetRetrievalUpdates returns status of updated retrieval deals
@ -934,15 +933,14 @@ type MarketDeal struct {
} }
type RetrievalOrder struct { type RetrievalOrder struct {
// TODO: make this less unixfs specific Root cid.Cid
Root cid.Cid Piece *cid.Cid
Piece *cid.Cid DataSelector *Selector
DatamodelPathSelector *textselector.Expression
Size uint64 // todo: Size/Total are only used for calculating price per byte; we should let users just pass that
Size uint64
Total types.BigInt
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt UnsealPrice types.BigInt
PaymentInterval uint64 PaymentInterval uint64
PaymentIntervalIncrease uint64 PaymentIntervalIncrease uint64

View File

@ -91,6 +91,8 @@ func init() {
storeIDExample := imports.ID(50) storeIDExample := imports.ID(50)
textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash") textSelExample := textselector.Expression("Links/21/Hash/Links/42/Hash")
apiSelExample := api.Selector("Links/21/Hash/Links/42/Hash")
clientEvent := retrievalmarket.ClientEventDealAccepted
addExample(bitfield.NewFromSet([]uint64{5})) addExample(bitfield.NewFromSet([]uint64{5}))
addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1) addExample(abi.RegisteredSealProof_StackedDrg32GiBV1_1)
@ -122,9 +124,12 @@ func init() {
addExample(datatransfer.Ongoing) addExample(datatransfer.Ongoing)
addExample(storeIDExample) addExample(storeIDExample)
addExample(&storeIDExample) addExample(&storeIDExample)
addExample(clientEvent)
addExample(&clientEvent)
addExample(retrievalmarket.ClientEventDealAccepted) addExample(retrievalmarket.ClientEventDealAccepted)
addExample(retrievalmarket.DealStatusNew) addExample(retrievalmarket.DealStatusNew)
addExample(&textSelExample) addExample(&textSelExample)
addExample(&apiSelExample)
addExample(network.ReachabilityPublic) addExample(network.ReachabilityPublic)
addExample(build.NewestNetworkVersion) addExample(build.NewestNetworkVersion)
addExample(map[string]int{"name": 42}) addExample(map[string]int{"name": 42})

View File

@ -25,7 +25,6 @@ import (
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
types "github.com/filecoin-project/lotus/chain/types" types "github.com/filecoin-project/lotus/chain/types"
alerting "github.com/filecoin-project/lotus/journal/alerting" alerting "github.com/filecoin-project/lotus/journal/alerting"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
imports "github.com/filecoin-project/lotus/node/repo/imports" imports "github.com/filecoin-project/lotus/node/repo/imports"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -537,6 +536,20 @@ func (mr *MockFullNodeMockRecorder) ClientDealSize(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientDealSize", reflect.TypeOf((*MockFullNode)(nil).ClientDealSize), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientDealSize", reflect.TypeOf((*MockFullNode)(nil).ClientDealSize), arg0, arg1)
} }
// ClientExport mocks base method.
func (m *MockFullNode) ClientExport(arg0 context.Context, arg1 api.ExportRef, arg2 api.FileRef) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientExport", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// ClientExport indicates an expected call of ClientExport.
func (mr *MockFullNodeMockRecorder) ClientExport(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientExport", reflect.TypeOf((*MockFullNode)(nil).ClientExport), arg0, arg1, arg2)
}
// ClientFindData mocks base method. // ClientFindData mocks base method.
func (m *MockFullNode) ClientFindData(arg0 context.Context, arg1 cid.Cid, arg2 *cid.Cid) ([]api.QueryOffer, error) { func (m *MockFullNode) ClientFindData(arg0 context.Context, arg1 cid.Cid, arg2 *cid.Cid) ([]api.QueryOffer, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -775,17 +788,18 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2,
} }
// ClientRetrieve mocks base method. // ClientRetrieve mocks base method.
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error { func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder) (*api.RestrievalRes, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2) ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(*api.RestrievalRes)
return ret0 ret1, _ := ret[1].(error)
return ret0, ret1
} }
// ClientRetrieve indicates an expected call of ClientRetrieve. // ClientRetrieve indicates an expected call of ClientRetrieve.
func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1, arg2 interface{}) *gomock.Call { func (mr *MockFullNodeMockRecorder) ClientRetrieve(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieve", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieve), arg0, arg1)
} }
// ClientRetrieveTryRestartInsufficientFunds mocks base method. // ClientRetrieveTryRestartInsufficientFunds mocks base method.
@ -802,19 +816,18 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveTryRestartInsufficientFunds", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveTryRestartInsufficientFunds), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveTryRestartInsufficientFunds", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveTryRestartInsufficientFunds), arg0, arg1)
} }
// ClientRetrieveWithEvents mocks base method. // ClientRetrieveWait mocks base method.
func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (m *MockFullNode) ClientRetrieveWait(arg0 context.Context, arg1 retrievalmarket.DealID) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2) ret := m.ctrl.Call(m, "ClientRetrieveWait", arg0, arg1)
ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent) ret0, _ := ret[0].(error)
ret1, _ := ret[1].(error) return ret0
return ret0, ret1
} }
// ClientRetrieveWithEvents indicates an expected call of ClientRetrieveWithEvents. // ClientRetrieveWait indicates an expected call of ClientRetrieveWait.
func (mr *MockFullNodeMockRecorder) ClientRetrieveWithEvents(arg0, arg1, arg2 interface{}) *gomock.Call { func (mr *MockFullNodeMockRecorder) ClientRetrieveWait(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveWithEvents", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveWithEvents), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientRetrieveWait", reflect.TypeOf((*MockFullNode)(nil).ClientRetrieveWait), arg0, arg1)
} }
// ClientStartDeal mocks base method. // ClientStartDeal mocks base method.

View File

@ -28,7 +28,6 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/journal/alerting" "github.com/filecoin-project/lotus/journal/alerting"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -162,6 +161,8 @@ type FullNodeStruct struct {
ClientDealSize func(p0 context.Context, p1 cid.Cid) (DataSize, error) `perm:"read"` ClientDealSize func(p0 context.Context, p1 cid.Cid) (DataSize, error) `perm:"read"`
ClientExport func(p0 context.Context, p1 ExportRef, p2 FileRef) error `perm:"admin"`
ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientGenCar func(p0 context.Context, p1 FileRef, p2 string) error `perm:"write"` ClientGenCar func(p0 context.Context, p1 FileRef, p2 string) error `perm:"write"`
@ -194,11 +195,11 @@ type FullNodeStruct struct {
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error `perm:"admin"` ClientRetrieve func(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) `perm:"admin"`
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieveWait func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"admin"`
ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) `perm:"admin"`
@ -1357,6 +1358,17 @@ func (s *FullNodeStub) ClientDealSize(p0 context.Context, p1 cid.Cid) (DataSize,
return *new(DataSize), ErrNotSupported return *new(DataSize), ErrNotSupported
} }
func (s *FullNodeStruct) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error {
if s.Internal.ClientExport == nil {
return ErrNotSupported
}
return s.Internal.ClientExport(p0, p1, p2)
}
func (s *FullNodeStub) ClientExport(p0 context.Context, p1 ExportRef, p2 FileRef) error {
return ErrNotSupported
}
func (s *FullNodeStruct) ClientFindData(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) { func (s *FullNodeStruct) ClientFindData(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]QueryOffer, error) {
if s.Internal.ClientFindData == nil { if s.Internal.ClientFindData == nil {
return *new([]QueryOffer), ErrNotSupported return *new([]QueryOffer), ErrNotSupported
@ -1533,15 +1545,15 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran
return ErrNotSupported return ErrNotSupported
} }
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error { func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) {
if s.Internal.ClientRetrieve == nil { if s.Internal.ClientRetrieve == nil {
return ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.ClientRetrieve(p0, p1, p2) return s.Internal.ClientRetrieve(p0, p1)
} }
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) error { func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder) (*RestrievalRes, error) {
return ErrNotSupported return nil, ErrNotSupported
} }
func (s *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(p0 context.Context, p1 address.Address) error { func (s *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(p0 context.Context, p1 address.Address) error {
@ -1555,15 +1567,15 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont
return ErrNotSupported return ErrNotSupported
} }
func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) { func (s *FullNodeStruct) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error {
if s.Internal.ClientRetrieveWithEvents == nil { if s.Internal.ClientRetrieveWait == nil {
return nil, ErrNotSupported return ErrNotSupported
} }
return s.Internal.ClientRetrieveWithEvents(p0, p1, p2) return s.Internal.ClientRetrieveWait(p0, p1)
} }
func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *FileRef) (<-chan marketevents.RetrievalEvent, error) { func (s *FullNodeStub) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error {
return nil, ErrNotSupported return ErrNotSupported
} }
func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) { func (s *FullNodeStruct) ClientStartDeal(p0 context.Context, p1 *StartDealParams) (*cid.Cid, error) {

View File

@ -5,11 +5,10 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/chain/types"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -194,4 +193,39 @@ type RetrievalInfo struct {
TransferChannelID *datatransfer.ChannelID TransferChannelID *datatransfer.ChannelID
DataTransfer *DataTransferChannel DataTransfer *DataTransferChannel
// optional event if part of ClientGetRetrievalUpdates
Event *retrievalmarket.ClientEvent
}
type RestrievalRes struct {
DealID retrievalmarket.DealID
}
// Selector specifies ipld selector string
// - if the string starts with '{', it's interpreted as json selector string
// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/
// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path)
// see https://github.com/ipld/go-ipld-selector-text-lite
type Selector string
type DagSpec struct {
// DataSelector matches data to be retrieved
// - when using textselector, the path specifies subtree
// - the matched graph must have a single root
DataSelector *Selector
}
type ExportRef struct {
Root cid.Cid
// DAGs array specifies a list of DAGs to export
// - If exporting into a car file, defines car roots
// - If exporting into unixfs files, only one DAG is supported, DataSelector is only used to find the root node
// - When not specified defaults to a single DAG:
// - Data - the entire DAG: `{"R":{"l":{"none":{}},":>":{"a":{">":{"@":{}}}}}}`
DAGs []DagSpec
FromLocalCAR string // if specified, get data from a local CARv2 file.
DealID retrievalmarket.DealID
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -325,10 +326,10 @@ type FullNode interface {
// ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file.
ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) //perm:read ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) //perm:read
// ClientRetrieve initiates the retrieval of a file, as specified in the order. // ClientRetrieve initiates the retrieval of a file, as specified in the order.
ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error //perm:admin ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error //perm:admin
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel // ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates. // of status updates.
ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin
// ClientQueryAsk returns a signed StorageAsk from the specified miner. // ClientQueryAsk returns a signed StorageAsk from the specified miner.
// ClientListRetrievals returns information about retrievals made by the local client // ClientListRetrievals returns information about retrievals made by the local client
ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) //perm:write ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) //perm:write
@ -714,3 +715,37 @@ type FullNode interface {
// the path specified when calling CreateBackup is within the base path // the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin CreateBackup(ctx context.Context, fpath string) error //perm:admin
} }
func OfferOrder(o api.QueryOffer, client address.Address) RetrievalOrder {
return RetrievalOrder{
Root: o.Root,
Piece: o.Piece,
Size: o.Size,
Total: o.MinPrice,
UnsealPrice: o.UnsealPrice,
PaymentInterval: o.PaymentInterval,
PaymentIntervalIncrease: o.PaymentIntervalIncrease,
Client: client,
Miner: o.Miner,
MinerPeer: &o.MinerPeer,
}
}
type RetrievalOrder struct {
// TODO: make this less unixfs specific
Root cid.Cid
Piece *cid.Cid
DatamodelPathSelector *textselector.Expression
Size uint64
FromLocalCAR string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt
PaymentInterval uint64
PaymentIntervalIncrease uint64
Client address.Address
Miner address.Address
MinerPeer *retrievalmarket.RetrievalPeer
}

View File

@ -125,11 +125,11 @@ type FullNodeStruct struct {
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientRetrieve func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error `perm:"admin"` ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error `perm:"admin"`
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientRetrieveWithEvents func(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
@ -965,14 +965,14 @@ func (s *FullNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 datatran
return ErrNotSupported return ErrNotSupported
} }
func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error { func (s *FullNodeStruct) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error {
if s.Internal.ClientRetrieve == nil { if s.Internal.ClientRetrieve == nil {
return ErrNotSupported return ErrNotSupported
} }
return s.Internal.ClientRetrieve(p0, p1, p2) return s.Internal.ClientRetrieve(p0, p1, p2)
} }
func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) error { func (s *FullNodeStub) ClientRetrieve(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error {
return ErrNotSupported return ErrNotSupported
} }
@ -987,14 +987,14 @@ func (s *FullNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 context.Cont
return ErrNotSupported return ErrNotSupported
} }
func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (s *FullNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
if s.Internal.ClientRetrieveWithEvents == nil { if s.Internal.ClientRetrieveWithEvents == nil {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.ClientRetrieveWithEvents(p0, p1, p2) return s.Internal.ClientRetrieveWithEvents(p0, p1, p2)
} }
func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 api.RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (s *FullNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }

View File

@ -21,6 +21,7 @@ import (
network "github.com/filecoin-project/go-state-types/network" network "github.com/filecoin-project/go-state-types/network"
api "github.com/filecoin-project/lotus/api" api "github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types" apitypes "github.com/filecoin-project/lotus/api/types"
v0api "github.com/filecoin-project/lotus/api/v0api"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
types "github.com/filecoin-project/lotus/chain/types" types "github.com/filecoin-project/lotus/chain/types"
alerting "github.com/filecoin-project/lotus/journal/alerting" alerting "github.com/filecoin-project/lotus/journal/alerting"
@ -760,7 +761,7 @@ func (mr *MockFullNodeMockRecorder) ClientRestartDataTransfer(arg0, arg1, arg2,
} }
// ClientRetrieve mocks base method. // ClientRetrieve mocks base method.
func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) error { func (m *MockFullNode) ClientRetrieve(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2) ret := m.ctrl.Call(m, "ClientRetrieve", arg0, arg1, arg2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
@ -788,7 +789,7 @@ func (mr *MockFullNodeMockRecorder) ClientRetrieveTryRestartInsufficientFunds(ar
} }
// ClientRetrieveWithEvents mocks base method. // ClientRetrieveWithEvents mocks base method.
func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { func (m *MockFullNode) ClientRetrieveWithEvents(arg0 context.Context, arg1 v0api.RetrievalOrder, arg2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2) ret := m.ctrl.Call(m, "ClientRetrieveWithEvents", arg0, arg1, arg2)
ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent) ret0, _ := ret[0].(<-chan marketevents.RetrievalEvent)

View File

@ -3,7 +3,10 @@ package v0api
import ( import (
"context" "context"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -194,4 +197,143 @@ func (w *WrapperV1Full) ChainGetRandomnessFromBeacon(ctx context.Context, tsk ty
return w.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk) return w.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk)
} }
func (w *WrapperV1Full) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef) error {
events := make(chan marketevents.RetrievalEvent)
go w.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (w *WrapperV1Full) ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent)
go w.clientRetrieve(ctx, order, ref, events)
return events, nil
}
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents <-chan api.RetrievalInfo, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent api.RetrievalInfo
var evt retrievalmarket.ClientEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
if subscribeEvent.Event != nil {
evt = *subscribeEvent.Event
}
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: evt,
Status: subscribeEvent.Status,
BytesReceived: subscribeEvent.BytesReceived,
FundsSpent: subscribeEvent.TotalPaid,
}:
}
switch subscribeEvent.Status {
case retrievalmarket.DealStatusCompleted:
return nil
case retrievalmarket.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", subscribeEvent.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", subscribeEvent.Message)
}
}
}
func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
if e != nil {
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
}
}
var dealID retrievalmarket.DealID
if order.FromLocalCAR == "" {
// Subscribe to events before retrieving to avoid losing events.
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
retrievalEvents, err := w.ClientGetRetrievalUpdates(subscribeCtx)
if err != nil {
finish(xerrors.Errorf("GetRetrievalUpdates failed: %w", err))
return
}
retrievalRes, err := w.FullNode.ClientRetrieve(ctx, api.RetrievalOrder{
Root: order.Root,
Piece: order.Piece,
Size: order.Size,
Total: order.Total,
UnsealPrice: order.UnsealPrice,
PaymentInterval: order.PaymentInterval,
PaymentIntervalIncrease: order.PaymentIntervalIncrease,
Client: order.Client,
Miner: order.Miner,
MinerPeer: order.MinerPeer,
})
if err != nil {
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}
dealID = retrievalRes.DealID
err = readSubscribeEvents(ctx, retrievalRes.DealID, retrievalEvents, events)
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
}
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
finish(nil)
return
}
eref := api.ExportRef{
Root: order.Root,
FromLocalCAR: order.FromLocalCAR,
DealID: dealID,
}
if order.DatamodelPathSelector != nil {
s := api.Selector(*order.DatamodelPathSelector)
eref.DAGs = append(eref.DAGs, api.DagSpec{
DataSelector: &s,
})
}
finish(w.ClientExport(ctx, eref, *ref))
}
var _ FullNode = &WrapperV1Full{} var _ FullNode = &WrapperV1Full{}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -26,7 +26,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc" "github.com/ipfs/go-cidutil/cidenc"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase" "github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -94,6 +93,8 @@ var clientCmd = &cli.Command{
WithCategory("data", clientStat), WithCategory("data", clientStat),
WithCategory("retrieval", clientFindCmd), WithCategory("retrieval", clientFindCmd),
WithCategory("retrieval", clientRetrieveCmd), WithCategory("retrieval", clientRetrieveCmd),
WithCategory("retrieval", clientRetrieveCatCmd),
WithCategory("retrieval", clientRetrieveLsCmd),
WithCategory("retrieval", clientCancelRetrievalDealCmd), WithCategory("retrieval", clientCancelRetrievalDealCmd),
WithCategory("retrieval", clientListRetrievalsCmd), WithCategory("retrieval", clientListRetrievalsCmd),
WithCategory("util", clientCommPCmd), WithCategory("util", clientCommPCmd),
@ -1029,209 +1030,6 @@ var clientFindCmd = &cli.Command{
}, },
} }
const DefaultMaxRetrievePrice = "0.01"
var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "Retrieve data from network",
ArgsUsage: "[dataCid outputPath]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
Usage: "address to send transactions from",
},
&cli.BoolFlag{
Name: "car",
Usage: "export to a car file instead of a regular file",
},
&cli.StringFlag{
Name: "miner",
Usage: "miner address for retrieval, if not present it'll use local discovery",
},
&cli.StringFlag{
Name: "datamodel-path-selector",
Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal",
},
&cli.StringFlag{
Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
},
&cli.StringFlag{
Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID",
},
&cli.BoolFlag{
Name: "allow-local",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
fapi, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
afmt := NewAppFmt(cctx.App)
var payer address.Address
if cctx.String("from") != "" {
payer, err = address.NewFromString(cctx.String("from"))
} else {
payer, err = fapi.WalletDefaultAddress(ctx)
}
if err != nil {
return err
}
file, err := cid.Parse(cctx.Args().Get(0))
if err != nil {
return err
}
var pieceCid *cid.Cid
if cctx.String("pieceCid") != "" {
parsed, err := cid.Parse(cctx.String("pieceCid"))
if err != nil {
return err
}
pieceCid = &parsed
}
var order *lapi.RetrievalOrder
if cctx.Bool("allow-local") {
imports, err := fapi.ClientListImports(ctx)
if err != nil {
return err
}
for _, i := range imports {
if i.Root != nil && i.Root.Equals(file) {
order = &lapi.RetrievalOrder{
Root: file,
FromLocalCAR: i.CARPath,
Total: big.Zero(),
UnsealPrice: big.Zero(),
}
break
}
}
}
if order == nil {
var offer api.QueryOffer
minerStrAddr := cctx.String("miner")
if minerStrAddr == "" { // Local discovery
offers, err := fapi.ClientFindData(ctx, file, pieceCid)
var cleaned []api.QueryOffer
// filter out offers that errored
for _, o := range offers {
if o.Err == "" {
cleaned = append(cleaned, o)
}
}
offers = cleaned
// sort by price low to high
sort.Slice(offers, func(i, j int) bool {
return offers[i].MinPrice.LessThan(offers[j].MinPrice)
})
if err != nil {
return err
}
// TODO: parse offer strings from `client find`, make this smarter
if len(offers) < 1 {
fmt.Println("Failed to find file")
return nil
}
offer = offers[0]
} else { // Directed retrieval
minerAddr, err := address.NewFromString(minerStrAddr)
if err != nil {
return err
}
offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid)
if err != nil {
return err
}
}
if offer.Err != "" {
return fmt.Errorf("The received offer errored: %s", offer.Err)
}
maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice)
if cctx.String("maxPrice") != "" {
maxPrice, err = types.ParseFIL(cctx.String("maxPrice"))
if err != nil {
return xerrors.Errorf("parsing maxPrice: %w", err)
}
}
if offer.MinPrice.GreaterThan(big.Int(maxPrice)) {
return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice)
}
o := offer.Order(payer)
order = &o
}
ref := &lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
}
if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" {
order.DatamodelPathSelector = &sel
}
updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)
}
var prevStatus retrievalmarket.DealStatus
for {
select {
case evt, ok := <-updates:
if ok {
afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n",
types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.FundsSpent),
retrievalmarket.ClientEvents[evt.Event],
retrievalmarket.DealStatuses[evt.Status],
)
prevStatus = evt.Status
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
if !ok {
if prevStatus == retrievalmarket.DealStatusCompleted {
afmt.Println("Success")
} else {
afmt.Printf("saw final deal state %s instead of expected success state DealStatusCompleted\n",
retrievalmarket.DealStatuses[prevStatus])
}
return nil
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
},
}
var clientListRetrievalsCmd = &cli.Command{ var clientListRetrievalsCmd = &cli.Command{
Name: "list-retrievals", Name: "list-retrievals",
Usage: "List retrieval market deals", Usage: "List retrieval market deals",

592
cli/client_retr.go Normal file
View File

@ -0,0 +1,592 @@
package cli
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"sort"
"strings"
"time"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/big"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/utils"
"github.com/filecoin-project/lotus/node/repo"
)
const DefaultMaxRetrievePrice = "0"
func retrieve(ctx context.Context, cctx *cli.Context, fapi lapi.FullNode, sel *lapi.Selector, printf func(string, ...interface{})) (*lapi.ExportRef, error) {
var payer address.Address
var err error
if cctx.String("from") != "" {
payer, err = address.NewFromString(cctx.String("from"))
} else {
payer, err = fapi.WalletDefaultAddress(ctx)
}
if err != nil {
return nil, err
}
file, err := cid.Parse(cctx.Args().Get(0))
if err != nil {
return nil, err
}
var pieceCid *cid.Cid
if cctx.String("pieceCid") != "" {
parsed, err := cid.Parse(cctx.String("pieceCid"))
if err != nil {
return nil, err
}
pieceCid = &parsed
}
var eref *lapi.ExportRef
if cctx.Bool("allow-local") {
imports, err := fapi.ClientListImports(ctx)
if err != nil {
return nil, err
}
for _, i := range imports {
if i.Root != nil && i.Root.Equals(file) {
eref = &lapi.ExportRef{
Root: file,
FromLocalCAR: i.CARPath,
}
break
}
}
}
// no local found, so make a retrieval
if eref == nil {
var offer lapi.QueryOffer
minerStrAddr := cctx.String("provider")
if minerStrAddr == "" { // Local discovery
offers, err := fapi.ClientFindData(ctx, file, pieceCid)
var cleaned []lapi.QueryOffer
// filter out offers that errored
for _, o := range offers {
if o.Err == "" {
cleaned = append(cleaned, o)
}
}
offers = cleaned
// sort by price low to high
sort.Slice(offers, func(i, j int) bool {
return offers[i].MinPrice.LessThan(offers[j].MinPrice)
})
if err != nil {
return nil, err
}
// TODO: parse offer strings from `client find`, make this smarter
if len(offers) < 1 {
fmt.Println("Failed to find file")
return nil, nil
}
offer = offers[0]
} else { // Directed retrieval
minerAddr, err := address.NewFromString(minerStrAddr)
if err != nil {
return nil, err
}
offer, err = fapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid)
if err != nil {
return nil, err
}
}
if offer.Err != "" {
return nil, fmt.Errorf("offer error: %s", offer.Err)
}
maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice)
if cctx.String("maxPrice") != "" {
maxPrice, err = types.ParseFIL(cctx.String("maxPrice"))
if err != nil {
return nil, xerrors.Errorf("parsing maxPrice: %w", err)
}
}
if offer.MinPrice.GreaterThan(big.Int(maxPrice)) {
return nil, xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice)
}
o := offer.Order(payer)
o.DataSelector = sel
subscribeEvents, err := fapi.ClientGetRetrievalUpdates(ctx)
if err != nil {
return nil, xerrors.Errorf("error setting up retrieval updates: %w", err)
}
retrievalRes, err := fapi.ClientRetrieve(ctx, o)
if err != nil {
return nil, xerrors.Errorf("error setting up retrieval: %w", err)
}
start := time.Now()
readEvents:
for {
var evt lapi.RetrievalInfo
select {
case <-ctx.Done():
return nil, xerrors.New("Retrieval Timed Out")
case evt = <-subscribeEvents:
if evt.ID != retrievalRes.DealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}
event := "New"
if evt.Event != nil {
event = retrievalmarket.ClientEvents[*evt.Event]
}
printf("Recv %s, Paid %s, %s (%s), %s\n",
types.SizeStr(types.NewInt(evt.BytesReceived)),
types.FIL(evt.TotalPaid),
strings.TrimPrefix(event, "ClientEvent"),
strings.TrimPrefix(retrievalmarket.DealStatuses[evt.Status], "DealStatus"),
time.Now().Sub(start).Truncate(time.Millisecond),
)
switch evt.Status {
case retrievalmarket.DealStatusCompleted:
break readEvents
case retrievalmarket.DealStatusRejected:
return nil, xerrors.Errorf("Retrieval Proposal Rejected: %s", evt.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
return nil, xerrors.Errorf("Retrieval Error: %s", evt.Message)
}
}
eref = &lapi.ExportRef{
Root: file,
DealID: retrievalRes.DealID,
}
}
return eref, nil
}
var retrFlagsCommon = []cli.Flag{
&cli.StringFlag{
Name: "from",
Usage: "address to send transactions from",
},
&cli.StringFlag{
Name: "provider",
Usage: "provider to use for retrieval, if not present it'll use local discovery",
Aliases: []string{"miner"},
},
&cli.StringFlag{
Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
},
&cli.StringFlag{
Name: "pieceCid",
Usage: "require data to be retrieved from a specific Piece CID",
},
&cli.BoolFlag{
Name: "allow-local",
// todo: default to true?
},
}
var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "Retrieve data from network",
ArgsUsage: "[dataCid outputPath]",
Description: `Retrieve data from the Filecoin network.
The retrieve command will attempt to find a provider make a retrieval deal with
them. In case a provider can't be found, it can be specified with the --provider
flag.
By default the data will be interpreted as DAG-PB UnixFSv1 File. Alternatively
a CAR file containing the raw IPLD graph can be exported by setting the --car
flag.
Partial Retrieval:
The --data-selector flag can be used to specify a sub-graph to fetch. The
selector can be specified as either IPLD datamodel text-path selector, or IPLD
json selector.
In case of unixfs retrieval, the selector must point at a single root node, and
match the entire graph under that node.
In case of CAR retrieval, the selector must have one common "sub-root" node.
Examples:
- Retrieve a file by CID
$ lotus client retrieve Qm... my-file.txt
- Retrieve a file by CID from f0123
$ lotus client retrieve --provider f0123 Qm... my-file.txt
- Retrieve a first file from a specified directory
$ lotus client retrieve --data-selector /Links/0/Hash Qm... my-file.txt
`,
Flags: append([]cli.Flag{
&cli.BoolFlag{
Name: "car",
Usage: "export to a car file instead of a regular file",
},
&cli.StringFlag{
Name: "data-selector",
Aliases: []string{"data-selector-selector"},
Usage: "IPLD datamodel text-path selector, or IPLD json selector",
},
}, retrFlagsCommon...),
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
fapi, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
afmt := NewAppFmt(cctx.App)
var s *lapi.Selector
if sel := lapi.Selector(cctx.String("data-selector")); sel != "" {
s = &sel
}
eref, err := retrieve(ctx, cctx, fapi, s, afmt.Printf)
if err != nil {
return err
}
if s != nil {
eref.DAGs = append(eref.DAGs, lapi.DagSpec{DataSelector: s})
}
err = fapi.ClientExport(ctx, *eref, lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
})
if err != nil {
return err
}
afmt.Println("Success")
return nil
},
}
func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) {
rj, err := json.Marshal(eref)
if err != nil {
return nil, xerrors.Errorf("marshaling export ref: %w", err)
}
ma, err := multiaddr.NewMultiaddr(apiAddr)
if err == nil {
_, addr, err := manet.DialArgs(ma)
if err != nil {
return nil, err
}
// todo: make cliutil helpers for this
apiAddr = "http://" + addr
}
aa, err := url.Parse(apiAddr)
if err != nil {
return nil, xerrors.Errorf("parsing api address: %w", err)
}
switch aa.Scheme {
case "ws":
aa.Scheme = "http"
case "wss":
aa.Scheme = "https"
}
aa.Path = path.Join(aa.Path, "rest/v0/export")
req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil)
if err != nil {
return nil, err
}
req.Header = apiAuth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
em, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, xerrors.Errorf("reading error body: %w", err)
}
resp.Body.Close() // nolint
return nil, xerrors.Errorf("getting root car: http %d: %s", resp.StatusCode, string(em))
}
return resp.Body, nil
}
var clientRetrieveCatCmd = &cli.Command{
Name: "cat",
Usage: "Show data from network",
ArgsUsage: "[dataCid]",
Flags: append([]cli.Flag{
&cli.BoolFlag{
Name: "ipld",
Usage: "list IPLD datamodel links",
},
&cli.StringFlag{
Name: "data-selector",
Usage: "IPLD datamodel text-path selector, or IPLD json selector",
},
}, retrFlagsCommon...),
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
ainfo, err := GetAPIInfo(cctx, repo.FullNode)
if err != nil {
return xerrors.Errorf("could not get API info: %w", err)
}
fapi, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
afmt := NewAppFmt(cctx.App)
sel := lapi.Selector(cctx.String("data-selector"))
selp := &sel
if sel == "" {
selp = nil
}
eref, err := retrieve(ctx, cctx, fapi, selp, afmt.Printf)
if err != nil {
return err
}
fmt.Println() // separate retrieval events from results
if sel != "" {
eref.DAGs = append(eref.DAGs, lapi.DagSpec{DataSelector: &sel})
}
rc, err := ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, false)
if err != nil {
return err
}
defer rc.Close() // nolint
_, err = io.Copy(os.Stdout, rc)
return err
},
}
func pathToSel(psel string, sub builder.SelectorSpec) (lapi.Selector, error) {
rs, err := textselector.SelectorSpecFromPath(textselector.Expression(psel), sub)
if err != nil {
return "", xerrors.Errorf("failed to parse path-selector: %w", err)
}
var b bytes.Buffer
if err := dagjson.Encode(rs.Node(), &b); err != nil {
return "", err
}
return lapi.Selector(b.String()), nil
}
var clientRetrieveLsCmd = &cli.Command{
Name: "ls",
Usage: "List object links",
ArgsUsage: "[dataCid]",
Flags: append([]cli.Flag{
&cli.BoolFlag{
Name: "ipld",
Usage: "list IPLD datamodel links",
},
&cli.IntFlag{
Name: "depth",
Usage: "list links recursively up to the specified depth",
Value: 1,
},
&cli.StringFlag{
Name: "data-selector",
Usage: "IPLD datamodel text-path selector, or IPLD json selector",
},
}, retrFlagsCommon...),
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
ainfo, err := GetAPIInfo(cctx, repo.FullNode)
if err != nil {
return xerrors.Errorf("could not get API info: %w", err)
}
fapi, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
afmt := NewAppFmt(cctx.App)
dataSelector := lapi.Selector(fmt.Sprintf(`{"R":{"l":{"depth":%d},":>":{"a":{">":{"|":[{"@":{}},{".":{}}]}}}}}`, cctx.Int("depth")))
if cctx.IsSet("data-selector") {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
dataSelector, err = pathToSel(cctx.String("data-selector"),
ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(
ssb.ExploreRecursive(selector.RecursionLimitDepth(int64(cctx.Int("depth"))), ssb.ExploreAll(ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreRecursiveEdge()))),
)))
if err != nil {
return xerrors.Errorf("parsing datamodel path: %w", err)
}
}
eref, err := retrieve(ctx, cctx, fapi, &dataSelector, afmt.Printf)
if err != nil {
return xerrors.Errorf("retrieve: %w", err)
}
fmt.Println() // separate retrieval events from results
eref.DAGs = append(eref.DAGs, lapi.DagSpec{
DataSelector: &dataSelector,
})
rc, err := ClientExportStream(ainfo.Addr, ainfo.AuthHeader(), *eref, true)
if err != nil {
return xerrors.Errorf("export: %w", err)
}
defer rc.Close() // nolint
var memcar bytes.Buffer
_, err = io.Copy(&memcar, rc)
if err != nil {
return err
}
cbs, err := blockstore.NewReadOnly(&bytesReaderAt{bytes.NewReader(memcar.Bytes())}, nil,
carv2.ZeroLengthSectionAsEOF(true),
blockstore.UseWholeCIDs(true))
if err != nil {
return xerrors.Errorf("opening car blockstore: %w", err)
}
roots, err := cbs.Roots()
if err != nil {
return xerrors.Errorf("getting roots: %w", err)
}
if len(roots) != 1 {
return xerrors.Errorf("expected 1 car root, got %d", len(roots))
}
dserv := merkledag.NewDAGService(blockservice.New(cbs, offline.Exchange(cbs)))
if !cctx.Bool("ipld") {
links, err := dserv.GetLinks(ctx, roots[0])
if err != nil {
return xerrors.Errorf("getting links: %w", err)
}
for _, link := range links {
fmt.Printf("%s %s\t%d\n", link.Cid, link.Name, link.Size)
}
} else {
jsel := lapi.Selector(fmt.Sprintf(`{"R":{"l":{"depth":%d},":>":{"a":{">":{"|":[{"@":{}},{".":{}}]}}}}}`, cctx.Int("depth")))
if cctx.IsSet("data-selector") {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
jsel, err = pathToSel(cctx.String("data-selector"),
ssb.ExploreRecursive(selector.RecursionLimitDepth(int64(cctx.Int("depth"))), ssb.ExploreAll(ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreRecursiveEdge()))),
)
}
sel, _ := selectorparse.ParseJSONSelector(string(jsel))
if err := utils.TraverseDag(
ctx,
dserv,
roots[0],
sel,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
fmt.Println(p.Path)
}
return nil
},
); err != nil {
return err
}
}
return err
},
}
type bytesReaderAt struct {
btr *bytes.Reader
}
func (b bytesReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return b.btr.ReadAt(p, off)
}
var _ io.ReaderAt = &bytesReaderAt{}

View File

@ -1269,7 +1269,8 @@ Response:
"Stages": { "Stages": {
"Stages": null "Stages": null
} }
} },
"Event": 5
} }
``` ```

View File

@ -41,6 +41,7 @@
* [ClientDataTransferUpdates](#ClientDataTransferUpdates) * [ClientDataTransferUpdates](#ClientDataTransferUpdates)
* [ClientDealPieceCID](#ClientDealPieceCID) * [ClientDealPieceCID](#ClientDealPieceCID)
* [ClientDealSize](#ClientDealSize) * [ClientDealSize](#ClientDealSize)
* [ClientExport](#ClientExport)
* [ClientFindData](#ClientFindData) * [ClientFindData](#ClientFindData)
* [ClientGenCar](#ClientGenCar) * [ClientGenCar](#ClientGenCar)
* [ClientGetDealInfo](#ClientGetDealInfo) * [ClientGetDealInfo](#ClientGetDealInfo)
@ -59,7 +60,7 @@
* [ClientRestartDataTransfer](#ClientRestartDataTransfer) * [ClientRestartDataTransfer](#ClientRestartDataTransfer)
* [ClientRetrieve](#ClientRetrieve) * [ClientRetrieve](#ClientRetrieve)
* [ClientRetrieveTryRestartInsufficientFunds](#ClientRetrieveTryRestartInsufficientFunds) * [ClientRetrieveTryRestartInsufficientFunds](#ClientRetrieveTryRestartInsufficientFunds)
* [ClientRetrieveWithEvents](#ClientRetrieveWithEvents) * [ClientRetrieveWait](#ClientRetrieveWait)
* [ClientStartDeal](#ClientStartDeal) * [ClientStartDeal](#ClientStartDeal)
* [ClientStatelessDeal](#ClientStatelessDeal) * [ClientStatelessDeal](#ClientStatelessDeal)
* [Create](#Create) * [Create](#Create)
@ -1055,6 +1056,32 @@ Response:
} }
``` ```
### ClientExport
ClientExport exports a file stored in the local filestore to a system file
Perms: admin
Inputs:
```json
[
{
"Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"DAGs": null,
"FromLocalCAR": "string value",
"DealID": 5
},
{
"Path": "string value",
"IsCAR": true
}
]
```
Response: `{}`
### ClientFindData ### ClientFindData
ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer). ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer).
@ -1282,7 +1309,8 @@ Response:
"Stages": { "Stages": {
"Stages": null "Stages": null
} }
} },
"Event": 5
} }
``` ```
@ -1482,9 +1510,8 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}, },
"Piece": null, "Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash", "DataSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42, "Size": 42,
"FromLocalCAR": "string value",
"Total": "0", "Total": "0",
"UnsealPrice": "0", "UnsealPrice": "0",
"PaymentInterval": 42, "PaymentInterval": 42,
@ -1496,15 +1523,16 @@ Inputs:
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null "PieceCID": null
} }
},
{
"Path": "string value",
"IsCAR": true
} }
] ]
``` ```
Response: `{}` Response:
```json
{
"DealID": 5
}
```
### ClientRetrieveTryRestartInsufficientFunds ### ClientRetrieveTryRestartInsufficientFunds
ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel
@ -1522,9 +1550,8 @@ Inputs:
Response: `{}` Response: `{}`
### ClientRetrieveWithEvents ### ClientRetrieveWait
ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel ClientRetrieveWait waits for retrieval to be complete
of status updates.
Perms: admin Perms: admin
@ -1532,43 +1559,11 @@ Perms: admin
Inputs: Inputs:
```json ```json
[ [
{ 5
"Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Piece": null,
"DatamodelPathSelector": "Links/21/Hash/Links/42/Hash",
"Size": 42,
"FromLocalCAR": "string value",
"Total": "0",
"UnsealPrice": "0",
"PaymentInterval": 42,
"PaymentIntervalIncrease": 42,
"Client": "f01234",
"Miner": "f01234",
"MinerPeer": {
"Address": "f01234",
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null
}
},
{
"Path": "string value",
"IsCAR": true
}
] ]
``` ```
Response: Response: `{}`
```json
{
"Event": 5,
"Status": 0,
"BytesReceived": 42,
"FundsSpent": "0",
"Err": "string value"
}
```
### ClientStartDeal ### ClientStartDeal
ClientStartDeal proposes a deal with a miner. ClientStartDeal proposes a deal with a miner.

View File

@ -426,6 +426,8 @@ COMMANDS:
RETRIEVAL: RETRIEVAL:
find Find data in the network find Find data in the network
retrieve Retrieve data from network retrieve Retrieve data from network
cat Show data from network
ls List object links
cancel-retrieval Cancel a retrieval deal by deal ID; this also cancels the associated transfer cancel-retrieval Cancel a retrieval deal by deal ID; this also cancels the associated transfer
list-retrievals List retrieval market deals list-retrievals List retrieval market deals
STORAGE: STORAGE:
@ -544,12 +546,93 @@ USAGE:
CATEGORY: CATEGORY:
RETRIEVAL RETRIEVAL
DESCRIPTION:
Retrieve data from the Filecoin network.
The retrieve command will attempt to find a provider make a retrieval deal with
them. In case a provider can't be found, it can be specified with the --provider
flag.
By default the data will be interpreted as DAG-PB UnixFSv1 File. Alternatively
a CAR file containing the raw IPLD graph can be exported by setting the --car
flag.
Partial Retrieval:
The --data-selector flag can be used to specify a sub-graph to fetch. The
selector can be specified as either IPLD datamodel text-path selector, or IPLD
json selector.
In case of unixfs retrieval, the selector must point at a single root node, and
match the entire graph under that node.
In case of CAR retrieval, the selector must have one common "sub-root" node.
Examples:
- Retrieve a file by CID
$ lotus client retrieve Qm... my-file.txt
- Retrieve a file by CID from f0123
$ lotus client retrieve --provider f0123 Qm... my-file.txt
- Retrieve a first file from a specified directory
$ lotus client retrieve --data-selector /Links/0/Hash Qm... my-file.txt
OPTIONS: OPTIONS:
--car export to a car file instead of a regular file (default: false)
--data-selector value, --data-selector-selector value IPLD datamodel text-path selector, or IPLD json selector
--from value address to send transactions from
--provider value, --miner value provider to use for retrieval, if not present it'll use local discovery
--maxPrice value maximum price the client is willing to consider (default: 0 FIL)
--pieceCid value require data to be retrieved from a specific Piece CID
--allow-local (default: false)
--help, -h show help (default: false)
```
### lotus client cat
```
NAME:
lotus client cat - Show data from network
USAGE:
lotus client cat [command options] [dataCid]
CATEGORY:
RETRIEVAL
OPTIONS:
--ipld list IPLD datamodel links (default: false)
--data-selector value IPLD datamodel text-path selector, or IPLD json selector
--from value address to send transactions from --from value address to send transactions from
--car export to a car file instead of a regular file (default: false) --provider value, --miner value provider to use for retrieval, if not present it'll use local discovery
--miner value miner address for retrieval, if not present it'll use local discovery --maxPrice value maximum price the client is willing to consider (default: 0 FIL)
--datamodel-path-selector value a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal --pieceCid value require data to be retrieved from a specific Piece CID
--maxPrice value maximum price the client is willing to consider (default: 0.01 FIL) --allow-local (default: false)
--help, -h show help (default: false)
```
### lotus client ls
```
NAME:
lotus client ls - List object links
USAGE:
lotus client ls [command options] [dataCid]
CATEGORY:
RETRIEVAL
OPTIONS:
--ipld list IPLD datamodel links (default: false)
--depth value list links recursively up to the specified depth (default: 1)
--data-selector value IPLD datamodel text-path selector, or IPLD json selector
--from value address to send transactions from
--provider value, --miner value provider to use for retrieval, if not present it'll use local discovery
--maxPrice value maximum price the client is willing to consider (default: 0 FIL)
--pieceCid value require data to be retrieved from a specific Piece CID --pieceCid value require data to be retrieved from a specific Piece CID
--allow-local (default: false) --allow-local (default: false)
--help, -h show help (default: false) --help, -h show help (default: false)

View File

@ -10,7 +10,6 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"text/template" "text/template"
"unicode"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -71,9 +70,6 @@ func typeName(e ast.Expr, pkg string) (string, error) {
return t.X.(*ast.Ident).Name + "." + t.Sel.Name, nil return t.X.(*ast.Ident).Name + "." + t.Sel.Name, nil
case *ast.Ident: case *ast.Ident:
pstr := t.Name pstr := t.Name
if !unicode.IsLower(rune(pstr[0])) && pkg != "api" {
pstr = "api." + pstr // todo src pkg name
}
return pstr, nil return pstr, nil
case *ast.ArrayType: case *ast.ArrayType:
subt, err := typeName(t.Elt, pkg) subt, err := typeName(t.Elt, pkg)

View File

@ -9,6 +9,8 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -18,7 +20,6 @@ import (
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipld/go-car" "github.com/ipld/go-car"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -29,9 +30,10 @@ var (
carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2") carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2")
carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina") carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina")
carPieceSize = abi.PaddedPieceSize(2097152) carPieceSize = abi.PaddedPieceSize(2097152)
textSelector = textselector.Expression("8/1/8/1/0/1/0") textSelector = api.Selector("8/1/8/1/0/1/0")
textSelectorNonLink = textselector.Expression("8/1/8/1/0/1") storPowCid, _ = cid.Parse("bafkqaetgnfwc6mjpon2g64tbm5sxa33xmvza")
textSelectorNonexistent = textselector.Expression("42") textSelectorNonLink = api.Selector("8/1/8/1/0/1")
textSelectorNonexistent = api.Selector("42")
expectedResult = "fil/1/storagepower" expectedResult = "fil/1/storagepower"
) )
@ -56,14 +58,11 @@ func TestPartialRetrieval(t *testing.T) {
for _, fullCycle := range []bool{false, true} { for _, fullCycle := range []bool{false, true} {
var retOrder api.RetrievalOrder var retOrder api.RetrievalOrder
var eref api.ExportRef
if !fullCycle { if !fullCycle {
eref.FromLocalCAR = sourceCar
retOrder.FromLocalCAR = sourceCar
retOrder.Root = carRoot
} else { } else {
dp := dh.DefaultStartDealParams() dp := dh.DefaultStartDealParams()
dp.Data = &storagemarket.DataRef{ dp.Data = &storagemarket.DataRef{
// FIXME: figure out how to do this with an online partial transfer // FIXME: figure out how to do this with an online partial transfer
@ -95,7 +94,11 @@ func TestPartialRetrieval(t *testing.T) {
retOrder = offers[0].Order(caddr) retOrder = offers[0].Order(caddr)
} }
retOrder.DatamodelPathSelector = &textSelector retOrder.DataSelector = &textSelector
eref.DAGs = append(eref.DAGs, api.DagSpec{
DataSelector: &textSelector,
})
eref.Root = carRoot
// test retrieval of either data or constructing a partial selective-car // test retrieval of either data or constructing a partial selective-car
for _, retrieveAsCar := range []bool{false, true} { for _, retrieveAsCar := range []bool{false, true} {
@ -107,10 +110,12 @@ func TestPartialRetrieval(t *testing.T) {
ctx, ctx,
client, client,
retOrder, retOrder,
eref,
&api.FileRef{ &api.FileRef{
Path: outFile.Name(), Path: outFile.Name(),
IsCAR: retrieveAsCar, IsCAR: retrieveAsCar,
}, },
storPowCid,
outFile, outFile,
)) ))
@ -131,14 +136,19 @@ func TestPartialRetrieval(t *testing.T) {
ctx, ctx,
client, client,
api.RetrievalOrder{ api.RetrievalOrder{
FromLocalCAR: sourceCar, Root: carRoot,
Root: carRoot, DataSelector: &textSelectorNonexistent,
DatamodelPathSelector: &textSelectorNonexistent, },
api.ExportRef{
Root: carRoot,
FromLocalCAR: sourceCar,
DAGs: []api.DagSpec{{DataSelector: &textSelectorNonexistent}},
}, },
&api.FileRef{}, &api.FileRef{},
storPowCid,
nil, nil,
), ),
fmt.Sprintf("retrieval failed: path selection '%s' does not match a node within %s", textSelectorNonexistent, carRoot), fmt.Sprintf("parsing dag spec: path selection does not match a node within %s", carRoot),
) )
// ensure non-boundary retrievals fail // ensure non-boundary retrievals fail
@ -148,18 +158,23 @@ func TestPartialRetrieval(t *testing.T) {
ctx, ctx,
client, client,
api.RetrievalOrder{ api.RetrievalOrder{
FromLocalCAR: sourceCar, Root: carRoot,
Root: carRoot, DataSelector: &textSelectorNonLink,
DatamodelPathSelector: &textSelectorNonLink, },
api.ExportRef{
Root: carRoot,
FromLocalCAR: sourceCar,
DAGs: []api.DagSpec{{DataSelector: &textSelectorNonLink}},
}, },
&api.FileRef{}, &api.FileRef{},
storPowCid,
nil, nil,
), ),
fmt.Sprintf("retrieval failed: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink), fmt.Sprintf("parsing dag spec: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink),
) )
} }
func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, retRef *api.FileRef, outFile *os.File) error { func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef, expRootCid cid.Cid, outFile *os.File) error {
if retOrder.Total.Nil() { if retOrder.Total.Nil() {
retOrder.Total = big.Zero() retOrder.Total = big.Zero()
@ -168,7 +183,19 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
retOrder.UnsealPrice = big.Zero() retOrder.UnsealPrice = big.Zero()
} }
err := client.ClientRetrieve(ctx, retOrder, retRef) if eref.FromLocalCAR == "" {
rr, err := client.ClientRetrieve(ctx, retOrder)
if err != nil {
return err
}
eref.DealID = rr.DealID
if err := client.ClientRetrieveWait(ctx, rr.DealID); err != nil {
return xerrors.Errorf("retrieval wait: %w", err)
}
}
err := client.ClientExport(ctx, eref, *retRef)
if err != nil { if err != nil {
return err return err
} }
@ -190,7 +217,7 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
if len(cr.Header.Roots) != 1 { if len(cr.Header.Roots) != 1 {
return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots)) return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots))
} else if cr.Header.Roots[0].String() != carRoot.String() { } else if cr.Header.Roots[0].String() != expRootCid.String() {
return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String()) return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String())
} }
@ -206,11 +233,11 @@ func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrde
blks = append(blks, b) blks = append(blks, b)
} }
if len(blks) != 3 { if len(blks) != 1 {
return fmt.Errorf("expected a car file with 3 blocks, got one with %d instead", len(blks)) return fmt.Errorf("expected a car file with 1 blocks, got one with %d instead", len(blks))
} }
data = blks[2].RawData() data = blks[0].RawData()
} }
if string(data) != expectedResult { if string(data) != expectedResult {

View File

@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -320,17 +321,45 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root
caddr, err := dh.client.WalletDefaultAddress(ctx) caddr, err := dh.client.WalletDefaultAddress(ctx)
require.NoError(dh.t, err) require.NoError(dh.t, err)
ref := &api.FileRef{ updatesCtx, cancel := context.WithCancel(ctx)
Path: carFile.Name(), updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx)
IsCAR: carExport,
}
updates, err := dh.client.ClientRetrieveWithEvents(ctx, offers[0].Order(caddr), ref)
require.NoError(dh.t, err) require.NoError(dh.t, err)
for update := range updates { retrievalRes, err := dh.client.ClientRetrieve(ctx, offers[0].Order(caddr))
require.Emptyf(dh.t, update.Err, "retrieval failed: %s", update.Err) require.NoError(dh.t, err)
consumeEvents:
for {
var evt api.RetrievalInfo
select {
case <-updatesCtx.Done():
dh.t.Fatal("Retrieval Timed Out")
case evt = <-updates:
if evt.ID != retrievalRes.DealID {
continue
}
}
switch evt.Status {
case retrievalmarket.DealStatusCompleted:
break consumeEvents
case retrievalmarket.DealStatusRejected:
dh.t.Fatalf("Retrieval Proposal Rejected: %s", evt.Message)
case
retrievalmarket.DealStatusDealNotFound,
retrievalmarket.DealStatusErrored:
dh.t.Fatalf("Retrieval Error: %s", evt.Message)
}
} }
cancel()
require.NoError(dh.t, dh.client.ClientExport(ctx,
api.ExportRef{
Root: root,
DealID: retrievalRes.DealID,
},
api.FileRef{
Path: carFile.Name(),
IsCAR: carExport,
}))
ret := carFile.Name() ret := carFile.Name()
if carExport { if carExport {

View File

@ -4,17 +4,21 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"sort" "sort"
"strings"
"time" "time"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
unixfile "github.com/ipfs/go-unixfs/file" unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car" "github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2" carv2 "github.com/ipld/go-car/v2"
carv2bs "github.com/ipld/go-car/v2/blockstore" carv2bs "github.com/ipld/go-car/v2/blockstore"
"github.com/ipld/go-ipld-prime/datamodel"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-padreader"
@ -58,7 +62,6 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/node/repo/imports"
@ -760,325 +763,363 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) e
} }
} }
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { func getDataSelector(dps *api.Selector) (datamodel.Node, error) {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
for {
select {
case evt, ok := <-events:
if !ok { // done successfully
return nil
}
if evt.Err != "" {
return xerrors.Errorf("retrieval failed: %s", evt.Err)
}
case <-ctx.Done():
return xerrors.Errorf("retrieval timed out")
}
}
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
return events, nil
}
type retrievalSubscribeEvent struct {
event rm.ClientEvent
state rm.ClientDealState
}
func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.state.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: subscribeEvent.event,
Status: subscribeEvent.state.Status,
BytesReceived: subscribeEvent.state.TotalReceived,
FundsSpent: subscribeEvent.state.FundsSpent,
}:
}
state := subscribeEvent.state
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case rm.DealStatusCancelled:
return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
finish := func(e error) {
if e != nil {
events <- marketevents.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()}
}
}
sel := selectorparse.CommonSelector_ExploreAllRecursively sel := selectorparse.CommonSelector_ExploreAllRecursively
if order.DatamodelPathSelector != nil { if dps != nil {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) if strings.HasPrefix(string(*dps), "{") {
var err error
sel, err = selectorparse.ParseJSONSelector(string(*dps))
if err != nil {
return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *dps, err)
}
} else {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selspec, err := textselector.SelectorSpecFromPath( selspec, err := textselector.SelectorSpecFromPath(
textselector.Expression(*dps),
*order.DatamodelPathSelector, // URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
)
if err != nil {
return nil, xerrors.Errorf("failed to parse text-selector '%s': %w", *dps, err)
}
// URGH - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.12.0/shared/selectors.go#L10-L16 sel = selspec.Node()
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node log.Infof("partial retrieval of datamodel-path-selector %s/*", *dps)
ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
)
if err != nil {
finish(xerrors.Errorf("failed to parse text-selector '%s': %w", *order.DatamodelPathSelector, err))
return
} }
sel = selspec.Node()
log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector)
} }
// summary: return sel, nil
// 1. if we're retrieving from an import, FromLocalCAR will be set. }
// Skip the retrieval itself, and use the provided car as a blockstore further down
// to extract a CAR or UnixFS export from.
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
// then use the virtual blockstore to extract a CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then either
// extract the CARv1 (with ExtractV1File) or use it as a blockstore further down.
// this indicates we're proxying to IPFS. func (a *API) ClientRetrieve(ctx context.Context, params api.RetrievalOrder) (*api.RestrievalRes, error) {
sel, err := getDataSelector(params.DataSelector)
if err != nil {
return nil, err
}
di, err := a.doRetrieval(ctx, params, sel)
if err != nil {
return nil, err
}
return &api.RestrievalRes{
DealID: di,
}, nil
}
func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel datamodel.Node) (rm.DealID, error) {
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
return 0, err
}
order.MinerPeer = &rm.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
}
if order.Total.Int == nil {
return 0, xerrors.Errorf("cannot make retrieval deal for null total")
}
if order.Size == 0 {
return 0, xerrors.Errorf("cannot make retrieval deal for zero bytes")
}
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
if err != nil {
return 0, xerrors.Errorf("Error in retrieval params: %s", err)
}
id := a.Retrieval.NextID()
id, err = a.Retrieval.Retrieve(
ctx,
id,
order.Root,
params,
order.Total,
*order.MinerPeer,
order.Client,
order.Miner,
)
if err != nil {
return 0, xerrors.Errorf("Retrieve failed: %w", err)
}
return id, nil
}
func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
subscribeEvents := make(chan rm.ClientDealState, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside consumeAllEvents.
if state.ID != deal {
return
}
select {
case <-ctx.Done():
case subscribeEvents <- state:
}
})
defer unsubscribe()
{
state, err := a.Retrieval.GetDeal(deal)
if err != nil {
return xerrors.Errorf("getting deal state: %w", err)
}
select {
case subscribeEvents <- state:
default: // already have an event queued from the subscription
}
}
for {
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case state := <-subscribeEvents:
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case rm.DealStatusCancelled:
return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
}
type ExportDest struct {
Writer io.Writer
Path string
}
func (ed *ExportDest) doWrite(cb func(io.Writer) error) error {
if ed.Writer != nil {
return cb(ed.Writer)
}
f, err := os.OpenFile(ed.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
if err := cb(f); err != nil {
_ = f.Close()
return err
}
return f.Close()
}
func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error {
return a.ClientExportInto(ctx, exportRef, ref.IsCAR, ExportDest{Path: ref.Path})
}
func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car bool, dest ExportDest) error {
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := exportRef.FromLocalCAR
carPath := order.FromLocalCAR
// we actually need to retrieve from the network
if carPath == "" { if carPath == "" {
if !retrieveIntoIPFS && !retrieveIntoCAR { if !retrieveIntoIPFS && !retrieveIntoCAR {
// we don't recognize the blockstore accessor. return xerrors.Errorf("unsupported retrieval blockstore accessor")
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
return
}
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
finish(err)
return
}
order.MinerPeer = &rm.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
}
if order.Total.Int == nil {
finish(xerrors.Errorf("cannot make retrieval deal for null total"))
return
}
if order.Size == 0 {
finish(xerrors.Errorf("cannot make retrieval deal for zero bytes"))
return
}
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
if err != nil {
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
}
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside consumeAllEvents.
if state.PayloadCID.Equals(order.Root) {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})
id := a.Retrieval.NextID()
id, err = a.Retrieval.Retrieve(
ctx,
id,
order.Root,
params,
order.Total,
*order.MinerPeer,
order.Client,
order.Miner,
)
if err != nil {
unsubscribe()
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}
err = consumeAllEvents(ctx, id, subscribeEvents, events)
unsubscribe()
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
} }
if retrieveIntoCAR { if retrieveIntoCAR {
carPath = carBss.PathFor(id) carPath = carBss.PathFor(exportRef.DealID)
} }
} }
if ref == nil {
// If ref is nil, it only fetches the data into the configured blockstore
// (if fetching from network).
finish(nil)
return
}
// determine where did the retrieval go
var retrievalBs bstore.Blockstore var retrievalBs bstore.Blockstore
if retrieveIntoIPFS { if retrieveIntoIPFS {
retrievalBs = proxyBss.Blockstore retrievalBs = proxyBss.Blockstore
} else { } else {
cbs, err := stores.ReadOnlyFilestore(carPath) cbs, err := stores.ReadOnlyFilestore(carPath)
if err != nil { if err != nil {
finish(err) return err
return
} }
defer cbs.Close() //nolint:errcheck defer cbs.Close() //nolint:errcheck
retrievalBs = cbs retrievalBs = cbs
} }
dserv := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, !car)
if err != nil {
return xerrors.Errorf("parsing dag spec: %w", err)
}
// Are we outputting a CAR? // Are we outputting a CAR?
if ref.IsCAR { if car {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && order.DatamodelPathSelector == nil { if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil {
finish(carv2.ExtractV1File(carPath, ref.Path)) return carv2.ExtractV1File(carPath, dest.Path)
return
} }
// generating a CARv1 from the configured blockstore return a.outputCAR(ctx, roots, retrievalBs, dest)
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) }
if err != nil {
finish(err)
return
}
err = car.NewSelectiveCar( if len(roots) != 1 {
return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots))
}
return a.outputUnixFS(ctx, roots[0].root, dserv, dest)
}
func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, dest ExportDest) error {
// generating a CARv1 from the configured blockstore
carDags := make([]car.Dag, len(dags))
for i, dag := range dags {
carDags[i] = car.Dag{
Root: dag.root,
Selector: dag.selector,
}
}
return dest.doWrite(func(w io.Writer) error {
return car.NewSelectiveCar(
ctx, ctx,
retrievalBs, bs,
[]car.Dag{{ carDags,
Root: order.Root,
Selector: sel,
}},
car.MaxTraversalLinks(config.MaxTraversalLinks), car.MaxTraversalLinks(config.MaxTraversalLinks),
).Write(f) ).Write(w)
if err != nil { })
finish(err) }
return
}
finish(f.Close()) type dagSpec struct {
return root cid.Cid
} selector ipld.Node
}
// we are extracting a UnixFS file. func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, rootOnNodeBoundary bool) ([]dagSpec, error) {
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs))) if len(dsp) == 0 {
root := order.Root return []dagSpec{
{
// if we used a selector - need to find the sub-root the user actually wanted to retrieve root: root,
if order.DatamodelPathSelector != nil { selector: nil,
var subRootFound bool
// no err check - we just compiled this before starting, but now we do not wrap a `*`
selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck
if err := utils.TraverseDag(
ctx,
ds,
root,
selspec.Node(),
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
if p.LastBlock.Path.String() != p.Path.String() {
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
}
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
}
root = cidLnk.Cid
subRootFound = true
}
return nil
}, },
); err != nil { }, nil
finish(xerrors.Errorf("error while locating partial retrieval sub-root: %w", err)) }
return
out := make([]dagSpec, len(dsp))
for i, spec := range dsp {
// if a selector is specified, find it's root node
if spec.DataSelector != nil {
var rsn ipld.Node
if strings.HasPrefix(string(*spec.DataSelector), "{") {
var err error
rsn, err = selectorparse.ParseJSONSelector(string(*spec.DataSelector))
if err != nil {
return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.DataSelector, err)
}
} else {
selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), nil) //nolint:errcheck
rsn = selspec.Node()
}
var newRoot cid.Cid
var errHalt = errors.New("halt walk")
if err := utils.TraverseDag(
ctx,
ds,
root,
rsn,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
if rootOnNodeBoundary && p.LastBlock.Path.String() != p.Path.String() {
return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String())
}
if p.LastBlock.Link == nil {
// this is likely the root node that we've matched here
newRoot = root
return errHalt
}
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link)
}
newRoot = cidLnk.Cid
return errHalt
}
return nil
},
); err != nil && err != errHalt {
return nil, xerrors.Errorf("error while locating partial retrieval sub-root: %w", err)
}
if newRoot == cid.Undef {
return nil, xerrors.Errorf("path selection does not match a node within %s", root)
}
out[i].root = newRoot
} }
if !subRootFound { if spec.DataSelector != nil {
finish(xerrors.Errorf("path selection '%s' does not match a node within %s", *order.DatamodelPathSelector, root)) var err error
return out[i].selector, err = getDataSelector(spec.DataSelector)
if err != nil {
return nil, err
}
} }
} }
return out, nil
}
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
nd, err := ds.Get(ctx, root) nd, err := ds.Get(ctx, root)
if err != nil { if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err)) return xerrors.Errorf("ClientRetrieve: %w", err)
return
} }
file, err := unixfile.NewUnixfsFile(ctx, ds, nd) file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
if err != nil { if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err)) return xerrors.Errorf("ClientRetrieve: %w", err)
return
} }
finish(files.WriteTo(file, ref.Path)) if dest.Writer == nil {
return files.WriteTo(file, dest.Path)
}
switch f := file.(type) {
case files.File:
_, err = io.Copy(dest.Writer, f)
if err != nil {
return err
}
return nil
default:
return fmt.Errorf("file type %T is not supported", nd)
}
} }
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) { func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
@ -1110,8 +1151,13 @@ func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, er
func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) { func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) {
updates := make(chan api.RetrievalInfo) updates := make(chan api.RetrievalInfo)
unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) { unsub := a.Retrieval.SubscribeToEvents(func(evt rm.ClientEvent, deal rm.ClientDealState) {
updates <- a.newRetrievalInfo(ctx, deal) update := a.newRetrievalInfo(ctx, deal)
update.Event = &evt
select {
case updates <- update:
case <-ctx.Done():
}
}) })
go func() { go func() {

View File

@ -60,7 +60,7 @@ func TestImportLocal(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, local) require.True(t, local)
order := api.RetrievalOrder{ order := api.ExportRef{
Root: root, Root: root,
FromLocalCAR: it.CARPath, FromLocalCAR: it.CARPath,
} }
@ -68,7 +68,7 @@ func TestImportLocal(t *testing.T) {
// retrieve as UnixFS. // retrieve as UnixFS.
out1 := filepath.Join(dir, "retrieval1.data") // as unixfs out1 := filepath.Join(dir, "retrieval1.data") // as unixfs
out2 := filepath.Join(dir, "retrieval2.data") // as car out2 := filepath.Join(dir, "retrieval2.data") // as car
err = a.ClientRetrieve(ctx, order, &api.FileRef{ err = a.ClientExport(ctx, order, api.FileRef{
Path: out1, Path: out1,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -77,7 +77,7 @@ func TestImportLocal(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, b, outBytes) require.Equal(t, b, outBytes)
err = a.ClientRetrieve(ctx, order, &api.FileRef{ err = a.ClientExport(ctx, order, api.FileRef{
Path: out2, Path: out2,
IsCAR: true, IsCAR: true,
}) })

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/go-storedcounter"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -683,6 +684,9 @@ func RetrievalProvider(
dagStore *dagstore.Wrapper, dagStore *dagstore.Wrapper,
) (retrievalmarket.RetrievalProvider, error) { ) (retrievalmarket.RetrievalProvider, error) {
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
retrievalmarket.DefaultPricePerByte = big.Zero() // todo: for whatever reason this is a global var in markets
return retrievalimpl.NewProvider( return retrievalimpl.NewProvider(
address.Address(maddr), address.Address(maddr),
adapter, adapter,

View File

@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/client"
) )
var rpclog = logging.Logger("rpc") var rpclog = logging.Logger("rpc")
@ -89,14 +90,22 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
// Import handler // Import handler
handleImportFunc := handleImport(a.(*impl.FullNodeAPI)) handleImportFunc := handleImport(a.(*impl.FullNodeAPI))
handleExportFunc := handleExport(a.(*impl.FullNodeAPI))
if permissioned { if permissioned {
importAH := &auth.Handler{ importAH := &auth.Handler{
Verify: a.AuthVerify, Verify: a.AuthVerify,
Next: handleImportFunc, Next: handleImportFunc,
} }
m.Handle("/rest/v0/import", importAH) m.Handle("/rest/v0/import", importAH)
exportAH := &auth.Handler{
Verify: a.AuthVerify,
Next: handleExportFunc,
}
m.Handle("/rest/v0/export", exportAH)
} else { } else {
m.HandleFunc("/rest/v0/import", handleImportFunc) m.HandleFunc("/rest/v0/import", handleImportFunc)
m.HandleFunc("/rest/v0/export", handleExportFunc)
} }
// debugging // debugging
@ -169,6 +178,34 @@ func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Reque
} }
} }
func handleExport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.WriteHeader(404)
return
}
if !auth.HasPerm(r.Context(), nil, api.PermWrite) {
w.WriteHeader(401)
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
return
}
var eref api.ExportRef
if err := json.Unmarshal([]byte(r.FormValue("export")), &eref); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
car := r.FormValue("car") == "true"
err := a.ClientExportInto(r.Context(), eref, car, client.ExportDest{Writer: w})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func handleFractionOpt(name string, setter func(int)) http.HandlerFunc { func handleFractionOpt(name string, setter func(int)) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) { return func(rw http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files" files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format"
@ -51,7 +52,7 @@ func RetrieveData(t *TestEnvironment, ctx context.Context, client api.FullNode,
IsCAR: carExport, IsCAR: carExport,
} }
t1 = time.Now() t1 = time.Now()
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref) err = (&v0api.WrapperV1Full{FullNode: client}).ClientRetrieve(ctx, v0api.OfferOrder(offers[0], caddr), ref)
if err != nil { if err != nil {
return err return err
} }