add configuration for splitstore and default to a simple compaction algorithm
This commit is contained in:
parent
2e4d45ef07
commit
73259aa350
@ -38,12 +38,29 @@ func init() {
|
||||
logging.SetLogLevel("splitstore", "DEBUG")
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// use LMDB for tracking store and liveset instead of BoltDB
|
||||
UseLMDB bool
|
||||
// perform full reachability analysis (expensive) for compaction
|
||||
// You should enable this option if you plan to use the splitstore without a backing coldstore
|
||||
EnableFullCompaction bool
|
||||
// EXPERIMENTAL enable pruning of unreachable objects.
|
||||
// This has not been sufficiently tested yet; only enable if you know what you are doing.
|
||||
// Only applies if you enable full compaction.
|
||||
EnableGC bool
|
||||
// full archival nodes should enable this if EnableFullCompaction is enabled
|
||||
// do NOT enable this if you synced from a snapshot.
|
||||
// Only applies if you enabled full compaction
|
||||
Archival bool
|
||||
}
|
||||
|
||||
type SplitStore struct {
|
||||
compacting int32
|
||||
|
||||
enableGC bool // TODO disabled for now, as it needs testing
|
||||
skipOldMsgs bool // TODO this should be false for full archival nodes
|
||||
skipMsgReceipts bool // TODO this should be false for full archival nodes
|
||||
fullCompaction bool
|
||||
enableGC bool
|
||||
skipOldMsgs bool
|
||||
skipMsgReceipts bool
|
||||
|
||||
baseEpoch abi.ChainEpoch
|
||||
|
||||
@ -64,19 +81,22 @@ var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||
// NewSplitStore creates a new SplitStore instance, given a path for the hotstore dbs and a cold
|
||||
// blockstore. The SplitStore must be attached to the ChainStore with Start in order to trigger
|
||||
// compaction.
|
||||
func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore, useLMDB bool) (*SplitStore, error) {
|
||||
func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore, cfg *Config) (*SplitStore, error) {
|
||||
// the tracking store
|
||||
snoop, err := NewTrackingStore(path, useLMDB)
|
||||
snoop, err := NewTrackingStore(path, cfg.UseLMDB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// the liveset env
|
||||
env, err := NewLiveSetEnv(path, useLMDB)
|
||||
var env LiveSetEnv
|
||||
if cfg.EnableFullCompaction {
|
||||
env, err = NewLiveSetEnv(path, cfg.UseLMDB)
|
||||
if err != nil {
|
||||
snoop.Close() //nolint:errcheck
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// and now we can make a SplitStore
|
||||
ss := &SplitStore{
|
||||
@ -85,9 +105,11 @@ func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore
|
||||
cold: cold,
|
||||
snoop: snoop,
|
||||
env: env,
|
||||
enableGC: false, // TODO option for this
|
||||
skipOldMsgs: true, // TODO option for this
|
||||
skipMsgReceipts: true, // TODO option for this
|
||||
|
||||
fullCompaction: cfg.EnableFullCompaction,
|
||||
enableGC: cfg.EnableGC,
|
||||
skipOldMsgs: !cfg.Archival,
|
||||
skipMsgReceipts: !cfg.Archival,
|
||||
}
|
||||
|
||||
return ss, nil
|
||||
@ -284,9 +306,13 @@ func (s *SplitStore) Close() error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.env != nil {
|
||||
return s.env.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
s.mx.Lock()
|
||||
s.curTs = apply[len(apply)-1]
|
||||
@ -319,6 +345,98 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
||||
|
||||
// Compaction/GC Algorithm
|
||||
func (s *SplitStore) compact() {
|
||||
if s.fullCompaction {
|
||||
s.compactFull()
|
||||
} else {
|
||||
s.compactSimple()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) compactSimple() {
|
||||
// some stats for logging
|
||||
var stHot, stCold int
|
||||
|
||||
coldEpoch := s.baseEpoch + CompactionCold
|
||||
cold := make(map[cid.Cid]struct{})
|
||||
|
||||
log.Info("collecting cold objects")
|
||||
startCollect := time.Now()
|
||||
|
||||
err := s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
|
||||
// is the object stil hot?
|
||||
if wrEpoch > coldEpoch {
|
||||
// yes, stay in the hotstore
|
||||
stHot++
|
||||
return nil
|
||||
}
|
||||
|
||||
cold[cid] = struct{}{}
|
||||
stCold++
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Infow("collection done", "took", time.Since(startCollect))
|
||||
|
||||
log.Info("moving cold objects to the coldstore")
|
||||
startMove := time.Now()
|
||||
|
||||
for cid := range cold {
|
||||
blk, err := s.hot.Get(cid)
|
||||
if err != nil {
|
||||
if err == dstore.ErrNotFound {
|
||||
// this can happen if the node is killed after we have deleted the block from the hotstore
|
||||
// but before we have deleted it from the snoop; just delete the snoop.
|
||||
err = s.snoop.Delete(cid)
|
||||
if err != nil {
|
||||
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error retrieving tracked block %s from hotstore: %w ", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// put the object in the coldstore
|
||||
err = s.cold.Put(blk)
|
||||
if err != nil {
|
||||
log.Errorf("error puting block %s to coldstore: %w", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// delete the object from the hotstore
|
||||
err = s.hot.DeleteBlock(cid)
|
||||
if err != nil {
|
||||
log.Errorf("error deleting block %s from hotstore: %w", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// remove the snoop tracking
|
||||
err = s.snoop.Delete(cid)
|
||||
if err != nil {
|
||||
log.Errorf("error deleting cid %s from tracking store: %w", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("moving done", "took", time.Since(startMove))
|
||||
log.Infow("compaction stats", "hot", stHot, "cold", stCold)
|
||||
}
|
||||
|
||||
func (s *SplitStore) compactFull() {
|
||||
// create two on disk live sets, one for marking the cold finality region
|
||||
// and one for marking the hot region
|
||||
hotSet, err := s.env.NewLiveSet("hot")
|
||||
|
@ -610,10 +610,10 @@ func Repo(r repo.Repo) Option {
|
||||
Override(new(dtypes.MetadataDS), modules.Datastore),
|
||||
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
||||
|
||||
If(cfg.Splitstore,
|
||||
If(cfg.UseLMDBHotstore,
|
||||
If(cfg.EnableSplitstore,
|
||||
If(cfg.Splitstore.UseLMDBHotstore,
|
||||
Override(new(dtypes.HotBlockstore), modules.LMDBHotBlockstore)),
|
||||
If(!cfg.UseLMDBHotstore,
|
||||
If(!cfg.Splitstore.UseLMDBHotstore,
|
||||
Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)),
|
||||
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore),
|
||||
@ -621,7 +621,7 @@ func Repo(r repo.Repo) Option {
|
||||
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||
),
|
||||
If(!cfg.Splitstore,
|
||||
If(!cfg.EnableSplitstore,
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainFlatBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateFlatBlockstore),
|
||||
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||
|
@ -121,9 +121,16 @@ type Pubsub struct {
|
||||
}
|
||||
|
||||
type Blockstore struct {
|
||||
Splitstore bool
|
||||
EnableSplitstore bool
|
||||
Splitstore Splitstore
|
||||
}
|
||||
|
||||
type Splitstore struct {
|
||||
UseLMDBHotstore bool
|
||||
UseLMDBTracking bool
|
||||
EnableFullCompaction bool
|
||||
EnableGC bool // EXPERIMENTAL
|
||||
Archival bool
|
||||
}
|
||||
|
||||
// // Full Node
|
||||
|
@ -103,7 +103,13 @@ func SplitBlockstore(cfg *config.Blockstore) func(lc fx.Lifecycle, r repo.Locked
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ss, err := splitstore.NewSplitStore(path, ds, cold, hot, cfg.UseLMDBTracking)
|
||||
ss, err := splitstore.NewSplitStore(path, ds, cold, hot,
|
||||
&splitstore.Config{
|
||||
UseLMDB: cfg.Splitstore.UseLMDBTracking,
|
||||
EnableFullCompaction: cfg.Splitstore.EnableFullCompaction,
|
||||
EnableGC: cfg.Splitstore.EnableGC,
|
||||
Archival: cfg.Splitstore.Archival,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user