feat:splitstore:single compaction that can handle prune aka two marksets one compaction (#9571)
* begin * rough draft -- this should probably actually work? * WIP * Start testing * message mode * Fix tests, make gen * Better default * docsgen-cli * Review Response Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com>
This commit is contained in:
parent
65ee059591
commit
4ffded6fef
@ -98,6 +98,10 @@ type Config struct {
|
||||
// and directly purges cold blocks.
|
||||
DiscardColdBlocks bool
|
||||
|
||||
// UniversalColdBlocks indicates whether all blocks being garbage collected and purged
|
||||
// from the hotstore should be written to the cold store
|
||||
UniversalColdBlocks bool
|
||||
|
||||
// HotstoreMessageRetention indicates the hotstore retention policy for messages.
|
||||
// It has the following semantics:
|
||||
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
|
||||
@ -111,21 +115,6 @@ type Config struct {
|
||||
// A positive value is the number of compactions before a full GC is performed;
|
||||
// a value of 1 will perform full GC in every compaction.
|
||||
HotStoreFullGCFrequency uint64
|
||||
|
||||
// EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
|
||||
// where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
|
||||
// Default is false
|
||||
EnableColdStoreAutoPrune bool
|
||||
|
||||
// ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
|
||||
// Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
|
||||
// full GC in every prune.
|
||||
// Default is 7 (about once every a week)
|
||||
ColdStoreFullGCFrequency uint64
|
||||
|
||||
// ColdStoreRetention specifies the retention policy for data reachable from the chain, in
|
||||
// finalities beyond the compaction boundary, default is 0, -1 retains everything
|
||||
ColdStoreRetention int64
|
||||
}
|
||||
|
||||
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
||||
|
@ -125,7 +125,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func(cid.Cid) error { return nil })
|
||||
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("error walking chain: %w", err)
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
@ -134,39 +133,6 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||
log.Infow("compaction done", "took", time.Since(start))
|
||||
}()
|
||||
// only prune if auto prune is enabled and after at least one compaction
|
||||
} else if s.cfg.EnableColdStoreAutoPrune && epoch-s.pruneEpoch > PruneThreshold && s.compactionIndex > 0 {
|
||||
s.beginTxnProtect()
|
||||
s.compactType = cold
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&s.compacting, 0)
|
||||
defer s.endTxnProtect()
|
||||
|
||||
log.Info("pruning splitstore")
|
||||
start := time.Now()
|
||||
|
||||
var retainP func(int64) bool
|
||||
switch {
|
||||
case s.cfg.ColdStoreRetention > int64(0):
|
||||
retainP = func(depth int64) bool {
|
||||
return depth <= int64(CompactionBoundary)+s.cfg.ColdStoreRetention*int64(build.Finality)
|
||||
}
|
||||
case s.cfg.ColdStoreRetention < 0:
|
||||
retainP = func(_ int64) bool { return true }
|
||||
default:
|
||||
retainP = func(depth int64) bool {
|
||||
return depth <= int64(CompactionBoundary)
|
||||
}
|
||||
}
|
||||
movingGC := s.cfg.ColdStoreFullGCFrequency > 0 && s.pruneIndex%int64(s.cfg.ColdStoreFullGCFrequency) == 0
|
||||
var gcOpts []bstore.BlockstoreGCOption
|
||||
if movingGC {
|
||||
gcOpts = append(gcOpts, bstore.WithFullGC(true))
|
||||
}
|
||||
doGC := func() error { return s.gcBlockstore(s.cold, gcOpts) }
|
||||
|
||||
s.prune(curTs, retainP, doGC)
|
||||
log.Infow("prune done", "took", time.Since(start))
|
||||
}()
|
||||
} else {
|
||||
// no compaction necessary
|
||||
atomic.StoreInt32(&s.compacting, 0)
|
||||
@ -562,6 +528,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
defer markSet.Close() //nolint:errcheck
|
||||
defer s.debug.Flush()
|
||||
|
||||
coldSet, err := s.markSetEnv.New("cold", s.markSetSize)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating cold mark set: %w", err)
|
||||
}
|
||||
defer coldSet.Close() //nolint:errcheck
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -580,24 +552,52 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
startMark := time.Now()
|
||||
|
||||
count := new(int64)
|
||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{},
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
visit, err := markSet.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
atomic.AddInt64(count, 1)
|
||||
coldCount := new(int64)
|
||||
fCold := func(c cid.Cid) error {
|
||||
// Writes to cold set optimized away in universal and discard mode
|
||||
//
|
||||
// Nothing gets written to cold store in discard mode so no cold objects to write
|
||||
// Everything not marked hot gets written to cold store in universal mode so no need to track cold objects separately
|
||||
if s.cfg.DiscardColdBlocks || s.cfg.UniversalColdBlocks {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
visit, err := coldSet.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
atomic.AddInt64(coldCount, 1)
|
||||
return nil
|
||||
}
|
||||
fHot := func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
visit, err := markSet.Visit(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error visiting object: %w", err)
|
||||
}
|
||||
|
||||
if !visit {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
atomic.AddInt64(count, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error marking: %w", err)
|
||||
@ -631,8 +631,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
defer coldw.Close() //nolint:errcheck
|
||||
|
||||
purgew, err := NewColdSetWriter(s.discardSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating deadset: %w", err)
|
||||
}
|
||||
defer purgew.Close() //nolint:errcheck
|
||||
|
||||
// some stats for logging
|
||||
var hotCnt, coldCnt int
|
||||
var hotCnt, coldCnt, purgeCnt int
|
||||
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
||||
// was it marked?
|
||||
mark, err := markSet.Has(c)
|
||||
@ -645,9 +651,27 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// it's cold, mark it as candidate for move
|
||||
// it needs to be removed from hot store, mark it as candidate for purge
|
||||
if err := purgew.Write(c); err != nil {
|
||||
return xerrors.Errorf("error writing cid to purge set: %w", err)
|
||||
}
|
||||
purgeCnt++
|
||||
|
||||
coldMark, err := coldSet.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking cold mark set for %s: %w", c, err)
|
||||
}
|
||||
|
||||
// Discard mode: coldMark == false, s.cfg.UniversalColdBlocks == false, always return here, no writes to cold store
|
||||
// Universal mode: coldMark == false, s.cfg.UniversalColdBlocks == true, never stop here, all writes to cold store
|
||||
// Otherwise: s.cfg.UniversalColdBlocks == false, if !coldMark stop here and don't write to cold store, if coldMark continue and write to cold store
|
||||
if !coldMark && !s.cfg.UniversalColdBlocks { // universal mode means mark everything as cold
|
||||
return nil
|
||||
}
|
||||
|
||||
// it's cold, mark as candidate for move
|
||||
if err := coldw.Write(c); err != nil {
|
||||
return xerrors.Errorf("error writing cid to coldstore: %w", err)
|
||||
return xerrors.Errorf("error writing cid to cold set")
|
||||
}
|
||||
coldCnt++
|
||||
|
||||
@ -656,7 +680,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error collecting cold objects: %w", err)
|
||||
}
|
||||
|
||||
if err := purgew.Close(); err != nil {
|
||||
return xerrors.Errorf("erroring closing purgeset: %w", err)
|
||||
}
|
||||
if err := coldw.Close(); err != nil {
|
||||
return xerrors.Errorf("error closing coldset: %w", err)
|
||||
}
|
||||
@ -705,6 +731,12 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
}
|
||||
|
||||
purger, err := NewColdSetReader(s.discardSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening coldset: %w", err)
|
||||
}
|
||||
defer purger.Close() //nolint:errcheck
|
||||
|
||||
// 4. Purge cold objects with checkpointing for recovery.
|
||||
// This is the critical section of compaction, whereby any cold object not in the markSet is
|
||||
// considered already deleted.
|
||||
@ -736,7 +768,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
// 5. purge cold objects from the hotstore, taking protected references into account
|
||||
log.Info("purging cold objects from the hotstore")
|
||||
startPurge := time.Now()
|
||||
err = s.purge(coldr, checkpoint, markSet)
|
||||
err = s.purge(purger, checkpoint, markSet)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging cold objects: %w", err)
|
||||
}
|
||||
@ -864,7 +896,7 @@ func (s *SplitStore) endCriticalSection() {
|
||||
}
|
||||
|
||||
func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEpoch,
|
||||
visitor ObjectVisitor, f func(cid.Cid) error) error {
|
||||
visitor ObjectVisitor, fHot, fCold func(cid.Cid) error) error {
|
||||
var walked ObjectVisitor
|
||||
var mx sync.Mutex
|
||||
// we copy the tipset first into a new slice, which allows us to reuse it in every epoch.
|
||||
@ -886,7 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
|
||||
atomic.AddInt64(walkCnt, 1)
|
||||
|
||||
if err := f(c); err != nil {
|
||||
if err := fHot(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -904,27 +936,37 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
||||
if inclMsgs < inclState {
|
||||
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
|
||||
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, f, stopWalk); err != nil {
|
||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||
}
|
||||
|
||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, f, stopWalk); err != nil {
|
||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||
}
|
||||
} else {
|
||||
if err := s.walkObject(hdr.Messages, visitor, f); err != nil {
|
||||
if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil {
|
||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||
}
|
||||
|
||||
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, f); err != nil {
|
||||
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil {
|
||||
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// messages and receipts outside of inclMsgs are included in the cold store
|
||||
if hdr.Height < inclMsgs && hdr.Height > 0 {
|
||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||
}
|
||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil {
|
||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||
}
|
||||
}
|
||||
|
||||
// state is only retained if within the inclState boundary, with the exception of genesis
|
||||
if hdr.Height >= inclState || hdr.Height == 0 {
|
||||
if err := s.walkObject(hdr.ParentStateRoot, visitor, f); err != nil {
|
||||
if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil {
|
||||
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
||||
}
|
||||
atomic.AddInt64(scanCnt, 1)
|
||||
@ -1296,7 +1338,7 @@ func (s *SplitStore) coldSetPath() string {
|
||||
return filepath.Join(s.path, "coldset")
|
||||
}
|
||||
|
||||
func (s *SplitStore) deadSetPath() string {
|
||||
func (s *SplitStore) discardSetPath() string {
|
||||
return filepath.Join(s.path, "deadset")
|
||||
}
|
||||
|
||||
|
@ -208,7 +208,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
|
||||
log.Info("collecting dead objects")
|
||||
startCollect := time.Now()
|
||||
|
||||
deadw, err := NewColdSetWriter(s.deadSetPath())
|
||||
deadw, err := NewColdSetWriter(s.discardSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating coldset: %w", err)
|
||||
}
|
||||
@ -267,7 +267,7 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
|
||||
return err
|
||||
}
|
||||
|
||||
deadr, err := NewColdSetReader(s.deadSetPath())
|
||||
deadr, err := NewColdSetReader(s.discardSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening deadset: %w", err)
|
||||
}
|
||||
@ -311,10 +311,10 @@ func (s *SplitStore) doPrune(curTs *types.TipSet, retainStateP func(int64) bool,
|
||||
log.Warnf("error removing checkpoint: %s", err)
|
||||
}
|
||||
if err := deadr.Close(); err != nil {
|
||||
log.Warnf("error closing deadset: %s", err)
|
||||
log.Warnf("error closing discard set: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.deadSetPath()); err != nil {
|
||||
log.Warnf("error removing deadset: %s", err)
|
||||
if err := os.Remove(s.discardSetPath()); err != nil {
|
||||
log.Warnf("error removing discard set: %s", err)
|
||||
}
|
||||
|
||||
// we are done; do some housekeeping
|
||||
@ -344,7 +344,7 @@ func (s *SplitStore) completePrune() error {
|
||||
}
|
||||
defer checkpoint.Close() //nolint:errcheck
|
||||
|
||||
deadr, err := NewColdSetReader(s.deadSetPath())
|
||||
deadr, err := NewColdSetReader(s.discardSetPath())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error opening deadset: %w", err)
|
||||
}
|
||||
@ -378,7 +378,7 @@ func (s *SplitStore) completePrune() error {
|
||||
if err := deadr.Close(); err != nil {
|
||||
log.Warnf("error closing deadset: %s", err)
|
||||
}
|
||||
if err := os.Remove(s.deadSetPath()); err != nil {
|
||||
if err := os.Remove(s.discardSetPath()); err != nil {
|
||||
log.Warnf("error removing deadset: %s", err)
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,7 @@ func init() {
|
||||
func testSplitStore(t *testing.T, cfg *Config) {
|
||||
ctx := context.Background()
|
||||
chain := &mockChain{t: t}
|
||||
fmt.Printf("Config: %v\n", cfg)
|
||||
|
||||
// the myriads of stores
|
||||
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
||||
@ -225,7 +226,7 @@ func TestSplitStoreCompaction(t *testing.T) {
|
||||
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
|
||||
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
|
||||
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
|
||||
testSplitStore(t, &Config{MarkSetType: "map"})
|
||||
testSplitStore(t, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
||||
}
|
||||
|
||||
func TestSplitStoreCompactionWithBadger(t *testing.T) {
|
||||
@ -237,7 +238,7 @@ func TestSplitStoreCompactionWithBadger(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
badgerMarkSetBatchSize = bs
|
||||
})
|
||||
testSplitStore(t, &Config{MarkSetType: "badger"})
|
||||
testSplitStore(t, &Config{MarkSetType: "badger", UniversalColdBlocks: true})
|
||||
}
|
||||
|
||||
func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
|
||||
@ -283,7 +284,7 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
|
||||
path := t.TempDir()
|
||||
|
||||
// open the splitstore
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -422,7 +423,7 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.
|
||||
|
||||
path := t.TempDir()
|
||||
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -522,7 +523,7 @@ func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blocks
|
||||
|
||||
path := t.TempDir()
|
||||
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
|
||||
mx.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
}, func(cid.Cid) error { return nil })
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -166,11 +166,11 @@
|
||||
|
||||
[Chainstore.Splitstore]
|
||||
# ColdStoreType specifies the type of the coldstore.
|
||||
# It can be "universal" (default) or "discard" for discarding cold blocks.
|
||||
# It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks.
|
||||
#
|
||||
# type: string
|
||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORETYPE
|
||||
#ColdStoreType = "universal"
|
||||
#ColdStoreType = "messages"
|
||||
|
||||
# HotStoreType specifies the type of the hotstore.
|
||||
# Only currently supported value is "badger".
|
||||
@ -201,28 +201,4 @@
|
||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY
|
||||
#HotStoreFullGCFrequency = 20
|
||||
|
||||
# EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
|
||||
# where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
|
||||
# Default is false
|
||||
#
|
||||
# type: bool
|
||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_ENABLECOLDSTOREAUTOPRUNE
|
||||
#EnableColdStoreAutoPrune = false
|
||||
|
||||
# ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
|
||||
# Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
|
||||
# full GC in every prune.
|
||||
# Default is 7 (about once every a week)
|
||||
#
|
||||
# type: uint64
|
||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTOREFULLGCFREQUENCY
|
||||
#ColdStoreFullGCFrequency = 7
|
||||
|
||||
# ColdStoreRetention specifies the retention policy for data reachable from the chain, in
|
||||
# finalities beyond the compaction boundary, default is 0, -1 retains everything
|
||||
#
|
||||
# type: int64
|
||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORERETENTION
|
||||
#ColdStoreRetention = 0
|
||||
|
||||
|
||||
|
@ -256,9 +256,6 @@ type CfgOption func(cfg *config.FullNode) error
|
||||
|
||||
func SplitstoreDiscard() NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
//cfg.Chainstore.Splitstore.HotStoreType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.MarkSetType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default
|
||||
cfg.Chainstore.EnableSplitstore = true
|
||||
cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.Splitstore.ColdStoreType = "discard" // no cold store
|
||||
@ -268,9 +265,6 @@ func SplitstoreDiscard() NodeOpt {
|
||||
|
||||
func SplitstoreUniversal() NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
//cfg.Chainstore.Splitstore.HotStoreType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.MarkSetType = "badger" // default
|
||||
//cfg.Chainstore.Splitstore.HotStoreMessageRetention = 0 // default
|
||||
cfg.Chainstore.EnableSplitstore = true
|
||||
cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.Splitstore.ColdStoreType = "universal" // universal bs is coldstore
|
||||
@ -278,10 +272,11 @@ func SplitstoreUniversal() NodeOpt {
|
||||
})
|
||||
}
|
||||
|
||||
func SplitstoreAutoPrune() NodeOpt {
|
||||
func SplitstoreMessges() NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
cfg.Chainstore.Splitstore.EnableColdStoreAutoPrune = true // turn on
|
||||
cfg.Chainstore.Splitstore.ColdStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.EnableSplitstore = true
|
||||
cfg.Chainstore.Splitstore.HotStoreFullGCFrequency = 0 // turn off full gc
|
||||
cfg.Chainstore.Splitstore.ColdStoreType = "messages" // universal bs is coldstore, and it accepts messages
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -63,7 +63,16 @@ func TestHotstoreCompactCleansGarbage(t *testing.T) {
|
||||
|
||||
// create garbage
|
||||
g := NewGarbager(ctx, t, full)
|
||||
garbage, e := g.Drop(ctx)
|
||||
// state
|
||||
garbageS, eS := g.Drop(ctx)
|
||||
// message
|
||||
garbageM, eM := g.Message(ctx)
|
||||
e := eM
|
||||
if eS > eM {
|
||||
e = eS
|
||||
}
|
||||
assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
|
||||
|
||||
// calculate next compaction where we should actually see cleanup
|
||||
|
||||
@ -94,7 +103,8 @@ func TestHotstoreCompactCleansGarbage(t *testing.T) {
|
||||
waitForCompaction(ctx, t, garbageCompactionIndex, full)
|
||||
|
||||
// check that garbage is cleaned up
|
||||
assert.False(t, g.Exists(ctx, garbage), "Garbage still exists in blockstore")
|
||||
assert.False(t, g.Exists(ctx, garbageS), "Garbage state still exists in blockstore")
|
||||
assert.False(t, g.Exists(ctx, garbageM), "Garbage message still exists in blockstore")
|
||||
}
|
||||
|
||||
// Create unreachable state
|
||||
@ -112,8 +122,16 @@ func TestColdStorePrune(t *testing.T) {
|
||||
|
||||
// create garbage
|
||||
g := NewGarbager(ctx, t, full)
|
||||
garbage, e := g.Drop(ctx)
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
// state
|
||||
garbageS, eS := g.Drop(ctx)
|
||||
// message
|
||||
garbageM, eM := g.Message(ctx)
|
||||
e := eM
|
||||
if eS > eM {
|
||||
e = eS
|
||||
}
|
||||
assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
|
||||
|
||||
// calculate next compaction where we should actually see cleanup
|
||||
|
||||
@ -148,7 +166,8 @@ func TestColdStorePrune(t *testing.T) {
|
||||
// This data should now be moved to the coldstore.
|
||||
// Access it without hotview to keep it there while checking that it still exists
|
||||
// Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
|
||||
bm.Restart()
|
||||
|
||||
// wait for compaction to finsih and pause to make sure it doesn't start to avoid racing
|
||||
@ -165,14 +184,15 @@ func TestColdStorePrune(t *testing.T) {
|
||||
require.NoError(t, full.ChainPrune(ctx, pruneOpts))
|
||||
bm.Restart()
|
||||
waitForPrune(ctx, t, 1, full)
|
||||
assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store after prune but it's still there")
|
||||
assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state should be removed from cold store after prune but it's still there")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message should be on the cold store after prune")
|
||||
}
|
||||
|
||||
func TestAutoPrune(t *testing.T) {
|
||||
func TestMessagesMode(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
// disable sync checking because efficient itests require that the node is out of sync : /
|
||||
splitstore.CheckSyncGap = false
|
||||
opts := []interface{}{kit.MockProofs(), kit.SplitstoreUniversal(), kit.SplitstoreAutoPrune(), kit.FsRepo()}
|
||||
opts := []interface{}{kit.MockProofs(), kit.SplitstoreMessges(), kit.FsRepo()}
|
||||
full, genesisMiner, ens := kit.EnsembleMinimal(t, opts...)
|
||||
bm := ens.InterconnectAll().BeginMining(4 * time.Millisecond)[0]
|
||||
_ = full
|
||||
@ -180,8 +200,16 @@ func TestAutoPrune(t *testing.T) {
|
||||
|
||||
// create garbage
|
||||
g := NewGarbager(ctx, t, full)
|
||||
garbage, e := g.Drop(ctx)
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
// state
|
||||
garbageS, eS := g.Drop(ctx)
|
||||
// message
|
||||
garbageM, eM := g.Message(ctx)
|
||||
e := eM
|
||||
if eS > eM {
|
||||
e = eS
|
||||
}
|
||||
assert.True(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
|
||||
|
||||
// calculate next compaction where we should actually see cleanup
|
||||
|
||||
@ -213,13 +241,12 @@ func TestAutoPrune(t *testing.T) {
|
||||
|
||||
bm.Pause()
|
||||
|
||||
// This data should now be moved to the coldstore.
|
||||
// Messages should be moved to the coldstore
|
||||
// State should be gced
|
||||
// Access it without hotview to keep it there while checking that it still exists
|
||||
// Only state compute uses hot view so garbager Exists backed by ChainReadObj is all good
|
||||
assert.True(g.t, g.Exists(ctx, garbage), "Garbage not found in splitstore")
|
||||
bm.Restart()
|
||||
waitForPrune(ctx, t, 1, full)
|
||||
assert.False(g.t, g.Exists(ctx, garbage), "Garbage should be removed from cold store through auto prune but it's still there")
|
||||
assert.False(g.t, g.Exists(ctx, garbageS), "Garbage state not found in splitstore")
|
||||
assert.True(g.t, g.Exists(ctx, garbageM), "Garbage message not found in splitstore")
|
||||
}
|
||||
|
||||
func waitForCompaction(ctx context.Context, t *testing.T, cIdx int64, n *kit.TestFullNode) {
|
||||
@ -304,7 +331,7 @@ func NewGarbager(ctx context.Context, t *testing.T, n *kit.TestFullNode) *Garbag
|
||||
latest: 0,
|
||||
maddr4Data: address.Undef,
|
||||
}
|
||||
g.createMiner(ctx)
|
||||
g.createMiner4Data(ctx)
|
||||
g.newPeerID(ctx)
|
||||
return g
|
||||
}
|
||||
@ -320,6 +347,12 @@ func (g *Garbager) Drop(ctx context.Context) (cid.Cid, abi.ChainEpoch) {
|
||||
return c, g.newPeerID(ctx)
|
||||
}
|
||||
|
||||
// message returns the cid referencing a message and the chain epoch it went on chain
|
||||
func (g *Garbager) Message(ctx context.Context) (cid.Cid, abi.ChainEpoch) {
|
||||
mw := g.createMiner(ctx)
|
||||
return mw.Message, mw.Height
|
||||
}
|
||||
|
||||
// exists checks whether the cid is reachable through the node
|
||||
func (g *Garbager) Exists(ctx context.Context, c cid.Cid) bool {
|
||||
// check chain get / blockstore get
|
||||
@ -374,8 +407,15 @@ func (g *Garbager) mInfoCid(ctx context.Context) cid.Cid {
|
||||
return mSt.Info
|
||||
}
|
||||
|
||||
func (g *Garbager) createMiner(ctx context.Context) {
|
||||
func (g *Garbager) createMiner4Data(ctx context.Context) {
|
||||
require.True(g.t, g.maddr4Data == address.Undef, "garbager miner actor already created")
|
||||
mw := g.createMiner(ctx)
|
||||
var retval power6.CreateMinerReturn
|
||||
require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)))
|
||||
g.maddr4Data = retval.IDAddress
|
||||
}
|
||||
|
||||
func (g *Garbager) createMiner(ctx context.Context) *lapi.MsgLookup {
|
||||
owner, err := g.node.WalletDefaultAddress(ctx)
|
||||
require.NoError(g.t, err)
|
||||
worker := owner
|
||||
@ -401,8 +441,5 @@ func (g *Garbager) createMiner(ctx context.Context) {
|
||||
mw, err := g.node.StateWaitMsg(ctx, signed.Cid(), build.MessageConfidence, lapi.LookbackNoLimit, true)
|
||||
require.NoError(g.t, err)
|
||||
require.True(g.t, mw.Receipt.ExitCode == 0, "garbager's internal create miner message failed")
|
||||
|
||||
var retval power6.CreateMinerReturn
|
||||
require.NoError(g.t, retval.UnmarshalCBOR(bytes.NewReader(mw.Receipt.Return)))
|
||||
g.maddr4Data = retval.IDAddress
|
||||
return mw
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ func ConfigFullNode(c interface{}) Option {
|
||||
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
||||
|
||||
If(cfg.Chainstore.EnableSplitstore,
|
||||
If(cfg.Chainstore.Splitstore.ColdStoreType == "universal",
|
||||
If(cfg.Chainstore.Splitstore.ColdStoreType == "universal" || cfg.Chainstore.Splitstore.ColdStoreType == "messages",
|
||||
Override(new(dtypes.ColdBlockstore), From(new(dtypes.UniversalBlockstore)))),
|
||||
If(cfg.Chainstore.Splitstore.ColdStoreType == "discard",
|
||||
Override(new(dtypes.ColdBlockstore), modules.DiscardColdBlockstore)),
|
||||
|
@ -91,12 +91,11 @@ func DefaultFullNode() *FullNode {
|
||||
Chainstore: Chainstore{
|
||||
EnableSplitstore: false,
|
||||
Splitstore: Splitstore{
|
||||
ColdStoreType: "universal",
|
||||
ColdStoreType: "messages",
|
||||
HotStoreType: "badger",
|
||||
MarkSetType: "badger",
|
||||
|
||||
HotStoreFullGCFrequency: 20,
|
||||
ColdStoreFullGCFrequency: 7,
|
||||
HotStoreFullGCFrequency: 20,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -1116,7 +1116,7 @@ submitting proofs to the chain individually`,
|
||||
Type: "string",
|
||||
|
||||
Comment: `ColdStoreType specifies the type of the coldstore.
|
||||
It can be "universal" (default) or "discard" for discarding cold blocks.`,
|
||||
It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks.`,
|
||||
},
|
||||
{
|
||||
Name: "HotStoreType",
|
||||
@ -1147,30 +1147,6 @@ the compaction boundary; default is 0.`,
|
||||
A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||
Default is 20 (about once a week).`,
|
||||
},
|
||||
{
|
||||
Name: "EnableColdStoreAutoPrune",
|
||||
Type: "bool",
|
||||
|
||||
Comment: `EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
|
||||
where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
|
||||
Default is false`,
|
||||
},
|
||||
{
|
||||
Name: "ColdStoreFullGCFrequency",
|
||||
Type: "uint64",
|
||||
|
||||
Comment: `ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
|
||||
Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
|
||||
full GC in every prune.
|
||||
Default is 7 (about once every a week)`,
|
||||
},
|
||||
{
|
||||
Name: "ColdStoreRetention",
|
||||
Type: "int64",
|
||||
|
||||
Comment: `ColdStoreRetention specifies the retention policy for data reachable from the chain, in
|
||||
finalities beyond the compaction boundary, default is 0, -1 retains everything`,
|
||||
},
|
||||
},
|
||||
"StorageMiner": []DocField{
|
||||
{
|
||||
|
@ -555,7 +555,7 @@ type Chainstore struct {
|
||||
|
||||
type Splitstore struct {
|
||||
// ColdStoreType specifies the type of the coldstore.
|
||||
// It can be "universal" (default) or "discard" for discarding cold blocks.
|
||||
// It can be "messages" (default) to store only messages, "universal" to store all chain state or "discard" for discarding cold blocks.
|
||||
ColdStoreType string
|
||||
// HotStoreType specifies the type of the hotstore.
|
||||
// Only currently supported value is "badger".
|
||||
@ -571,21 +571,6 @@ type Splitstore struct {
|
||||
// A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||
// Default is 20 (about once a week).
|
||||
HotStoreFullGCFrequency uint64
|
||||
|
||||
// EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
|
||||
// where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
|
||||
// Default is false
|
||||
EnableColdStoreAutoPrune bool
|
||||
|
||||
// ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
|
||||
// Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
|
||||
// full GC in every prune.
|
||||
// Default is 7 (about once every a week)
|
||||
ColdStoreFullGCFrequency uint64
|
||||
|
||||
// ColdStoreRetention specifies the retention policy for data reachable from the chain, in
|
||||
// finalities beyond the compaction boundary, default is 0, -1 retains everything
|
||||
ColdStoreRetention int64
|
||||
}
|
||||
|
||||
// // Full Node
|
||||
|
@ -84,11 +84,9 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
|
||||
cfg := &splitstore.Config{
|
||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
||||
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
|
||||
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
||||
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
|
||||
EnableColdStoreAutoPrune: cfg.Splitstore.EnableColdStoreAutoPrune,
|
||||
ColdStoreFullGCFrequency: cfg.Splitstore.ColdStoreFullGCFrequency,
|
||||
ColdStoreRetention: cfg.Splitstore.ColdStoreRetention,
|
||||
}
|
||||
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user