From 4235a97cf4f7f485c02b090cee9f5b70cdcdf77c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 6 Jan 2022 16:26:25 +0100 Subject: [PATCH] retrieval: OffChainRetrieval config --- markets/retrievaladapter/client.go | 13 +++++++--- node/builder_chain.go | 4 ++- node/config/doc_gen.go | 8 ++++++ node/config/types.go | 5 ++++ node/modules/client.go | 40 ++++++++++++++++-------------- 5 files changed, 47 insertions(+), 23 deletions(-) diff --git a/markets/retrievaladapter/client.go b/markets/retrievaladapter/client.go index 601f9f255..60f41ca29 100644 --- a/markets/retrievaladapter/client.go +++ b/markets/retrievaladapter/client.go @@ -41,6 +41,8 @@ func extractPaychReusedCid(c cid.Cid) (address.Address, error) { } type retrievalClientNode struct { + forceOffChain bool + chainAPI full.ChainAPI payAPI payapi.PaychAPI stateAPI full.StateAPI @@ -48,8 +50,13 @@ type retrievalClientNode struct { // NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the // Lotus Node -func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode { - return &retrievalClientNode{payAPI: payAPI, chainAPI: chainAPI, stateAPI: stateAPI} +func NewRetrievalClientNode(forceOffChain bool, payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode { + return &retrievalClientNode{ + forceOffChain: forceOffChain, + chainAPI: chainAPI, + payAPI: payAPI, + stateAPI: stateAPI, + } } // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist @@ -60,7 +67,7 @@ func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, c // querying the chain ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{ Reserve: true, - OffChain: false, + OffChain: rcn.forceOffChain, }) if err != nil { return address.Undef, cid.Undef, err diff --git a/node/builder_chain.go b/node/builder_chain.go index 11283ec3a..0d10dcb9b 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -121,7 +121,7 @@ var ChainNode = Options( // Markets (retrieval) Override(new(discovery.PeerResolver), modules.RetrievalResolver), Override(new(retrievalmarket.BlockstoreAccessor), modules.RetrievalBlockstoreAccessor), - Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), + Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(false)), Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer), // Markets (storage) @@ -221,6 +221,8 @@ func ConfigFullNode(c interface{}) Option { ), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)), + Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(cfg.Client.OffChainRetrieval)), + If(cfg.Wallet.RemoteBackend != "", Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)), ), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index c3730cbac..59181f9f6 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -105,6 +105,14 @@ and storage providers for storage deals`, Comment: `The maximum number of simultaneous data transfers between the client and storage providers for retrieval deals`, }, + { + Name: "OffChainRetrieval", + Type: "bool", + + Comment: `Require that retrievals perform no on-chain retrievals. Paid retrievals +without existing payment channels with available funds will fail instead +of automatically performing on-chain operations.`, + }, }, "Common": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 715f48248..7e9064614 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -383,6 +383,11 @@ type Client struct { // The maximum number of simultaneous data transfers between the client // and storage providers for retrieval deals SimultaneousTransfersForRetrieval uint64 + + // Require that retrievals perform no on-chain retrievals. Paid retrievals + // without existing payment channels with available funds will fail instead + // of automatically performing on-chain operations. + OffChainRetrieval bool } type Wallet struct { diff --git a/node/modules/client.go b/node/modules/client.go index 48f9dc3d7..1e7418204 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -202,26 +202,28 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT } // RetrievalClient creates a new retrieval client attached to the client blockstore -func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, +func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, + ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI) + network := rmnet.NewFromLibp2pHost(h) + ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) + client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor) + if err != nil { + return nil, err + } + client.OnReady(marketevents.ReadyLogger("retrieval client")) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + client.SubscribeToEvents(marketevents.RetrievalClientLogger) - adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI) - network := rmnet.NewFromLibp2pHost(h) - ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) - client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor) - if err != nil { - return nil, err + evtType := j.RegisterEventType("markets/retrieval/client", "state_change") + client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType)) + + return client.Start(ctx) + }, + }) + return client, nil } - client.OnReady(marketevents.ReadyLogger("retrieval client")) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - client.SubscribeToEvents(marketevents.RetrievalClientLogger) - - evtType := j.RegisterEventType("markets/retrieval/client", "state_change") - client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType)) - - return client.Start(ctx) - }, - }) - return client, nil }