Merge pull request #10391 from filecoin-project/feat/record-hotstore-space
feat:splitstore:Configure max space used by hotstore and GC makes best effort to respect
This commit is contained in:
commit
faedc12531
@ -115,6 +115,23 @@ type Config struct {
|
|||||||
// A positive value is the number of compactions before a full GC is performed;
|
// A positive value is the number of compactions before a full GC is performed;
|
||||||
// a value of 1 will perform full GC in every compaction.
|
// a value of 1 will perform full GC in every compaction.
|
||||||
HotStoreFullGCFrequency uint64
|
HotStoreFullGCFrequency uint64
|
||||||
|
|
||||||
|
// HotstoreMaxSpaceTarget suggests the max allowed space the hotstore can take.
|
||||||
|
// This is not a hard limit, it is possible for the hotstore to exceed the target
|
||||||
|
// for example if state grows massively between compactions. The splitstore
|
||||||
|
// will make a best effort to avoid overflowing the target and in practice should
|
||||||
|
// never overflow. This field is used when doing GC at the end of a compaction to
|
||||||
|
// adaptively choose moving GC
|
||||||
|
HotstoreMaxSpaceTarget uint64
|
||||||
|
|
||||||
|
// Moving GC will be triggered when total moving size exceeds
|
||||||
|
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
|
||||||
|
HotstoreMaxSpaceThreshold uint64
|
||||||
|
|
||||||
|
// Safety buffer to prevent moving GC from overflowing disk.
|
||||||
|
// Moving GC will not occur when total moving size exceeds
|
||||||
|
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
|
||||||
|
HotstoreMaxSpaceSafetyBuffer uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
||||||
@ -165,6 +182,7 @@ type SplitStore struct {
|
|||||||
|
|
||||||
compactionIndex int64
|
compactionIndex int64
|
||||||
pruneIndex int64
|
pruneIndex int64
|
||||||
|
onlineGCCnt int64
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
@ -195,6 +213,17 @@ type SplitStore struct {
|
|||||||
|
|
||||||
// registered protectors
|
// registered protectors
|
||||||
protectors []func(func(cid.Cid) error) error
|
protectors []func(func(cid.Cid) error) error
|
||||||
|
|
||||||
|
// dag sizes measured during latest compaction
|
||||||
|
// logged and used for GC strategy
|
||||||
|
|
||||||
|
// protected by compaction lock
|
||||||
|
szWalk int64
|
||||||
|
szProtectedTxns int64
|
||||||
|
szKeys int64 // approximate, not counting keys protected when entering critical section
|
||||||
|
|
||||||
|
// protected by txnLk
|
||||||
|
szMarkedLiveRefs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||||
|
@ -95,7 +95,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
defer visitor.Close() //nolint
|
defer visitor.Close() //nolint
|
||||||
|
|
||||||
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
|
size := s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
@ -133,7 +133,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
|
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt, "walk size", size)
|
||||||
write("--")
|
write("--")
|
||||||
write("cold: %d missing: %d", *coldCnt, *missingCnt)
|
write("cold: %d missing: %d", *coldCnt, *missingCnt)
|
||||||
write("DONE")
|
write("DONE")
|
||||||
|
@ -66,7 +66,8 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
batchSize = 16384
|
batchSize = 16384
|
||||||
|
cidKeySize = 128
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
||||||
@ -199,9 +200,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
|
|||||||
log.Debugf("marking %d live refs", len(cids))
|
log.Debugf("marking %d live refs", len(cids))
|
||||||
startMark := time.Now()
|
startMark := time.Now()
|
||||||
|
|
||||||
|
szMarked := new(int64)
|
||||||
|
|
||||||
count := new(int32)
|
count := new(int32)
|
||||||
visitor := newConcurrentVisitor()
|
visitor := newConcurrentVisitor()
|
||||||
walkObject := func(c cid.Cid) error {
|
walkObject := func(c cid.Cid) (int64, error) {
|
||||||
return s.walkObjectIncomplete(c, visitor,
|
return s.walkObjectIncomplete(c, visitor,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
@ -228,10 +231,12 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
|
|||||||
|
|
||||||
// optimize the common case of single put
|
// optimize the common case of single put
|
||||||
if len(cids) == 1 {
|
if len(cids) == 1 {
|
||||||
if err := walkObject(cids[0]); err != nil {
|
sz, err := walkObject(cids[0])
|
||||||
|
if err != nil {
|
||||||
log.Errorf("error marking tipset refs: %s", err)
|
log.Errorf("error marking tipset refs: %s", err)
|
||||||
}
|
}
|
||||||
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
|
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
|
||||||
|
atomic.AddInt64(szMarked, sz)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,9 +248,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
|
|||||||
|
|
||||||
worker := func() error {
|
worker := func() error {
|
||||||
for c := range workch {
|
for c := range workch {
|
||||||
if err := walkObject(c); err != nil {
|
sz, err := walkObject(c)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szMarked, sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -268,7 +275,8 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
|
|||||||
log.Errorf("error marking tipset refs: %s", err)
|
log.Errorf("error marking tipset refs: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
|
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count, "size marked", *szMarked)
|
||||||
|
s.szMarkedLiveRefs += atomic.LoadInt64(szMarked)
|
||||||
}
|
}
|
||||||
|
|
||||||
// transactionally protect a view
|
// transactionally protect a view
|
||||||
@ -361,6 +369,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
|
|||||||
|
|
||||||
log.Infow("protecting transactional references", "refs", len(txnRefs))
|
log.Infow("protecting transactional references", "refs", len(txnRefs))
|
||||||
count := 0
|
count := 0
|
||||||
|
sz := new(int64)
|
||||||
workch := make(chan cid.Cid, len(txnRefs))
|
workch := make(chan cid.Cid, len(txnRefs))
|
||||||
startProtect := time.Now()
|
startProtect := time.Now()
|
||||||
|
|
||||||
@ -393,10 +402,11 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
|
|||||||
|
|
||||||
worker := func() error {
|
worker := func() error {
|
||||||
for c := range workch {
|
for c := range workch {
|
||||||
err := s.doTxnProtect(c, markSet)
|
szTxn, err := s.doTxnProtect(c, markSet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error protecting transactional references to %s: %w", c, err)
|
return xerrors.Errorf("error protecting transactional references to %s: %w", c, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(sz, szTxn)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -409,16 +419,16 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
|
|||||||
if err := g.Wait(); err != nil {
|
if err := g.Wait(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.szProtectedTxns += atomic.LoadInt64(sz)
|
||||||
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count)
|
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count, "protected size", sz)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// transactionally protect a reference by walking the object and marking.
|
// transactionally protect a reference by walking the object and marking.
|
||||||
// concurrent markings are short circuited by checking the markset.
|
// concurrent markings are short circuited by checking the markset.
|
||||||
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
|
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
// Note: cold objects are deleted heaviest first, so the consituents of an object
|
||||||
@ -509,6 +519,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
// might be potentially inconsistent; abort compaction and notify the user to intervene.
|
// might be potentially inconsistent; abort compaction and notify the user to intervene.
|
||||||
return xerrors.Errorf("checkpoint exists; aborting compaction")
|
return xerrors.Errorf("checkpoint exists; aborting compaction")
|
||||||
}
|
}
|
||||||
|
s.clearSizeMeasurements()
|
||||||
|
|
||||||
currentEpoch := curTs.Height()
|
currentEpoch := curTs.Height()
|
||||||
boundaryEpoch := currentEpoch - CompactionBoundary
|
boundaryEpoch := currentEpoch - CompactionBoundary
|
||||||
@ -598,7 +609,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)
|
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error marking: %w", err)
|
return xerrors.Errorf("error marking: %w", err)
|
||||||
}
|
}
|
||||||
@ -638,7 +648,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
defer purgew.Close() //nolint:errcheck
|
defer purgew.Close() //nolint:errcheck
|
||||||
|
|
||||||
// some stats for logging
|
// some stats for logging
|
||||||
var hotCnt, coldCnt, purgeCnt int
|
var hotCnt, coldCnt, purgeCnt int64
|
||||||
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
err = s.hot.ForEachKey(func(c cid.Cid) error {
|
||||||
// was it marked?
|
// was it marked?
|
||||||
mark, err := markSet.Has(c)
|
mark, err := markSet.Has(c)
|
||||||
@ -690,8 +700,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
log.Infow("cold collection done", "took", time.Since(startCollect))
|
log.Infow("cold collection done", "took", time.Since(startCollect))
|
||||||
|
|
||||||
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt)
|
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt)
|
||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
|
s.szKeys = hotCnt * cidKeySize
|
||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
|
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
|
||||||
|
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -773,8 +784,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error purging cold objects: %w", err)
|
return xerrors.Errorf("error purging cold objects: %w", err)
|
||||||
}
|
}
|
||||||
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
|
||||||
|
|
||||||
s.endCriticalSection()
|
s.endCriticalSection()
|
||||||
|
log.Infow("critical section done", "total protected size", s.szProtectedTxns, "total marked live size", s.szMarkedLiveRefs)
|
||||||
|
|
||||||
if err := checkpoint.Close(); err != nil {
|
if err := checkpoint.Close(); err != nil {
|
||||||
log.Warnf("error closing checkpoint: %s", err)
|
log.Warnf("error closing checkpoint: %s", err)
|
||||||
@ -907,6 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
|||||||
copy(toWalk, ts.Cids())
|
copy(toWalk, ts.Cids())
|
||||||
walkCnt := new(int64)
|
walkCnt := new(int64)
|
||||||
scanCnt := new(int64)
|
scanCnt := new(int64)
|
||||||
|
szWalk := new(int64)
|
||||||
|
|
||||||
tsRef := func(blkCids []cid.Cid) (cid.Cid, error) {
|
tsRef := func(blkCids []cid.Cid) (cid.Cid, error) {
|
||||||
return types.NewTipSetKey(blkCids...).Cid()
|
return types.NewTipSetKey(blkCids...).Cid()
|
||||||
@ -942,48 +954,64 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error computing cid reference to parent tipset")
|
return xerrors.Errorf("error computing cid reference to parent tipset")
|
||||||
}
|
}
|
||||||
if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil {
|
sz, err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking parent tipset cid reference")
|
return xerrors.Errorf("error walking parent tipset cid reference")
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
|
|
||||||
// message are retained if within the inclMsgs boundary
|
// message are retained if within the inclMsgs boundary
|
||||||
if hdr.Height >= inclMsgs && hdr.Height > 0 {
|
if hdr.Height >= inclMsgs && hdr.Height > 0 {
|
||||||
if inclMsgs < inclState {
|
if inclMsgs < inclState {
|
||||||
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
|
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
|
||||||
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
|
||||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil {
|
sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
|
|
||||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil {
|
sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
} else {
|
} else {
|
||||||
if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil {
|
sz, err = s.walkObject(hdr.Messages, visitor, fHot)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
|
|
||||||
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil {
|
sz, err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// messages and receipts outside of inclMsgs are included in the cold store
|
// messages and receipts outside of inclMsgs are included in the cold store
|
||||||
if hdr.Height < inclMsgs && hdr.Height > 0 {
|
if hdr.Height < inclMsgs && hdr.Height > 0 {
|
||||||
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil {
|
sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
|
||||||
}
|
}
|
||||||
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil {
|
atomic.AddInt64(szWalk, sz)
|
||||||
|
sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
// state is only retained if within the inclState boundary, with the exception of genesis
|
// state is only retained if within the inclState boundary, with the exception of genesis
|
||||||
if hdr.Height >= inclState || hdr.Height == 0 {
|
if hdr.Height >= inclState || hdr.Height == 0 {
|
||||||
if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil {
|
sz, err := s.walkObject(hdr.ParentStateRoot, visitor, fHot)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
atomic.AddInt64(scanCnt, 1)
|
atomic.AddInt64(scanCnt, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1001,9 +1029,11 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error computing cid reference to parent tipset")
|
return xerrors.Errorf("error computing cid reference to parent tipset")
|
||||||
}
|
}
|
||||||
if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil {
|
sz, err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk)
|
||||||
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking parent tipset cid reference")
|
return xerrors.Errorf("error walking parent tipset cid reference")
|
||||||
}
|
}
|
||||||
|
atomic.AddInt64(szWalk, sz)
|
||||||
|
|
||||||
for len(toWalk) > 0 {
|
for len(toWalk) > 0 {
|
||||||
// walking can take a while, so check this with every opportunity
|
// walking can take a while, so check this with every opportunity
|
||||||
@ -1047,123 +1077,129 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt)
|
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt, "walk size", szWalk)
|
||||||
|
s.szWalk = atomic.LoadInt64(szWalk)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error {
|
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) (int64, error) {
|
||||||
|
var sz int64
|
||||||
visit, err := visitor.Visit(c)
|
visit, err := visitor.Visit(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error visiting object: %w", err)
|
return 0, xerrors.Errorf("error visiting object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !visit {
|
if !visit {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f(c); err != nil {
|
if err := f(c); err != nil {
|
||||||
if err == errStopWalk {
|
if err == errStopWalk {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// check this before recursing
|
// check this before recursing
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var links []cid.Cid
|
var links []cid.Cid
|
||||||
err = s.view(c, func(data []byte) error {
|
err = s.view(c, func(data []byte) error {
|
||||||
|
sz += int64(len(data))
|
||||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||||
links = append(links, c)
|
links = append(links, c)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range links {
|
for _, c := range links {
|
||||||
err := s.walkObject(c, visitor, f)
|
szLink, err := s.walkObject(c, visitor, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
|
sz += szLink
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// like walkObject, but the object may be potentially incomplete (references missing)
|
// like walkObject, but the object may be potentially incomplete (references missing)
|
||||||
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error {
|
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) (int64, error) {
|
||||||
|
var sz int64
|
||||||
visit, err := visitor.Visit(c)
|
visit, err := visitor.Visit(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error visiting object: %w", err)
|
return 0, xerrors.Errorf("error visiting object: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !visit {
|
if !visit {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// occurs check -- only for DAGs
|
// occurs check -- only for DAGs
|
||||||
if c.Prefix().Codec == cid.DagCBOR {
|
if c.Prefix().Codec == cid.DagCBOR {
|
||||||
has, err := s.has(c)
|
has, err := s.has(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error occur checking %s: %w", c, err)
|
return 0, xerrors.Errorf("error occur checking %s: %w", c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !has {
|
if !has {
|
||||||
err = missing(c)
|
err = missing(c)
|
||||||
if err == errStopWalk {
|
if err == errStopWalk {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f(c); err != nil {
|
if err := f(c); err != nil {
|
||||||
if err == errStopWalk {
|
if err == errStopWalk {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// check this before recursing
|
// check this before recursing
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return sz, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var links []cid.Cid
|
var links []cid.Cid
|
||||||
err = s.view(c, func(data []byte) error {
|
err = s.view(c, func(data []byte) error {
|
||||||
|
sz += int64(len(data))
|
||||||
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
|
||||||
links = append(links, c)
|
links = append(links, c)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range links {
|
for _, c := range links {
|
||||||
err := s.walkObjectIncomplete(c, visitor, f, missing)
|
szLink, err := s.walkObjectIncomplete(c, visitor, f, missing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err)
|
||||||
}
|
}
|
||||||
|
sz += szLink
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return sz, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// internal version used during compaction and related operations
|
// internal version used during compaction and related operations
|
||||||
@ -1429,8 +1465,9 @@ func (s *SplitStore) completeCompaction() error {
|
|||||||
}
|
}
|
||||||
s.compactType = none
|
s.compactType = none
|
||||||
|
|
||||||
// Note: at this point we can start the splitstore; a compaction should run on
|
// Note: at this point we can start the splitstore; base epoch is not
|
||||||
// the first head change, which will trigger gc on the hotstore.
|
// incremented here so a compaction should run on the first head
|
||||||
|
// change, which will trigger gc on the hotstore.
|
||||||
// We don't mind the second (back-to-back) compaction as the head will
|
// We don't mind the second (back-to-back) compaction as the head will
|
||||||
// have advanced during marking and coldset accumulation.
|
// have advanced during marking and coldset accumulation.
|
||||||
return nil
|
return nil
|
||||||
@ -1488,6 +1525,13 @@ func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) clearSizeMeasurements() {
|
||||||
|
s.szKeys = 0
|
||||||
|
s.szMarkedLiveRefs = 0
|
||||||
|
s.szProtectedTxns = 0
|
||||||
|
s.szWalk = 0
|
||||||
|
}
|
||||||
|
|
||||||
// I really don't like having this code, but we seem to have some occasional DAG references with
|
// I really don't like having this code, but we seem to have some occasional DAG references with
|
||||||
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
|
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
|
||||||
// after a little bit.
|
// after a little bit.
|
||||||
@ -1528,7 +1572,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
|
|||||||
missing = make(map[cid.Cid]struct{})
|
missing = make(map[cid.Cid]struct{})
|
||||||
|
|
||||||
for c := range towalk {
|
for c := range towalk {
|
||||||
err := s.walkObjectIncomplete(c, visitor,
|
_, err := s.walkObjectIncomplete(c, visitor,
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
|
@ -7,15 +7,61 @@ import (
|
|||||||
bstore "github.com/filecoin-project/lotus/blockstore"
|
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Fraction of garbage in badger vlog for online GC traversal to collect garbage
|
||||||
|
AggressiveOnlineGCThreshold = 0.0001
|
||||||
|
)
|
||||||
|
|
||||||
func (s *SplitStore) gcHotAfterCompaction() {
|
func (s *SplitStore) gcHotAfterCompaction() {
|
||||||
|
// Measure hotstore size, determine if we should do full GC, determine if we can do full GC.
|
||||||
|
// We should do full GC if
|
||||||
|
// FullGCFrequency is specified and compaction index matches frequency
|
||||||
|
// OR HotstoreMaxSpaceTarget is specified and total moving space is within 150 GB of target
|
||||||
|
// We can do full if
|
||||||
|
// HotstoreMaxSpaceTarget is not specified
|
||||||
|
// OR total moving space would not exceed 50 GB below target
|
||||||
|
//
|
||||||
|
// a) If we should not do full GC => online GC
|
||||||
|
// b) If we should do full GC and can => moving GC
|
||||||
|
// c) If we should do full GC and can't => aggressive online GC
|
||||||
|
getSize := func() int64 {
|
||||||
|
sizer, ok := s.hot.(bstore.BlockstoreSize)
|
||||||
|
if ok {
|
||||||
|
size, err := sizer.Size()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error getting hotstore size: %s, estimating empty hot store for targeting", err)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
log.Errorf("Could not measure hotstore size, assuming it is 0 bytes, which it is not")
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
hotSize := getSize()
|
||||||
|
|
||||||
|
copySizeApprox := s.szKeys + s.szMarkedLiveRefs + s.szProtectedTxns + s.szWalk
|
||||||
|
shouldTarget := s.cfg.HotstoreMaxSpaceTarget > 0 && hotSize+copySizeApprox > int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceThreshold)
|
||||||
|
shouldFreq := s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0
|
||||||
|
shouldDoFull := shouldTarget || shouldFreq
|
||||||
|
canDoFull := s.cfg.HotstoreMaxSpaceTarget == 0 || hotSize+copySizeApprox < int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceSafetyBuffer)
|
||||||
|
log.Debugw("approximating new hot store size", "key size", s.szKeys, "marked live refs", s.szMarkedLiveRefs, "protected txns", s.szProtectedTxns, "walked DAG", s.szWalk)
|
||||||
|
log.Infof("measured hot store size: %d, approximate new size: %d, should do full %t, can do full %t", hotSize, copySizeApprox, shouldDoFull, canDoFull)
|
||||||
|
|
||||||
var opts []bstore.BlockstoreGCOption
|
var opts []bstore.BlockstoreGCOption
|
||||||
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
|
if shouldDoFull && canDoFull {
|
||||||
opts = append(opts, bstore.WithFullGC(true))
|
opts = append(opts, bstore.WithFullGC(true))
|
||||||
|
} else if shouldDoFull && !canDoFull {
|
||||||
|
log.Warnf("Attention! Estimated moving GC size %d is not within safety buffer %d of target max %d, performing aggressive online GC to attempt to bring hotstore size down safely", copySizeApprox, s.cfg.HotstoreMaxSpaceSafetyBuffer, s.cfg.HotstoreMaxSpaceTarget)
|
||||||
|
log.Warn("If problem continues you can 1) temporarily allocate more disk space to hotstore and 2) reflect in HotstoreMaxSpaceTarget OR trigger manual move with `lotus chain prune hot-moving`")
|
||||||
|
log.Warn("If problem continues and you do not have any more disk space you can run continue to manually trigger online GC at aggressive thresholds (< 0.01) with `lotus chain prune hot`")
|
||||||
|
|
||||||
|
opts = append(opts, bstore.WithThreshold(AggressiveOnlineGCThreshold))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.gcBlockstore(s.hot, opts); err != nil {
|
if err := s.gcBlockstore(s.hot, opts); err != nil {
|
||||||
log.Warnf("error garbage collecting hostore: %s", err)
|
log.Warnf("error garbage collecting hostore: %s", err)
|
||||||
}
|
}
|
||||||
|
log.Infof("measured hot store size after GC: %d", getSize())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
||||||
|
@ -101,7 +101,7 @@ func (s *SplitStore) doReify(c cid.Cid) {
|
|||||||
defer s.txnLk.RUnlock()
|
defer s.txnLk.RUnlock()
|
||||||
|
|
||||||
count := 0
|
count := 0
|
||||||
err := s.walkObjectIncomplete(c, newTmpVisitor(),
|
_, err := s.walkObjectIncomplete(c, newTmpVisitor(),
|
||||||
func(c cid.Cid) error {
|
func(c cid.Cid) error {
|
||||||
if isUnitaryObject(c) {
|
if isUnitaryObject(c) {
|
||||||
return errStopWalk
|
return errStopWalk
|
||||||
|
@ -230,6 +230,35 @@
|
|||||||
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY
|
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY
|
||||||
#HotStoreFullGCFrequency = 20
|
#HotStoreFullGCFrequency = 20
|
||||||
|
|
||||||
|
# HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
|
||||||
|
# will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
|
||||||
|
# Splitstore GC will NOT run moving GC if the total size of the move would get
|
||||||
|
# within 50 GB of the target, and instead will run a more aggressive online GC.
|
||||||
|
# If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
|
||||||
|
# GC will trigger moving GC if either configuration condition is met.
|
||||||
|
# A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
|
||||||
|
# At this minimum size moving GC happens every time, any smaller and moving GC won't
|
||||||
|
# be able to run. In spring 2023 this minimum is ~550 GB.
|
||||||
|
#
|
||||||
|
# type: uint64
|
||||||
|
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETARGET
|
||||||
|
#HotStoreMaxSpaceTarget = 0
|
||||||
|
|
||||||
|
# When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
|
||||||
|
# exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
|
||||||
|
#
|
||||||
|
# type: uint64
|
||||||
|
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETHRESHOLD
|
||||||
|
#HotStoreMaxSpaceThreshold = 150000000000
|
||||||
|
|
||||||
|
# Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
|
||||||
|
# is set. Moving GC will not occur when total moving size exceeds
|
||||||
|
# HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
|
||||||
|
#
|
||||||
|
# type: uint64
|
||||||
|
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACESAFETYBUFFER
|
||||||
|
#HotstoreMaxSpaceSafetyBuffer = 50000000000
|
||||||
|
|
||||||
|
|
||||||
[Cluster]
|
[Cluster]
|
||||||
# EXPERIMENTAL. config to enabled node cluster with raft consensus
|
# EXPERIMENTAL. config to enabled node cluster with raft consensus
|
||||||
|
@ -95,7 +95,9 @@ func DefaultFullNode() *FullNode {
|
|||||||
HotStoreType: "badger",
|
HotStoreType: "badger",
|
||||||
MarkSetType: "badger",
|
MarkSetType: "badger",
|
||||||
|
|
||||||
HotStoreFullGCFrequency: 20,
|
HotStoreFullGCFrequency: 20,
|
||||||
|
HotStoreMaxSpaceThreshold: 150_000_000_000,
|
||||||
|
HotstoreMaxSpaceSafetyBuffer: 50_000_000_000,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Cluster: *DefaultUserRaftConfig(),
|
Cluster: *DefaultUserRaftConfig(),
|
||||||
|
@ -1286,6 +1286,35 @@ the compaction boundary; default is 0.`,
|
|||||||
A value of 0 disables, while a value 1 will do full GC in every compaction.
|
A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||||
Default is 20 (about once a week).`,
|
Default is 20 (about once a week).`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "HotStoreMaxSpaceTarget",
|
||||||
|
Type: "uint64",
|
||||||
|
|
||||||
|
Comment: `HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
|
||||||
|
will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
|
||||||
|
Splitstore GC will NOT run moving GC if the total size of the move would get
|
||||||
|
within 50 GB of the target, and instead will run a more aggressive online GC.
|
||||||
|
If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
|
||||||
|
GC will trigger moving GC if either configuration condition is met.
|
||||||
|
A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
|
||||||
|
At this minimum size moving GC happens every time, any smaller and moving GC won't
|
||||||
|
be able to run. In spring 2023 this minimum is ~550 GB.`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "HotStoreMaxSpaceThreshold",
|
||||||
|
Type: "uint64",
|
||||||
|
|
||||||
|
Comment: `When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
|
||||||
|
exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "HotstoreMaxSpaceSafetyBuffer",
|
||||||
|
Type: "uint64",
|
||||||
|
|
||||||
|
Comment: `Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
|
||||||
|
is set. Moving GC will not occur when total moving size exceeds
|
||||||
|
HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer`,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"StorageMiner": []DocField{
|
"StorageMiner": []DocField{
|
||||||
{
|
{
|
||||||
|
@ -601,6 +601,25 @@ type Splitstore struct {
|
|||||||
// A value of 0 disables, while a value 1 will do full GC in every compaction.
|
// A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||||
// Default is 20 (about once a week).
|
// Default is 20 (about once a week).
|
||||||
HotStoreFullGCFrequency uint64
|
HotStoreFullGCFrequency uint64
|
||||||
|
// HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
|
||||||
|
// will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
|
||||||
|
// Splitstore GC will NOT run moving GC if the total size of the move would get
|
||||||
|
// within 50 GB of the target, and instead will run a more aggressive online GC.
|
||||||
|
// If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
|
||||||
|
// GC will trigger moving GC if either configuration condition is met.
|
||||||
|
// A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
|
||||||
|
// At this minimum size moving GC happens every time, any smaller and moving GC won't
|
||||||
|
// be able to run. In spring 2023 this minimum is ~550 GB.
|
||||||
|
HotStoreMaxSpaceTarget uint64
|
||||||
|
|
||||||
|
// When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
|
||||||
|
// exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
|
||||||
|
HotStoreMaxSpaceThreshold uint64
|
||||||
|
|
||||||
|
// Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
|
||||||
|
// is set. Moving GC will not occur when total moving size exceeds
|
||||||
|
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
|
||||||
|
HotstoreMaxSpaceSafetyBuffer uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Full Node
|
// // Full Node
|
||||||
|
@ -82,11 +82,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg := &splitstore.Config{
|
cfg := &splitstore.Config{
|
||||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||||
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
||||||
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
|
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
|
||||||
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
||||||
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
|
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
|
||||||
|
HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget,
|
||||||
|
HotstoreMaxSpaceThreshold: cfg.Splitstore.HotStoreMaxSpaceThreshold,
|
||||||
|
HotstoreMaxSpaceSafetyBuffer: cfg.Splitstore.HotstoreMaxSpaceSafetyBuffer,
|
||||||
}
|
}
|
||||||
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user