Track size of dags relevant to compaction

This commit is contained in:
zenground0 2023-03-04 09:38:18 -07:00
parent dd998d6b24
commit 5d9739a863
3 changed files with 38 additions and 10 deletions

View File

@ -195,6 +195,17 @@ type SplitStore struct {
// registered protectors
protectors []func(func(cid.Cid) error) error
// dag sizes measured during latest compaction
// logged and used for GC strategy
// protected by compaction lock
szWalk int64
szProtectedTxns int64
szToPurge int64 // expected purges before critical section protections and live marking
// protected by txnLk
szMarkedLiveRefs int64
}
var _ bstore.Blockstore = (*SplitStore)(nil)

View File

@ -95,7 +95,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
size, err := s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
@ -133,7 +133,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
return err
}
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt, "walk size", size)
write("--")
write("cold: %d missing: %d", *coldCnt, *missingCnt)
write("DONE")

View File

@ -199,6 +199,8 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Debugf("marking %d live refs", len(cids))
startMark := time.Now()
szMarked := new(int64)
count := new(int32)
visitor := newConcurrentVisitor()
walkObject := func(c cid.Cid) (int, error) {
@ -228,10 +230,12 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
// optimize the common case of single put
if len(cids) == 1 {
if _, err := walkObject(cids[0]); err != nil {
sz, err := walkObject(cids[0])
if err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
atomic.AddInt64(szMarked, int64(sz))
return
}
@ -243,9 +247,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
worker := func() error {
for c := range workch {
if _, err := walkObject(c); err != nil {
sz, err := walkObject(c)
if err != nil {
return err
}
atomic.AddInt64(szMarked, int64(sz))
}
return nil
@ -268,7 +274,8 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count, "size marked", *szMarked)
s.szMarkedLiveRefs += *szMarked
}
// transactionally protect a view
@ -600,7 +607,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)
if err != nil {
return xerrors.Errorf("error marking: %w", err)
}
@ -640,7 +646,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer purgew.Close() //nolint:errcheck
// some stats for logging
var hotCnt, coldCnt, purgeCnt int
var hotCnt, coldCnt, purgeCnt, szPurge int
err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
@ -652,6 +658,16 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
hotCnt++
return nil
}
if sz, err := s.hot.GetSize(s.ctx, c); err != nil {
if ipld.IsNotFound(err) {
log.Warnf("hotstore missing expected block %s", c)
return nil
}
return xerrors.Errorf("error retrieving block %s from hotstore: %w", c, err)
} else {
szPurge += sz
}
// it needs to be removed from hot store, mark it as candidate for purge
if err := purgew.Write(c); err != nil {
@ -691,7 +707,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("cold collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt)
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt, "purge size", szPurge)
s.szToPurge = int64(szPurge)
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
@ -775,8 +792,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error purging cold objects: %w", err)
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
s.endCriticalSection()
log.Infow("total protected size", s.szProtectedTxns, "total marked live size", s.szMarkedLiveRefs)
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
@ -1069,7 +1086,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
}
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt, "walk size", szWalk)
s.szWalk = *szWalk
return nil
}