diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index cf3c07a62..bfd5dc343 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -218,11 +218,11 @@ func (s *SplitStore) DeleteMany(_ []cid.Cid) error { return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint } -func (s *SplitStore) Has(cid cid.Cid) (bool, error) { +func (s *SplitStore) Has(c cid.Cid) (bool, error) { s.txnLk.RLock() defer s.txnLk.RUnlock() - has, err := s.hot.Has(cid) + has, err := s.hot.Has(c) if err != nil { return has, err @@ -232,20 +232,38 @@ func (s *SplitStore) Has(cid cid.Cid) (bool, error) { // treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate // writes on Flush. When we have options in the API, the vm can explicitly signal that this is // an implicit Write. - s.trackWrite(cid) + // we also walk dags for links so that the reference applies transitively to children. + if c.Prefix().Codec != cid.DagCBOR { + s.trackWrite(c) + } else { + err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error { + s.trackWrite(c) + return nil + }) + if err != nil { + log.Errorf("error transitively tracking cid %s: %s", c, err) + } + } // also make sure the object is considered live during compaction if s.txnProtect != nil { - err = s.txnProtect.Mark(cid) + if c.Prefix().Codec != cid.DagCBOR { + err = s.txnProtect.Mark(c) + } else { + err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error { + return s.txnProtect.Mark(c) + }) + } + if err != nil { - log.Errorf("error protecting object in compaction transaction: %s", err) + log.Errorf("error protecting object (cid: %s) in compaction transaction: %s", c, err) } } return true, err } - return s.cold.Has(cid) + return s.cold.Has(c) } func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { @@ -353,7 +371,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error { s.txnLk.RLock() defer s.txnLk.RUnlock() - s.trackWrites(batch) + s.trackWriteMany(batch) err := s.hot.PutMany(blks) if err == nil && s.txnProtect != nil { @@ -613,7 +631,7 @@ func (s *SplitStore) trackWrite(c cid.Cid) { } // and also combine batch writes into them -func (s *SplitStore) trackWrites(cids []cid.Cid) { +func (s *SplitStore) trackWriteMany(cids []cid.Cid) { s.mx.Lock() defer s.mx.Unlock()