bubble up dependent txn ref errors

This cause Has to return false if it fails to traverse/protect all links, which would cause
the vm to recompute.
This commit is contained in:
vyzo 2021-07-02 20:57:46 +03:00
parent 637fbf6c5b
commit b87295db93

View File

@ -237,9 +237,11 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
// also make sure the object is considered live during compaction in case we have already
// flushed pending writes and started compaction
s.trackTxnRef(c, true)
trackErr := s.trackTxnRef(c, true)
return true, err
// if we failed to track the object and all its dependencies, then return false so as
// to cause the vm to recompute
return trackErr == nil, nil
}
return s.cold.Has(c)
@ -253,7 +255,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
switch err {
case nil:
s.trackTxnRef(cid, false)
err = s.trackTxnRef(cid, false)
return blk, err
case bstore.ErrNotFound:
@ -285,7 +287,7 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) {
switch err {
case nil:
s.trackTxnRef(cid, false)
err = s.trackTxnRef(cid, false)
return size, err
case bstore.ErrNotFound:
@ -315,7 +317,7 @@ func (s *SplitStore) Put(blk blocks.Block) error {
err := s.hot.Put(blk)
if err == nil {
s.trackWrite(blk.Cid(), false)
s.trackTxnRef(blk.Cid(), false)
err = s.trackTxnRef(blk.Cid(), false)
}
return err
@ -333,7 +335,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
err := s.hot.PutMany(blks)
if err == nil {
s.trackWriteMany(batch)
s.trackTxnRefMany(batch)
err = s.trackTxnRefMany(batch)
}
return err
@ -385,8 +387,8 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
err := s.hot.View(cid, cb)
switch err {
case nil:
s.trackTxnRef(cid, false)
return nil
err = s.trackTxnRef(cid, false)
return err
case bstore.ErrNotFound:
s.mx.Lock()
@ -643,10 +645,10 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
s.debug.LogWriteMany(s.curTs, cids, epoch)
}
func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) {
func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) error {
if s.txnProtect == nil {
// not compacting
return
return nil
}
// NOTE: this occurs check assumes a markset without false positives, which is currently the case
@ -654,11 +656,11 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) {
has, err := s.txnProtect.Has(c)
if err != nil {
log.Errorf("error occur checking object (cid: %s) for compaction transaction: %s", c, err)
return
return err
}
if has {
return
return nil
}
if c.Prefix().Codec != cid.DagCBOR || !implicit {
@ -671,19 +673,26 @@ func (s *SplitStore) trackTxnRef(c cid.Cid, implicit bool) {
if err != nil {
log.Warnf("error protecting object (cid: %s) from compaction transaction: %s", c, err)
return
}
return err
}
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {
func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) error {
if s.txnProtect == nil {
// not compacting
return
return nil
}
var err error
for _, c := range cids {
s.trackTxnRef(c, false)
err2 := s.trackTxnRef(c, false)
if err2 != nil {
err = multierr.Combine(err, err2)
}
}
return err
}
func (s *SplitStore) background() {