do the dag walk for deep write tracking during flush

avoid crawling everything to a halt
This commit is contained in:
vyzo 2021-07-02 14:01:10 +03:00
parent 13a674330f
commit a98a062347

View File

@ -232,34 +232,16 @@ func (s *SplitStore) Has(c cid.Cid) (bool, error) {
// treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate // treat it as an implicit Write, absence options -- the vm uses this check to avoid duplicate
// writes on Flush. When we have options in the API, the vm can explicitly signal that this is // writes on Flush. When we have options in the API, the vm can explicitly signal that this is
// an implicit Write. // an implicit Write.
// we also walk dags for links so that the reference applies transitively to children.
// but first check if it is already a pending write to avoid unnecessary work
if s.isPendingWrite(c) { if s.isPendingWrite(c) {
return true, nil return true, nil
} }
if c.Prefix().Codec != cid.DagCBOR { s.trackWrite(c)
s.trackWrite(c)
} else {
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
s.trackWrite(c)
return nil
})
if err != nil {
log.Errorf("error transitively tracking cid %s: %s", c, err)
}
}
// also make sure the object is considered live during compaction in case we have already // also make sure the object is considered live during compaction in case we have already
// flushed pending writes and started compaction // flushed pending writes and started compaction
if s.txnProtect != nil { if s.txnProtect != nil {
if c.Prefix().Codec != cid.DagCBOR { err = s.txnProtect.Mark(c)
err = s.txnProtect.Mark(c)
} else {
err = s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
return s.txnProtect.Mark(c)
})
}
if err != nil { if err != nil {
log.Errorf("error protecting object (cid: %s) in compaction transaction: %s", c, err) log.Errorf("error protecting object (cid: %s) in compaction transaction: %s", c, err)
@ -667,6 +649,24 @@ func (s *SplitStore) flushPendingWrites(locked bool) {
cids := make([]cid.Cid, 0, len(s.pendingWrites)) cids := make([]cid.Cid, 0, len(s.pendingWrites))
for c := range s.pendingWrites { for c := range s.pendingWrites {
cids = append(cids, c) cids = append(cids, c)
// recursively walk dags to propagate dependent references
if c.Prefix().Codec != cid.DagCBOR {
continue
}
err := s.walkLinks(c, cid.NewSet(), func(c cid.Cid) error {
_, has := s.pendingWrites[c]
if !has {
cids = append(cids, c)
}
return nil
})
if err != nil {
log.Errorf("error tracking dependent writes for cid %s: %s", c, err)
}
} }
s.pendingWrites = make(map[cid.Cid]struct{}) s.pendingWrites = make(map[cid.Cid]struct{})