From a586d42c3b7396a0bc704f683d1471d66ff7497b Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Feb 2021 15:45:30 +0200 Subject: [PATCH] make hot store DI injectable in the split store, default to badger. --- chain/store/splitstore/splitstore.go | 19 +---- node/builder.go | 40 ++++++++- node/config/def.go | 12 ++- node/modules/blockstore.go | 119 +++++++++++++++++---------- node/modules/chain.go | 8 +- node/modules/dtypes/storage.go | 6 ++ node/repo/interface.go | 1 + 7 files changed, 133 insertions(+), 72 deletions(-) diff --git a/chain/store/splitstore/splitstore.go b/chain/store/splitstore/splitstore.go index d1edd9715..cd6390cae 100644 --- a/chain/store/splitstore/splitstore.go +++ b/chain/store/splitstore/splitstore.go @@ -16,7 +16,6 @@ import ( dstore "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" - lmdbbs "github.com/filecoin-project/go-bs-lmdb" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" @@ -59,32 +58,16 @@ var _ bstore.Blockstore = (*SplitStore)(nil) // NewSplitStore creates a new SplitStore instance, given a path for the hotstore dbs and a cold // blockstore. The SplitStore must be attached to the ChainStore with Start in order to trigger // compaction. -func NewSplitStore(path string, ds dstore.Datastore, cold bstore.Blockstore) (*SplitStore, error) { - // the hot store - path = filepath.Join(path, "hot.db") - hot, err := lmdbbs.Open(&lmdbbs.Options{ - Path: path, - InitialMmapSize: 4 << 30, // 4GiB. - MmapGrowthStepFactor: 1.25, // scale slower than the default of 1.5 - MmapGrowthStepMax: 4 << 30, // 4GiB - RetryDelay: 10 * time.Microsecond, - MaxReaders: 16384, - }) - if err != nil { - return nil, err - } - +func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore) (*SplitStore, error) { // the tracking store snoop, err := NewTrackingStore(filepath.Join(path, "snoop.db")) if err != nil { - hot.Close() //nolint:errcheck return nil, err } // the liveset env env, err := NewLiveSetEnv(filepath.Join(path, "sweep.db")) if err != nil { - hot.Close() //nolint:errcheck snoop.Close() //nolint:errcheck return nil, err } diff --git a/node/builder.go b/node/builder.go index f4aebd429..d4569a402 100644 --- a/node/builder.go +++ b/node/builder.go @@ -586,15 +586,47 @@ func Repo(r repo.Repo) Option { return err } + var cfg *config.Blockstore + switch settings.nodeType { + case repo.FullNode: + cfgp, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config from repo, got: %T", c) + } + cfg = &cfgp.Blockstore + case repo.StorageMiner: + cfgp, ok := c.(*config.StorageMiner) + if !ok { + return xerrors.Errorf("invalid config from repo, got: %T", c) + } + cfg = &cfgp.Blockstore + default: + cfg = &config.Blockstore{} + } + return Options( Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), - Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore), - Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), - Override(new(dtypes.StateBlockstore), modules.StateBlockstore), - Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + + If(cfg.Splitstore, + If(cfg.UseLMDB, + Override(new(dtypes.HotBlockstore), modules.LMDBHotBlockstore)), + If(!cfg.UseLMDB, + Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)), + Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore), + Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateSplitBlockstore), + Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))), + ), + If(!cfg.Splitstore, + Override(new(dtypes.ChainBlockstore), modules.ChainFlatBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateFlatBlockstore), + Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + ), If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1", Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore), diff --git a/node/config/def.go b/node/config/def.go index 579f123c8..56bba08ed 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -12,9 +12,10 @@ import ( // Common is common config between full node and miner type Common struct { - API API - Libp2p Libp2p - Pubsub Pubsub + API API + Libp2p Libp2p + Pubsub Pubsub + Blockstore Blockstore } // FullNode is a full node config @@ -119,6 +120,11 @@ type Pubsub struct { RemoteTracer string } +type Blockstore struct { + Splitstore bool + UseLMDB bool +} + // // Full Node type Metrics struct { diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 160ac8fc4..57c916865 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -3,7 +3,12 @@ package modules import ( "context" "io" + "os" + "path/filepath" + "time" + lmdbbs "github.com/filecoin-project/go-bs-lmdb" + badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger" bstore "github.com/ipfs/go-ipfs-blockstore" "go.uber.org/fx" "golang.org/x/xerrors" @@ -32,13 +37,71 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked return bs, err } -func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, bs dtypes.UniversalBlockstore) (dtypes.SplitBlockstore, error) { +func LMDBHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { path, err := r.SplitstorePath() if err != nil { return nil, err } - ss, err := splitstore.NewSplitStore(path, ds, bs) + path = filepath.Join(path, "hot.db") + bs, err := lmdbbs.Open(&lmdbbs.Options{ + Path: path, + InitialMmapSize: 4 << 30, // 4GiB. + MmapGrowthStepFactor: 1.25, // scale slower than the default of 1.5 + MmapGrowthStepMax: 4 << 30, // 4GiB + RetryDelay: 10 * time.Microsecond, + MaxReaders: 1024, + }) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bs.Close() + }}) + + hot := blockstore.WrapIDStore(bs) + return hot, err +} + +func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { + path, err := r.SplitstorePath() + if err != nil { + return nil, err + } + + path = filepath.Join(path, "hot.bs") + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly()) + if err != nil { + return nil, err + } + + bs, err := badgerbs.Open(opts) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bs.Close() + }}) + + hot := blockstore.WrapIDStore(bs) + return hot, err +} + +func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { + path, err := r.SplitstorePath() + if err != nil { + return nil, err + } + + ss, err := splitstore.NewSplitStore(path, ds, cold, hot) if err != nil { return nil, err } @@ -51,48 +114,20 @@ func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, b return ss, err } -// StateBlockstore returns the blockstore to use to store the state tree. -// StateBlockstore is a hook to overlay caches for state objects, or in the -// future, to segregate the universal blockstore into different physical state -// and chain stores. -func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) { - sbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{ - Name: "state", - BlockCapacity: 288 * 1024 * 1024, // 288MiB. - ExistsCapacity: 48 * 1024 * 1024, // 48MiB. - }) - if err != nil { - return nil, err - } - // this may end up double closing the underlying blockstore, but all - // blockstores should be lenient or idempotent on double-close. The native - // badger blockstore is (and unit tested). - if c, ok := bs.(io.Closer); ok { - lc.Append(closerStopHook(c)) - } - return sbs, nil +func StateFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ColdBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil } -// ChainBlockstore returns the blockstore to use for chain data (tipsets, blocks, messages). -// ChainBlockstore is a hook to overlay caches for state objects, or in the -// future, to segregate the universal blockstore into different physical state -// and chain stores. -func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) { - cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{ - Name: "chain", - BlockCapacity: 64 * 1024 * 1024, // 64MiB. - ExistsCapacity: 16 * 1024, // 16MiB. - }) - if err != nil { - return nil, err - } - // this may end up double closing the underlying blockstore, but all - // blockstores should be lenient or idempotent on double-close. The native - // badger blockstore is (and unit tested). - if c, ok := bs.(io.Closer); ok { - lc.Append(closerStopHook(c)) - } - return cbs, nil +func StateSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil +} + +func ChainFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ColdBlockstore) (dtypes.ChainBlockstore, error) { + return bs, nil +} + +func ChainSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) { + return bs, nil } func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore { diff --git a/node/modules/chain.go b/node/modules/chain.go index 0108a6282..a59418688 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -73,20 +73,18 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds return mp, nil } -func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, ss dtypes.SplitBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { +func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, basebs dtypes.BaseBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { chain := store.NewChainStore(cbs, sbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) } - if ssp, ok := ss.(*splitstore.SplitStore); ok { - err := ssp.Start(chain) + if ss, ok := basebs.(*splitstore.SplitStore); ok { + err := ss.Start(chain) if err != nil { log.Errorf("error starting splitstore: %s", err) } - } else { - log.Warnf("unexpected splitstore type: %+v", ss) } lc.Append(fx.Hook{ diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 4d1d957c0..216ccc1b1 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -27,9 +27,15 @@ type ( // UniversalBlockstore is the cold blockstore. UniversalBlockstore blockstore.Blockstore + // HotBlockstore is the Hot blockstore abstraction for the splitstore + HotBlockstore blockstore.Blockstore + // SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore. SplitBlockstore blockstore.Blockstore + // BaseBlockstore is something, coz DI + BaseBlockstore blockstore.Blockstore + // ChainBlockstore is a blockstore to store chain data (tipsets, blocks, // messages). It is physically backed by the BareMonolithBlockstore, but it // has a cache on top that is specially tuned for chain data access diff --git a/node/repo/interface.go b/node/repo/interface.go index 8c24caac4..d4afbe2a0 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -23,6 +23,7 @@ const ( // well as state. In the future, they may get segregated into different // domains. UniversalBlockstore = BlockstoreDomain("universal") + HotBlockstore = BlockstoreDomain("hot") ) var (