really optimize computing object weights

sort is still taking a long time, this should be as fast as it gets.
This commit is contained in:
vyzo 2021-07-06 09:02:44 +03:00
parent 55a9e0ccd1
commit 169ab262f5

View File

@ -1298,48 +1298,16 @@ func (s *SplitStore) sortObjects(cids []cid.Cid) error {
} }
// compute sorting weights as the cumulative number of DAG links // compute sorting weights as the cumulative number of DAG links
weight := make(map[string]int) weights := make(map[string]int)
for _, c := range cids { for _, c := range cids {
switch c.Prefix().Codec { w := s.getObjectWeight(c, weights, key)
case cid.DagCBOR, cid.Raw: weights[key(c)] = w
default:
continue
}
w := 0
err := s.scanObject(c, cid.NewSet(),
func(c cid.Cid) error {
wc, ok := weight[key(c)]
if ok {
w += wc
return errStopWalk
}
w++
// short-circuit block headers or else we'll walk the entire chain
isBlock, err := s.isBlockHeader(c)
if isBlock || err == bstore.ErrNotFound {
return errStopWalk
}
return nil
},
func(_ cid.Cid, leaves int) {
w += leaves
})
if err != nil {
return xerrors.Errorf("error determining cold object weight: %w", err)
}
weight[key(c)] = w
} }
// sort! // sort!
sort.Slice(cids, func(i, j int) bool { sort.Slice(cids, func(i, j int) bool {
wi := weight[key(cids[i])] wi := weights[key(cids[i])]
wj := weight[key(cids[j])] wj := weights[key(cids[j])]
if wi == wj { if wi == wj {
return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0 return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0
} }
@ -1350,54 +1318,51 @@ func (s *SplitStore) sortObjects(cids []cid.Cid) error {
return nil return nil
} }
// specialized version of walkObject for computing object weights func (s *SplitStore) getObjectWeight(c cid.Cid, weights map[string]int, key func(cid.Cid) string) int {
// 1. root keys are raw w, ok := weights[key(c)]
// 2. some references may not exist if ok {
// 3. we don't care about visiting non-DAGs so short-circuit those return w
func (s *SplitStore) scanObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) error, l func(cid.Cid, int)) error {
if !walked.Visit(c) {
return nil
} }
if err := f(c); err != nil { // we treat block headers specially to avoid walking the entire chain
if err == errStopWalk { var hdr types.BlockHeader
return nil err := s.view(c, func(data []byte) error {
} return hdr.UnmarshalCBOR(bytes.NewBuffer(data))
})
if err == nil {
w1 := s.getObjectWeight(hdr.ParentStateRoot, weights, key)
weights[key(hdr.ParentStateRoot)] = w1
return err w2 := s.getObjectWeight(hdr.Messages, weights, key)
weights[key(hdr.Messages)] = w2
return 1 + w1 + w2
} }
var links []cid.Cid var links []cid.Cid
err := s.view(c, func(data []byte) error { err = s.view(c, func(data []byte) error {
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c) links = append(links, c)
}) })
}) })
if err != nil { if err != nil {
// don't fail if the scan fails or if the object is absent return 1
return nil
} }
leaves := 0
for _, c := range links { for _, c := range links {
// these are internal refs, so dags will be dags // these are internal refs, so dags will be dags
if c.Prefix().Codec != cid.DagCBOR { if c.Prefix().Codec != cid.DagCBOR {
leaves++ w++
continue continue
} }
err := s.scanObject(c, walked, f, l) wc := s.getObjectWeight(c, weights, key)
if err != nil { weights[key(c)] = wc
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
} w += wc
} }
if leaves > 0 { return w
l(c, leaves)
}
return nil
} }
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error { func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {