More separation for storage types in di modules

This commit is contained in:
Łukasz Magiera 2019-08-01 16:14:16 +02:00
parent 5ef145463a
commit 6a4b9a6515
8 changed files with 124 additions and 81 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -189,7 +190,7 @@ type BlockSync struct {
syncPeers map[peer.ID]struct{}
}
func NewBlockSyncClient(bserv bserv.BlockService, h host.Host) *BlockSync {
func NewBlockSyncClient(bserv dtypes.ChainBlockService, h host.Host) *BlockSync {
return &BlockSync{
bserv: bserv,
newStream: h.NewStream,

View File

@ -6,13 +6,7 @@ import (
"reflect"
"time"
"github.com/ipfs/go-filestore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
@ -33,6 +27,7 @@ import (
"github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/impl"
"github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
"github.com/filecoin-project/go-lotus/node/modules/testing"
@ -193,11 +188,11 @@ func Online() Option {
Override(new(*store.ChainStore), modules.ChainStore),
Override(new(blockstore.GCLocker), blockstore.NewGCLocker),
Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore),
Override(new(exchange.Interface), modules.Bitswap),
Override(new(bserv.BlockService), bserv.New),
Override(new(ipld.DAGService), testing.MemoryClientDag),
Override(new(dtypes.ChainGCLocker), blockstore.NewGCLocker),
Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore),
Override(new(dtypes.ChainExchange), modules.ChainExchange),
Override(new(dtypes.ChainBlockService), modules.ChainBlockservice),
Override(new(dtypes.ClientDAG), testing.MemoryClientDag),
// Filecoin services
Override(new(*chain.Syncer), chain.NewSyncer),
@ -274,11 +269,11 @@ func Repo(r repo.Repo) Option {
Config(cfg),
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(datastore.Batching), modules.Datastore),
Override(new(blockstore.Blockstore), modules.Blockstore),
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.Blockstore),
Override(new(*filestore.Filestore), modules.ClientFstore),
Override(new(ipld.DAGService), modules.ClientDAG),
Override(new(dtypes.ClientFilestore), modules.ClientFstore),
Override(new(dtypes.ClientDAG), modules.ClientDAG),
Override(new(ci.PrivKey), pk),
Override(new(ci.PubKey), ci.PrivKey.GetPublic),

View File

@ -6,6 +6,8 @@ import (
"os"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/ipfs/go-filestore"
"go.uber.org/fx"
@ -20,8 +22,8 @@ import (
type LocalStorage struct {
fx.In
LocalDAG ipld.DAGService
Filestore *filestore.Filestore `optional:"true"`
LocalDAG dtypes.ClientDAG
Filestore dtypes.ClientFilestore `optional:"true"`
}
func (s *LocalStorage) ClientImport(ctx context.Context, path string) (cid.Cid, error) {

View File

@ -11,17 +11,9 @@ import (
"github.com/gbrlsnchs/jwt/v3"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
@ -36,6 +28,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/filecoin-project/go-lotus/node/repo"
"github.com/filecoin-project/go-lotus/storage"
@ -52,7 +45,7 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator {
}
}
func Bitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface {
func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainExchange {
bitswapNetwork := network.NewFromIpfsHost(host, rt)
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs)
lc.Append(fx.Hook{
@ -142,50 +135,7 @@ func APISecret(keystore types.KeyStore, lr repo.LockedRepo) (*APIAlg, error) {
return (*APIAlg)(jwt.NewHS256(key.PrivateKey)), nil
}
func Datastore(r repo.LockedRepo) (datastore.Batching, error) {
return r.Datastore("/metadata")
}
func Blockstore(r repo.LockedRepo) (blockstore.Blockstore, error) {
blocks, err := r.Datastore("/blocks")
if err != nil {
return nil, err
}
bs := blockstore.NewBlockstore(blocks)
return blockstore.NewIdStore(bs), nil
}
func ClientFstore(r repo.LockedRepo) (*filestore.Filestore, error) {
clientds, err := r.Datastore("/client")
if err != nil {
return nil, err
}
blocks := namespace.Wrap(clientds, datastore.NewKey("blocks"))
fm := filestore.NewFileManager(clientds, filepath.Dir(r.Path()))
fm.AllowFiles = true
// TODO: fm.AllowUrls (needs more code in client import)
bs := blockstore.NewBlockstore(blocks)
return filestore.NewFilestore(bs, fm), nil
}
func ClientDAG(lc fx.Lifecycle, fstore *filestore.Filestore) ipld.DAGService {
ibs := blockstore.NewIdStore(fstore)
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
dag := merkledag.NewDAGService(bsvc)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bsvc.Close()
},
})
return dag
}
func ChainStore(lc fx.Lifecycle, bs blockstore.Blockstore, ds datastore.Batching) *store.ChainStore {
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS) *store.ChainStore {
chain := store.NewChainStore(bs, ds)
lc.Append(fx.Hook{
@ -203,8 +153,8 @@ func ErrorGenesis() Genesis {
}
}
func LoadGenesis(genBytes []byte) func(blockstore.Blockstore) Genesis {
return func(bs blockstore.Blockstore) Genesis {
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
return func(bs dtypes.ChainBlockstore) Genesis {
return func() (header *types.BlockHeader, e error) {
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
if err != nil {
@ -270,7 +220,7 @@ func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.
return sb, nil
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds datastore.Batching, sb *sectorbuilder.SectorBuilder, w *wallet.Wallet) (*storage.Miner, error) {
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, w *wallet.Wallet) (*storage.Miner, error) {
maddrb, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err

View File

@ -0,0 +1,25 @@
package dtypes
import (
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format"
)
// MetadataDS stores metadata
// dy default it's namespaced under /metadata in main repo datastore
type MetadataDS datastore.Batching
type ChainBlockstore blockstore.Blockstore
type ChainGCLocker blockstore.GCLocker
type ChainGCBlockstore blockstore.GCBlockstore
type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService
type ClientFilestore *filestore.Filestore
type ClientDAG ipld.DAGService

View File

@ -4,12 +4,11 @@ import (
"context"
"fmt"
"github.com/ipfs/go-datastore"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
record "github.com/libp2p/go-libp2p-record"
@ -17,6 +16,7 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
)
@ -65,7 +65,7 @@ func MockHost(mn mocknet.Mocknet, id peer.ID, ps peerstore.Peerstore) (RawHost,
}
func DHTRouting(client bool) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host RawHost, dstore datastore.Batching, validator record.Validator) (BaseIpfsRouting, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host RawHost, dstore dtypes.MetadataDS, validator record.Validator) (BaseIpfsRouting, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
d, err := dht.New(

69
node/modules/storage.go Normal file
View File

@ -0,0 +1,69 @@
package modules
import (
"context"
"path/filepath"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/node/repo"
)
func Datastore(r repo.LockedRepo) (dtypes.MetadataDS, error) {
return r.Datastore("/metadata")
}
func Blockstore(r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
blocks, err := r.Datastore("/blocks")
if err != nil {
return nil, err
}
bs := blockstore.NewBlockstore(blocks)
return blockstore.NewIdStore(bs), nil
}
func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) {
clientds, err := r.Datastore("/client")
if err != nil {
return nil, err
}
blocks := namespace.Wrap(clientds, datastore.NewKey("blocks"))
fm := filestore.NewFileManager(clientds, filepath.Dir(r.Path()))
fm.AllowFiles = true
// TODO: fm.AllowUrls (needs more code in client import)
bs := blockstore.NewBlockstore(blocks)
return filestore.NewFilestore(bs, fm), nil
}
func ClientDAG(lc fx.Lifecycle, fstore dtypes.ClientFilestore) dtypes.ClientDAG {
ibs := blockstore.NewIdStore((*filestore.Filestore)(fstore))
bsvc := blockservice.New(ibs, offline.Exchange(ibs))
dag := merkledag.NewDAGService(bsvc)
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return bsvc.Close()
},
})
return dag
}
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
return blockstore.NewGCBlockstore(bs, gcl)
}
func ChainBlockservice(bs dtypes.ChainBlockstore, rem dtypes.ChainExchange) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
var glog = logging.Logger("genesis")
@ -44,8 +45,8 @@ func MakeGenesisMem(out io.Writer) func(bs blockstore.Blockstore, w *wallet.Wall
}
}
func MakeGenesis(outFile string) func(bs blockstore.Blockstore, w *wallet.Wallet) modules.Genesis {
return func(bs blockstore.Blockstore, w *wallet.Wallet) modules.Genesis {
func MakeGenesis(outFile string) func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis {
return func(bs dtypes.ChainBlockstore, w *wallet.Wallet) modules.Genesis {
return func() (*types.BlockHeader, error) {
glog.Warn("Generating new random genesis block, note that this SHOULD NOT happen unless you are setting up new network")
minerAddr, err := w.GenerateKey(types.KTSecp256k1)