rename liveset => markset; rename snoop => tracking store; docs.
This commit is contained in:
parent
48f253328d
commit
4b1e1f4b52
@ -1,33 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
type LiveSet interface {
|
||||
Mark(cid.Cid) error
|
||||
Has(cid.Cid) (bool, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
var markBytes []byte
|
||||
|
||||
type LiveSetEnv interface {
|
||||
NewLiveSet(name string, sizeHint int64) (LiveSet, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
func OpenLiveSetEnv(path string, liveSetType string) (LiveSetEnv, error) {
|
||||
switch liveSetType {
|
||||
case "", "bloom":
|
||||
return NewBloomLiveSetEnv()
|
||||
case "bolt":
|
||||
return NewBoltLiveSetEnv(filepath.Join(path, "sweep.bolt"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown live set type %s", liveSetType)
|
||||
}
|
||||
}
|
38
blockstore/splitstore/markset.go
Normal file
38
blockstore/splitstore/markset.go
Normal file
@ -0,0 +1,38 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// MarkSet is a utility to keep track of seen CID, and later query for them.
|
||||
//
|
||||
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
|
||||
// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default).
|
||||
type MarkSet interface {
|
||||
Mark(cid.Cid) error
|
||||
Has(cid.Cid) (bool, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// markBytes is deliberately a non-nil empty byte slice for serialization.
|
||||
var markBytes = []byte{}
|
||||
|
||||
type MarkSetEnv interface {
|
||||
Create(name string, sizeHint int64) (MarkSet, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
|
||||
switch mtype {
|
||||
case "", "bloom":
|
||||
return NewBloomMarkSetEnv()
|
||||
case "bolt":
|
||||
return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown mark set type %s", mtype)
|
||||
}
|
||||
}
|
@ -15,22 +15,22 @@ const (
|
||||
BloomFilterProbability = 0.01
|
||||
)
|
||||
|
||||
type BloomLiveSetEnv struct{}
|
||||
type BloomMarkSetEnv struct{}
|
||||
|
||||
var _ LiveSetEnv = (*BloomLiveSetEnv)(nil)
|
||||
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
|
||||
|
||||
type BloomLiveSet struct {
|
||||
type BloomMarkSet struct {
|
||||
salt []byte
|
||||
bf *bbloom.Bloom
|
||||
}
|
||||
|
||||
var _ LiveSet = (*BloomLiveSet)(nil)
|
||||
var _ MarkSet = (*BloomMarkSet)(nil)
|
||||
|
||||
func NewBloomLiveSetEnv() (*BloomLiveSetEnv, error) {
|
||||
return &BloomLiveSetEnv{}, nil
|
||||
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
|
||||
return &BloomMarkSetEnv{}, nil
|
||||
}
|
||||
|
||||
func (e *BloomLiveSetEnv) NewLiveSet(name string, sizeHint int64) (LiveSet, error) {
|
||||
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
|
||||
size := int64(BloomFilterMinSize)
|
||||
for size < sizeHint {
|
||||
size += BloomFilterMinSize
|
||||
@ -47,14 +47,14 @@ func (e *BloomLiveSetEnv) NewLiveSet(name string, sizeHint int64) (LiveSet, erro
|
||||
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
|
||||
}
|
||||
|
||||
return &BloomLiveSet{salt: salt, bf: bf}, nil
|
||||
return &BloomMarkSet{salt: salt, bf: bf}, nil
|
||||
}
|
||||
|
||||
func (e *BloomLiveSetEnv) Close() error {
|
||||
func (e *BloomMarkSetEnv) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BloomLiveSet) saltedKey(cid cid.Cid) []byte {
|
||||
func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
|
||||
hash := cid.Hash()
|
||||
key := make([]byte, len(s.salt)+len(hash))
|
||||
n := copy(key, s.salt)
|
||||
@ -63,15 +63,15 @@ func (s *BloomLiveSet) saltedKey(cid cid.Cid) []byte {
|
||||
return rehash[:]
|
||||
}
|
||||
|
||||
func (s *BloomLiveSet) Mark(cid cid.Cid) error {
|
||||
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
|
||||
s.bf.Add(s.saltedKey(cid))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BloomLiveSet) Has(cid cid.Cid) (bool, error) {
|
||||
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
|
||||
return s.bf.Has(s.saltedKey(cid)), nil
|
||||
}
|
||||
|
||||
func (s *BloomLiveSet) Close() error {
|
||||
func (s *BloomMarkSet) Close() error {
|
||||
return nil
|
||||
}
|
@ -9,20 +9,20 @@ import (
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type BoltLiveSetEnv struct {
|
||||
type BoltMarkSetEnv struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
var _ LiveSetEnv = (*BoltLiveSetEnv)(nil)
|
||||
var _ MarkSetEnv = (*BoltMarkSetEnv)(nil)
|
||||
|
||||
type BoltLiveSet struct {
|
||||
type BoltMarkSet struct {
|
||||
db *bolt.DB
|
||||
bucketId []byte
|
||||
}
|
||||
|
||||
var _ LiveSet = (*BoltLiveSet)(nil)
|
||||
var _ MarkSet = (*BoltMarkSet)(nil)
|
||||
|
||||
func NewBoltLiveSetEnv(path string) (*BoltLiveSetEnv, error) {
|
||||
func NewBoltMarkSetEnv(path string) (*BoltMarkSetEnv, error) {
|
||||
db, err := bolt.Open(path, 0644,
|
||||
&bolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
@ -32,10 +32,10 @@ func NewBoltLiveSetEnv(path string) (*BoltLiveSetEnv, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltLiveSetEnv{db: db}, nil
|
||||
return &BoltMarkSetEnv{db: db}, nil
|
||||
}
|
||||
|
||||
func (e *BoltLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) {
|
||||
func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) {
|
||||
bucketId := []byte(name)
|
||||
err := e.db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucketId)
|
||||
@ -49,21 +49,21 @@ func (e *BoltLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltLiveSet{db: e.db, bucketId: bucketId}, nil
|
||||
return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil
|
||||
}
|
||||
|
||||
func (e *BoltLiveSetEnv) Close() error {
|
||||
func (e *BoltMarkSetEnv) Close() error {
|
||||
return e.db.Close()
|
||||
}
|
||||
|
||||
func (s *BoltLiveSet) Mark(cid cid.Cid) error {
|
||||
func (s *BoltMarkSet) Mark(cid cid.Cid) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Put(cid.Hash(), markBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltLiveSet) Has(cid cid.Cid) (result bool, err error) {
|
||||
func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) {
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
v := b.Get(cid.Hash())
|
||||
@ -74,7 +74,7 @@ func (s *BoltLiveSet) Has(cid cid.Cid) (result bool, err error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *BoltLiveSet) Close() error {
|
||||
func (s *BoltMarkSet) Close() error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return tx.DeleteBucket(s.bucketId)
|
||||
})
|
@ -8,36 +8,36 @@ import (
|
||||
"github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
func TestBoltLiveSet(t *testing.T) {
|
||||
testLiveSet(t, "bolt")
|
||||
func TestBoltMarkSet(t *testing.T) {
|
||||
testMarkSet(t, "bolt")
|
||||
}
|
||||
|
||||
func TestBloomLiveSet(t *testing.T) {
|
||||
testLiveSet(t, "bloom")
|
||||
func TestBloomMarkSet(t *testing.T) {
|
||||
testMarkSet(t, "bloom")
|
||||
}
|
||||
|
||||
func testLiveSet(t *testing.T, lsType string) {
|
||||
func testMarkSet(t *testing.T, lsType string) {
|
||||
t.Helper()
|
||||
|
||||
path := "/tmp/liveset-test"
|
||||
path := "/tmp/markset-test"
|
||||
|
||||
err := os.MkdirAll(path, 0777)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
env, err := OpenLiveSetEnv(path, lsType)
|
||||
env, err := OpenMarkSetEnv(path, lsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer env.Close() //nolint:errcheck
|
||||
|
||||
hotSet, err := env.NewLiveSet("hot", 0)
|
||||
hotSet, err := env.Create("hot", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coldSet, err := env.NewLiveSet("cold", 0)
|
||||
coldSet, err := env.Create("cold", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -51,7 +51,7 @@ func testLiveSet(t *testing.T, lsType string) {
|
||||
return cid.NewCidV1(cid.Raw, h)
|
||||
}
|
||||
|
||||
mustHave := func(s LiveSet, cid cid.Cid) {
|
||||
mustHave := func(s MarkSet, cid cid.Cid) {
|
||||
has, err := s.Has(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -62,7 +62,7 @@ func testLiveSet(t *testing.T, lsType string) {
|
||||
}
|
||||
}
|
||||
|
||||
mustNotHave := func(s LiveSet, cid cid.Cid) {
|
||||
mustNotHave := func(s MarkSet, cid cid.Cid) {
|
||||
has, err := s.Has(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -104,12 +104,12 @@ func testLiveSet(t *testing.T, lsType string) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hotSet, err = env.NewLiveSet("hot", 0)
|
||||
hotSet, err = env.Create("hot", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coldSet, err = env.NewLiveSet("cold", 0)
|
||||
coldSet, err = env.Create("cold", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
@ -22,14 +22,36 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
// these are variable so that 1) lotus-soup builds 2) we can change them in tests
|
||||
var (
|
||||
// CompactionThreshold is the number of epochs that need to have elapsed
|
||||
// from the previously compacted epoch to trigger a new compaction.
|
||||
//
|
||||
// |················· CompactionThreshold ··················|
|
||||
// | |
|
||||
// =======‖≡≡≡≡≡≡≡‖-------------------------------------------------»
|
||||
// | | chain --> ↑__ current epoch
|
||||
// |·······|
|
||||
// ↑________ CompactionCold.
|
||||
//
|
||||
// === :: cold (already archived)
|
||||
// ≡≡≡ :: to be archived in this compaction
|
||||
// --- :: hot
|
||||
CompactionThreshold = 5 * build.Finality
|
||||
CompactionCold = build.Finality
|
||||
|
||||
// CompactionCold is the number of epochs that will be archived to the
|
||||
// cold store on compaction. See diagram on CompactionThreshold for a
|
||||
// better sense.
|
||||
CompactionCold = build.Finality
|
||||
)
|
||||
|
||||
var (
|
||||
baseEpochKey = dstore.NewKey("/splitstore/baseEpoch")
|
||||
// baseEpochKey stores the base epoch (last compaction epoch) in the
|
||||
// metadata store.
|
||||
baseEpochKey = dstore.NewKey("/splitstore/baseEpoch")
|
||||
|
||||
// warmupEpochKey stores whether a hot store warmup has been performed.
|
||||
// On first start, the splitstore will walk the state tree and will copy
|
||||
// all active blocks into the hotstore.
|
||||
warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch")
|
||||
log = logging.Logger("splitstore")
|
||||
)
|
||||
@ -42,10 +64,15 @@ func init() {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// TrackingStore type; bolt (default) or lmdb
|
||||
// TrackingStore is the type of tracking store to use.
|
||||
//
|
||||
// Supported values are: "bolt".
|
||||
TrackingStoreType string
|
||||
// LiveSet type; bloom (default), bolt, or lmdb
|
||||
LiveSetType string
|
||||
|
||||
// MarkSetType is the type of mark set to use.
|
||||
//
|
||||
// Supported values are: "bolt", "bloom".
|
||||
MarkSetType string
|
||||
// perform full reachability analysis (expensive) for compaction
|
||||
// You should enable this option if you plan to use the splitstore without a backing coldstore
|
||||
EnableFullCompaction bool
|
||||
@ -82,15 +109,15 @@ type SplitStore struct {
|
||||
mx sync.Mutex
|
||||
curTs *types.TipSet
|
||||
|
||||
chain ChainAccessor
|
||||
ds dstore.Datastore
|
||||
hot bstore.Blockstore
|
||||
cold bstore.Blockstore
|
||||
snoop TrackingStore
|
||||
chain ChainAccessor
|
||||
ds dstore.Datastore
|
||||
hot bstore.Blockstore
|
||||
cold bstore.Blockstore
|
||||
tracker TrackingStore
|
||||
|
||||
env LiveSetEnv
|
||||
env MarkSetEnv
|
||||
|
||||
liveSetSize int64
|
||||
markSetSize int64
|
||||
}
|
||||
|
||||
var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||
@ -100,25 +127,25 @@ var _ bstore.Blockstore = (*SplitStore)(nil)
|
||||
// attached to the ChainStore with Start in order to trigger compaction.
|
||||
func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) {
|
||||
// the tracking store
|
||||
snoop, err := OpenTrackingStore(path, cfg.TrackingStoreType)
|
||||
tracker, err := OpenTrackingStore(path, cfg.TrackingStoreType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// the liveset env
|
||||
env, err := OpenLiveSetEnv(path, cfg.LiveSetType)
|
||||
// the markset env
|
||||
env, err := OpenMarkSetEnv(path, cfg.MarkSetType)
|
||||
if err != nil {
|
||||
_ = snoop.Close()
|
||||
_ = tracker.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// and now we can make a SplitStore
|
||||
ss := &SplitStore{
|
||||
ds: ds,
|
||||
hot: hot,
|
||||
cold: cold,
|
||||
snoop: snoop,
|
||||
env: env,
|
||||
ds: ds,
|
||||
hot: hot,
|
||||
cold: cold,
|
||||
tracker: tracker,
|
||||
env: env,
|
||||
|
||||
fullCompaction: cfg.EnableFullCompaction,
|
||||
enableGC: cfg.EnableGC,
|
||||
@ -185,7 +212,7 @@ func (s *SplitStore) Put(blk blocks.Block) error {
|
||||
epoch := s.curTs.Height()
|
||||
s.mx.Unlock()
|
||||
|
||||
err := s.snoop.Put(blk.Cid(), epoch)
|
||||
err := s.tracker.Put(blk.Cid(), epoch)
|
||||
if err != nil {
|
||||
log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err)
|
||||
return s.cold.Put(blk)
|
||||
@ -209,7 +236,7 @@ func (s *SplitStore) PutMany(blks []blocks.Block) error {
|
||||
batch = append(batch, blk.Cid())
|
||||
}
|
||||
|
||||
err := s.snoop.PutBatch(batch, epoch)
|
||||
err := s.tracker.PutBatch(batch, epoch)
|
||||
if err != nil {
|
||||
log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err)
|
||||
return s.cold.PutMany(blks)
|
||||
@ -403,7 +430,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
|
||||
batchSnoop = append(batchSnoop, cid)
|
||||
|
||||
if len(batchHot) == batchSize {
|
||||
err = s.snoop.PutBatch(batchSnoop, epoch)
|
||||
err = s.tracker.PutBatch(batchSnoop, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -425,7 +452,7 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
|
||||
}
|
||||
|
||||
if len(batchHot) > 0 {
|
||||
err = s.snoop.PutBatch(batchSnoop, epoch)
|
||||
err = s.tracker.PutBatch(batchSnoop, epoch)
|
||||
if err != nil {
|
||||
log.Errorf("error warming up splitstore: %s", err)
|
||||
return
|
||||
@ -438,8 +465,8 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
|
||||
}
|
||||
}
|
||||
|
||||
if count > s.liveSetSize {
|
||||
s.liveSetSize = count
|
||||
if count > s.markSetSize {
|
||||
s.markSetSize = count
|
||||
}
|
||||
|
||||
// save the warmup epoch
|
||||
@ -452,13 +479,13 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
|
||||
|
||||
// Compaction/GC Algorithm
|
||||
func (s *SplitStore) compact(curTs *types.TipSet) {
|
||||
if s.liveSetSize == 0 {
|
||||
if s.markSetSize == 0 {
|
||||
start := time.Now()
|
||||
log.Info("estimating live set size")
|
||||
s.estimateLiveSetSize(curTs)
|
||||
log.Infow("estimating live set size done", "took", time.Since(start), "size", s.liveSetSize)
|
||||
log.Info("estimating mark set size")
|
||||
s.estimateMarkSetSize(curTs)
|
||||
log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize)
|
||||
} else {
|
||||
log.Infow("current live set size estimate", "size", s.liveSetSize)
|
||||
log.Infow("current mark set size estimate", "size", s.markSetSize)
|
||||
}
|
||||
|
||||
if s.fullCompaction {
|
||||
@ -468,11 +495,11 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) estimateLiveSetSize(curTs *types.TipSet) {
|
||||
s.liveSetSize = 0
|
||||
func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) {
|
||||
s.markSetSize = 0
|
||||
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||
func(cid cid.Cid) error {
|
||||
s.liveSetSize++
|
||||
s.markSetSize++
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -487,7 +514,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
|
||||
log.Infow("running simple compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
|
||||
|
||||
coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize)
|
||||
coldSet, err := s.env.Create("cold", s.markSetSize)
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
@ -495,7 +522,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
defer coldSet.Close() //nolint:errcheck
|
||||
|
||||
// 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch
|
||||
log.Info("marking reachable cold objects")
|
||||
log.Infow("marking reachable cold objects", "cold_epoch", coldEpoch)
|
||||
startMark := time.Now()
|
||||
|
||||
coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
|
||||
@ -504,7 +531,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
count := int64(0)
|
||||
var count int64
|
||||
err = s.chain.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
|
||||
func(cid cid.Cid) error {
|
||||
count++
|
||||
@ -516,8 +543,8 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if count > s.liveSetSize {
|
||||
s.liveSetSize = count
|
||||
if count > s.markSetSize {
|
||||
s.markSetSize = count
|
||||
}
|
||||
|
||||
log.Infow("marking done", "took", time.Since(startMark))
|
||||
@ -529,14 +556,14 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
cold := make(map[cid.Cid]struct{})
|
||||
|
||||
// some stats for logging
|
||||
var stHot, stCold int
|
||||
var hotCnt, coldCnt int
|
||||
|
||||
// 2.1 iterate through the snoop and collect unreachable cold objects
|
||||
err = s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
|
||||
// is the object stil hot?
|
||||
if wrEpoch > coldEpoch {
|
||||
// 2.1 iterate through the tracking store and collect unreachable cold objects
|
||||
err = s.tracker.ForEach(func(cid cid.Cid, writeEpoch abi.ChainEpoch) error {
|
||||
// is the object still hot?
|
||||
if writeEpoch > coldEpoch {
|
||||
// yes, stay in the hotstore
|
||||
stHot++
|
||||
hotCnt++
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -547,13 +574,13 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
}
|
||||
|
||||
if mark {
|
||||
stHot++
|
||||
hotCnt++
|
||||
return nil
|
||||
}
|
||||
|
||||
// it's cold, mark it for move
|
||||
cold[cid] = struct{}{}
|
||||
stCold++
|
||||
coldCnt++
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -563,7 +590,7 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
}
|
||||
|
||||
log.Infow("collection done", "took", time.Since(startCollect))
|
||||
log.Infow("compaction stats", "hot", stHot, "cold", stCold)
|
||||
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt)
|
||||
|
||||
// 2.2 copy the cold objects to the coldstore
|
||||
log.Info("moving cold objects to the coldstore")
|
||||
@ -576,10 +603,10 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
if err != nil {
|
||||
if err == dstore.ErrNotFound {
|
||||
// this can happen if the node is killed after we have deleted the block from the hotstore
|
||||
// but before we have deleted it from the snoop; just delete the snoop.
|
||||
err = s.snoop.Delete(cid)
|
||||
// but before we have deleted it from the tracker; just delete the tracker.
|
||||
err = s.tracker.Delete(cid)
|
||||
if err != nil {
|
||||
log.Errorf("error deleting cid %s from snoop: %s", cid, err)
|
||||
log.Errorf("error deleting cid %s from tracker: %s", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
@ -629,20 +656,20 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
|
||||
}
|
||||
log.Infow("purging cold from hotstore done", "took", time.Since(purgeStart))
|
||||
|
||||
// 2.4 remove the snoop tracking for cold objects
|
||||
// 2.4 remove the tracker tracking for cold objects
|
||||
purgeStart = time.Now()
|
||||
log.Info("purging cold objects from snoop")
|
||||
log.Info("purging cold objects from tracker")
|
||||
|
||||
err = s.snoop.DeleteBatch(cold)
|
||||
err = s.tracker.DeleteBatch(cold)
|
||||
if err != nil {
|
||||
log.Errorf("error purging cold objects from snoop: %s", err)
|
||||
log.Errorf("error purging cold objects from tracker: %s", err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
log.Infow("purging cold from snoop done", "took", time.Since(purgeStart))
|
||||
log.Infow("purging cold from tracker done", "took", time.Since(purgeStart))
|
||||
|
||||
// we are done; do some housekeeping
|
||||
err = s.snoop.Sync()
|
||||
err = s.tracker.Sync()
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
@ -661,16 +688,16 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
|
||||
log.Infow("running full compaction", "currentEpoch", curTs.Height(), "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch)
|
||||
|
||||
// create two live sets, one for marking the cold finality region
|
||||
// create two mark sets, one for marking the cold finality region
|
||||
// and one for marking the hot region
|
||||
hotSet, err := s.env.NewLiveSet("hot", s.liveSetSize)
|
||||
hotSet, err := s.env.Create("hot", s.markSetSize)
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
}
|
||||
defer hotSet.Close() //nolint:errcheck
|
||||
|
||||
coldSet, err := s.env.NewLiveSet("cold", s.liveSetSize)
|
||||
coldSet, err := s.env.Create("cold", s.markSetSize)
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
@ -694,8 +721,8 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if count > s.liveSetSize {
|
||||
s.liveSetSize = count
|
||||
if count > s.markSetSize {
|
||||
s.markSetSize = count
|
||||
}
|
||||
|
||||
// Phase 1b: mark all reachable CIDs in the cold range
|
||||
@ -717,8 +744,8 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if count > s.liveSetSize {
|
||||
s.liveSetSize = count
|
||||
if count > s.markSetSize {
|
||||
s.markSetSize = count
|
||||
}
|
||||
|
||||
log.Infow("marking done", "took", time.Since(startMark))
|
||||
@ -736,8 +763,8 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
cold := make(map[cid.Cid]struct{})
|
||||
dead := make(map[cid.Cid]struct{})
|
||||
|
||||
// 2.1 iterate through the snoop and collect cold and dead objects
|
||||
err = s.snoop.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
|
||||
// 2.1 iterate through the tracker and collect cold and dead objects
|
||||
err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error {
|
||||
// is the object stil hot?
|
||||
if wrEpoch > coldEpoch {
|
||||
// yes, stay in the hotstore
|
||||
@ -803,10 +830,10 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
if err != nil {
|
||||
if err == dstore.ErrNotFound {
|
||||
// this can happen if the node is killed after we have deleted the block from the hotstore
|
||||
// but before we have deleted it from the snoop; just delete the snoop.
|
||||
err = s.snoop.Delete(cid)
|
||||
// but before we have deleted it from the tracker; just delete the tracker.
|
||||
err = s.tracker.Delete(cid)
|
||||
if err != nil {
|
||||
log.Errorf("error deleting cid %s from snoop: %s", cid, err)
|
||||
log.Errorf("error deleting cid %s from tracker: %s", cid, err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
@ -856,17 +883,17 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
}
|
||||
log.Infow("purging cold from hotstore done", "took", time.Since(purgeStart))
|
||||
|
||||
// 2.4 remove the snoop tracking for cold objects
|
||||
// 2.4 remove the tracker tracking for cold objects
|
||||
purgeStart = time.Now()
|
||||
log.Info("purging cold objects from snoop")
|
||||
log.Info("purging cold objects from tracker")
|
||||
|
||||
err = s.snoop.DeleteBatch(cold)
|
||||
err = s.tracker.DeleteBatch(cold)
|
||||
if err != nil {
|
||||
log.Errorf("error purging cold objects from snoop: %s", err)
|
||||
log.Errorf("error purging cold objects from tracker: %s", err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
log.Infow("purging cold from snoop done", "took", time.Since(purgeStart))
|
||||
log.Infow("purging cold from tracker done", "took", time.Since(purgeStart))
|
||||
|
||||
// 3. if we have dead objects, delete them from the hotstore and remove the tracking
|
||||
if len(dead) > 0 {
|
||||
@ -886,24 +913,24 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
|
||||
}
|
||||
log.Infow("purging dead from hotstore done", "took", time.Since(purgeStart))
|
||||
|
||||
// remove the snoop tracking
|
||||
// remove the tracker tracking
|
||||
purgeStart := time.Now()
|
||||
log.Info("purging dead objects from snoop")
|
||||
log.Info("purging dead objects from tracker")
|
||||
|
||||
err = s.snoop.DeleteBatch(dead)
|
||||
err = s.tracker.DeleteBatch(dead)
|
||||
if err != nil {
|
||||
log.Errorf("error purging dead objects from snoop: %s", err)
|
||||
log.Errorf("error purging dead objects from tracker: %s", err)
|
||||
// TODO do something better here -- just continue?
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Infow("purging dead from snoop done", "took", time.Since(purgeStart))
|
||||
log.Infow("purging dead from tracker done", "took", time.Since(purgeStart))
|
||||
}
|
||||
|
||||
log.Infow("sweeping done", "took", time.Since(startSweep))
|
||||
|
||||
// we are done; do some housekeeping
|
||||
err = s.snoop.Sync()
|
||||
err = s.tracker.Sync()
|
||||
if err != nil {
|
||||
// TODO do something better here
|
||||
panic(err)
|
||||
|
@ -9,6 +9,11 @@ import (
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// TrackingStore is a persistent store that tracks blocks that are added
|
||||
// within the current compaction range, including the epoch at which they are
|
||||
// written.
|
||||
//
|
||||
// On every compaction, we iterate over
|
||||
type TrackingStore interface {
|
||||
Put(cid.Cid, abi.ChainEpoch) error
|
||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||
@ -20,11 +25,13 @@ type TrackingStore interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
func OpenTrackingStore(path string, trackingStoreType string) (TrackingStore, error) {
|
||||
switch trackingStoreType {
|
||||
// OpenTrackingStore opens a tracking store of the specified type in the
|
||||
// specified path.
|
||||
func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
|
||||
switch ttype {
|
||||
case "", "bolt":
|
||||
return OpenBoltTrackingStore(filepath.Join(path, "snoop.bolt"))
|
||||
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", trackingStoreType)
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketId := []byte("snoop")
|
||||
bucketId := []byte("tracker")
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucketId)
|
||||
if err != nil {
|
||||
|
@ -44,7 +44,7 @@ func testTrackingStore(t *testing.T, tsType string) {
|
||||
}
|
||||
}
|
||||
|
||||
path := "/tmp/liveset-test"
|
||||
path := "/tmp/markset-test"
|
||||
|
||||
err := os.MkdirAll(path, 0777)
|
||||
if err != nil {
|
||||
|
@ -128,7 +128,7 @@ type Blockstore struct {
|
||||
type Splitstore struct {
|
||||
HotStoreType string
|
||||
TrackingStoreType string
|
||||
LiveSetType string
|
||||
MarkSetType string
|
||||
EnableFullCompaction bool
|
||||
EnableGC bool // EXPERIMENTAL
|
||||
Archival bool
|
||||
|
@ -76,7 +76,7 @@ func SplitBlockstore(cfg *config.Blockstore) func(lc fx.Lifecycle, r repo.Locked
|
||||
|
||||
cfg := &splitstore.Config{
|
||||
TrackingStoreType: cfg.Splitstore.TrackingStoreType,
|
||||
LiveSetType: cfg.Splitstore.LiveSetType,
|
||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||
EnableFullCompaction: cfg.Splitstore.EnableFullCompaction,
|
||||
EnableGC: cfg.Splitstore.EnableGC,
|
||||
Archival: cfg.Splitstore.Archival,
|
||||
|
Loading…
Reference in New Issue
Block a user