fix transactional race during compaction
It is possible for an object to be written or recreated (and checked with Has) after the mark completes and during the purge; if this happens we will purge a live block.
This commit is contained in:
parent
649b7dd162
commit
cb665d07e0
@ -123,7 +123,7 @@ func (d *debugLog) LogWriteMany(curTs *types.TipSet, blks []blocks.Block, writeE
|
||||
}
|
||||
}
|
||||
|
||||
func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid, writeEpoch abi.ChainEpoch) {
|
||||
func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid) {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
@ -133,7 +133,7 @@ func (d *debugLog) LogMove(curTs *types.TipSet, cid cid.Cid, writeEpoch abi.Chai
|
||||
|
||||
d.moveCnt++
|
||||
|
||||
_, err := fmt.Fprintf(d.moveLog, "%d %s %d\n", curTs.Height(), cid, writeEpoch)
|
||||
_, err := fmt.Fprintf(d.moveLog, "%d %s\n", curTs.Height(), cid)
|
||||
if err != nil {
|
||||
log.Warnf("error writing move log: %s", err)
|
||||
}
|
||||
|
@ -29,7 +29,9 @@ type MarkSetEnv interface {
|
||||
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
|
||||
switch mtype {
|
||||
case "", "bloom":
|
||||
return NewBloomMarkSetEnv()
|
||||
return NewBloomMarkSetEnv(false)
|
||||
case "bloomts":
|
||||
return NewBloomMarkSetEnv(true)
|
||||
case "bolt":
|
||||
return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt"))
|
||||
default:
|
||||
|
@ -15,19 +15,22 @@ const (
|
||||
BloomFilterProbability = 0.01
|
||||
)
|
||||
|
||||
type BloomMarkSetEnv struct{}
|
||||
type BloomMarkSetEnv struct {
|
||||
ts bool
|
||||
}
|
||||
|
||||
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
|
||||
|
||||
type BloomMarkSet struct {
|
||||
salt []byte
|
||||
bf *bbloom.Bloom
|
||||
ts bool
|
||||
}
|
||||
|
||||
var _ MarkSet = (*BloomMarkSet)(nil)
|
||||
|
||||
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
|
||||
return &BloomMarkSetEnv{}, nil
|
||||
func NewBloomMarkSetEnv(ts bool) (*BloomMarkSetEnv, error) {
|
||||
return &BloomMarkSetEnv{ts: ts}, nil
|
||||
}
|
||||
|
||||
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
|
||||
@ -47,7 +50,7 @@ func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
|
||||
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
|
||||
}
|
||||
|
||||
return &BloomMarkSet{salt: salt, bf: bf}, nil
|
||||
return &BloomMarkSet{salt: salt, bf: bf, ts: e.ts}, nil
|
||||
}
|
||||
|
||||
func (e *BloomMarkSetEnv) Close() error {
|
||||
@ -64,12 +67,20 @@ func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
|
||||
s.bf.Add(s.saltedKey(cid))
|
||||
if s.ts {
|
||||
s.bf.AddTS(s.saltedKey(cid))
|
||||
} else {
|
||||
s.bf.Add(s.saltedKey(cid))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
|
||||
return s.bf.Has(s.saltedKey(cid)), nil
|
||||
if s.ts {
|
||||
return s.bf.HasTS(s.saltedKey(cid)), nil
|
||||
}
|
||||
return s.bf.HasTS(s.saltedKey(cid)), nil
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Close() error {
|
||||
|
@ -141,6 +141,11 @@ type SplitStore struct {
|
||||
cancel func()
|
||||
|
||||
debug *debugLog
|
||||
|
||||
// protection for concurrent read/writes during compaction
|
||||
txnLk sync.RWMutex
|
||||
txnEnv MarkSetEnv
|
||||
txnProtect MarkSet
|
||||
}
|
||||
|
||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||
@ -162,6 +167,14 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// the txn markset env
|
||||
txnEnv, err := OpenMarkSetEnv(path, "bloomts")
|
||||
if err != nil {
|
||||
_ = tracker.Close()
|
||||
_ = env.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// and now we can make a SplitStore
|
||||
ss := &SplitStore{
|
||||
cfg: cfg,
|
||||
@ -170,6 +183,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
cold: cold,
|
||||
tracker: tracker,
|
||||
env: env,
|
||||
txnEnv: txnEnv,
|
||||
|
||||
coldPurgeSize: defaultColdPurgeSize,
|
||||
}
|
||||
@ -198,9 +212,16 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error {
|
||||
}
|
||||
|
||||
func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
has, err := s.hot.Has(cid)
|
||||
|
||||
if err != nil || has {
|
||||
if has && s.txnProtect != nil {
|
||||
err = s.txnProtect.Mark(cid)
|
||||
}
|
||||
|
||||
return has, err
|
||||
}
|
||||
|
||||
@ -208,11 +229,18 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
|
||||
}
|
||||
|
||||
func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
blk, err := s.hot.Get(cid)
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
return blk, nil
|
||||
if s.txnProtect != nil {
|
||||
err = s.txnProtect.Mark(cid)
|
||||
}
|
||||
|
||||
return blk, err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
s.mx.Lock()
|
||||
@ -236,11 +264,18 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
|
||||
}
|
||||
|
||||
func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
size, err := s.hot.GetSize(cid)
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
return size, nil
|
||||
if s.txnProtect != nil {
|
||||
err = s.txnProtect.Mark(cid)
|
||||
}
|
||||
|
||||
return size, err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
s.mx.Lock()
|
||||
@ -273,6 +308,9 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
||||
epoch := s.writeEpoch
|
||||
s.mx.Unlock()
|
||||
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
err := s.tracker.Put(blk.Cid(), epoch)
|
||||
if err != nil {
|
||||
log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err)
|
||||
@ -281,7 +319,12 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
||||
|
||||
s.debug.LogWrite(curTs, blk, epoch)
|
||||
|
||||
return s.hot.Put(blk)
|
||||
err = s.hot.Put(blk)
|
||||
if err == nil && s.txnProtect != nil {
|
||||
err = s.txnProtect.Mark(blk.Cid())
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SplitStore) PutMany(blks []blocks.Block) error {
|
||||
@ -300,6 +343,9 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
|
||||
batch = append(batch, blk.Cid())
|
||||
}
|
||||
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
err := s.tracker.PutBatch(batch, epoch)
|
||||
if err != nil {
|
||||
log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err)
|
||||
@ -308,7 +354,17 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
|
||||
|
||||
s.debug.LogWriteMany(curTs, blks, epoch)
|
||||
|
||||
return s.hot.PutMany(blks)
|
||||
err = s.hot.PutMany(blks)
|
||||
if err == nil && s.txnProtect != nil {
|
||||
for _, cid := range batch {
|
||||
err2 := s.txnProtect.Mark(cid)
|
||||
if err2 != nil {
|
||||
err = multierr.Combine(err, err2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
@ -351,8 +407,18 @@ func (s *SplitStore) HashOnRead(enabled bool) {
|
||||
}
|
||||
|
||||
func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
err := s.hot.View(cid, cb)
|
||||
switch err {
|
||||
case nil:
|
||||
if s.txnProtect != nil {
|
||||
err = s.txnProtect.Mark(cid)
|
||||
}
|
||||
|
||||
return err
|
||||
|
||||
case bstore.ErrNotFound:
|
||||
s.mx.Lock()
|
||||
warmup := s.warmupEpoch > 0
|
||||
@ -774,6 +840,24 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
defer markSet.Close() //nolint:errcheck
|
||||
|
||||
// create the pruge protect filter
|
||||
s.txnLk.Lock()
|
||||
s.txnProtect, err = s.txnEnv.Create("protected", s.markSetSize)
|
||||
if err != nil {
|
||||
s.txnLk.Unlock()
|
||||
return xerrors.Errorf("error creating transactional mark set: %w", err)
|
||||
}
|
||||
s.txnLk.Unlock()
|
||||
|
||||
defer func() {
|
||||
s.txnLk.Lock()
|
||||
_ = s.txnProtect.Close()
|
||||
s.txnProtect = nil
|
||||
s.txnLk.Unlock()
|
||||
}()
|
||||
|
||||
defer s.debug.Flush()
|
||||
|
||||
// 1. mark reachable objects by walking the chain from the current epoch to the boundary epoch
|
||||
log.Infow("marking reachable blocks", "currentEpoch", currentEpoch, "boundaryEpoch", boundaryEpoch)
|
||||
startMark := time.Now()
|
||||
@ -828,13 +912,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
cold = append(cold, cid)
|
||||
coldCnt++
|
||||
|
||||
s.debug.LogMove(curTs, cid, writeEpoch)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
s.debug.Flush()
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error collecting cold objects: %w", err)
|
||||
}
|
||||
@ -867,24 +947,15 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
}
|
||||
log.Infow("moving done", "took", time.Since(startMove))
|
||||
|
||||
// 2.3 delete cold objects from the hotstore
|
||||
// 2.3 purge cold objects from the hotstore
|
||||
log.Info("purging cold objects from the hotstore")
|
||||
startPurge := time.Now()
|
||||
err = s.purgeBlocks(cold)
|
||||
err = s.purge(curTs, cold)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging cold blocks: %w", err)
|
||||
}
|
||||
log.Infow("purging cold from hotstore done", "took", time.Since(startPurge))
|
||||
|
||||
// 2.4 remove the tracker tracking for cold objects
|
||||
startPurge = time.Now()
|
||||
log.Info("purging cold objects from tracker")
|
||||
err = s.purgeTracking(cold)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging tracking for cold blocks: %w", err)
|
||||
}
|
||||
log.Infow("purging cold from tracker done", "took", time.Since(startPurge))
|
||||
|
||||
// we are done; do some housekeeping
|
||||
err = s.tracker.Sync()
|
||||
if err != nil {
|
||||
@ -1067,12 +1138,40 @@ func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SplitStore) purgeBlocks(cids []cid.Cid) error {
|
||||
return s.purgeBatch(cids, s.hot.DeleteMany)
|
||||
}
|
||||
func (s *SplitStore) purge(curTs *types.TipSet, cids []cid.Cid) error {
|
||||
return s.purgeBatch(cids,
|
||||
func(cids []cid.Cid) error {
|
||||
deadCids := make([]cid.Cid, 0, len(cids))
|
||||
|
||||
func (s *SplitStore) purgeTracking(cids []cid.Cid) error {
|
||||
return s.purgeBatch(cids, s.tracker.DeleteBatch)
|
||||
s.txnLk.Lock()
|
||||
defer s.txnLk.Unlock()
|
||||
|
||||
for _, c := range cids {
|
||||
live, err := s.txnProtect.Has(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking for liveness: %w", err)
|
||||
}
|
||||
|
||||
if live {
|
||||
continue
|
||||
}
|
||||
|
||||
deadCids = append(deadCids, c)
|
||||
s.debug.LogMove(curTs, c)
|
||||
}
|
||||
|
||||
err := s.tracker.DeleteBatch(deadCids)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging tracking: %w", err)
|
||||
}
|
||||
|
||||
err = s.hot.DeleteMany(deadCids)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error purging cold objects: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SplitStore) gcHotstore() {
|
||||
|
Loading…
Reference in New Issue
Block a user