transitively track dags from implicit writes in Has

This commit is contained in:
vyzo 2021-07-02 13:27:56 +03:00
parent 4de0cd9fcb
commit 982867317e

View File

@ -218,11 +218,11 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error {
return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint
}
func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
func (s *SplitStore) Has(c cid.Cid) (bool, error) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
has, err := s.hot.Has(cid)
has, err := s.hot.Has(c)
if err != nil {
return has, err
@ -232,20 +232,38 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) {
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is
// an implicit Write.
s.trackWrite(cid)
// we also walk dags for links so that the reference applies transitively to children.
if c.Prefix().Codec != cid.DagCBOR {
s.trackWrite(c)
} else {
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
s.trackWrite(c)
return nil
})
if err != nil {
log.Errorf("error transitively tracking cid %s: %s", c, err)
}
}
// also make sure the object is considered live during compaction
if s.txnProtect != nil {
err = s.txnProtect.Mark(cid)
if c.Prefix().Codec != cid.DagCBOR {
err = s.txnProtect.Mark(c)
} else {
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
return s.txnProtect.Mark(c)
})
}
if err != nil {
log.Errorf("error protecting object in compaction transaction: %s", err)
log.Errorf("error protecting object (cid: %s) in compaction transaction: %s", c, err)
}
}
return true, err
}
return s.cold.Has(cid)
return s.cold.Has(c)
}
func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {
@ -353,7 +371,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
s.txnLk.RLock()
defer s.txnLk.RUnlock()
s.trackWrites(batch)
s.trackWriteMany(batch)
err := s.hot.PutMany(blks)
if err == nil && s.txnProtect != nil {
@ -613,7 +631,7 @@ func (s *SplitStore) trackWrite(c cid.Cid) {
}
// and also combine batch writes into them
func (s *SplitStore) trackWrites(cids []cid.Cid) {
func (s *SplitStore) trackWriteMany(cids []cid.Cid) {
s.mx.Lock()
defer s.mx.Unlock()