retrieval: OffChainRetrieval config
This commit is contained in:
parent
8b19b84140
commit
4235a97cf4
@ -41,6 +41,8 @@ func extractPaychReusedCid(c cid.Cid) (address.Address, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type retrievalClientNode struct {
|
type retrievalClientNode struct {
|
||||||
|
forceOffChain bool
|
||||||
|
|
||||||
chainAPI full.ChainAPI
|
chainAPI full.ChainAPI
|
||||||
payAPI payapi.PaychAPI
|
payAPI payapi.PaychAPI
|
||||||
stateAPI full.StateAPI
|
stateAPI full.StateAPI
|
||||||
@ -48,8 +50,13 @@ type retrievalClientNode struct {
|
|||||||
|
|
||||||
// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the
|
// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the
|
||||||
// Lotus Node
|
// Lotus Node
|
||||||
func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode {
|
func NewRetrievalClientNode(forceOffChain bool, payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode {
|
||||||
return &retrievalClientNode{payAPI: payAPI, chainAPI: chainAPI, stateAPI: stateAPI}
|
return &retrievalClientNode{
|
||||||
|
forceOffChain: forceOffChain,
|
||||||
|
chainAPI: chainAPI,
|
||||||
|
payAPI: payAPI,
|
||||||
|
stateAPI: stateAPI,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
||||||
@ -60,7 +67,7 @@ func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, c
|
|||||||
// querying the chain
|
// querying the chain
|
||||||
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{
|
ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{
|
||||||
Reserve: true,
|
Reserve: true,
|
||||||
OffChain: false,
|
OffChain: rcn.forceOffChain,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, cid.Undef, err
|
return address.Undef, cid.Undef, err
|
||||||
|
@ -121,7 +121,7 @@ var ChainNode = Options(
|
|||||||
// Markets (retrieval)
|
// Markets (retrieval)
|
||||||
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
||||||
Override(new(retrievalmarket.BlockstoreAccessor), modules.RetrievalBlockstoreAccessor),
|
Override(new(retrievalmarket.BlockstoreAccessor), modules.RetrievalBlockstoreAccessor),
|
||||||
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
|
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(false)),
|
||||||
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
|
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
|
||||||
|
|
||||||
// Markets (storage)
|
// Markets (storage)
|
||||||
@ -221,6 +221,8 @@ func ConfigFullNode(c interface{}) Option {
|
|||||||
),
|
),
|
||||||
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)),
|
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)),
|
||||||
|
|
||||||
|
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(cfg.Client.OffChainRetrieval)),
|
||||||
|
|
||||||
If(cfg.Wallet.RemoteBackend != "",
|
If(cfg.Wallet.RemoteBackend != "",
|
||||||
Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)),
|
Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)),
|
||||||
),
|
),
|
||||||
|
@ -105,6 +105,14 @@ and storage providers for storage deals`,
|
|||||||
Comment: `The maximum number of simultaneous data transfers between the client
|
Comment: `The maximum number of simultaneous data transfers between the client
|
||||||
and storage providers for retrieval deals`,
|
and storage providers for retrieval deals`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "OffChainRetrieval",
|
||||||
|
Type: "bool",
|
||||||
|
|
||||||
|
Comment: `Require that retrievals perform no on-chain retrievals. Paid retrievals
|
||||||
|
without existing payment channels with available funds will fail instead
|
||||||
|
of automatically performing on-chain operations.`,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"Common": []DocField{
|
"Common": []DocField{
|
||||||
{
|
{
|
||||||
|
@ -383,6 +383,11 @@ type Client struct {
|
|||||||
// The maximum number of simultaneous data transfers between the client
|
// The maximum number of simultaneous data transfers between the client
|
||||||
// and storage providers for retrieval deals
|
// and storage providers for retrieval deals
|
||||||
SimultaneousTransfersForRetrieval uint64
|
SimultaneousTransfersForRetrieval uint64
|
||||||
|
|
||||||
|
// Require that retrievals perform no on-chain retrievals. Paid retrievals
|
||||||
|
// without existing payment channels with available funds will fail instead
|
||||||
|
// of automatically performing on-chain operations.
|
||||||
|
OffChainRetrieval bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Wallet struct {
|
type Wallet struct {
|
||||||
|
@ -202,26 +202,28 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
||||||
func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
|
func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
|
||||||
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
||||||
|
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
|
||||||
|
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
||||||
|
adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI)
|
||||||
|
network := rmnet.NewFromLibp2pHost(h)
|
||||||
|
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))
|
||||||
|
client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
client.OnReady(marketevents.ReadyLogger("retrieval client"))
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
||||||
|
|
||||||
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
|
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
|
||||||
network := rmnet.NewFromLibp2pHost(h)
|
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
|
||||||
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))
|
|
||||||
client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor)
|
return client.Start(ctx)
|
||||||
if err != nil {
|
},
|
||||||
return nil, err
|
})
|
||||||
|
return client, nil
|
||||||
}
|
}
|
||||||
client.OnReady(marketevents.ReadyLogger("retrieval client"))
|
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStart: func(ctx context.Context) error {
|
|
||||||
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
|
||||||
|
|
||||||
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
|
|
||||||
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
|
|
||||||
|
|
||||||
return client.Start(ctx)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return client, nil
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user