make hot store DI injectable in the split store, default to badger.
This commit is contained in:
parent
7044e623f9
commit
a586d42c3b
@ -16,7 +16,6 @@ import (
|
|||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
lmdbbs "github.com/filecoin-project/go-bs-lmdb"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -59,32 +58,16 @@ var _ bstore.Blockstore = (*SplitStore)(nil)
|
|||||||
// NewSplitStore creates a new SplitStore instance, given a path for the hotstore dbs and a cold
|
// NewSplitStore creates a new SplitStore instance, given a path for the hotstore dbs and a cold
|
||||||
// blockstore. The SplitStore must be attached to the ChainStore with Start in order to trigger
|
// blockstore. The SplitStore must be attached to the ChainStore with Start in order to trigger
|
||||||
// compaction.
|
// compaction.
|
||||||
func NewSplitStore(path string, ds dstore.Datastore, cold bstore.Blockstore) (*SplitStore, error) {
|
func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore) (*SplitStore, error) {
|
||||||
// the hot store
|
|
||||||
path = filepath.Join(path, "hot.db")
|
|
||||||
hot, err := lmdbbs.Open(&lmdbbs.Options{
|
|
||||||
Path: path,
|
|
||||||
InitialMmapSize: 4 << 30, // 4GiB.
|
|
||||||
MmapGrowthStepFactor: 1.25, // scale slower than the default of 1.5
|
|
||||||
MmapGrowthStepMax: 4 << 30, // 4GiB
|
|
||||||
RetryDelay: 10 * time.Microsecond,
|
|
||||||
MaxReaders: 16384,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// the tracking store
|
// the tracking store
|
||||||
snoop, err := NewTrackingStore(filepath.Join(path, "snoop.db"))
|
snoop, err := NewTrackingStore(filepath.Join(path, "snoop.db"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hot.Close() //nolint:errcheck
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// the liveset env
|
// the liveset env
|
||||||
env, err := NewLiveSetEnv(filepath.Join(path, "sweep.db"))
|
env, err := NewLiveSetEnv(filepath.Join(path, "sweep.db"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
hot.Close() //nolint:errcheck
|
|
||||||
snoop.Close() //nolint:errcheck
|
snoop.Close() //nolint:errcheck
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -586,15 +586,47 @@ func Repo(r repo.Repo) Option {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var cfg *config.Blockstore
|
||||||
|
switch settings.nodeType {
|
||||||
|
case repo.FullNode:
|
||||||
|
cfgp, ok := c.(*config.FullNode)
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("invalid config from repo, got: %T", c)
|
||||||
|
}
|
||||||
|
cfg = &cfgp.Blockstore
|
||||||
|
case repo.StorageMiner:
|
||||||
|
cfgp, ok := c.(*config.StorageMiner)
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("invalid config from repo, got: %T", c)
|
||||||
|
}
|
||||||
|
cfg = &cfgp.Blockstore
|
||||||
|
default:
|
||||||
|
cfg = &config.Blockstore{}
|
||||||
|
}
|
||||||
|
|
||||||
return Options(
|
return Options(
|
||||||
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
|
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
|
||||||
|
|
||||||
Override(new(dtypes.MetadataDS), modules.Datastore),
|
Override(new(dtypes.MetadataDS), modules.Datastore),
|
||||||
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
||||||
|
|
||||||
|
If(cfg.Splitstore,
|
||||||
|
If(cfg.UseLMDB,
|
||||||
|
Override(new(dtypes.HotBlockstore), modules.LMDBHotBlockstore)),
|
||||||
|
If(!cfg.UseLMDB,
|
||||||
|
Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)),
|
||||||
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore),
|
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore),
|
||||||
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
|
Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore),
|
||||||
Override(new(dtypes.StateBlockstore), modules.StateBlockstore),
|
Override(new(dtypes.StateBlockstore), modules.StateSplitBlockstore),
|
||||||
|
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||||
|
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||||
|
),
|
||||||
|
If(!cfg.Splitstore,
|
||||||
|
Override(new(dtypes.ChainBlockstore), modules.ChainFlatBlockstore),
|
||||||
|
Override(new(dtypes.StateBlockstore), modules.StateFlatBlockstore),
|
||||||
|
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||||
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
|
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||||
|
),
|
||||||
|
|
||||||
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
|
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
|
||||||
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
|
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
|
||||||
|
@ -15,6 +15,7 @@ type Common struct {
|
|||||||
API API
|
API API
|
||||||
Libp2p Libp2p
|
Libp2p Libp2p
|
||||||
Pubsub Pubsub
|
Pubsub Pubsub
|
||||||
|
Blockstore Blockstore
|
||||||
}
|
}
|
||||||
|
|
||||||
// FullNode is a full node config
|
// FullNode is a full node config
|
||||||
@ -119,6 +120,11 @@ type Pubsub struct {
|
|||||||
RemoteTracer string
|
RemoteTracer string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Blockstore struct {
|
||||||
|
Splitstore bool
|
||||||
|
UseLMDB bool
|
||||||
|
}
|
||||||
|
|
||||||
// // Full Node
|
// // Full Node
|
||||||
|
|
||||||
type Metrics struct {
|
type Metrics struct {
|
||||||
|
@ -3,7 +3,12 @@ package modules
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
lmdbbs "github.com/filecoin-project/go-bs-lmdb"
|
||||||
|
badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger"
|
||||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -32,13 +37,71 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked
|
|||||||
return bs, err
|
return bs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, bs dtypes.UniversalBlockstore) (dtypes.SplitBlockstore, error) {
|
func LMDBHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
|
||||||
path, err := r.SplitstorePath()
|
path, err := r.SplitstorePath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ss, err := splitstore.NewSplitStore(path, ds, bs)
|
path = filepath.Join(path, "hot.db")
|
||||||
|
bs, err := lmdbbs.Open(&lmdbbs.Options{
|
||||||
|
Path: path,
|
||||||
|
InitialMmapSize: 4 << 30, // 4GiB.
|
||||||
|
MmapGrowthStepFactor: 1.25, // scale slower than the default of 1.5
|
||||||
|
MmapGrowthStepMax: 4 << 30, // 4GiB
|
||||||
|
RetryDelay: 10 * time.Microsecond,
|
||||||
|
MaxReaders: 1024,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStop: func(_ context.Context) error {
|
||||||
|
return bs.Close()
|
||||||
|
}})
|
||||||
|
|
||||||
|
hot := blockstore.WrapIDStore(bs)
|
||||||
|
return hot, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
|
||||||
|
path, err := r.SplitstorePath()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
path = filepath.Join(path, "hot.bs")
|
||||||
|
if err := os.MkdirAll(path, 0755); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err := badgerbs.Open(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStop: func(_ context.Context) error {
|
||||||
|
return bs.Close()
|
||||||
|
}})
|
||||||
|
|
||||||
|
hot := blockstore.WrapIDStore(bs)
|
||||||
|
return hot, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
|
||||||
|
path, err := r.SplitstorePath()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ss, err := splitstore.NewSplitStore(path, ds, cold, hot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -51,48 +114,20 @@ func SplitBlockstore(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, b
|
|||||||
return ss, err
|
return ss, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateBlockstore returns the blockstore to use to store the state tree.
|
func StateFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ColdBlockstore) (dtypes.StateBlockstore, error) {
|
||||||
// StateBlockstore is a hook to overlay caches for state objects, or in the
|
return bs, nil
|
||||||
// future, to segregate the universal blockstore into different physical state
|
|
||||||
// and chain stores.
|
|
||||||
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
|
|
||||||
sbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
|
|
||||||
Name: "state",
|
|
||||||
BlockCapacity: 288 * 1024 * 1024, // 288MiB.
|
|
||||||
ExistsCapacity: 48 * 1024 * 1024, // 48MiB.
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// this may end up double closing the underlying blockstore, but all
|
|
||||||
// blockstores should be lenient or idempotent on double-close. The native
|
|
||||||
// badger blockstore is (and unit tested).
|
|
||||||
if c, ok := bs.(io.Closer); ok {
|
|
||||||
lc.Append(closerStopHook(c))
|
|
||||||
}
|
|
||||||
return sbs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChainBlockstore returns the blockstore to use for chain data (tipsets, blocks, messages).
|
func StateSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
|
||||||
// ChainBlockstore is a hook to overlay caches for state objects, or in the
|
return bs, nil
|
||||||
// future, to segregate the universal blockstore into different physical state
|
|
||||||
// and chain stores.
|
|
||||||
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
|
|
||||||
cbs, err := blockstore.WrapFreecacheCache(helpers.LifecycleCtx(mctx, lc), bs, blockstore.FreecacheConfig{
|
|
||||||
Name: "chain",
|
|
||||||
BlockCapacity: 64 * 1024 * 1024, // 64MiB.
|
|
||||||
ExistsCapacity: 16 * 1024, // 16MiB.
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
// this may end up double closing the underlying blockstore, but all
|
|
||||||
// blockstores should be lenient or idempotent on double-close. The native
|
func ChainFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ColdBlockstore) (dtypes.ChainBlockstore, error) {
|
||||||
// badger blockstore is (and unit tested).
|
return bs, nil
|
||||||
if c, ok := bs.(io.Closer); ok {
|
|
||||||
lc.Append(closerStopHook(c))
|
|
||||||
}
|
}
|
||||||
return cbs, nil
|
|
||||||
|
func ChainSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
|
||||||
|
return bs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore {
|
func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore {
|
||||||
|
@ -73,20 +73,18 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
|
|||||||
return mp, nil
|
return mp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, ss dtypes.SplitBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, basebs dtypes.BaseBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||||
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
|
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
|
||||||
|
|
||||||
if err := chain.Load(); err != nil {
|
if err := chain.Load(); err != nil {
|
||||||
log.Warnf("loading chain state from disk: %s", err)
|
log.Warnf("loading chain state from disk: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ssp, ok := ss.(*splitstore.SplitStore); ok {
|
if ss, ok := basebs.(*splitstore.SplitStore); ok {
|
||||||
err := ssp.Start(chain)
|
err := ss.Start(chain)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error starting splitstore: %s", err)
|
log.Errorf("error starting splitstore: %s", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
log.Warnf("unexpected splitstore type: %+v", ss)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
|
@ -27,9 +27,15 @@ type (
|
|||||||
// UniversalBlockstore is the cold blockstore.
|
// UniversalBlockstore is the cold blockstore.
|
||||||
UniversalBlockstore blockstore.Blockstore
|
UniversalBlockstore blockstore.Blockstore
|
||||||
|
|
||||||
|
// HotBlockstore is the Hot blockstore abstraction for the splitstore
|
||||||
|
HotBlockstore blockstore.Blockstore
|
||||||
|
|
||||||
// SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore.
|
// SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore.
|
||||||
SplitBlockstore blockstore.Blockstore
|
SplitBlockstore blockstore.Blockstore
|
||||||
|
|
||||||
|
// BaseBlockstore is something, coz DI
|
||||||
|
BaseBlockstore blockstore.Blockstore
|
||||||
|
|
||||||
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
|
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
|
||||||
// messages). It is physically backed by the BareMonolithBlockstore, but it
|
// messages). It is physically backed by the BareMonolithBlockstore, but it
|
||||||
// has a cache on top that is specially tuned for chain data access
|
// has a cache on top that is specially tuned for chain data access
|
||||||
|
@ -23,6 +23,7 @@ const (
|
|||||||
// well as state. In the future, they may get segregated into different
|
// well as state. In the future, they may get segregated into different
|
||||||
// domains.
|
// domains.
|
||||||
UniversalBlockstore = BlockstoreDomain("universal")
|
UniversalBlockstore = BlockstoreDomain("universal")
|
||||||
|
HotBlockstore = BlockstoreDomain("hot")
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
Loading…
Reference in New Issue
Block a user