diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index e9fe92f2f..b710c4ee7 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -236,17 +236,19 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) { } if has { - // treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate - // writes on Flush. When we have options in the API, the vm can explicitly signal that this is - // an implicit Write. + // treat it as an implicit (recursive) Write, absence options -- the vm uses this check to avoid + // duplicate writes on Copy. + // When we have options in the API, the vm can explicitly signal that this is an implicit Write. s.trackWrite(c, true) // also make sure the object is considered live during compaction in case we have already - // flushed pending writes and started compaction + // flushed pending writes and started compaction. + // in case of a race with purge, this will return a track error, which we can use to + // signal to the vm that the object is not fully present. trackErr := s.trackTxnRef(c, true) // if we failed to track the object and all its dependencies, then return false so as - // to cause the vm to recompute + // to cause the vm to copy return trackErr == nil, nil } @@ -673,6 +675,16 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) error { err = s.txnProtect.Mark(c) } else { err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error { + // this check is necessary to avoid races because objects are purged in random order + has, err := s.hot.Has(c) + if err != nil { + return err + } + + if !has { + return xerrors.Errorf("object (cid: %s) has been purged", c) + } + return s.txnProtect.Mark(c) }) } @@ -949,28 +961,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error creating mark set: %w", err) } defer markSet.Close() //nolint:errcheck - - // create the pruge protect filter - s.txnLk.Lock() - s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize) - if err != nil { - s.txnLk.Unlock() - return xerrors.Errorf("error creating transactional mark set: %w", err) - } - s.txnLk.Unlock() - - defer func() { - s.txnLk.Lock() - _ = s.txnProtect.Close() - s.txnProtect = nil - s.txnLk.Unlock() - }() - defer s.debug.Flush() - // flush pending writes to update the tracker - s.flushPendingWrites(false) - // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch) startMark := time.Now() @@ -992,6 +984,28 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("marking done", "took", time.Since(startMark), "marked", count) + // create the transaction protect filter + s.txnLk.Lock() + s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize) + if err != nil { + s.txnLk.Unlock() + return xerrors.Errorf("error creating transactional mark set: %w", err) + } + s.txnLk.Unlock() + + defer func() { + s.txnLk.Lock() + _ = s.txnProtect.Close() + s.txnProtect = nil + s.txnLk.Unlock() + }() + + // flush pending writes to update the tracker + log.Info("flushing pending writes") + startFlush := time.Now() + s.flushPendingWrites(false) + log.Infow("flushing done", "took", time.Since(startFlush)) + // 2. move cold unreachable objects to the coldstore log.Info("collecting cold objects") startCollect := time.Now() @@ -1071,6 +1085,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("moving done", "took", time.Since(startMove)) } + // 2.3 purge cold objects from the hotstore log.Info("purging cold objects from the hotstore") startPurge := time.Now() @@ -1293,7 +1308,7 @@ func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error { deadCids := make([]cid.Cid, 0, batchSize) var purgeCnt, liveCnt int defer func() { - log.Infow("purged objects", "purged", purgeCnt, "live", liveCnt) + log.Infow("purged cold objects", "purged", purgeCnt, "live", liveCnt) }() return s.purgeBatch(cids,