diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 22f9036e3..2c00f4240 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -131,6 +131,25 @@ func (b *Blockstore) Close() error { return b.DB.Close() } +// CollectGarbage runs garbage collection on the value log +func (b *Blockstore) CollectGarbage() error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + var err error + for err == nil { + err = b.DB.RunValueLogGC(0.125) + } + + if err == badger.ErrNoRewrite { + // not really an error in this case + return nil + } + + return err +} + // View implements blockstore.Viewer, which leverages zero-copy read-only // access to values. func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { @@ -318,6 +337,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { }) } +func (b *Blockstore) DeleteMany(cids []cid.Cid) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + batch := b.DB.NewWriteBatch() + defer batch.Cancel() + + // toReturn tracks the byte slices to return to the pool, if we're using key + // prefixing. we can't return each slice to the pool after each Set, because + // badger holds on to the slice. + var toReturn [][]byte + if b.prefixing { + toReturn = make([][]byte, 0, len(cids)) + defer func() { + for _, b := range toReturn { + KeyPool.Put(b) + } + }() + } + + for _, cid := range cids { + k, pooled := b.PooledStorageKey(cid) + if pooled { + toReturn = append(toReturn, k) + } + if err := batch.Delete(k); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) + } + return err +} + // AllKeysChan implements Blockstore.AllKeysChan. func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { if atomic.LoadInt64(&b.state) != stateOpen { diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 5d4578777..23f0bd754 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -1,7 +1,7 @@ package blockstore import ( - "github.com/ipfs/go-cid" + cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -18,6 +18,7 @@ var ErrNotFound = blockstore.ErrNotFound type Blockstore interface { blockstore.Blockstore blockstore.Viewer + BatchDeleter } // BasicBlockstore is an alias to the original IPFS Blockstore. @@ -25,13 +26,30 @@ type BasicBlockstore = blockstore.Blockstore type Viewer = blockstore.Viewer +type BatchDeleter interface { + DeleteMany(cids []cid.Cid) error +} + // WrapIDStore wraps the underlying blockstore in an "identity" blockstore. // The ID store filters out all puts for blocks with CIDs using the "identity" // hash function. It also extracts inlined blocks from CIDs using the identity // hash function and returns them on get/has, ignoring the contents of the // blockstore. func WrapIDStore(bstore blockstore.Blockstore) Blockstore { - return blockstore.NewIdStore(bstore).(Blockstore) + if is, ok := bstore.(*idstore); ok { + // already wrapped + return is + } + + if bs, ok := bstore.(Blockstore); ok { + // we need to wrap our own because we don't want to neuter the DeleteMany method + // the underlying blockstore has implemented an (efficient) DeleteMany + return NewIDStore(bs) + } + + // The underlying blockstore does not implement DeleteMany, so we need to shim it. + // This is less efficient as it'll iterate and perform single deletes. + return NewIDStore(Adapt(bstore)) } // FromDatastore creates a new blockstore backed by the given datastore. @@ -53,6 +71,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error return callback(blk.RawData()) } +func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error { + for _, cid := range cids { + err := a.DeleteBlock(cid) + if err != nil { + return err + } + } + + return nil +} + // Adapt adapts a standard blockstore to a Lotus blockstore by // enriching it with the extra methods that Lotus requires (e.g. View, Sync). // diff --git a/blockstore/buffered.go b/blockstore/buffered.go index 200e9b995..5d3d38f78 100644 --- a/blockstore/buffered.go +++ b/blockstore/buffered.go @@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error { return bs.write.DeleteBlock(c) } +func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error { + if err := bs.read.DeleteMany(cids); err != nil { + return err + } + + return bs.write.DeleteMany(cids) +} + func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error { // both stores are viewable. if err := bs.write.View(c, callback); err == ErrNotFound { diff --git a/blockstore/idstore.go b/blockstore/idstore.go new file mode 100644 index 000000000..e6148ff04 --- /dev/null +++ b/blockstore/idstore.go @@ -0,0 +1,174 @@ +package blockstore + +import ( + "context" + "io" + + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +var _ Blockstore = (*idstore)(nil) + +type idstore struct { + bs Blockstore +} + +func NewIDStore(bs Blockstore) Blockstore { + return &idstore{bs: bs} +} + +func decodeCid(cid cid.Cid) (inline bool, data []byte, err error) { + if cid.Prefix().MhType != mh.IDENTITY { + return false, nil, nil + } + + dmh, err := mh.Decode(cid.Hash()) + if err != nil { + return false, nil, err + } + + if dmh.Code == mh.IDENTITY { + return true, dmh.Digest, nil + } + + return false, nil, err +} + +func (b *idstore) Has(cid cid.Cid) (bool, error) { + inline, _, err := decodeCid(cid) + if err != nil { + return false, xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return true, nil + } + + return b.bs.Has(cid) +} + +func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) { + inline, data, err := decodeCid(cid) + if err != nil { + return nil, xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return blocks.NewBlockWithCid(data, cid) + } + + return b.bs.Get(cid) +} + +func (b *idstore) GetSize(cid cid.Cid) (int, error) { + inline, data, err := decodeCid(cid) + if err != nil { + return 0, xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return len(data), err + } + + return b.bs.GetSize(cid) +} + +func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error { + inline, data, err := decodeCid(cid) + if err != nil { + return xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return cb(data) + } + + return b.bs.View(cid, cb) +} + +func (b *idstore) Put(blk blocks.Block) error { + inline, _, err := decodeCid(blk.Cid()) + if err != nil { + return xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return nil + } + + return b.bs.Put(blk) +} + +func (b *idstore) PutMany(blks []blocks.Block) error { + toPut := make([]blocks.Block, 0, len(blks)) + for _, blk := range blks { + inline, _, err := decodeCid(blk.Cid()) + if err != nil { + return xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + continue + } + toPut = append(toPut, blk) + } + + if len(toPut) > 0 { + return b.bs.PutMany(toPut) + } + + return nil +} + +func (b *idstore) DeleteBlock(cid cid.Cid) error { + inline, _, err := decodeCid(cid) + if err != nil { + return xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + return nil + } + + return b.bs.DeleteBlock(cid) +} + +func (b *idstore) DeleteMany(cids []cid.Cid) error { + toDelete := make([]cid.Cid, 0, len(cids)) + for _, cid := range cids { + inline, _, err := decodeCid(cid) + if err != nil { + return xerrors.Errorf("error decoding Cid: %w", err) + } + + if inline { + continue + } + toDelete = append(toDelete, cid) + } + + if len(toDelete) > 0 { + return b.bs.DeleteMany(toDelete) + } + + return nil +} + +func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.bs.AllKeysChan(ctx) +} + +func (b *idstore) HashOnRead(enabled bool) { + b.bs.HashOnRead(enabled) +} + +func (b *idstore) Close() error { + if c, ok := b.bs.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/blockstore/mem.go b/blockstore/mem.go index c8de3e3e8..8ea69d46a 100644 --- a/blockstore/mem.go +++ b/blockstore/mem.go @@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error { return nil } +func (m MemBlockstore) DeleteMany(ks []cid.Cid) error { + for _, k := range ks { + delete(m, k) + } + return nil +} + func (m MemBlockstore) Has(k cid.Cid) (bool, error) { _, ok := m[k] return ok, nil diff --git a/blockstore/splitstore/markset.go b/blockstore/splitstore/markset.go new file mode 100644 index 000000000..ef14a2fc6 --- /dev/null +++ b/blockstore/splitstore/markset.go @@ -0,0 +1,38 @@ +package splitstore + +import ( + "path/filepath" + + "golang.org/x/xerrors" + + cid "github.com/ipfs/go-cid" +) + +// MarkSet is a utility to keep track of seen CID, and later query for them. +// +// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt). +// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default). +type MarkSet interface { + Mark(cid.Cid) error + Has(cid.Cid) (bool, error) + Close() error +} + +// markBytes is deliberately a non-nil empty byte slice for serialization. +var markBytes = []byte{} + +type MarkSetEnv interface { + Create(name string, sizeHint int64) (MarkSet, error) + Close() error +} + +func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { + switch mtype { + case "", "bloom": + return NewBloomMarkSetEnv() + case "bolt": + return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt")) + default: + return nil, xerrors.Errorf("unknown mark set type %s", mtype) + } +} diff --git a/blockstore/splitstore/markset_bloom.go b/blockstore/splitstore/markset_bloom.go new file mode 100644 index 000000000..c213436c8 --- /dev/null +++ b/blockstore/splitstore/markset_bloom.go @@ -0,0 +1,77 @@ +package splitstore + +import ( + "crypto/rand" + "crypto/sha256" + + "golang.org/x/xerrors" + + bbloom "github.com/ipfs/bbloom" + cid "github.com/ipfs/go-cid" +) + +const ( + BloomFilterMinSize = 10_000_000 + BloomFilterProbability = 0.01 +) + +type BloomMarkSetEnv struct{} + +var _ MarkSetEnv = (*BloomMarkSetEnv)(nil) + +type BloomMarkSet struct { + salt []byte + bf *bbloom.Bloom +} + +var _ MarkSet = (*BloomMarkSet)(nil) + +func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) { + return &BloomMarkSetEnv{}, nil +} + +func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) { + size := int64(BloomFilterMinSize) + for size < sizeHint { + size += BloomFilterMinSize + } + + salt := make([]byte, 4) + _, err := rand.Read(salt) + if err != nil { + return nil, xerrors.Errorf("error reading salt: %w", err) + } + + bf, err := bbloom.New(float64(size), BloomFilterProbability) + if err != nil { + return nil, xerrors.Errorf("error creating bloom filter: %w", err) + } + + return &BloomMarkSet{salt: salt, bf: bf}, nil +} + +func (e *BloomMarkSetEnv) Close() error { + return nil +} + +func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte { + hash := cid.Hash() + key := make([]byte, len(s.salt)+len(hash)) + n := copy(key, s.salt) + copy(key[n:], hash) + rehash := sha256.Sum256(key) + return rehash[:] +} + +func (s *BloomMarkSet) Mark(cid cid.Cid) error { + s.bf.Add(s.saltedKey(cid)) + return nil +} + +func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) { + return s.bf.Has(s.saltedKey(cid)), nil +} + +func (s *BloomMarkSet) Close() error { + return nil +} diff --git a/blockstore/splitstore/markset_bolt.go b/blockstore/splitstore/markset_bolt.go new file mode 100644 index 000000000..cab0dd74a --- /dev/null +++ b/blockstore/splitstore/markset_bolt.go @@ -0,0 +1,81 @@ +package splitstore + +import ( + "time" + + "golang.org/x/xerrors" + + cid "github.com/ipfs/go-cid" + bolt "go.etcd.io/bbolt" +) + +type BoltMarkSetEnv struct { + db *bolt.DB +} + +var _ MarkSetEnv = (*BoltMarkSetEnv)(nil) + +type BoltMarkSet struct { + db *bolt.DB + bucketId []byte +} + +var _ MarkSet = (*BoltMarkSet)(nil) + +func NewBoltMarkSetEnv(path string) (*BoltMarkSetEnv, error) { + db, err := bolt.Open(path, 0644, + &bolt.Options{ + Timeout: 1 * time.Second, + NoSync: true, + }) + if err != nil { + return nil, err + } + + return &BoltMarkSetEnv{db: db}, nil +} + +func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) { + bucketId := []byte(name) + err := e.db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucketId) + if err != nil { + return xerrors.Errorf("error creating bolt db bucket %s: %w", name, err) + } + return nil + }) + + if err != nil { + return nil, err + } + + return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil +} + +func (e *BoltMarkSetEnv) Close() error { + return e.db.Close() +} + +func (s *BoltMarkSet) Mark(cid cid.Cid) error { + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + return b.Put(cid.Hash(), markBytes) + }) +} + +func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + v := b.Get(cid.Hash()) + result = v != nil + return nil + }) + + return result, err +} + +func (s *BoltMarkSet) Close() error { + return s.db.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket(s.bucketId) + }) +} diff --git a/blockstore/splitstore/markset_test.go b/blockstore/splitstore/markset_test.go new file mode 100644 index 000000000..367ab8d06 --- /dev/null +++ b/blockstore/splitstore/markset_test.go @@ -0,0 +1,138 @@ +package splitstore + +import ( + "io/ioutil" + "testing" + + cid "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" +) + +func TestBoltMarkSet(t *testing.T) { + testMarkSet(t, "bolt") +} + +func TestBloomMarkSet(t *testing.T) { + testMarkSet(t, "bloom") +} + +func testMarkSet(t *testing.T, lsType string) { + t.Helper() + + path, err := ioutil.TempDir("", "sweep-test.*") + if err != nil { + t.Fatal(err) + } + + env, err := OpenMarkSetEnv(path, lsType) + if err != nil { + t.Fatal(err) + } + defer env.Close() //nolint:errcheck + + hotSet, err := env.Create("hot", 0) + if err != nil { + t.Fatal(err) + } + + coldSet, err := env.Create("cold", 0) + if err != nil { + t.Fatal(err) + } + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustHave := func(s MarkSet, cid cid.Cid) { + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("mark not found") + } + } + + mustNotHave := func(s MarkSet, cid cid.Cid) { + has, err := s.Has(cid) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("unexpected mark") + } + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + hotSet.Mark(k1) //nolint + hotSet.Mark(k2) //nolint + coldSet.Mark(k3) //nolint + + mustHave(hotSet, k1) + mustHave(hotSet, k2) + mustNotHave(hotSet, k3) + mustNotHave(hotSet, k4) + + mustNotHave(coldSet, k1) + mustNotHave(coldSet, k2) + mustHave(coldSet, k3) + mustNotHave(coldSet, k4) + + // close them and reopen to redo the dance + + err = hotSet.Close() + if err != nil { + t.Fatal(err) + } + + err = coldSet.Close() + if err != nil { + t.Fatal(err) + } + + hotSet, err = env.Create("hot", 0) + if err != nil { + t.Fatal(err) + } + + coldSet, err = env.Create("cold", 0) + if err != nil { + t.Fatal(err) + } + + hotSet.Mark(k3) //nolint + hotSet.Mark(k4) //nolint + coldSet.Mark(k1) //nolint + + mustNotHave(hotSet, k1) + mustNotHave(hotSet, k2) + mustHave(hotSet, k3) + mustHave(hotSet, k4) + + mustHave(coldSet, k1) + mustNotHave(coldSet, k2) + mustNotHave(coldSet, k3) + mustNotHave(coldSet, k4) + + err = hotSet.Close() + if err != nil { + t.Fatal(err) + } + + err = coldSet.Close() + if err != nil { + t.Fatal(err) + } +} diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go new file mode 100644 index 000000000..fb3e28803 --- /dev/null +++ b/blockstore/splitstore/splitstore.go @@ -0,0 +1,1069 @@ +package splitstore + +import ( + "context" + "encoding/binary" + "errors" + "sync" + "sync/atomic" + "time" + + "go.uber.org/multierr" + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + dstore "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/go-state-types/abi" + + bstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" + + "go.opencensus.io/stats" +) + +var ( + // CompactionThreshold is the number of epochs that need to have elapsed + // from the previously compacted epoch to trigger a new compaction. + // + // |················· CompactionThreshold ··················| + // | | + // =======‖≡≡≡≡≡≡≡‖-----------------------|------------------------» + // | | | chain --> ↑__ current epoch + // |·······| | + // ↑________ CompactionCold ↑________ CompactionBoundary + // + // === :: cold (already archived) + // ≡≡≡ :: to be archived in this compaction + // --- :: hot + CompactionThreshold = 5 * build.Finality + + // CompactionCold is the number of epochs that will be archived to the + // cold store on compaction. See diagram on CompactionThreshold for a + // better sense. + CompactionCold = build.Finality + + // CompactionBoundary is the number of epochs from the current epoch at which + // we will walk the chain for live objects + CompactionBoundary = 2 * build.Finality +) + +var ( + // baseEpochKey stores the base epoch (last compaction epoch) in the + // metadata store. + baseEpochKey = dstore.NewKey("/splitstore/baseEpoch") + + // warmupEpochKey stores whether a hot store warmup has been performed. + // On first start, the splitstore will walk the state tree and will copy + // all active blocks into the hotstore. + warmupEpochKey = dstore.NewKey("/splitstore/warmupEpoch") + + // markSetSizeKey stores the current estimate for the mark set size. + // this is first computed at warmup and updated in every compaction + markSetSizeKey = dstore.NewKey("/splitstore/markSetSize") + + log = logging.Logger("splitstore") +) + +const ( + batchSize = 16384 + + defaultColdPurgeSize = 7_000_000 + defaultDeadPurgeSize = 1_000_000 +) + +type Config struct { + // TrackingStore is the type of tracking store to use. + // + // Supported values are: "bolt" (default if omitted), "mem" (for tests and readonly access). + TrackingStoreType string + + // MarkSetType is the type of mark set to use. + // + // Supported values are: "bloom" (default if omitted), "bolt". + MarkSetType string + // perform full reachability analysis (expensive) for compaction + // You should enable this option if you plan to use the splitstore without a backing coldstore + EnableFullCompaction bool + // EXPERIMENTAL enable pruning of unreachable objects. + // This has not been sufficiently tested yet; only enable if you know what you are doing. + // Only applies if you enable full compaction. + EnableGC bool + // full archival nodes should enable this if EnableFullCompaction is enabled + // do NOT enable this if you synced from a snapshot. + // Only applies if you enabled full compaction + Archival bool +} + +// ChainAccessor allows the Splitstore to access the chain. It will most likely +// be a ChainStore at runtime. +type ChainAccessor interface { + GetTipsetByHeight(context.Context, abi.ChainEpoch, *types.TipSet, bool) (*types.TipSet, error) + GetHeaviestTipSet() *types.TipSet + SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) + WalkSnapshot(context.Context, *types.TipSet, abi.ChainEpoch, bool, bool, func(cid.Cid) error) error +} + +type SplitStore struct { + compacting int32 // compaction (or warmp up) in progress + critsection int32 // compaction critical section + closing int32 // the split store is closing + + fullCompaction bool + enableGC bool + skipOldMsgs bool + skipMsgReceipts bool + + baseEpoch abi.ChainEpoch + warmupEpoch abi.ChainEpoch + + coldPurgeSize int + deadPurgeSize int + + mx sync.Mutex + curTs *types.TipSet + + chain ChainAccessor + ds dstore.Datastore + hot bstore.Blockstore + cold bstore.Blockstore + tracker TrackingStore + + env MarkSetEnv + + markSetSize int64 +} + +var _ bstore.Blockstore = (*SplitStore)(nil) + +// Open opens an existing splistore, or creates a new splitstore. The splitstore +// is backed by the provided hot and cold stores. The returned SplitStore MUST be +// attached to the ChainStore with Start in order to trigger compaction. +func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) { + // the tracking store + tracker, err := OpenTrackingStore(path, cfg.TrackingStoreType) + if err != nil { + return nil, err + } + + // the markset env + env, err := OpenMarkSetEnv(path, cfg.MarkSetType) + if err != nil { + _ = tracker.Close() + return nil, err + } + + // and now we can make a SplitStore + ss := &SplitStore{ + ds: ds, + hot: hot, + cold: cold, + tracker: tracker, + env: env, + + fullCompaction: cfg.EnableFullCompaction, + enableGC: cfg.EnableGC, + skipOldMsgs: !(cfg.EnableFullCompaction && cfg.Archival), + skipMsgReceipts: !(cfg.EnableFullCompaction && cfg.Archival), + + coldPurgeSize: defaultColdPurgeSize, + } + + if cfg.EnableGC { + ss.deadPurgeSize = defaultDeadPurgeSize + } + + return ss, nil +} + +// Blockstore interface +func (s *SplitStore) DeleteBlock(_ cid.Cid) error { + // afaict we don't seem to be using this method, so it's not implemented + return errors.New("DeleteBlock not implemented on SplitStore; don't do this Luke!") //nolint +} + +func (s *SplitStore) DeleteMany(_ []cid.Cid) error { + // afaict we don't seem to be using this method, so it's not implemented + return errors.New("DeleteMany not implemented on SplitStore; don't do this Luke!") //nolint +} + +func (s *SplitStore) Has(cid cid.Cid) (bool, error) { + has, err := s.hot.Has(cid) + + if err != nil || has { + return has, err + } + + return s.cold.Has(cid) +} + +func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { + blk, err := s.hot.Get(cid) + + switch err { + case nil: + return blk, nil + + case bstore.ErrNotFound: + blk, err = s.cold.Get(cid) + if err == nil { + stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + } + return blk, err + + default: + return nil, err + } +} + +func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { + size, err := s.hot.GetSize(cid) + + switch err { + case nil: + return size, nil + + case bstore.ErrNotFound: + size, err = s.cold.GetSize(cid) + if err == nil { + stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + } + return size, err + + default: + return 0, err + } +} + +func (s *SplitStore) Put(blk blocks.Block) error { + s.mx.Lock() + if s.curTs == nil { + s.mx.Unlock() + return s.cold.Put(blk) + } + + epoch := s.curTs.Height() + s.mx.Unlock() + + err := s.tracker.Put(blk.Cid(), epoch) + if err != nil { + log.Errorf("error tracking CID in hotstore: %s; falling back to coldstore", err) + return s.cold.Put(blk) + } + + return s.hot.Put(blk) +} + +func (s *SplitStore) PutMany(blks []blocks.Block) error { + s.mx.Lock() + if s.curTs == nil { + s.mx.Unlock() + return s.cold.PutMany(blks) + } + + epoch := s.curTs.Height() + s.mx.Unlock() + + batch := make([]cid.Cid, 0, len(blks)) + for _, blk := range blks { + batch = append(batch, blk.Cid()) + } + + err := s.tracker.PutBatch(batch, epoch) + if err != nil { + log.Errorf("error tracking CIDs in hotstore: %s; falling back to coldstore", err) + return s.cold.PutMany(blks) + } + + return s.hot.PutMany(blks) +} + +func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + ctx, cancel := context.WithCancel(ctx) + + chHot, err := s.hot.AllKeysChan(ctx) + if err != nil { + cancel() + return nil, err + } + + chCold, err := s.cold.AllKeysChan(ctx) + if err != nil { + cancel() + return nil, err + } + + ch := make(chan cid.Cid) + go func() { + defer cancel() + defer close(ch) + + for _, in := range []<-chan cid.Cid{chHot, chCold} { + for cid := range in { + select { + case ch <- cid: + case <-ctx.Done(): + return + } + } + } + }() + + return ch, nil +} + +func (s *SplitStore) HashOnRead(enabled bool) { + s.hot.HashOnRead(enabled) + s.cold.HashOnRead(enabled) +} + +func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { + err := s.hot.View(cid, cb) + switch err { + case bstore.ErrNotFound: + return s.cold.View(cid, cb) + + default: + return err + } +} + +// State tracking +func (s *SplitStore) Start(chain ChainAccessor) error { + s.chain = chain + s.curTs = chain.GetHeaviestTipSet() + + // load base epoch from metadata ds + // if none, then use current epoch because it's a fresh start + bs, err := s.ds.Get(baseEpochKey) + switch err { + case nil: + s.baseEpoch = bytesToEpoch(bs) + + case dstore.ErrNotFound: + if s.curTs == nil { + // this can happen in some tests + break + } + + err = s.setBaseEpoch(s.curTs.Height()) + if err != nil { + return xerrors.Errorf("error saving base epoch: %w", err) + } + + default: + return xerrors.Errorf("error loading base epoch: %w", err) + } + + // load warmup epoch from metadata ds + // if none, then the splitstore will warm up the hotstore at first head change notif + // by walking the current tipset + bs, err = s.ds.Get(warmupEpochKey) + switch err { + case nil: + s.warmupEpoch = bytesToEpoch(bs) + + case dstore.ErrNotFound: + default: + return xerrors.Errorf("error loading warmup epoch: %w", err) + } + + // load markSetSize from metadata ds + // if none, the splitstore will compute it during warmup and update in every compaction + bs, err = s.ds.Get(markSetSizeKey) + switch err { + case nil: + s.markSetSize = bytesToInt64(bs) + + case dstore.ErrNotFound: + default: + return xerrors.Errorf("error loading mark set size: %w", err) + } + + log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch) + + // watch the chain + chain.SubscribeHeadChanges(s.HeadChange) + + return nil +} + +func (s *SplitStore) Close() error { + atomic.StoreInt32(&s.closing, 1) + + if atomic.LoadInt32(&s.critsection) == 1 { + log.Warn("ongoing compaction in critical section; waiting for it to finish...") + for atomic.LoadInt32(&s.critsection) == 1 { + time.Sleep(time.Second) + } + } + + return multierr.Combine(s.tracker.Close(), s.env.Close()) +} + +func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { + s.mx.Lock() + curTs := apply[len(apply)-1] + epoch := curTs.Height() + s.curTs = curTs + s.mx.Unlock() + + if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { + // we are currently compacting, do nothing and wait for the next head change + return nil + } + + if s.warmupEpoch == 0 { + // splitstore needs to warm up + go func() { + defer atomic.StoreInt32(&s.compacting, 0) + + log.Info("warming up hotstore") + start := time.Now() + + s.warmup(curTs) + + log.Infow("warm up done", "took", time.Since(start)) + }() + + return nil + } + + if epoch-s.baseEpoch > CompactionThreshold { + // it's time to compact + go func() { + defer atomic.StoreInt32(&s.compacting, 0) + + log.Info("compacting splitstore") + start := time.Now() + + s.compact(curTs) + + log.Infow("compaction done", "took", time.Since(start)) + }() + } else { + // no compaction necessary + atomic.StoreInt32(&s.compacting, 0) + } + + return nil +} + +func (s *SplitStore) warmup(curTs *types.TipSet) { + epoch := curTs.Height() + + batchHot := make([]blocks.Block, 0, batchSize) + batchSnoop := make([]cid.Cid, 0, batchSize) + + count := int64(0) + err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + count++ + + has, err := s.hot.Has(cid) + if err != nil { + return err + } + + if has { + return nil + } + + blk, err := s.cold.Get(cid) + if err != nil { + return err + } + + batchHot = append(batchHot, blk) + batchSnoop = append(batchSnoop, cid) + + if len(batchHot) == batchSize { + err = s.tracker.PutBatch(batchSnoop, epoch) + if err != nil { + return err + } + batchSnoop = batchSnoop[:0] + + err = s.hot.PutMany(batchHot) + if err != nil { + return err + } + batchHot = batchHot[:0] + } + + return nil + }) + + if err != nil { + log.Errorf("error warming up splitstore: %s", err) + return + } + + if len(batchHot) > 0 { + err = s.tracker.PutBatch(batchSnoop, epoch) + if err != nil { + log.Errorf("error warming up splitstore: %s", err) + return + } + + err = s.hot.PutMany(batchHot) + if err != nil { + log.Errorf("error warming up splitstore: %s", err) + return + } + } + + if count > s.markSetSize { + s.markSetSize = count + count>>2 // overestimate a bit + } + + // save the warmup epoch + s.warmupEpoch = epoch + err = s.ds.Put(warmupEpochKey, epochToBytes(epoch)) + if err != nil { + log.Errorf("error saving warmup epoch: %s", err) + } + + err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) + if err != nil { + log.Errorf("error saving mark set size: %s", err) + } +} + +// Compaction/GC Algorithm +func (s *SplitStore) compact(curTs *types.TipSet) { + var err error + if s.markSetSize == 0 { + start := time.Now() + log.Info("estimating mark set size") + err = s.estimateMarkSetSize(curTs) + if err != nil { + log.Errorf("error estimating mark set size: %s; aborting compaction", err) + return + } + log.Infow("estimating mark set size done", "took", time.Since(start), "size", s.markSetSize) + } else { + log.Infow("current mark set size estimate", "size", s.markSetSize) + } + + start := time.Now() + if s.fullCompaction { + err = s.compactFull(curTs) + } else { + err = s.compactSimple(curTs) + } + took := time.Since(start).Milliseconds() + stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) + + if err != nil { + log.Errorf("COMPACTION ERROR: %s", err) + } +} + +func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error { + var count int64 + err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + count++ + return nil + }) + + if err != nil { + return err + } + + s.markSetSize = count + count>>2 // overestimate a bit + return nil +} + +func (s *SplitStore) compactSimple(curTs *types.TipSet) error { + coldEpoch := s.baseEpoch + CompactionCold + currentEpoch := curTs.Height() + boundaryEpoch := currentEpoch - CompactionBoundary + + log.Infow("running simple compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch) + + coldSet, err := s.env.Create("cold", s.markSetSize) + if err != nil { + return xerrors.Errorf("error creating mark set: %w", err) + } + defer coldSet.Close() //nolint:errcheck + + // 1. mark reachable cold objects by looking at the objects reachable only from the cold epoch + log.Infow("marking reachable cold blocks", "boundaryEpoch", boundaryEpoch) + startMark := time.Now() + + boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) + if err != nil { + return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) + } + + var count int64 + err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + count++ + return coldSet.Mark(cid) + }) + + if err != nil { + return xerrors.Errorf("error marking cold blocks: %w", err) + } + + if count > s.markSetSize { + s.markSetSize = count + count>>2 // overestimate a bit + } + + log.Infow("marking done", "took", time.Since(startMark)) + + // 2. move cold unreachable objects to the coldstore + log.Info("collecting cold objects") + startCollect := time.Now() + + cold := make([]cid.Cid, 0, s.coldPurgeSize) + + // some stats for logging + var hotCnt, coldCnt int + + // 2.1 iterate through the tracking store and collect unreachable cold objects + err = s.tracker.ForEach(func(cid cid.Cid, writeEpoch abi.ChainEpoch) error { + // is the object still hot? + if writeEpoch > coldEpoch { + // yes, stay in the hotstore + hotCnt++ + return nil + } + + // check whether it is reachable in the cold boundary + mark, err := coldSet.Has(cid) + if err != nil { + return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err) + } + + if mark { + hotCnt++ + return nil + } + + // it's cold, mark it for move + cold = append(cold, cid) + coldCnt++ + return nil + }) + + if err != nil { + return xerrors.Errorf("error collecting cold objects: %w", err) + } + + if coldCnt > 0 { + s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit + } + + log.Infow("collection done", "took", time.Since(startCollect)) + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) + stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) + stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + + // Enter critical section + atomic.StoreInt32(&s.critsection, 1) + defer atomic.StoreInt32(&s.critsection, 0) + + // check to see if we are closing first; if that's the case just return + if atomic.LoadInt32(&s.closing) == 1 { + log.Info("splitstore is closing; aborting compaction") + return xerrors.Errorf("compaction aborted") + } + + // 2.2 copy the cold objects to the coldstore + log.Info("moving cold blocks to the coldstore") + startMove := time.Now() + err = s.moveColdBlocks(cold) + if err != nil { + return xerrors.Errorf("error moving cold blocks: %w", err) + } + log.Infow("moving done", "took", time.Since(startMove)) + + // 2.3 delete cold objects from the hotstore + log.Info("purging cold objects from the hotstore") + startPurge := time.Now() + err = s.purgeBlocks(cold) + if err != nil { + return xerrors.Errorf("error purging cold blocks: %w", err) + } + log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) + + // 2.4 remove the tracker tracking for cold objects + startPurge = time.Now() + log.Info("purging cold objects from tracker") + err = s.purgeTracking(cold) + if err != nil { + return xerrors.Errorf("error purging tracking for cold blocks: %w", err) + } + log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) + + // we are done; do some housekeeping + err = s.tracker.Sync() + if err != nil { + return xerrors.Errorf("error syncing tracker: %w", err) + } + + s.gcHotstore() + + err = s.setBaseEpoch(coldEpoch) + if err != nil { + return xerrors.Errorf("error saving base epoch: %w", err) + } + + err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) + if err != nil { + return xerrors.Errorf("error saving mark set size: %w", err) + } + + return nil +} + +func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error { + batch := make([]blocks.Block, 0, batchSize) + + for _, cid := range cold { + blk, err := s.hot.Get(cid) + if err != nil { + if err == dstore.ErrNotFound { + // this can happen if the node is killed after we have deleted the block from the hotstore + // but before we have deleted it from the tracker; just delete the tracker. + err = s.tracker.Delete(cid) + if err != nil { + return xerrors.Errorf("error deleting unreachable cid %s from tracker: %w", cid, err) + } + } else { + return xerrors.Errorf("error retrieving tracked block %s from hotstore: %w", cid, err) + } + + continue + } + + batch = append(batch, blk) + if len(batch) == batchSize { + err = s.cold.PutMany(batch) + if err != nil { + return xerrors.Errorf("error putting batch to coldstore: %w", err) + } + batch = batch[:0] + } + } + + if len(batch) > 0 { + err := s.cold.PutMany(batch) + if err != nil { + return xerrors.Errorf("error putting cold to coldstore: %w", err) + } + } + + return nil +} + +func (s *SplitStore) purgeBatch(cids []cid.Cid, deleteBatch func([]cid.Cid) error) error { + if len(cids) == 0 { + return nil + } + + // don't delete one giant batch of 7M objects, but rather do smaller batches + done := false + for i := 0; !done; i++ { + start := i * batchSize + end := start + batchSize + if end >= len(cids) { + end = len(cids) + done = true + } + + err := deleteBatch(cids[start:end]) + if err != nil { + return xerrors.Errorf("error deleting batch: %w", err) + } + } + + return nil +} + +func (s *SplitStore) purgeBlocks(cids []cid.Cid) error { + return s.purgeBatch(cids, s.hot.DeleteMany) +} + +func (s *SplitStore) purgeTracking(cids []cid.Cid) error { + return s.purgeBatch(cids, s.tracker.DeleteBatch) +} + +func (s *SplitStore) gcHotstore() { + if gc, ok := s.hot.(interface{ CollectGarbage() error }); ok { + log.Infof("garbage collecting hotstore") + startGC := time.Now() + err := gc.CollectGarbage() + if err != nil { + log.Warnf("error garbage collecting hotstore: %s", err) + } else { + log.Infow("garbage collection done", "took", time.Since(startGC)) + } + } +} + +func (s *SplitStore) compactFull(curTs *types.TipSet) error { + currentEpoch := curTs.Height() + coldEpoch := s.baseEpoch + CompactionCold + boundaryEpoch := currentEpoch - CompactionBoundary + + log.Infow("running full compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "coldEpoch", coldEpoch, "boundaryEpoch", boundaryEpoch) + + // create two mark sets, one for marking the cold finality region + // and one for marking the hot region + hotSet, err := s.env.Create("hot", s.markSetSize) + if err != nil { + return xerrors.Errorf("error creating hot mark set: %w", err) + } + defer hotSet.Close() //nolint:errcheck + + coldSet, err := s.env.Create("cold", s.markSetSize) + if err != nil { + return xerrors.Errorf("error creating cold mark set: %w", err) + } + defer coldSet.Close() //nolint:errcheck + + // Phase 1: marking + log.Info("marking live blocks") + startMark := time.Now() + + // Phase 1a: mark all reachable CIDs in the hot range + boundaryTs, err := s.chain.GetTipsetByHeight(context.Background(), boundaryEpoch, curTs, true) + if err != nil { + return xerrors.Errorf("error getting tipset at boundary epoch: %w", err) + } + + count := int64(0) + err = s.chain.WalkSnapshot(context.Background(), boundaryTs, boundaryEpoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + count++ + return hotSet.Mark(cid) + }) + + if err != nil { + return xerrors.Errorf("error marking hot blocks: %w", err) + } + + if count > s.markSetSize { + s.markSetSize = count + count>>2 // overestimate a bit + } + + // Phase 1b: mark all reachable CIDs in the cold range + coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true) + if err != nil { + return xerrors.Errorf("error getting tipset at cold epoch: %w", err) + } + + count = 0 + err = s.chain.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts, + func(cid cid.Cid) error { + count++ + return coldSet.Mark(cid) + }) + + if err != nil { + return xerrors.Errorf("error marking cold blocks: %w", err) + } + + if count > s.markSetSize { + s.markSetSize = count + count>>2 // overestimate a bit + } + + log.Infow("marking done", "took", time.Since(startMark)) + + // Phase 2: sweep cold objects: + // - If a cold object is reachable in the hot range, it stays in the hotstore. + // - If a cold object is reachable in the cold range, it is moved to the coldstore. + // - If a cold object is unreachable, it is deleted if GC is enabled, otherwise moved to the coldstore. + log.Info("collecting cold objects") + startCollect := time.Now() + + // some stats for logging + var hotCnt, coldCnt, deadCnt int + + cold := make([]cid.Cid, 0, s.coldPurgeSize) + dead := make([]cid.Cid, 0, s.deadPurgeSize) + + // 2.1 iterate through the tracker and collect cold and dead objects + err = s.tracker.ForEach(func(cid cid.Cid, wrEpoch abi.ChainEpoch) error { + // is the object stil hot? + if wrEpoch > coldEpoch { + // yes, stay in the hotstore + hotCnt++ + return nil + } + + // the object is cold -- check whether it is reachable in the hot range + mark, err := hotSet.Has(cid) + if err != nil { + return xerrors.Errorf("error checking live mark for %s: %w", cid, err) + } + + if mark { + // the object is reachable in the hot range, stay in the hotstore + hotCnt++ + return nil + } + + // check whether it is reachable in the cold range + mark, err = coldSet.Has(cid) + if err != nil { + return xerrors.Errorf("error checkiing cold set for %s: %w", cid, err) + } + + if s.enableGC { + if mark { + // the object is reachable in the cold range, move it to the cold store + cold = append(cold, cid) + coldCnt++ + } else { + // the object is dead and will be deleted + dead = append(dead, cid) + deadCnt++ + } + } else { + // if GC is disabled, we move both cold and dead objects to the coldstore + cold = append(cold, cid) + if mark { + coldCnt++ + } else { + deadCnt++ + } + } + + return nil + }) + + if err != nil { + return xerrors.Errorf("error collecting cold objects: %w", err) + } + + if coldCnt > 0 { + s.coldPurgeSize = coldCnt + coldCnt>>2 // overestimate a bit + } + if deadCnt > 0 { + s.deadPurgeSize = deadCnt + deadCnt>>2 // overestimate a bit + } + + log.Infow("collection done", "took", time.Since(startCollect)) + log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "dead", deadCnt) + stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) + stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + stats.Record(context.Background(), metrics.SplitstoreCompactionDead.M(int64(deadCnt))) + + // Enter critical section + atomic.StoreInt32(&s.critsection, 1) + defer atomic.StoreInt32(&s.critsection, 0) + + // check to see if we are closing first; if that's the case just return + if atomic.LoadInt32(&s.closing) == 1 { + log.Info("splitstore is closing; aborting compaction") + return xerrors.Errorf("compaction aborted") + } + + // 2.2 copy the cold objects to the coldstore + log.Info("moving cold objects to the coldstore") + startMove := time.Now() + err = s.moveColdBlocks(cold) + if err != nil { + return xerrors.Errorf("error moving cold blocks: %w", err) + } + log.Infow("moving done", "took", time.Since(startMove)) + + // 2.3 delete cold objects from the hotstore + log.Info("purging cold objects from the hotstore") + startPurge := time.Now() + err = s.purgeBlocks(cold) + if err != nil { + return xerrors.Errorf("error purging cold blocks: %w", err) + } + log.Infow("purging cold from hotstore done", "took", time.Since(startPurge)) + + // 2.4 remove the tracker tracking for cold objects + startPurge = time.Now() + log.Info("purging cold objects from tracker") + err = s.purgeTracking(cold) + if err != nil { + return xerrors.Errorf("error purging tracking for cold blocks: %w", err) + } + log.Infow("purging cold from tracker done", "took", time.Since(startPurge)) + + // 3. if we have dead objects, delete them from the hotstore and remove the tracking + if len(dead) > 0 { + log.Info("deleting dead objects") + err = s.purgeBlocks(dead) + if err != nil { + return xerrors.Errorf("error purging dead blocks: %w", err) + } + + // remove the tracker tracking + startPurge := time.Now() + log.Info("purging dead objects from tracker") + err = s.purgeTracking(dead) + if err != nil { + return xerrors.Errorf("error purging tracking for dead blocks: %w", err) + } + log.Infow("purging dead from tracker done", "took", time.Since(startPurge)) + } + + // we are done; do some housekeeping + err = s.tracker.Sync() + if err != nil { + return xerrors.Errorf("error syncing tracker: %w", err) + } + + s.gcHotstore() + + err = s.setBaseEpoch(coldEpoch) + if err != nil { + return xerrors.Errorf("error saving base epoch: %w", err) + } + + err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize)) + if err != nil { + return xerrors.Errorf("error saving mark set size: %w", err) + } + + return nil +} + +func (s *SplitStore) setBaseEpoch(epoch abi.ChainEpoch) error { + s.baseEpoch = epoch + // write to datastore + return s.ds.Put(baseEpochKey, epochToBytes(epoch)) +} + +func epochToBytes(epoch abi.ChainEpoch) []byte { + return uint64ToBytes(uint64(epoch)) +} + +func bytesToEpoch(buf []byte) abi.ChainEpoch { + return abi.ChainEpoch(bytesToUint64(buf)) +} + +func int64ToBytes(i int64) []byte { + return uint64ToBytes(uint64(i)) +} + +func bytesToInt64(buf []byte) int64 { + return int64(bytesToUint64(buf)) +} + +func uint64ToBytes(i uint64) []byte { + buf := make([]byte, 16) + n := binary.PutUvarint(buf, i) + return buf[:n] +} + +func bytesToUint64(buf []byte) uint64 { + i, _ := binary.Uvarint(buf) + return i +} diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go new file mode 100644 index 000000000..db5144039 --- /dev/null +++ b/blockstore/splitstore/splitstore_test.go @@ -0,0 +1,248 @@ +package splitstore + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + + cid "github.com/ipfs/go-cid" + datastore "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" +) + +func init() { + CompactionThreshold = 5 + CompactionCold = 1 + CompactionBoundary = 2 + logging.SetLogLevel("splitstore", "DEBUG") +} + +func testSplitStore(t *testing.T, cfg *Config) { + t.Helper() + + chain := &mockChain{} + // genesis + genBlock := mock.MkBlock(nil, 0, 0) + genTs := mock.TipSet(genBlock) + chain.push(genTs) + + // the myriads of stores + ds := datastore.NewMapDatastore() + hot := blockstore.NewMemorySync() + cold := blockstore.NewMemorySync() + + // put the genesis block to cold store + blk, err := genBlock.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + err = cold.Put(blk) + if err != nil { + t.Fatal(err) + } + + // open the splitstore + ss, err := Open("", ds, hot, cold, cfg) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + err = ss.Start(chain) + if err != nil { + t.Fatal(err) + } + + // make some tipsets, but not enough to cause compaction + mkBlock := func(curTs *types.TipSet, i int) *types.TipSet { + blk := mock.MkBlock(curTs, uint64(i), uint64(i)) + sblk, err := blk.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + err = ss.Put(sblk) + if err != nil { + t.Fatal(err) + } + ts := mock.TipSet(blk) + chain.push(ts) + + return ts + } + + mkGarbageBlock := func(curTs *types.TipSet, i int) { + blk := mock.MkBlock(curTs, uint64(i), uint64(i)) + sblk, err := blk.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + err = ss.Put(sblk) + if err != nil { + t.Fatal(err) + } + } + + curTs := genTs + for i := 1; i < 5; i++ { + curTs = mkBlock(curTs, i) + } + + mkGarbageBlock(genTs, 1) + + // count objects in the cold and hot stores + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + countBlocks := func(bs blockstore.Blockstore) int { + count := 0 + ch, err := bs.AllKeysChan(ctx) + if err != nil { + t.Fatal(err) + } + for range ch { + count++ + } + return count + } + + coldCnt := countBlocks(cold) + hotCnt := countBlocks(hot) + + if coldCnt != 1 { + t.Fatalf("expected %d blocks, but got %d", 1, coldCnt) + } + + if hotCnt != 4 { + t.Fatalf("expected %d blocks, but got %d", 4, hotCnt) + } + + // trigger a compaction + for i := 5; i < 10; i++ { + curTs = mkBlock(curTs, i) + time.Sleep(time.Second) + } + + coldCnt = countBlocks(cold) + hotCnt = countBlocks(hot) + + if !cfg.EnableFullCompaction { + if coldCnt != 5 { + t.Fatalf("expected %d cold blocks, but got %d", 5, coldCnt) + } + + if hotCnt != 5 { + t.Fatalf("expected %d hot blocks, but got %d", 5, hotCnt) + } + } + + if cfg.EnableFullCompaction && !cfg.EnableGC { + if coldCnt != 3 { + t.Fatalf("expected %d cold blocks, but got %d", 3, coldCnt) + } + + if hotCnt != 7 { + t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt) + } + } + + if cfg.EnableFullCompaction && cfg.EnableGC { + if coldCnt != 2 { + t.Fatalf("expected %d cold blocks, but got %d", 2, coldCnt) + } + + if hotCnt != 7 { + t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt) + } + } +} + +func TestSplitStoreSimpleCompaction(t *testing.T) { + testSplitStore(t, &Config{TrackingStoreType: "mem"}) +} + +func TestSplitStoreFullCompactionWithoutGC(t *testing.T) { + testSplitStore(t, &Config{ + TrackingStoreType: "mem", + EnableFullCompaction: true, + }) +} + +func TestSplitStoreFullCompactionWithGC(t *testing.T) { + testSplitStore(t, &Config{ + TrackingStoreType: "mem", + EnableFullCompaction: true, + EnableGC: true, + }) +} + +type mockChain struct { + sync.Mutex + tipsets []*types.TipSet + listener func(revert []*types.TipSet, apply []*types.TipSet) error +} + +func (c *mockChain) push(ts *types.TipSet) { + c.Lock() + c.tipsets = append(c.tipsets, ts) + c.Unlock() + + if c.listener != nil { + err := c.listener(nil, []*types.TipSet{ts}) + if err != nil { + log.Errorf("mockchain: error dispatching listener: %s", err) + } + } +} + +func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) { + c.Lock() + defer c.Unlock() + + iEpoch := int(epoch) + if iEpoch > len(c.tipsets) { + return nil, fmt.Errorf("bad epoch %d", epoch) + } + + return c.tipsets[iEpoch-1], nil +} + +func (c *mockChain) GetHeaviestTipSet() *types.TipSet { + c.Lock() + defer c.Unlock() + + return c.tipsets[len(c.tipsets)-1] +} + +func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) { + c.listener = change +} + +func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error { + c.Lock() + defer c.Unlock() + + start := int(ts.Height()) - 1 + end := start - int(epochs) + if end < 0 { + end = -1 + } + for i := start; i > end; i-- { + ts := c.tipsets[i] + for _, cid := range ts.Cids() { + err := f(cid) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/blockstore/splitstore/tracking.go b/blockstore/splitstore/tracking.go new file mode 100644 index 000000000..d57fd45ef --- /dev/null +++ b/blockstore/splitstore/tracking.go @@ -0,0 +1,109 @@ +package splitstore + +import ( + "path/filepath" + "sync" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + cid "github.com/ipfs/go-cid" +) + +// TrackingStore is a persistent store that tracks blocks that are added +// to the hotstore, tracking the epoch at which they are written. +type TrackingStore interface { + Put(cid.Cid, abi.ChainEpoch) error + PutBatch([]cid.Cid, abi.ChainEpoch) error + Get(cid.Cid) (abi.ChainEpoch, error) + Delete(cid.Cid) error + DeleteBatch([]cid.Cid) error + ForEach(func(cid.Cid, abi.ChainEpoch) error) error + Sync() error + Close() error +} + +// OpenTrackingStore opens a tracking store of the specified type in the +// specified path. +func OpenTrackingStore(path string, ttype string) (TrackingStore, error) { + switch ttype { + case "", "bolt": + return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt")) + case "mem": + return NewMemTrackingStore(), nil + default: + return nil, xerrors.Errorf("unknown tracking store type %s", ttype) + } +} + +// NewMemTrackingStore creates an in-memory tracking store. +// This is only useful for test or situations where you don't want to open the +// real tracking store (eg concurrent read only access on a node's datastore) +func NewMemTrackingStore() *MemTrackingStore { + return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)} +} + +// MemTrackingStore is a simple in-memory tracking store +type MemTrackingStore struct { + sync.Mutex + tab map[cid.Cid]abi.ChainEpoch +} + +var _ TrackingStore = (*MemTrackingStore)(nil) + +func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { + s.Lock() + defer s.Unlock() + s.tab[cid] = epoch + return nil +} + +func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { + s.Lock() + defer s.Unlock() + for _, cid := range cids { + s.tab[cid] = epoch + } + return nil +} + +func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) { + s.Lock() + defer s.Unlock() + epoch, ok := s.tab[cid] + if ok { + return epoch, nil + } + return 0, xerrors.Errorf("missing tracking epoch for %s", cid) +} + +func (s *MemTrackingStore) Delete(cid cid.Cid) error { + s.Lock() + defer s.Unlock() + delete(s.tab, cid) + return nil +} + +func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error { + s.Lock() + defer s.Unlock() + for _, cid := range cids { + delete(s.tab, cid) + } + return nil +} + +func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { + s.Lock() + defer s.Unlock() + for cid, epoch := range s.tab { + err := f(cid, epoch) + if err != nil { + return err + } + } + return nil +} + +func (s *MemTrackingStore) Sync() error { return nil } +func (s *MemTrackingStore) Close() error { return nil } diff --git a/blockstore/splitstore/tracking_bolt.go b/blockstore/splitstore/tracking_bolt.go new file mode 100644 index 000000000..c5c451e15 --- /dev/null +++ b/blockstore/splitstore/tracking_bolt.go @@ -0,0 +1,120 @@ +package splitstore + +import ( + "time" + + "golang.org/x/xerrors" + + cid "github.com/ipfs/go-cid" + bolt "go.etcd.io/bbolt" + + "github.com/filecoin-project/go-state-types/abi" +) + +type BoltTrackingStore struct { + db *bolt.DB + bucketId []byte +} + +var _ TrackingStore = (*BoltTrackingStore)(nil) + +func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) { + opts := &bolt.Options{ + Timeout: 1 * time.Second, + NoSync: true, + } + db, err := bolt.Open(path, 0644, opts) + if err != nil { + return nil, err + } + + bucketId := []byte("tracker") + err = db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucketId) + if err != nil { + return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err) + } + return nil + }) + + if err != nil { + _ = db.Close() + return nil, err + } + + return &BoltTrackingStore{db: db, bucketId: bucketId}, nil +} + +func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { + val := epochToBytes(epoch) + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + return b.Put(cid.Hash(), val) + }) +} + +func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { + val := epochToBytes(epoch) + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + for _, cid := range cids { + err := b.Put(cid.Hash(), val) + if err != nil { + return err + } + } + return nil + }) +} + +func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) { + err = s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + val := b.Get(cid.Hash()) + if val == nil { + return xerrors.Errorf("missing tracking epoch for %s", cid) + } + epoch = bytesToEpoch(val) + return nil + }) + return epoch, err +} + +func (s *BoltTrackingStore) Delete(cid cid.Cid) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + return b.Delete(cid.Hash()) + }) +} + +func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error { + return s.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + for _, cid := range cids { + err := b.Delete(cid.Hash()) + if err != nil { + return xerrors.Errorf("error deleting %s", cid) + } + } + return nil + }) +} + +func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { + return s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(s.bucketId) + return b.ForEach(func(k, v []byte) error { + cid := cid.NewCidV1(cid.Raw, k) + epoch := bytesToEpoch(v) + return f(cid, epoch) + }) + }) +} + +func (s *BoltTrackingStore) Sync() error { + return s.db.Sync() +} + +func (s *BoltTrackingStore) Close() error { + return s.db.Close() +} diff --git a/blockstore/splitstore/tracking_test.go b/blockstore/splitstore/tracking_test.go new file mode 100644 index 000000000..afd475da5 --- /dev/null +++ b/blockstore/splitstore/tracking_test.go @@ -0,0 +1,130 @@ +package splitstore + +import ( + "io/ioutil" + "testing" + + cid "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + + "github.com/filecoin-project/go-state-types/abi" +) + +func TestBoltTrackingStore(t *testing.T) { + testTrackingStore(t, "bolt") +} + +func testTrackingStore(t *testing.T, tsType string) { + t.Helper() + + makeCid := func(key string) cid.Cid { + h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1) + if err != nil { + t.Fatal(err) + } + + return cid.NewCidV1(cid.Raw, h) + } + + mustHave := func(s TrackingStore, cid cid.Cid, epoch abi.ChainEpoch) { + val, err := s.Get(cid) + if err != nil { + t.Fatal(err) + } + + if val != epoch { + t.Fatal("epoch mismatch") + } + } + + mustNotHave := func(s TrackingStore, cid cid.Cid) { + _, err := s.Get(cid) + if err == nil { + t.Fatal("expected error") + } + } + + path, err := ioutil.TempDir("", "snoop-test.*") + if err != nil { + t.Fatal(err) + } + + s, err := OpenTrackingStore(path, tsType) + if err != nil { + t.Fatal(err) + } + + k1 := makeCid("a") + k2 := makeCid("b") + k3 := makeCid("c") + k4 := makeCid("d") + + s.Put(k1, 1) //nolint + s.Put(k2, 2) //nolint + s.Put(k3, 3) //nolint + s.Put(k4, 4) //nolint + + mustHave(s, k1, 1) + mustHave(s, k2, 2) + mustHave(s, k3, 3) + mustHave(s, k4, 4) + + s.Delete(k1) // nolint + s.Delete(k2) // nolint + + mustNotHave(s, k1) + mustNotHave(s, k2) + mustHave(s, k3, 3) + mustHave(s, k4, 4) + + s.PutBatch([]cid.Cid{k1}, 1) //nolint + s.PutBatch([]cid.Cid{k2}, 2) //nolint + + mustHave(s, k1, 1) + mustHave(s, k2, 2) + mustHave(s, k3, 3) + mustHave(s, k4, 4) + + allKeys := map[string]struct{}{ + k1.String(): {}, + k2.String(): {}, + k3.String(): {}, + k4.String(): {}, + } + + err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error { + _, ok := allKeys[k.String()] + if !ok { + t.Fatal("unexpected key") + } + + delete(allKeys, k.String()) + return nil + }) + + if err != nil { + t.Fatal(err) + } + + if len(allKeys) != 0 { + t.Fatal("not all keys were returned") + } + + // no close and reopen and ensure the keys still exist + err = s.Close() + if err != nil { + t.Fatal(err) + } + + s, err = OpenTrackingStore(path, tsType) + if err != nil { + t.Fatal(err) + } + + mustHave(s, k1, 1) + mustHave(s, k2, 2) + mustHave(s, k3, 3) + mustHave(s, k4, 4) + + s.Close() //nolint:errcheck +} diff --git a/blockstore/sync.go b/blockstore/sync.go index 2da71a898..848ccd19d 100644 --- a/blockstore/sync.go +++ b/blockstore/sync.go @@ -26,6 +26,12 @@ func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error { return m.bs.DeleteBlock(k) } +func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.bs.DeleteMany(ks) +} + func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/blockstore/timed.go b/blockstore/timed.go index 138375028..ce25bb5bc 100644 --- a/blockstore/timed.go +++ b/blockstore/timed.go @@ -153,6 +153,12 @@ func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error { return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k)) } +func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error { + t.mu.Lock() + defer t.mu.Unlock() + return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks)) +} + func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) { t.mu.RLock() defer t.mu.RUnlock() diff --git a/blockstore/union.go b/blockstore/union.go index dfe5ea70c..a99ba2591 100644 --- a/blockstore/union.go +++ b/blockstore/union.go @@ -82,6 +82,15 @@ func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) { return err } +func (m unionBlockstore) DeleteMany(cids []cid.Cid) (err error) { + for _, bs := range m { + if err = bs.DeleteMany(cids); err != nil { + break + } + } + return err +} + func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // this does not deduplicate; this interface needs to be revisited. outCh := make(chan cid.Cid) diff --git a/chain/store/store.go b/chain/store/store.go index e0660495e..e0d71f030 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -81,7 +81,7 @@ func init() { } // ReorgNotifee represents a callback that gets called upon reorgs. -type ReorgNotifee func(rev, app []*types.TipSet) error +type ReorgNotifee = func(rev, app []*types.TipSet) error // Journal event types. const ( diff --git a/go.mod b/go.mod index 8ed943bd8..b585a8892 100644 --- a/go.mod +++ b/go.mod @@ -137,6 +137,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 + go.etcd.io/bbolt v1.3.4 go.opencensus.io v0.22.5 go.uber.org/dig v1.10.0 // indirect go.uber.org/fx v1.9.0 diff --git a/go.sum b/go.sum index 202fcef65..9aa8f5682 100644 --- a/go.sum +++ b/go.sum @@ -241,6 +241,8 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349/ github.com/filecoin-project/go-amt-ipld/v3 v3.0.0 h1:Ou/q82QeHGOhpkedvaxxzpBYuqTxLCcj5OChkDNx4qc= github.com/filecoin-project/go-amt-ipld/v3 v3.0.0/go.mod h1:Qa95YNAbtoVCTSVtX38aAC1ptBnJfPma1R/zZsKmx4o= github.com/filecoin-project/go-bitfield v0.2.0/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= +github.com/filecoin-project/go-bitfield v0.2.3 h1:pedK/7maYF06Z+BYJf2OeFFqIDEh6SP6mIOlLFpYXGs= +github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW8p9au0C68JPgk= github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= @@ -1934,6 +1936,8 @@ howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCU modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= +modernc.org/golex v1.0.0 h1:wWpDlbK8ejRfSyi0frMyhilD3JBvtcx2AdGDnU+JtsE= +modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/golex v1.0.1 h1:EYKY1a3wStt0RzHaH8mdSRNg78Ub0OHxYfCRWw35YtM= modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= diff --git a/metrics/metrics.go b/metrics/metrics.go index 45869ea91..5428a81bc 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -82,6 +82,13 @@ var ( WorkerCallsReturnedCount = stats.Int64("sealing/worker_calls_returned_count", "Counter of returned worker tasks", stats.UnitDimensionless) WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds) WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless) + + // splitstore + SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless) + SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds) + SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless) + SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless) + SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless) ) var ( @@ -222,6 +229,28 @@ var ( Aggregation: workMillisecondsDistribution, TagKeys: []tag.Key{TaskType, WorkerHostname}, } + + // splitstore + SplitstoreMissView = &view.View{ + Measure: SplitstoreMiss, + Aggregation: view.Count(), + } + SplitstoreCompactionTimeSecondsView = &view.View{ + Measure: SplitstoreCompactionTimeSeconds, + Aggregation: view.LastValue(), + } + SplitstoreCompactionHotView = &view.View{ + Measure: SplitstoreCompactionHot, + Aggregation: view.LastValue(), + } + SplitstoreCompactionColdView = &view.View{ + Measure: SplitstoreCompactionCold, + Aggregation: view.Sum(), + } + SplitstoreCompactionDeadView = &view.View{ + Measure: SplitstoreCompactionDead, + Aggregation: view.Sum(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -258,6 +287,11 @@ var ChainNodeViews = append([]*view.View{ PubsubDropRPCView, VMFlushCopyCountView, VMFlushCopyDurationView, + SplitstoreMissView, + SplitstoreCompactionTimeSecondsView, + SplitstoreCompactionHotView, + SplitstoreCompactionColdView, + SplitstoreCompactionDeadView, }, DefaultViews...) var MinerNodeViews = append([]*view.View{ diff --git a/node/builder.go b/node/builder.go index b9f2e85bf..47e685543 100644 --- a/node/builder.go +++ b/node/builder.go @@ -586,14 +586,39 @@ func Repo(r repo.Repo) Option { return err } + var cfg *config.Chainstore + switch settings.nodeType { + case repo.FullNode: + cfgp, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config from repo, got: %T", c) + } + cfg = &cfgp.Chainstore + default: + cfg = &config.Chainstore{} + } + return Options( Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), - Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), - Override(new(dtypes.StateBlockstore), modules.StateBlockstore), - Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + + If(cfg.EnableSplitstore, + If(cfg.Splitstore.HotStoreType == "badger", + Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)), + Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)), + Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateSplitBlockstore), + Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))), + ), + If(!cfg.EnableSplitstore, + Override(new(dtypes.ChainBlockstore), modules.ChainFlatBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateFlatBlockstore), + Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + ), If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1", Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore), diff --git a/node/config/def.go b/node/config/def.go index fa4ed412e..f5f293cbd 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -20,10 +20,11 @@ type Common struct { // FullNode is a full node config type FullNode struct { Common - Client Client - Metrics Metrics - Wallet Wallet - Fees FeeConfig + Client Client + Metrics Metrics + Wallet Wallet + Fees FeeConfig + Chainstore Chainstore } // // Common @@ -120,6 +121,20 @@ type Pubsub struct { RemoteTracer string } +type Chainstore struct { + EnableSplitstore bool + Splitstore Splitstore +} + +type Splitstore struct { + HotStoreType string + TrackingStoreType string + MarkSetType string + EnableFullCompaction bool + EnableGC bool // EXPERIMENTAL + Archival bool +} + // // Full Node type Metrics struct { @@ -185,6 +200,12 @@ func DefaultFullNode() *FullNode { Client: Client{ SimultaneousTransfers: DefaultSimultaneousTransfers, }, + Chainstore: Chainstore{ + EnableSplitstore: false, + Splitstore: Splitstore{ + HotStoreType: "badger", + }, + }, } } diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 5b1d2ee63..c1c52fafe 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -3,19 +3,25 @@ package modules import ( "context" "io" + "os" + "path/filepath" bstore "github.com/ipfs/go-ipfs-blockstore" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" + badgerbs "github.com/filecoin-project/lotus/blockstore/badger" + "github.com/filecoin-project/lotus/blockstore/splitstore" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" ) // UniversalBlockstore returns a single universal blockstore that stores both -// chain data and state data. +// chain data and state data. It can be backed by a blockstore directly +// (e.g. Badger), or by a Splitstore. func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) { bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) if err != nil { @@ -31,17 +37,76 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked return bs, err } -// StateBlockstore is a hook to overlay caches for state objects, or in the -// future, to segregate the universal blockstore into different physical state -// and chain stores. -func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) { +func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { + path, err := r.SplitstorePath() + if err != nil { + return nil, err + } + + path = filepath.Join(path, "hot.badger") + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + + opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly()) + if err != nil { + return nil, err + } + + bs, err := badgerbs.Open(opts) + if err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bs.Close() + }}) + return bs, nil } -// ChainBlockstore is a hook to overlay caches for state objects, or in the -// future, to segregate the universal blockstore into different physical state -// and chain stores. -func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) { +func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { + path, err := r.SplitstorePath() + if err != nil { + return nil, err + } + + cfg := &splitstore.Config{ + TrackingStoreType: cfg.Splitstore.TrackingStoreType, + MarkSetType: cfg.Splitstore.MarkSetType, + EnableFullCompaction: cfg.Splitstore.EnableFullCompaction, + EnableGC: cfg.Splitstore.EnableGC, + Archival: cfg.Splitstore.Archival, + } + ss, err := splitstore.Open(path, ds, hot, cold, cfg) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStop: func(context.Context) error { + return ss.Close() + }, + }) + + return ss, err + } +} + +func StateFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil +} + +func StateSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil +} + +func ChainFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) { + return bs, nil +} + +func ChainSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) { return bs, nil } diff --git a/node/modules/chain.go b/node/modules/chain.go index 029064b97..ffdf3aa3a 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -14,6 +14,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/blockstore/splitstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" @@ -72,14 +73,26 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds return mp, nil } -func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { +func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, basebs dtypes.BaseBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { chain := store.NewChainStore(cbs, sbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) } + var startHook func(context.Context) error + if ss, ok := basebs.(*splitstore.SplitStore); ok { + startHook = func(_ context.Context) error { + err := ss.Start(chain) + if err != nil { + err = xerrors.Errorf("error starting splitstore: %w", err) + } + return err + } + } + lc.Append(fx.Hook{ + OnStart: startHook, OnStop: func(_ context.Context) error { return chain.Close() }, diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index c6963e1e2..216ccc1b1 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -27,6 +27,15 @@ type ( // UniversalBlockstore is the cold blockstore. UniversalBlockstore blockstore.Blockstore + // HotBlockstore is the Hot blockstore abstraction for the splitstore + HotBlockstore blockstore.Blockstore + + // SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore. + SplitBlockstore blockstore.Blockstore + + // BaseBlockstore is something, coz DI + BaseBlockstore blockstore.Blockstore + // ChainBlockstore is a blockstore to store chain data (tipsets, blocks, // messages). It is physically backed by the BareMonolithBlockstore, but it // has a cache on top that is specially tuned for chain data access diff --git a/node/repo/interface.go b/node/repo/interface.go index 33979c8de..b169ee5cc 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -23,6 +23,7 @@ const ( // well as state. In the future, they may get segregated into different // domains. UniversalBlockstore = BlockstoreDomain("universal") + HotBlockstore = BlockstoreDomain("hot") ) var (