sort cold objects before deleting
so that we can't shoot ourselves in the foot by deleting the constituents of a DAG while it is still in the hotstore.
This commit is contained in:
parent
13d612f72f
commit
40c271cda1
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -1114,7 +1115,17 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
log.Infow("moving done", "took", time.Since(startMove))
|
log.Infow("moving done", "took", time.Since(startMove))
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. purge cold objects from the hotstore, taking protected references into account
|
// 4. sort cold objects so that the dags with most references are deleted first
|
||||||
|
// this ensures that we can't refer to a dag with its consituents already deleted
|
||||||
|
log.Info("sorting cold objects")
|
||||||
|
startSort := time.Now()
|
||||||
|
err = s.sortObjects(cold)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error sorting objects: %w", err)
|
||||||
|
}
|
||||||
|
log.Infow("sorting done", "took", time.Since(startSort))
|
||||||
|
|
||||||
|
// 5. purge cold objects from the hotstore, taking protected references into account
|
||||||
log.Info("purging cold objects from the hotstore")
|
log.Info("purging cold objects from the hotstore")
|
||||||
startPurge := time.Now()
|
startPurge := time.Now()
|
||||||
err = s.purge(curTs, cold)
|
err = s.purge(curTs, cold)
|
||||||
@ -1348,6 +1359,46 @@ func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) sortObjects(cids []cid.Cid) error {
|
||||||
|
weight := make(map[cid.Cid]int)
|
||||||
|
for _, c := range cids {
|
||||||
|
if c.Prefix().Codec != cid.DagCBOR {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
w := 0
|
||||||
|
err := s.walkObject(c, cid.NewSet(),
|
||||||
|
func(c cid.Cid) error {
|
||||||
|
wc, ok := weight[c]
|
||||||
|
if ok {
|
||||||
|
w += wc
|
||||||
|
return errStopWalk
|
||||||
|
}
|
||||||
|
|
||||||
|
w++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error determining cold object weight: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
weight[c] = w
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(cids, func(i, j int) bool {
|
||||||
|
wi := weight[cids[i]]
|
||||||
|
wj := weight[cids[j]]
|
||||||
|
if wi == wj {
|
||||||
|
return bytes.Compare(cids[i].Hash(), cids[j].Hash()) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return wi > wj
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {
|
func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error {
|
||||||
if len(cids) == 0 {
|
if len(cids) == 0 {
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user