hook splitstore into DI

This commit is contained in:
vyzo 2020-12-01 17:35:58 +02:00
parent e07c6c71c0
commit 622b4f7d9d
5 changed files with 72 additions and 5 deletions

View File

@ -591,6 +591,7 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.StateBlockstore), modules.StateBlockstore),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),

View File

@ -9,6 +9,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/store/splitstore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
@ -31,18 +32,67 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked
return bs, err
}
func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, bs dtypes.UniversalBlockstore) (dtypes.SplitBlockstore, error) {
path, err := r.SplitstorePath()
if err != nil {
return nil, err
}
ss, err := splitstore.NewSplitStore(path, ds, bs)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
return ss.Close()
},
})
return ss, err
}
// StateBlockstore returns the blockstore to use to store the state tree.
// StateBlockstore is a hook to overlay caches for state objects, or in the
// future, to segregate the universal blockstore into different physical state
// and chain stores.
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
return bs, nil
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
sbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
Name: "state",
BlockCapacity: 288 * 1024 * 1024, // 288MiB.
ExistsCapacity: 48 * 1024 * 1024, // 48MiB.
})
if err != nil {
return nil, err
}
// this may end up double closing the underlying blockstore, but all
// blockstores should be lenient or idempotent on double-close. The native
// badger blockstore is (and unit tested).
if c, ok := bs.(io.Closer); ok {
lc.Append(closerStopHook(c))
}
return sbs, nil
}
// ChainBlockstore returns the blockstore to use for chain data (tipsets, blocks, messages).
// ChainBlockstore is a hook to overlay caches for state objects, or in the
// future, to segregate the universal blockstore into different physical state
// and chain stores.
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
return bs, nil
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
Name: "chain",
BlockCapacity: 64 * 1024 * 1024, // 64MiB.
ExistsCapacity: 16 * 1024, // 16MiB.
})
if err != nil {
return nil, err
}
// this may end up double closing the underlying blockstore, but all
// blockstores should be lenient or idempotent on double-close. The native
// badger blockstore is (and unit tested).
if c, ok := bs.(io.Closer); ok {
lc.Append(closerStopHook(c))
}
return cbs, nil
}
func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore {

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/store/splitstore"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
@ -72,13 +73,22 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
return mp, nil
}
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, ss dtypes.SplitBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}
if ssp, ok := ss.(*splitstore.SplitStore); ok {
err := ssp.Start(chain)
if err != nil {
log.Errorf("error starting splitstore: %s", err)
}
} else {
log.Warnf("unexpected splitstore type: %+v", ss)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return chain.Close()

View File

@ -27,6 +27,9 @@ type (
// UniversalBlockstore is the cold blockstore.
UniversalBlockstore blockstore.Blockstore
// SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore.
SplitBlockstore blockstore.Blockstore
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
// messages). It is physically backed by the BareMonolithBlockstore, but it
// has a cache on top that is specially tuned for chain data access

View File

@ -66,6 +66,9 @@ type LockedRepo interface {
// SplitstorePath returns the path for the SplitStore
SplitstorePath() (string, error)
// SplitstorePath returns the path for the SplitStore
SplitstorePath() (string, error)
// Returns config in this repo
Config() (interface{}, error)
SetConfig(func(interface{})) error