avoid clown shoes: only walk links for tracking in implicit writes/refs
This commit is contained in:
parent
484dfaebce
commit
9d6bcd7705
@ -150,7 +150,8 @@ type SplitStore struct {
|
|||||||
txnProtect MarkSet
|
txnProtect MarkSet
|
||||||
|
|
||||||
// pending write set
|
// pending write set
|
||||||
pendingWrites map[cid.Cid]struct{}
|
pendingWrites map[cid.Cid]struct{}
|
||||||
|
pendingWritesImplicit map[cid.Cid]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||||
@ -232,11 +233,11 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
|
|||||||
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
|
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
|
||||||
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
|
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
|
||||||
// an implicit Write.
|
// an implicit Write.
|
||||||
s.trackWrite(c)
|
s.trackWrite(c, true)
|
||||||
|
|
||||||
// also make sure the object is considered live during compaction in case we have already
|
// also make sure the object is considered live during compaction in case we have already
|
||||||
// flushed pending writes and started compaction
|
// flushed pending writes and started compaction
|
||||||
s.trackTxnRef(c)
|
s.trackTxnRef(c, true)
|
||||||
|
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
@ -252,7 +253,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
|
|||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
s.trackTxnRef(cid)
|
s.trackTxnRef(cid, false)
|
||||||
return blk, err
|
return blk, err
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
@ -284,7 +285,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
|
|||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
s.trackTxnRef(cid)
|
s.trackTxnRef(cid, false)
|
||||||
return size, err
|
return size, err
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
@ -313,8 +314,8 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
|||||||
|
|
||||||
err := s.hot.Put(blk)
|
err := s.hot.Put(blk)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
s.trackWrite(blk.Cid())
|
s.trackWrite(blk.Cid(), false)
|
||||||
s.trackTxnRef(blk.Cid())
|
s.trackTxnRef(blk.Cid(), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -384,7 +385,7 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
|
|||||||
err := s.hot.View(cid, cb)
|
err := s.hot.View(cid, cb)
|
||||||
switch err {
|
switch err {
|
||||||
case nil:
|
case nil:
|
||||||
s.trackTxnRef(cid)
|
s.trackTxnRef(cid, false)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case bstore.ErrNotFound:
|
case bstore.ErrNotFound:
|
||||||
@ -564,7 +565,7 @@ func (s *SplitStore) updateWriteEpoch() {
|
|||||||
|
|
||||||
// Unfortunately we can't just directly tracker.Put one by one, as it is ridiculously slow with
|
// Unfortunately we can't just directly tracker.Put one by one, as it is ridiculously slow with
|
||||||
// bbolt because of syncing (order of 10ms), so we batch them.
|
// bbolt because of syncing (order of 10ms), so we batch them.
|
||||||
func (s *SplitStore) trackWrite(c cid.Cid) {
|
func (s *SplitStore) trackWrite(c cid.Cid, implicit bool) {
|
||||||
s.mx.Lock()
|
s.mx.Lock()
|
||||||
defer s.mx.Unlock()
|
defer s.mx.Unlock()
|
||||||
|
|
||||||
@ -603,6 +604,11 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
|
|||||||
cids = append(cids, c)
|
cids = append(cids, c)
|
||||||
seen[c] = struct{}{}
|
seen[c] = struct{}{}
|
||||||
|
|
||||||
|
_, implicit := s.pendingWritesImplicit[c]
|
||||||
|
if !implicit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// recursively walk dags to propagate dependent references
|
// recursively walk dags to propagate dependent references
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
continue
|
continue
|
||||||
@ -622,7 +628,11 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
|
|||||||
log.Warnf("error tracking dependent writes for cid %s: %s", c, err)
|
log.Warnf("error tracking dependent writes for cid %s: %s", c, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pendingWrites = make(map[cid.Cid]struct{})
|
s.pendingWrites = make(map[cid.Cid]struct{})
|
||||||
|
if len(s.pendingWritesImplicit) > 0 {
|
||||||
|
s.pendingWritesImplicit = make(map[cid.Cid]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
epoch := s.writeEpoch
|
epoch := s.writeEpoch
|
||||||
err := s.tracker.PutBatch(cids, epoch)
|
err := s.tracker.PutBatch(cids, epoch)
|
||||||
@ -633,7 +643,7 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
|
|||||||
s.debug.LogWriteMany(s.curTs, cids, epoch)
|
s.debug.LogWriteMany(s.curTs, cids, epoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) trackTxnRef(c cid.Cid) {
|
func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) {
|
||||||
if s.txnProtect == nil {
|
if s.txnProtect == nil {
|
||||||
// not compacting
|
// not compacting
|
||||||
return
|
return
|
||||||
@ -653,7 +663,7 @@ func (s *SplitStore) trackTxnRef(c cid.Cid) {
|
|||||||
|
|
||||||
if c.Prefix().Codec != cid.DagCBOR {
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
err = s.txnProtect.Mark(c)
|
err = s.txnProtect.Mark(c)
|
||||||
} else {
|
} else if implicit {
|
||||||
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
|
||||||
return s.txnProtect.Mark(c)
|
return s.txnProtect.Mark(c)
|
||||||
})
|
})
|
||||||
@ -672,7 +682,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, c := range cids {
|
for _, c := range cids {
|
||||||
s.trackTxnRef(c)
|
s.trackTxnRef(c, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user