From dd998d6b2452e1d63eb5e64e52d2093c0aa45fe5 Mon Sep 17 00:00:00 2001 From: zenground0 Date: Fri, 3 Mar 2023 08:53:23 -0500 Subject: [PATCH] Begin account for size during walks --- blockstore/splitstore/splitstore_compact.go | 113 ++++++++++++-------- blockstore/splitstore/splitstore_reify.go | 2 +- 2 files changed, 71 insertions(+), 44 deletions(-) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 59bdd515d..79afe126c 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -201,7 +201,7 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { count := new(int32) visitor := newConcurrentVisitor() - walkObject := func(c cid.Cid) error { + walkObject := func(c cid.Cid) (int, error) { return s.walkObjectIncomplete(c, visitor, func(c cid.Cid) error { if isUnitaryObject(c) { @@ -228,7 +228,7 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { // optimize the common case of single put if len(cids) == 1 { - if err := walkObject(cids[0]); err != nil { + if _, err := walkObject(cids[0]); err != nil { log.Errorf("error marking tipset refs: %s", err) } log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count) @@ -243,7 +243,7 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { worker := func() error { for c := range workch { - if err := walkObject(c); err != nil { + if _, err := walkObject(c); err != nil { return err } } @@ -361,6 +361,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { log.Infow("protecting transactional references", "refs", len(txnRefs)) count := 0 + sz := new(int64) workch := make(chan cid.Cid, len(txnRefs)) startProtect := time.Now() @@ -393,10 +394,11 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { worker := func() error { for c := range workch { - err := s.doTxnProtect(c, markSet) + szTxn, err := s.doTxnProtect(c, markSet) if err != nil { return xerrors.Errorf("error protecting transactional references to %s: %w", c, err) } + atomic.AddInt64(sz, int64(szTxn)) } return nil } @@ -410,15 +412,15 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { return err } - log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count) + log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count, "protected size", sz) } } // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. -func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { +func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int, error) { if err := s.checkClosing(); err != nil { - return err + return 0, err } // Note: cold objects are deleted heaviest first, so the consituents of an object @@ -907,6 +909,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp copy(toWalk, ts.Cids()) walkCnt := new(int64) scanCnt := new(int64) + szWalk := new(int64) tsRef := func(blkCids []cid.Cid) (cid.Cid, error) { return types.NewTipSetKey(blkCids...).Cid() @@ -942,8 +945,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if err != nil { return xerrors.Errorf("error computing cid reference to parent tipset") } - if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking parent tipset cid reference") + } else { + atomic.AddInt64(szWalk, int64(sz)) } // message are retained if within the inclMsgs boundary @@ -951,38 +956,52 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if inclMsgs < inclState { // we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we // synced from snapshot and have a long HotStoreMessageRetentionPolicy. - if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } - if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } } else { - if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil { + if sz, err := s.walkObject(hdr.Messages, visitor, fHot); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } - if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil { + if sz, err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil { return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } } } // messages and receipts outside of inclMsgs are included in the cold store if hdr.Height < inclMsgs && hdr.Height > 0 { - if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } - if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil { return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } } // state is only retained if within the inclState boundary, with the exception of genesis if hdr.Height >= inclState || hdr.Height == 0 { - if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil { + if sz, err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) + } else { + atomic.AddInt64(szWalk, int64(sz)) } atomic.AddInt64(scanCnt, 1) } @@ -1001,8 +1020,10 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if err != nil { return xerrors.Errorf("error computing cid reference to parent tipset") } - if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil { + if sz, err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil { return xerrors.Errorf("error walking parent tipset cid reference") + } else { + atomic.AddInt64(szWalk, int64(sz)) } for len(toWalk) > 0 { @@ -1047,123 +1068,129 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp } } - log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt) + log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt, "walk size", szWalk) return nil } -func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error { +func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) (int, error) { + var sz int visit, err := visitor.Visit(c) if err != nil { - return xerrors.Errorf("error visiting object: %w", err) + return 0, xerrors.Errorf("error visiting object: %w", err) } if !visit { - return nil + return sz, nil } if err := f(c); err != nil { if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } if c.Prefix().Codec != cid.DagCBOR { - return nil + return sz, nil } // check this before recursing if err := s.checkClosing(); err != nil { - return err + return 0, err } var links []cid.Cid err = s.view(c, func(data []byte) error { + sz += len(data) return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { links = append(links, c) }) }) if err != nil { - return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } for _, c := range links { - err := s.walkObject(c, visitor, f) + szLink, err := s.walkObject(c, visitor, f) if err != nil { - return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err) } + sz += szLink } - return nil + return sz, nil } // like walkObject, but the object may be potentially incomplete (references missing) -func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error { +func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) (int, error) { + sz := 0 visit, err := visitor.Visit(c) if err != nil { - return xerrors.Errorf("error visiting object: %w", err) + return 0, xerrors.Errorf("error visiting object: %w", err) } if !visit { - return nil + return sz, nil } // occurs check -- only for DAGs if c.Prefix().Codec == cid.DagCBOR { has, err := s.has(c) if err != nil { - return xerrors.Errorf("error occur checking %s: %w", c, err) + return 0, xerrors.Errorf("error occur checking %s: %w", c, err) } if !has { err = missing(c) if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } } if err := f(c); err != nil { if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } if c.Prefix().Codec != cid.DagCBOR { - return nil + return sz, nil } // check this before recursing if err := s.checkClosing(); err != nil { - return err + return sz, err } var links []cid.Cid err = s.view(c, func(data []byte) error { + sz += len(data) return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { links = append(links, c) }) }) if err != nil { - return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } for _, c := range links { - err := s.walkObjectIncomplete(c, visitor, f, missing) + szLink, err := s.walkObjectIncomplete(c, visitor, f, missing) if err != nil { - return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err) } + sz += szLink } - return nil + return sz, nil } // internal version used during compaction and related operations @@ -1528,7 +1555,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { missing = make(map[cid.Cid]struct{}) for c := range towalk { - err := s.walkObjectIncomplete(c, visitor, + _, err := s.walkObjectIncomplete(c, visitor, func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk diff --git a/blockstore/splitstore/splitstore_reify.go b/blockstore/splitstore/splitstore_reify.go index aa14f090a..07efedead 100644 --- a/blockstore/splitstore/splitstore_reify.go +++ b/blockstore/splitstore/splitstore_reify.go @@ -101,7 +101,7 @@ func (s *SplitStore) doReify(c cid.Cid) { defer s.txnLk.RUnlock() count := 0 - err := s.walkObjectIncomplete(c, newTmpVisitor(), + _, err := s.walkObjectIncomplete(c, newTmpVisitor(), func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk