refactor(deals): move type instantiation to modules
Move all type instantiation to dtypes & modules for any type unique to provider/client
This commit is contained in:
parent
ccfc3c5b19
commit
4b3bab371b
@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
inet "github.com/libp2p/go-libp2p-core/network"
|
||||
@ -21,7 +19,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
@ -54,7 +51,7 @@ type Client struct {
|
||||
// client will listen to events on the data transfer module
|
||||
// Because we are using only a fake DAGService
|
||||
// implementation, there's no validation or events on the client side
|
||||
dataTransfer datatransfer.ClientDataTransfer
|
||||
dataTransfer dtypes.ClientDataTransfer
|
||||
dag dtypes.ClientDAG
|
||||
discovery *discovery.Local
|
||||
events *events.Events
|
||||
@ -77,7 +74,7 @@ type clientDealUpdate struct {
|
||||
mut func(*ClientDeal)
|
||||
}
|
||||
|
||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, dataTransfer datatransfer.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, chainapi full.ChainAPI) *Client {
|
||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI) *Client {
|
||||
c := &Client{
|
||||
sm: sm,
|
||||
chain: chain,
|
||||
@ -89,7 +86,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
|
||||
fm: fm,
|
||||
events: events.NewEvents(context.TODO(), &chainapi),
|
||||
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
deals: deals,
|
||||
conns: map[cid.Cid]inet.Stream{},
|
||||
|
||||
incoming: make(chan *ClientDeal, 16),
|
||||
|
@ -6,8 +6,6 @@ import (
|
||||
"runtime"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
files "github.com/ipfs/go-ipfs-files"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/ipld/go-ipld-prime"
|
||||
@ -117,9 +115,9 @@ type ClientRequestValidator struct {
|
||||
|
||||
// NewClientRequestValidator returns a new client request validator for the
|
||||
// given datastore
|
||||
func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator {
|
||||
func NewClientRequestValidator(deals dtypes.ClientDealStore) *ClientRequestValidator {
|
||||
crv := &ClientRequestValidator{
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
deals: deals,
|
||||
}
|
||||
return crv
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ type Provider struct {
|
||||
dag dtypes.StagingDAG
|
||||
|
||||
// dataTransfer is the manager of data transfers used by this storage provider
|
||||
dataTransfer datatransfer.ProviderDataTransfer
|
||||
dataTransfer dtypes.ProviderDataTransfer
|
||||
|
||||
deals *statestore.StateStore
|
||||
ds dtypes.MetadataDS
|
||||
@ -81,7 +81,7 @@ var (
|
||||
ErrDataTransferFailed = errors.New("Deal data transfer failed")
|
||||
)
|
||||
|
||||
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer datatransfer.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
|
||||
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
|
||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -17,8 +17,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
inet "github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"golang.org/x/xerrors"
|
||||
@ -145,9 +143,9 @@ type ProviderRequestValidator struct {
|
||||
|
||||
// NewProviderRequestValidator returns a new client request validator for the
|
||||
// given datastore
|
||||
func NewProviderRequestValidator(ds dtypes.MetadataDS) *ProviderRequestValidator {
|
||||
func NewProviderRequestValidator(deals dtypes.ProviderDealStore) *ProviderRequestValidator {
|
||||
return &ProviderRequestValidator{
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
deals: deals,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,7 @@ func TestClientRequestValidation(t *testing.T) {
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
|
||||
crv := deals.NewClientRequestValidator(ds)
|
||||
crv := deals.NewClientRequestValidator(state)
|
||||
minerID := peer.ID("fakepeerid")
|
||||
block := blockGenerator.Next()
|
||||
t.Run("ValidatePush fails", func(t *testing.T) {
|
||||
@ -198,7 +198,7 @@ func TestProviderRequestValidation(t *testing.T) {
|
||||
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
|
||||
mrv := deals.NewProviderRequestValidator(ds)
|
||||
mrv := deals.NewProviderRequestValidator(state)
|
||||
clientID := peer.ID("fakepeerid")
|
||||
block := blockGenerator.Next()
|
||||
t.Run("ValidatePull fails", func(t *testing.T) {
|
||||
|
@ -10,8 +10,6 @@ import (
|
||||
ipld "github.com/ipld/go-ipld-prime"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
// This file implements a VERY simple, incomplete version of the data transfer
|
||||
@ -28,15 +26,9 @@ type dagserviceImpl struct {
|
||||
subscriber Subscriber
|
||||
}
|
||||
|
||||
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
||||
// uses the provider's Staging DAG service for transfers
|
||||
func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) Manager {
|
||||
return &dagserviceImpl{dag, nil}
|
||||
}
|
||||
|
||||
// NewClientDAGServiceDataTransfer returns a data transfer manager that just
|
||||
// uses the clients's Client DAG service for transfers
|
||||
func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) Manager {
|
||||
// NewDAGServiceDataTransfer returns a data transfer manager based on
|
||||
// an IPLD DAGService
|
||||
func NewDAGServiceDataTransfer(dag ipldformat.DAGService) Manager {
|
||||
return &dagserviceImpl{dag, nil}
|
||||
}
|
||||
|
||||
|
@ -171,9 +171,3 @@ type Manager interface {
|
||||
// get all in progress transfers
|
||||
InProgressChannels() map[ChannelID]ChannelState
|
||||
}
|
||||
|
||||
// ClientDataTransfer is a data transfer manager for the client
|
||||
type ClientDataTransfer Manager
|
||||
|
||||
// ProviderDataTransfer is a data transfer manager for the provider
|
||||
type ProviderDataTransfer Manager
|
||||
|
@ -27,7 +27,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
@ -221,7 +220,8 @@ func Online() Option {
|
||||
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
||||
|
||||
Override(new(*retrieval.Client), retrieval.NewClient),
|
||||
Override(new(datatransfer.ClientDataTransfer), datatransfer.NewClientDAGServiceDataTransfer),
|
||||
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
|
||||
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
|
||||
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
|
||||
Override(new(*deals.Client), deals.NewClient),
|
||||
Override(RegisterClientValidatorKey, modules.RegisterClientValidator),
|
||||
@ -244,7 +244,8 @@ func Online() Option {
|
||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||
|
||||
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
||||
Override(new(datatransfer.ProviderDataTransfer), datatransfer.NewProviderDAGServiceDataTransfer),
|
||||
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
|
||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
|
||||
Override(new(*deals.Provider), deals.NewProvider),
|
||||
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
@ -47,10 +48,21 @@ func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore {
|
||||
// RegisterClientValidator is an initialization hook that registers the client
|
||||
// request validator with the data transfer module as the validator for
|
||||
// StorageDataTransferVoucher types
|
||||
func RegisterClientValidator(crv *deals.ClientRequestValidator, dtm datatransfer.ClientDataTransfer) {
|
||||
func RegisterClientValidator(crv *deals.ClientRequestValidator, dtm dtypes.ClientDataTransfer) {
|
||||
dtm.RegisterVoucherType(reflect.TypeOf(deals.StorageDataTransferVoucher{}), crv)
|
||||
}
|
||||
|
||||
// NewClientDAGServiceDataTransfer returns a data transfer manager that just
|
||||
// uses the clients's Client DAG service for transfers
|
||||
func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) dtypes.ClientDataTransfer {
|
||||
return datatransfer.NewDAGServiceDataTransfer(dag)
|
||||
}
|
||||
|
||||
// NewClientDealStore creates a statestore for the client to store its deals
|
||||
func NewClientDealStore(ds dtypes.MetadataDS) dtypes.ClientDealStore {
|
||||
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
}
|
||||
|
||||
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
|
||||
bitswapNetwork := network.NewFromIpfsHost(h, rt)
|
||||
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs)
|
||||
|
@ -7,6 +7,9 @@ import (
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
)
|
||||
|
||||
// MetadataDS stores metadata
|
||||
@ -23,5 +26,14 @@ type ChainBlockService bserv.BlockService
|
||||
type ClientFilestore *filestore.Filestore
|
||||
type ClientBlockstore blockstore.Blockstore
|
||||
type ClientDAG ipld.DAGService
|
||||
type ClientDealStore *statestore.StateStore
|
||||
|
||||
// ClientDataTransfer is a data transfer manager for the client
|
||||
type ClientDataTransfer datatransfer.Manager
|
||||
|
||||
type ProviderDealStore *statestore.StateStore
|
||||
|
||||
// ProviderDataTransfer is a data transfer manager for the provider
|
||||
type ProviderDataTransfer datatransfer.Manager
|
||||
|
||||
type StagingDAG ipld.DAGService
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
@ -25,6 +26,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/deals"
|
||||
"github.com/filecoin-project/lotus/datatransfer"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
@ -135,10 +137,21 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *de
|
||||
// RegisterProviderValidator is an initialization hook that registers the provider
|
||||
// request validator with the data transfer module as the validator for
|
||||
// StorageDataTransferVoucher types
|
||||
func RegisterProviderValidator(mrv *deals.ProviderRequestValidator, dtm datatransfer.ProviderDataTransfer) {
|
||||
func RegisterProviderValidator(mrv *deals.ProviderRequestValidator, dtm dtypes.ProviderDataTransfer) {
|
||||
dtm.RegisterVoucherType(reflect.TypeOf(deals.StorageDataTransferVoucher{}), mrv)
|
||||
}
|
||||
|
||||
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
||||
// uses the provider's Staging DAG service for transfers
|
||||
func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) dtypes.ProviderDataTransfer {
|
||||
return datatransfer.NewDAGServiceDataTransfer(dag)
|
||||
}
|
||||
|
||||
// NewProviderDealStore creates a statestore for the client to store its deals
|
||||
func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore {
|
||||
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||
}
|
||||
|
||||
func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {
|
||||
stagingds, err := r.Datastore("/staging")
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user