use dual live set marking algorithm to keep all hotly reachable objects in the hotstore
This commit is contained in:
parent
c2cc198316
commit
2bed6c94cd
@ -31,7 +31,6 @@ type SplitStore struct {
|
|||||||
cold bstore2.Blockstore
|
cold bstore2.Blockstore
|
||||||
|
|
||||||
snoop TrackingStore
|
snoop TrackingStore
|
||||||
sweep TrackingStore
|
|
||||||
|
|
||||||
compacting bool
|
compacting bool
|
||||||
}
|
}
|
||||||
@ -41,10 +40,15 @@ type TrackingStore interface {
|
|||||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||||
Get(cid.Cid) (abi.ChainEpoch, error)
|
Get(cid.Cid) (abi.ChainEpoch, error)
|
||||||
Delete(cid.Cid) error
|
Delete(cid.Cid) error
|
||||||
Has(cid.Cid) (bool, error)
|
|
||||||
Keys() (<-chan cid.Cid, error)
|
Keys() (<-chan cid.Cid, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type LiveSet interface {
|
||||||
|
Mark(cid.Cid) error
|
||||||
|
Has(cid.Cid) (bool, error)
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
var _ bstore2.Blockstore = (*SplitStore)(nil)
|
var _ bstore2.Blockstore = (*SplitStore)(nil)
|
||||||
|
|
||||||
// Blockstore interface
|
// Blockstore interface
|
||||||
@ -219,12 +223,29 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
|
|||||||
|
|
||||||
// Compaction/GC Algorithm
|
// Compaction/GC Algorithm
|
||||||
func (s *SplitStore) compact() {
|
func (s *SplitStore) compact() {
|
||||||
// Phase 1: mark all reachable CIDs with the current epoch
|
// create two on disk live sets, one for marking the cold finality region
|
||||||
|
// and one for marking the hot region
|
||||||
|
hotSet, err := s.newLiveSet()
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer hotSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
coldSet, err := s.newLiveSet()
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer coldSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
|
// Phase 1a: mark all reachable CIDs in the hot range
|
||||||
curTs := s.curTs
|
curTs := s.curTs
|
||||||
epoch := curTs.Height()
|
epoch := curTs.Height()
|
||||||
err := s.cs.WalkSnapshot(context.Background(), curTs, epoch-s.baseEpoch+1, false, false,
|
coldEpoch := s.baseEpoch + build.Finality
|
||||||
|
err = s.cs.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch+1, false, false,
|
||||||
func(cid cid.Cid) error {
|
func(cid cid.Cid) error {
|
||||||
return s.sweep.Put(cid, epoch)
|
return hotSet.Mark(cid)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -232,9 +253,27 @@ func (s *SplitStore) compact() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 2: sweep cold objects, moving reachable ones to the coldstore and deleting the others
|
// Phase 1b: mark all reachable CIDs in the cold range
|
||||||
coldEpoch := s.baseEpoch + build.Finality
|
coldTs, err := s.cs.GetTipsetByHeight(context.Background(), coldEpoch-1, curTs, true)
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.cs.WalkSnapshot(context.Background(), coldTs, build.Finality, false, false,
|
||||||
|
func(cid cid.Cid) error {
|
||||||
|
return coldSet.Mark(cid)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2: sweep cold objects:
|
||||||
|
// - If a cold object is reachable in the hot range, it stays in the hotstore.
|
||||||
|
// - If a cold object is reachable in the cold range, it is moved to the coldstore.
|
||||||
|
// - If a cold object is unreachable, it is deleted.
|
||||||
ch, err := s.snoop.Keys()
|
ch, err := s.snoop.Keys()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
@ -250,24 +289,31 @@ func (s *SplitStore) compact() {
|
|||||||
|
|
||||||
// is the object stil hot?
|
// is the object stil hot?
|
||||||
if wrEpoch >= coldEpoch {
|
if wrEpoch >= coldEpoch {
|
||||||
// yes, just clear the mark and continue
|
// yes, stay in the hotstore
|
||||||
err := s.sweep.Delete(cid)
|
|
||||||
if err != nil {
|
|
||||||
// TODO do something better here
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// the object is cold -- check whether it is reachable
|
// the object is cold -- check whether it is reachable in the hot range
|
||||||
mark, err := s.sweep.Has(cid)
|
mark, err := hotSet.Has(cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mark {
|
if mark {
|
||||||
// the object is reachable, move it to the cold store and delete the mark
|
// the object is reachable in the hot range, stay in the hotstore
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// check whether it is reachable in the cold range
|
||||||
|
mark, err = coldSet.Has(cid)
|
||||||
|
if err != nil {
|
||||||
|
// TODO do something better here
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mark {
|
||||||
|
// the object is reachable in the cold range, move it to the cold store
|
||||||
blk, err := s.hot.Get(cid)
|
blk, err := s.hot.Get(cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
@ -279,12 +325,6 @@ func (s *SplitStore) compact() {
|
|||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.sweep.Delete(cid)
|
|
||||||
if err != nil {
|
|
||||||
// TODO do something better here
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete the object from the hotstore
|
// delete the object from the hotstore
|
||||||
@ -302,21 +342,6 @@ func (s *SplitStore) compact() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear all remaining marks for cold objects that may have been reachable
|
|
||||||
ch, err = s.sweep.Keys()
|
|
||||||
if err != nil {
|
|
||||||
// TODO do something better here
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for cid := range ch {
|
|
||||||
err = s.sweep.Delete(cid)
|
|
||||||
if err != nil {
|
|
||||||
// TODO do something better here
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.setBaseEpoch(coldEpoch)
|
err = s.setBaseEpoch(coldEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO do something better here
|
// TODO do something better here
|
||||||
@ -332,3 +357,8 @@ func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error {
|
|||||||
bs = bs[:n]
|
bs = bs[:n]
|
||||||
return s.ds.Put(baseEpochKey, bs)
|
return s.ds.Put(baseEpochKey, bs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SplitStore) newLiveSet() (LiveSet, error) {
|
||||||
|
// TODO implementation
|
||||||
|
return nil, errors.New("newLiveSet: IMPLEMENT ME!!!")
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user