diff --git a/blockstore/splitstore/debug.go b/blockstore/splitstore/debug.go index 9fea63a5f..2310612d5 100644 --- a/blockstore/splitstore/debug.go +++ b/blockstore/splitstore/debug.go @@ -123,7 +123,7 @@ func (d *debugLog) LogWriteMany(curTs *types.TipSet, blks []blocks.Block, writeE } } -func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid, writeEpoch abi.ChainEpoch) { +func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid) { if d == nil { return } @@ -133,7 +133,7 @@ func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid, writeEpoch abi.Chai d.moveCnt++ - _, err := fmt.Fprintf(d.moveLog, "%d %s %d\n", curTs.Height(), cid, writeEpoch) + _, err := fmt.Fprintf(d.moveLog, "%d %s\n", curTs.Height(), cid) if err != nil { log.Warnf("error writing move log: %s", err) } diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go index ef14a2fc6..0c057a1df 100644 --- a/blockstore/splitstore/markset.go +++ b/blockstore/splitstore/markset.go @@ -29,7 +29,9 @@ type MarkSetEnv interface { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { switch mtype { case "", "bloom": - return NewBloomMarkSetEnv() + return NewBloomMarkSetEnv(false) + case "bloomts": + return NewBloomMarkSetEnv(true) case "bolt": return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt")) default: diff --git a/blockstore/splitstore/markset_bloom.go b/blockstore/splitstore/markset_bloom.go index c213436c8..cffd4f23a 100644 --- a/blockstore/splitstore/markset_bloom.go +++ b/blockstore/splitstore/markset_bloom.go @@ -15,19 +15,22 @@ const ( BloomFilterProbability = 0.01 ) -type BloomMarkSetEnv struct{} +type BloomMarkSetEnv struct { + ts bool +} var _ MarkSetEnv = (*BloomMarkSetEnv)(nil) type BloomMarkSet struct { salt []byte bf *bbloom.Bloom + ts bool } var _ MarkSet = (*BloomMarkSet)(nil) -func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) { - return &BloomMarkSetEnv{}, nil +func NewBloomMarkSetEnv(ts bool) (*BloomMarkSetEnv, error) { + return &BloomMarkSetEnv{ts: ts}, nil } func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { @@ -47,7 +50,7 @@ func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { return nil, xerrors.Errorf("error creating bloom filter: %w", err) } - return &BloomMarkSet{salt: salt, bf: bf}, nil + return &BloomMarkSet{salt: salt, bf: bf, ts: e.ts}, nil } func (e *BloomMarkSetEnv) Close() error { @@ -64,12 +67,20 @@ func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte { } func (s *BloomMarkSet) Mark(cid cid.Cid) error { - s.bf.Add(s.saltedKey(cid)) + if s.ts { + s.bf.AddTS(s.saltedKey(cid)) + } else { + s.bf.Add(s.saltedKey(cid)) + } + return nil } func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) { - return s.bf.Has(s.saltedKey(cid)), nil + if s.ts { + return s.bf.HasTS(s.saltedKey(cid)), nil + } + return s.bf.HasTS(s.saltedKey(cid)), nil } func (s *BloomMarkSet) Close() error { diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 17ece419f..7f84d7a7d 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -141,6 +141,11 @@ type SplitStore struct { cancel func() debug *debugLog + + // protection for concurrent read/writes during compaction + txnLk sync.RWMutex + txnEnv MarkSetEnv + txnProtect MarkSet } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -162,6 +167,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co return nil, err } + // the txn markset env + txnEnv, err := OpenMarkSetEnv(path, "bloomts") + if err != nil { + _ = tracker.Close() + _ = env.Close() + return nil, err + } + // and now we can make a SplitStore ss := &SplitStore{ cfg: cfg, @@ -170,6 +183,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co cold: cold, tracker: tracker, env: env, + txnEnv: txnEnv, coldPurgeSize: defaultColdPurgeSize, } @@ -198,9 +212,16 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error { } func (s *SplitStore) Has(cid cid.Cid) (bool, error) { + s.txnLk.RLock() + defer s.txnLk.RUnlock() + has, err := s.hot.Has(cid) if err != nil || has { + if has && s.txnProtect != nil { + err = s.txnProtect.Mark(cid) + } + return has, err } @@ -208,11 +229,18 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) { } func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { + s.txnLk.RLock() + defer s.txnLk.RUnlock() + blk, err := s.hot.Get(cid) switch err { case nil: - return blk, nil + if s.txnProtect != nil { + err = s.txnProtect.Mark(cid) + } + + return blk, err case bstore.ErrNotFound: s.mx.Lock() @@ -236,11 +264,18 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { } func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { + s.txnLk.RLock() + defer s.txnLk.RUnlock() + size, err := s.hot.GetSize(cid) switch err { case nil: - return size, nil + if s.txnProtect != nil { + err = s.txnProtect.Mark(cid) + } + + return size, err case bstore.ErrNotFound: s.mx.Lock() @@ -273,6 +308,9 @@ func (s *SplitStore) Put(blk blocks.Block) error { epoch := s.writeEpoch s.mx.Unlock() + s.txnLk.RLock() + defer s.txnLk.RUnlock() + err := s.tracker.Put(blk.Cid(), epoch) if err != nil { log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err) @@ -281,7 +319,12 @@ func (s *SplitStore) Put(blk blocks.Block) error { s.debug.LogWrite(curTs, blk, epoch) - return s.hot.Put(blk) + err = s.hot.Put(blk) + if err == nil && s.txnProtect != nil { + err = s.txnProtect.Mark(blk.Cid()) + } + + return err } func (s *SplitStore) PutMany(blks []blocks.Block) error { @@ -300,6 +343,9 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { batch = append(batch, blk.Cid()) } + s.txnLk.RLock() + defer s.txnLk.RUnlock() + err := s.tracker.PutBatch(batch, epoch) if err != nil { log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err) @@ -308,7 +354,17 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { s.debug.LogWriteMany(curTs, blks, epoch) - return s.hot.PutMany(blks) + err = s.hot.PutMany(blks) + if err == nil && s.txnProtect != nil { + for _, cid := range batch { + err2 := s.txnProtect.Mark(cid) + if err2 != nil { + err = multierr.Combine(err, err2) + } + } + } + + return err } func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { @@ -351,8 +407,18 @@ func (s *SplitStore) HashOnRead(enabled bool) { } func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { + s.txnLk.RLock() + defer s.txnLk.RUnlock() + err := s.hot.View(cid, cb) switch err { + case nil: + if s.txnProtect != nil { + err = s.txnProtect.Mark(cid) + } + + return err + case bstore.ErrNotFound: s.mx.Lock() warmup := s.warmupEpoch > 0 @@ -774,6 +840,24 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } defer markSet.Close() //nolint:errcheck + // create the pruge protect filter + s.txnLk.Lock() + s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize) + if err != nil { + s.txnLk.Unlock() + return xerrors.Errorf("error creating transactional mark set: %w", err) + } + s.txnLk.Unlock() + + defer func() { + s.txnLk.Lock() + _ = s.txnProtect.Close() + s.txnProtect = nil + s.txnLk.Unlock() + }() + + defer s.debug.Flush() + // 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch) startMark := time.Now() @@ -828,13 +912,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { cold = append(cold, cid) coldCnt++ - s.debug.LogMove(curTs, cid, writeEpoch) - return nil }) - s.debug.Flush() - if err != nil { return xerrors.Errorf("error collecting cold objects: %w", err) } @@ -867,24 +947,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("moving done", "took", time.Since(startMove)) - // 2.3 delete cold objects from the hotstore + // 2.3 purge cold objects from the hotstore log.Info("purging cold objects from the hotstore") startPurge := time.Now() - err = s.purgeBlocks(cold) + err = s.purge(curTs, cold) if err != nil { return xerrors.Errorf("error purging cold blocks: %w", err) } log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) - // 2.4 remove the tracker tracking for cold objects - startPurge = time.Now() - log.Info("purging cold objects from tracker") - err = s.purgeTracking(cold) - if err != nil { - return xerrors.Errorf("error purging tracking for cold blocks: %w", err) - } - log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) - // we are done; do some housekeeping err = s.tracker.Sync() if err != nil { @@ -1067,12 +1138,40 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro return nil } -func (s *SplitStore) purgeBlocks(cids []cid.Cid) error { - return s.purgeBatch(cids, s.hot.DeleteMany) -} +func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error { + return s.purgeBatch(cids, + func(cids []cid.Cid) error { + deadCids := make([]cid.Cid, 0, len(cids)) -func (s *SplitStore) purgeTracking(cids []cid.Cid) error { - return s.purgeBatch(cids, s.tracker.DeleteBatch) + s.txnLk.Lock() + defer s.txnLk.Unlock() + + for _, c := range cids { + live, err := s.txnProtect.Has(c) + if err != nil { + return xerrors.Errorf("error checking for liveness: %w", err) + } + + if live { + continue + } + + deadCids = append(deadCids, c) + s.debug.LogMove(curTs, c) + } + + err := s.tracker.DeleteBatch(deadCids) + if err != nil { + return xerrors.Errorf("error purging tracking: %w", err) + } + + err = s.hot.DeleteMany(deadCids) + if err != nil { + return xerrors.Errorf("error purging cold objects: %w", err) + } + + return nil + }) } func (s *SplitStore) gcHotstore() {