From c1c25868cc6621a87a7da2f6c05fa160cf9676de Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 6 Jul 2021 15:09:04 +0300 Subject: [PATCH] improve comments --- blockstore/splitstore/splitstore.go | 77 ++++++++++++++++------------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 2fa191e7a..dc2ace461 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -114,13 +114,13 @@ type ChainAccessor interface { type SplitStore struct { compacting int32 // compaction (or warmp up) in progress critsection int32 // compaction critical section - closing int32 // the split store is closing + closing int32 // the splitstore is closing cfg *Config baseEpoch abi.ChainEpoch warmupEpoch abi.ChainEpoch - writeEpoch abi.ChainEpoch // for debug logging + writeEpoch abi.ChainEpoch // for debug logging only coldPurgeSize int @@ -140,7 +140,7 @@ type SplitStore struct { debug *debugLog - // protection for concurrent read/writes during compaction + // transactional protection for concurrent read/writes during compaction txnLk sync.RWMutex txnActive bool txnLookbackEpoch abi.ChainEpoch @@ -429,15 +429,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error { } // load warmup epoch from metadata ds - // if none, then the splitstore will warm up the hotstore at first head change notif - // by walking the current tipset bs, err = s.ds.Get(warmupEpochKey) switch err { case nil: s.warmupEpoch = bytesToEpoch(bs) case dstore.ErrNotFound: - // the hotstore hasn't warmed up, load the genesis into the hotstore + // the hotstore hasn't warmed up, start a concurrent warm up err = s.warmup(s.curTs) if err != nil { return xerrors.Errorf("error warming up: %w", err) @@ -447,8 +445,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error { return xerrors.Errorf("error loading warmup epoch: %w", err) } - // load markSetSize from metadata ds - // if none, the splitstore will compute it during warmup and update in every compaction + // load markSetSize from metadata ds to provide a size hint for marksets bs, err = s.ds.Get(markSetSizeKey) switch err { case nil: @@ -504,7 +501,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { } if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { - // we are currently compacting, do nothing and wait for the next head change + // we are currently compacting (or warming up); do nothing and wait for the next head change return nil } @@ -568,6 +565,7 @@ func (s *SplitStore) updateWriteEpoch() { } } +// transactionally protect a reference to an object func (s *SplitStore) trackTxnRef(c cid.Cid) error { if !s.txnActive { // not compacting @@ -586,6 +584,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid) error { return s.doTxnProtect(c, nil) } +// transactionally protect a batch of references func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { if !s.txnActive { // not compacting @@ -593,7 +592,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { } if s.txnRefs != nil { - // we haven't finished marking yet, so track the reference + // we haven't finished marking yet, so track the references s.txnRefsMx.Lock() for _, c := range cids { s.txnRefs[c] = struct{}{} @@ -618,6 +617,8 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error { return nil } +// transactionally protect a reference by walking the object and marking. +// concurrent markings are short circuited by checking the markset. func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) error { // Note: cold objects are deleted heaviest first, so the consituents of an object // cannot be deleted before the object itself. @@ -674,6 +675,9 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, batch map[cid.Cid]struct{}) erro return err } +// warmup acuiqres the compaction lock and spawns a goroutine to warm up the hotstore; +// this is necessary when we sync from a snapshot or when we enable the splitstore +// on top of an existing blockstore (which becomes the coldstore). func (s *SplitStore) warmup(curTs *types.TipSet) error { if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { return xerrors.Errorf("error locking compaction") @@ -697,6 +701,9 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error { return nil } +// the actual warmup procedure; it waslk the chain loading all state roots at the boundary +// and headers all the way up to genesis. +// objects are written in batches so as to minimize overhead. func (s *SplitStore) doWarmup(curTs *types.TipSet) error { epoch := curTs.Height() batchHot := make([]blocks.Block, 0, batchSize) @@ -772,7 +779,17 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { return nil } -// Compaction/GC Algorithm +// --- Compaction --- +// Compaction works transactionally with the following algorithm: +// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked. +// - We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis. +// - Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references. +// - We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge. +// - When running with a coldstore, we next copy all cold objects to the coldstore. +// - At this point we are ready to begin purging: +// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references) +// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live +// - We then end the transaction and compact/gc the hotstore. func (s *SplitStore) compact(curTs *types.TipSet) { start := time.Now() err := s.doCompact(curTs) @@ -801,7 +818,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // 0. Prepare the transaction s.prepareTxnProtect(lookbackEpoch) - // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots + // and messages until the boundary epoch. log.Info("marking reachable objects") startMark := time.Now() @@ -928,7 +946,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { }) if err != nil { - return xerrors.Errorf("error collecting candidate cold objects: %w", err) + return xerrors.Errorf("error collecting cold objects: %w", err) } log.Infow("cold collection done", "took", time.Since(startCollect)) @@ -958,7 +976,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } // 4. sort cold objects so that the dags with most references are deleted first - // this ensures that we can't refer to a dag with its consituents already deleted + // this ensures that we can't refer to a dag with its consituents already deleted, ie + // we lave no dangling references. log.Info("sorting cold objects") startSort := time.Now() err = s.sortObjects(cold) @@ -1144,7 +1163,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro return nil } -// like walkObject, but the object may be potentially incomplete (references missing from the hotstore) +// like walkObject, but the object may be potentially incomplete (references missing) func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error { if !walked.Visit(c) { return nil @@ -1238,20 +1257,6 @@ func (s *SplitStore) isOldBlockHeader(c cid.Cid, epoch abi.ChainEpoch) (isOldBlo return isOldBlock, err } -func (s *SplitStore) isBlockHeader(c cid.Cid) (isBlock bool, err error) { - if c.Prefix().Codec != cid.DagCBOR { - return false, nil - } - - err = s.view(c, func(data []byte) error { - var hdr types.BlockHeader - isBlock = hdr.UnmarshalCBOR(bytes.NewBuffer(data)) == nil - return nil - }) - - return isBlock, err -} - func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { batch := make([]blocks.Block, 0, batchSize) @@ -1279,13 +1284,16 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { if len(batch) > 0 { err := s.cold.PutMany(batch) if err != nil { - return xerrors.Errorf("error putting cold to coldstore: %w", err) + return xerrors.Errorf("error putting batch to coldstore: %w", err) } } return nil } +// sorts a slice of objects heaviest first -- it's a little expensive but worth the +// guarantee that we don't leave dangling references behind, e.g. if we die in the middle +// of a purge. func (s *SplitStore) sortObjects(cids []cid.Cid) error { // we cache the keys to avoid making a gazillion of strings keys := make(map[cid.Cid]string) @@ -1431,12 +1439,13 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error { }) } -// I really don't like having this code, but we seem to have some DAG references with missing -// constituents. During testing in mainnet *some* of these references *sometimes* appeared after a -// little bit. +// I really don't like having this code, but we seem to have some occasional DAG references with +// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared +// after a little bit. // We need to figure out where they are coming from and eliminate that vector, but until then we // have this gem[TM]. -// My best guess is that they are parent message receipts or yet to be computed state roots. +// My best guess is that they are parent message receipts or yet to be computed state roots; magik +// thinks the cause may be block validation. func (s *SplitStore) waitForMissingRefs() { s.txnLk.Lock() missing := s.txnMissing