Merge pull request #4992 from filecoin-project/feat/splitstore
hot/cold blockstore segregation (aka. splitstore)
This commit is contained in:
commit
6591af960f
@ -131,6 +131,25 @@ func (b *Blockstore) Close() error {
|
||||
return b.DB.Close()
|
||||
}
|
||||
|
||||
// CollectGarbage runs garbage collection on the value log
|
||||
func (b *Blockstore) CollectGarbage() error {
|
||||
if atomic.LoadInt64(&b.state) != stateOpen {
|
||||
return ErrBlockstoreClosed
|
||||
}
|
||||
|
||||
var err error
|
||||
for err == nil {
|
||||
err = b.DB.RunValueLogGC(0.125)
|
||||
}
|
||||
|
||||
if err == badger.ErrNoRewrite {
|
||||
// not really an error in this case
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// View implements blockstore.Viewer, which leverages zero-copy read-only
|
||||
// access to values.
|
||||
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
|
||||
@ -318,6 +337,44 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
|
||||
if atomic.LoadInt64(&b.state) != stateOpen {
|
||||
return ErrBlockstoreClosed
|
||||
}
|
||||
|
||||
batch := b.DB.NewWriteBatch()
|
||||
defer batch.Cancel()
|
||||
|
||||
// toReturn tracks the byte slices to return to the pool, if we're using key
|
||||
// prefixing. we can't return each slice to the pool after each Set, because
|
||||
// badger holds on to the slice.
|
||||
var toReturn [][]byte
|
||||
if b.prefixing {
|
||||
toReturn = make([][]byte, 0, len(cids))
|
||||
defer func() {
|
||||
for _, b := range toReturn {
|
||||
KeyPool.Put(b)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for _, cid := range cids {
|
||||
k, pooled := b.PooledStorageKey(cid)
|
||||
if pooled {
|
||||
toReturn = append(toReturn, k)
|
||||
}
|
||||
if err := batch.Delete(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := batch.Flush()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// AllKeysChan implements Blockstore.AllKeysChan.
|
||||
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
if atomic.LoadInt64(&b.state) != stateOpen {
|
||||
|
@ -1,7 +1,7 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-cid"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
@ -18,6 +18,7 @@ var ErrNotFound = blockstore.ErrNotFound
|
||||
type Blockstore interface {
|
||||
blockstore.Blockstore
|
||||
blockstore.Viewer
|
||||
BatchDeleter
|
||||
}
|
||||
|
||||
// BasicBlockstore is an alias to the original IPFS Blockstore.
|
||||
@ -25,13 +26,30 @@ type BasicBlockstore = blockstore.Blockstore
|
||||
|
||||
type Viewer = blockstore.Viewer
|
||||
|
||||
type BatchDeleter interface {
|
||||
DeleteMany(cids []cid.Cid) error
|
||||
}
|
||||
|
||||
// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
|
||||
// The ID store filters out all puts for blocks with CIDs using the "identity"
|
||||
// hash function. It also extracts inlined blocks from CIDs using the identity
|
||||
// hash function and returns them on get/has, ignoring the contents of the
|
||||
// blockstore.
|
||||
func WrapIDStore(bstore blockstore.Blockstore) Blockstore {
|
||||
return blockstore.NewIdStore(bstore).(Blockstore)
|
||||
if is, ok := bstore.(*idstore); ok {
|
||||
// already wrapped
|
||||
return is
|
||||
}
|
||||
|
||||
if bs, ok := bstore.(Blockstore); ok {
|
||||
// we need to wrap our own because we don't want to neuter the DeleteMany method
|
||||
// the underlying blockstore has implemented an (efficient) DeleteMany
|
||||
return NewIDStore(bs)
|
||||
}
|
||||
|
||||
// The underlying blockstore does not implement DeleteMany, so we need to shim it.
|
||||
// This is less efficient as it'll iterate and perform single deletes.
|
||||
return NewIDStore(Adapt(bstore))
|
||||
}
|
||||
|
||||
// FromDatastore creates a new blockstore backed by the given datastore.
|
||||
@ -53,6 +71,17 @@ func (a *adaptedBlockstore) View(cid cid.Cid, callback func([]byte) error) error
|
||||
return callback(blk.RawData())
|
||||
}
|
||||
|
||||
func (a *adaptedBlockstore) DeleteMany(cids []cid.Cid) error {
|
||||
for _, cid := range cids {
|
||||
err := a.DeleteBlock(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Adapt adapts a standard blockstore to a Lotus blockstore by
|
||||
// enriching it with the extra methods that Lotus requires (e.g. View, Sync).
|
||||
//
|
||||
|
@ -96,6 +96,14 @@ func (bs *BufferedBlockstore) DeleteBlock(c cid.Cid) error {
|
||||
return bs.write.DeleteBlock(c)
|
||||
}
|
||||
|
||||
func (bs *BufferedBlockstore) DeleteMany(cids []cid.Cid) error {
|
||||
if err := bs.read.DeleteMany(cids); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bs.write.DeleteMany(cids)
|
||||
}
|
||||
|
||||
func (bs *BufferedBlockstore) View(c cid.Cid, callback func([]byte) error) error {
|
||||
// both stores are viewable.
|
||||
if err := bs.write.View(c, callback); err == ErrNotFound {
|
||||
|
174
blockstore/idstore.go
Normal file
174
blockstore/idstore.go
Normal file
@ -0,0 +1,174 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
var _ Blockstore = (*idstore)(nil)
|
||||
|
||||
type idstore struct {
|
||||
bs Blockstore
|
||||
}
|
||||
|
||||
func NewIDStore(bs Blockstore) Blockstore {
|
||||
return &idstore{bs: bs}
|
||||
}
|
||||
|
||||
func decodeCid(cid cid.Cid) (inline bool, data []byte, err error) {
|
||||
if cid.Prefix().MhType != mh.IDENTITY {
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
dmh, err := mh.Decode(cid.Hash())
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
if dmh.Code == mh.IDENTITY {
|
||||
return true, dmh.Digest, nil
|
||||
}
|
||||
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
func (b *idstore) Has(cid cid.Cid) (bool, error) {
|
||||
inline, _, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return b.bs.Has(cid)
|
||||
}
|
||||
|
||||
func (b *idstore) Get(cid cid.Cid) (blocks.Block, error) {
|
||||
inline, data, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return blocks.NewBlockWithCid(data, cid)
|
||||
}
|
||||
|
||||
return b.bs.Get(cid)
|
||||
}
|
||||
|
||||
func (b *idstore) GetSize(cid cid.Cid) (int, error) {
|
||||
inline, data, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return len(data), err
|
||||
}
|
||||
|
||||
return b.bs.GetSize(cid)
|
||||
}
|
||||
|
||||
func (b *idstore) View(cid cid.Cid, cb func([]byte) error) error {
|
||||
inline, data, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return cb(data)
|
||||
}
|
||||
|
||||
return b.bs.View(cid, cb)
|
||||
}
|
||||
|
||||
func (b *idstore) Put(blk blocks.Block) error {
|
||||
inline, _, err := decodeCid(blk.Cid())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.bs.Put(blk)
|
||||
}
|
||||
|
||||
func (b *idstore) PutMany(blks []blocks.Block) error {
|
||||
toPut := make([]blocks.Block, 0, len(blks))
|
||||
for _, blk := range blks {
|
||||
inline, _, err := decodeCid(blk.Cid())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
continue
|
||||
}
|
||||
toPut = append(toPut, blk)
|
||||
}
|
||||
|
||||
if len(toPut) > 0 {
|
||||
return b.bs.PutMany(toPut)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *idstore) DeleteBlock(cid cid.Cid) error {
|
||||
inline, _, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.bs.DeleteBlock(cid)
|
||||
}
|
||||
|
||||
func (b *idstore) DeleteMany(cids []cid.Cid) error {
|
||||
toDelete := make([]cid.Cid, 0, len(cids))
|
||||
for _, cid := range cids {
|
||||
inline, _, err := decodeCid(cid)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error decoding Cid: %w", err)
|
||||
}
|
||||
|
||||
if inline {
|
||||
continue
|
||||
}
|
||||
toDelete = append(toDelete, cid)
|
||||
}
|
||||
|
||||
if len(toDelete) > 0 {
|
||||
return b.bs.DeleteMany(toDelete)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
return b.bs.AllKeysChan(ctx)
|
||||
}
|
||||
|
||||
func (b *idstore) HashOnRead(enabled bool) {
|
||||
b.bs.HashOnRead(enabled)
|
||||
}
|
||||
|
||||
func (b *idstore) Close() error {
|
||||
if c, ok := b.bs.(io.Closer); ok {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
@ -20,6 +20,13 @@ func (m MemBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
for _, k := range ks {
|
||||
delete(m, k)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m MemBlockstore) Has(k cid.Cid) (bool, error) {
|
||||
_, ok := m[k]
|
||||
return ok, nil
|
||||
|
38
blockstore/splitstore/markset.go
Normal file
38
blockstore/splitstore/markset.go
Normal file
@ -0,0 +1,38 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// MarkSet is a utility to keep track of seen CID, and later query for them.
|
||||
//
|
||||
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
|
||||
// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default).
|
||||
type MarkSet interface {
|
||||
Mark(cid.Cid) error
|
||||
Has(cid.Cid) (bool, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// markBytes is deliberately a non-nil empty byte slice for serialization.
|
||||
var markBytes = []byte{}
|
||||
|
||||
type MarkSetEnv interface {
|
||||
Create(name string, sizeHint int64) (MarkSet, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
|
||||
switch mtype {
|
||||
case "", "bloom":
|
||||
return NewBloomMarkSetEnv()
|
||||
case "bolt":
|
||||
return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown mark set type %s", mtype)
|
||||
}
|
||||
}
|
77
blockstore/splitstore/markset_bloom.go
Normal file
77
blockstore/splitstore/markset_bloom.go
Normal file
@ -0,0 +1,77 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
bbloom "github.com/ipfs/bbloom"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
const (
|
||||
BloomFilterMinSize = 10_000_000
|
||||
BloomFilterProbability = 0.01
|
||||
)
|
||||
|
||||
type BloomMarkSetEnv struct{}
|
||||
|
||||
var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
|
||||
|
||||
type BloomMarkSet struct {
|
||||
salt []byte
|
||||
bf *bbloom.Bloom
|
||||
}
|
||||
|
||||
var _ MarkSet = (*BloomMarkSet)(nil)
|
||||
|
||||
func NewBloomMarkSetEnv() (*BloomMarkSetEnv, error) {
|
||||
return &BloomMarkSetEnv{}, nil
|
||||
}
|
||||
|
||||
func (e *BloomMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
|
||||
size := int64(BloomFilterMinSize)
|
||||
for size < sizeHint {
|
||||
size += BloomFilterMinSize
|
||||
}
|
||||
|
||||
salt := make([]byte, 4)
|
||||
_, err := rand.Read(salt)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error reading salt: %w", err)
|
||||
}
|
||||
|
||||
bf, err := bbloom.New(float64(size), BloomFilterProbability)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error creating bloom filter: %w", err)
|
||||
}
|
||||
|
||||
return &BloomMarkSet{salt: salt, bf: bf}, nil
|
||||
}
|
||||
|
||||
func (e *BloomMarkSetEnv) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
|
||||
hash := cid.Hash()
|
||||
key := make([]byte, len(s.salt)+len(hash))
|
||||
n := copy(key, s.salt)
|
||||
copy(key[n:], hash)
|
||||
rehash := sha256.Sum256(key)
|
||||
return rehash[:]
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Mark(cid cid.Cid) error {
|
||||
s.bf.Add(s.saltedKey(cid))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
|
||||
return s.bf.Has(s.saltedKey(cid)), nil
|
||||
}
|
||||
|
||||
func (s *BloomMarkSet) Close() error {
|
||||
return nil
|
||||
}
|
81
blockstore/splitstore/markset_bolt.go
Normal file
81
blockstore/splitstore/markset_bolt.go
Normal file
@ -0,0 +1,81 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type BoltMarkSetEnv struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
var _ MarkSetEnv = (*BoltMarkSetEnv)(nil)
|
||||
|
||||
type BoltMarkSet struct {
|
||||
db *bolt.DB
|
||||
bucketId []byte
|
||||
}
|
||||
|
||||
var _ MarkSet = (*BoltMarkSet)(nil)
|
||||
|
||||
func NewBoltMarkSetEnv(path string) (*BoltMarkSetEnv, error) {
|
||||
db, err := bolt.Open(path, 0644,
|
||||
&bolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
NoSync: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltMarkSetEnv{db: db}, nil
|
||||
}
|
||||
|
||||
func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) {
|
||||
bucketId := []byte(name)
|
||||
err := e.db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucketId)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating bolt db bucket %s: %w", name, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil
|
||||
}
|
||||
|
||||
func (e *BoltMarkSetEnv) Close() error {
|
||||
return e.db.Close()
|
||||
}
|
||||
|
||||
func (s *BoltMarkSet) Mark(cid cid.Cid) error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Put(cid.Hash(), markBytes)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) {
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
v := b.Get(cid.Hash())
|
||||
result = v != nil
|
||||
return nil
|
||||
})
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (s *BoltMarkSet) Close() error {
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
return tx.DeleteBucket(s.bucketId)
|
||||
})
|
||||
}
|
138
blockstore/splitstore/markset_test.go
Normal file
138
blockstore/splitstore/markset_test.go
Normal file
@ -0,0 +1,138 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
"github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
func TestBoltMarkSet(t *testing.T) {
|
||||
testMarkSet(t, "bolt")
|
||||
}
|
||||
|
||||
func TestBloomMarkSet(t *testing.T) {
|
||||
testMarkSet(t, "bloom")
|
||||
}
|
||||
|
||||
func testMarkSet(t *testing.T, lsType string) {
|
||||
t.Helper()
|
||||
|
||||
path, err := ioutil.TempDir("", "sweep-test.*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
env, err := OpenMarkSetEnv(path, lsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer env.Close() //nolint:errcheck
|
||||
|
||||
hotSet, err := env.Create("hot", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coldSet, err := env.Create("cold", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
makeCid := func(key string) cid.Cid {
|
||||
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return cid.NewCidV1(cid.Raw, h)
|
||||
}
|
||||
|
||||
mustHave := func(s MarkSet, cid cid.Cid) {
|
||||
has, err := s.Has(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !has {
|
||||
t.Fatal("mark not found")
|
||||
}
|
||||
}
|
||||
|
||||
mustNotHave := func(s MarkSet, cid cid.Cid) {
|
||||
has, err := s.Has(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if has {
|
||||
t.Fatal("unexpected mark")
|
||||
}
|
||||
}
|
||||
|
||||
k1 := makeCid("a")
|
||||
k2 := makeCid("b")
|
||||
k3 := makeCid("c")
|
||||
k4 := makeCid("d")
|
||||
|
||||
hotSet.Mark(k1) //nolint
|
||||
hotSet.Mark(k2) //nolint
|
||||
coldSet.Mark(k3) //nolint
|
||||
|
||||
mustHave(hotSet, k1)
|
||||
mustHave(hotSet, k2)
|
||||
mustNotHave(hotSet, k3)
|
||||
mustNotHave(hotSet, k4)
|
||||
|
||||
mustNotHave(coldSet, k1)
|
||||
mustNotHave(coldSet, k2)
|
||||
mustHave(coldSet, k3)
|
||||
mustNotHave(coldSet, k4)
|
||||
|
||||
// close them and reopen to redo the dance
|
||||
|
||||
err = hotSet.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = coldSet.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hotSet, err = env.Create("hot", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coldSet, err = env.Create("cold", 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hotSet.Mark(k3) //nolint
|
||||
hotSet.Mark(k4) //nolint
|
||||
coldSet.Mark(k1) //nolint
|
||||
|
||||
mustNotHave(hotSet, k1)
|
||||
mustNotHave(hotSet, k2)
|
||||
mustHave(hotSet, k3)
|
||||
mustHave(hotSet, k4)
|
||||
|
||||
mustHave(coldSet, k1)
|
||||
mustNotHave(coldSet, k2)
|
||||
mustNotHave(coldSet, k3)
|
||||
mustNotHave(coldSet, k4)
|
||||
|
||||
err = hotSet.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = coldSet.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
1069
blockstore/splitstore/splitstore.go
Normal file
1069
blockstore/splitstore/splitstore.go
Normal file
File diff suppressed because it is too large
Load Diff
248
blockstore/splitstore/splitstore_test.go
Normal file
248
blockstore/splitstore/splitstore_test.go
Normal file
@ -0,0 +1,248 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
CompactionThreshold = 5
|
||||
CompactionCold = 1
|
||||
CompactionBoundary = 2
|
||||
logging.SetLogLevel("splitstore", "DEBUG")
|
||||
}
|
||||
|
||||
func testSplitStore(t *testing.T, cfg *Config) {
|
||||
t.Helper()
|
||||
|
||||
chain := &mockChain{}
|
||||
// genesis
|
||||
genBlock := mock.MkBlock(nil, 0, 0)
|
||||
genTs := mock.TipSet(genBlock)
|
||||
chain.push(genTs)
|
||||
|
||||
// the myriads of stores
|
||||
ds := datastore.NewMapDatastore()
|
||||
hot := blockstore.NewMemorySync()
|
||||
cold := blockstore.NewMemorySync()
|
||||
|
||||
// put the genesis block to cold store
|
||||
blk, err := genBlock.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = cold.Put(blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// open the splitstore
|
||||
ss, err := Open("", ds, hot, cold, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ss.Close() //nolint
|
||||
|
||||
err = ss.Start(chain)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// make some tipsets, but not enough to cause compaction
|
||||
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet {
|
||||
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
||||
sblk, err := blk.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ss.Put(sblk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ts := mock.TipSet(blk)
|
||||
chain.push(ts)
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
mkGarbageBlock := func(curTs *types.TipSet, i int) {
|
||||
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
||||
sblk, err := blk.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ss.Put(sblk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
curTs := genTs
|
||||
for i := 1; i < 5; i++ {
|
||||
curTs = mkBlock(curTs, i)
|
||||
}
|
||||
|
||||
mkGarbageBlock(genTs, 1)
|
||||
|
||||
// count objects in the cold and hot stores
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
countBlocks := func(bs blockstore.Blockstore) int {
|
||||
count := 0
|
||||
ch, err := bs.AllKeysChan(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for range ch {
|
||||
count++
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
coldCnt := countBlocks(cold)
|
||||
hotCnt := countBlocks(hot)
|
||||
|
||||
if coldCnt != 1 {
|
||||
t.Fatalf("expected %d blocks, but got %d", 1, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 4 {
|
||||
t.Fatalf("expected %d blocks, but got %d", 4, hotCnt)
|
||||
}
|
||||
|
||||
// trigger a compaction
|
||||
for i := 5; i < 10; i++ {
|
||||
curTs = mkBlock(curTs, i)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
coldCnt = countBlocks(cold)
|
||||
hotCnt = countBlocks(hot)
|
||||
|
||||
if !cfg.EnableFullCompaction {
|
||||
if coldCnt != 5 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 5, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 5 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 5, hotCnt)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.EnableFullCompaction && !cfg.EnableGC {
|
||||
if coldCnt != 3 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 3, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 7 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.EnableFullCompaction && cfg.EnableGC {
|
||||
if coldCnt != 2 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 2, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 7 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitStoreSimpleCompaction(t *testing.T) {
|
||||
testSplitStore(t, &Config{TrackingStoreType: "mem"})
|
||||
}
|
||||
|
||||
func TestSplitStoreFullCompactionWithoutGC(t *testing.T) {
|
||||
testSplitStore(t, &Config{
|
||||
TrackingStoreType: "mem",
|
||||
EnableFullCompaction: true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestSplitStoreFullCompactionWithGC(t *testing.T) {
|
||||
testSplitStore(t, &Config{
|
||||
TrackingStoreType: "mem",
|
||||
EnableFullCompaction: true,
|
||||
EnableGC: true,
|
||||
})
|
||||
}
|
||||
|
||||
type mockChain struct {
|
||||
sync.Mutex
|
||||
tipsets []*types.TipSet
|
||||
listener func(revert []*types.TipSet, apply []*types.TipSet) error
|
||||
}
|
||||
|
||||
func (c *mockChain) push(ts *types.TipSet) {
|
||||
c.Lock()
|
||||
c.tipsets = append(c.tipsets, ts)
|
||||
c.Unlock()
|
||||
|
||||
if c.listener != nil {
|
||||
err := c.listener(nil, []*types.TipSet{ts})
|
||||
if err != nil {
|
||||
log.Errorf("mockchain: error dispatching listener: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
iEpoch := int(epoch)
|
||||
if iEpoch > len(c.tipsets) {
|
||||
return nil, fmt.Errorf("bad epoch %d", epoch)
|
||||
}
|
||||
|
||||
return c.tipsets[iEpoch-1], nil
|
||||
}
|
||||
|
||||
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
return c.tipsets[len(c.tipsets)-1]
|
||||
}
|
||||
|
||||
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
|
||||
c.listener = change
|
||||
}
|
||||
|
||||
func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
start := int(ts.Height()) - 1
|
||||
end := start - int(epochs)
|
||||
if end < 0 {
|
||||
end = -1
|
||||
}
|
||||
for i := start; i > end; i-- {
|
||||
ts := c.tipsets[i]
|
||||
for _, cid := range ts.Cids() {
|
||||
err := f(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
109
blockstore/splitstore/tracking.go
Normal file
109
blockstore/splitstore/tracking.go
Normal file
@ -0,0 +1,109 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// TrackingStore is a persistent store that tracks blocks that are added
|
||||
// to the hotstore, tracking the epoch at which they are written.
|
||||
type TrackingStore interface {
|
||||
Put(cid.Cid, abi.ChainEpoch) error
|
||||
PutBatch([]cid.Cid, abi.ChainEpoch) error
|
||||
Get(cid.Cid) (abi.ChainEpoch, error)
|
||||
Delete(cid.Cid) error
|
||||
DeleteBatch([]cid.Cid) error
|
||||
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
|
||||
Sync() error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// OpenTrackingStore opens a tracking store of the specified type in the
|
||||
// specified path.
|
||||
func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
|
||||
switch ttype {
|
||||
case "", "bolt":
|
||||
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
|
||||
case "mem":
|
||||
return NewMemTrackingStore(), nil
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
|
||||
}
|
||||
}
|
||||
|
||||
// NewMemTrackingStore creates an in-memory tracking store.
|
||||
// This is only useful for test or situations where you don't want to open the
|
||||
// real tracking store (eg concurrent read only access on a node's datastore)
|
||||
func NewMemTrackingStore() *MemTrackingStore {
|
||||
return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)}
|
||||
}
|
||||
|
||||
// MemTrackingStore is a simple in-memory tracking store
|
||||
type MemTrackingStore struct {
|
||||
sync.Mutex
|
||||
tab map[cid.Cid]abi.ChainEpoch
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*MemTrackingStore)(nil)
|
||||
|
||||
func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.tab[cid] = epoch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
s.tab[cid] = epoch
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
epoch, ok := s.tab[cid]
|
||||
if ok {
|
||||
return epoch, nil
|
||||
}
|
||||
return 0, xerrors.Errorf("missing tracking epoch for %s", cid)
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Delete(cid cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
delete(s.tab, cid)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
delete(s.tab, cid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for cid, epoch := range s.tab {
|
||||
err := f(cid, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Sync() error { return nil }
|
||||
func (s *MemTrackingStore) Close() error { return nil }
|
120
blockstore/splitstore/tracking_bolt.go
Normal file
120
blockstore/splitstore/tracking_bolt.go
Normal file
@ -0,0 +1,120 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
type BoltTrackingStore struct {
|
||||
db *bolt.DB
|
||||
bucketId []byte
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*BoltTrackingStore)(nil)
|
||||
|
||||
func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) {
|
||||
opts := &bolt.Options{
|
||||
Timeout: 1 * time.Second,
|
||||
NoSync: true,
|
||||
}
|
||||
db, err := bolt.Open(path, 0644, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketId := []byte("tracker")
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(bucketId)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BoltTrackingStore{db: db, bucketId: bucketId}, nil
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Put(cid.Hash(), val)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
for _, cid := range cids {
|
||||
err := b.Put(cid.Hash(), val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
|
||||
err = s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
val := b.Get(cid.Hash())
|
||||
if val == nil {
|
||||
return xerrors.Errorf("missing tracking epoch for %s", cid)
|
||||
}
|
||||
epoch = bytesToEpoch(val)
|
||||
return nil
|
||||
})
|
||||
return epoch, err
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Delete(cid cid.Cid) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.Delete(cid.Hash())
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
return s.db.Batch(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
for _, cid := range cids {
|
||||
err := b.Delete(cid.Hash())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error deleting %s", cid)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
return s.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(s.bucketId)
|
||||
return b.ForEach(func(k, v []byte) error {
|
||||
cid := cid.NewCidV1(cid.Raw, k)
|
||||
epoch := bytesToEpoch(v)
|
||||
return f(cid, epoch)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Sync() error {
|
||||
return s.db.Sync()
|
||||
}
|
||||
|
||||
func (s *BoltTrackingStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
130
blockstore/splitstore/tracking_test.go
Normal file
130
blockstore/splitstore/tracking_test.go
Normal file
@ -0,0 +1,130 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
"github.com/multiformats/go-multihash"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
func TestBoltTrackingStore(t *testing.T) {
|
||||
testTrackingStore(t, "bolt")
|
||||
}
|
||||
|
||||
func testTrackingStore(t *testing.T, tsType string) {
|
||||
t.Helper()
|
||||
|
||||
makeCid := func(key string) cid.Cid {
|
||||
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return cid.NewCidV1(cid.Raw, h)
|
||||
}
|
||||
|
||||
mustHave := func(s TrackingStore, cid cid.Cid, epoch abi.ChainEpoch) {
|
||||
val, err := s.Get(cid)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if val != epoch {
|
||||
t.Fatal("epoch mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
mustNotHave := func(s TrackingStore, cid cid.Cid) {
|
||||
_, err := s.Get(cid)
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
}
|
||||
|
||||
path, err := ioutil.TempDir("", "snoop-test.*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err := OpenTrackingStore(path, tsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
k1 := makeCid("a")
|
||||
k2 := makeCid("b")
|
||||
k3 := makeCid("c")
|
||||
k4 := makeCid("d")
|
||||
|
||||
s.Put(k1, 1) //nolint
|
||||
s.Put(k2, 2) //nolint
|
||||
s.Put(k3, 3) //nolint
|
||||
s.Put(k4, 4) //nolint
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.Delete(k1) // nolint
|
||||
s.Delete(k2) // nolint
|
||||
|
||||
mustNotHave(s, k1)
|
||||
mustNotHave(s, k2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.PutBatch([]cid.Cid{k1}, 1) //nolint
|
||||
s.PutBatch([]cid.Cid{k2}, 2) //nolint
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
allKeys := map[string]struct{}{
|
||||
k1.String(): {},
|
||||
k2.String(): {},
|
||||
k3.String(): {},
|
||||
k4.String(): {},
|
||||
}
|
||||
|
||||
err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error {
|
||||
_, ok := allKeys[k.String()]
|
||||
if !ok {
|
||||
t.Fatal("unexpected key")
|
||||
}
|
||||
|
||||
delete(allKeys, k.String())
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(allKeys) != 0 {
|
||||
t.Fatal("not all keys were returned")
|
||||
}
|
||||
|
||||
// no close and reopen and ensure the keys still exist
|
||||
err = s.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err = OpenTrackingStore(path, tsType)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mustHave(s, k1, 1)
|
||||
mustHave(s, k2, 2)
|
||||
mustHave(s, k3, 3)
|
||||
mustHave(s, k4, 4)
|
||||
|
||||
s.Close() //nolint:errcheck
|
||||
}
|
@ -26,6 +26,12 @@ func (m *SyncBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return m.bs.DeleteBlock(k)
|
||||
}
|
||||
|
||||
func (m *SyncBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.bs.DeleteMany(ks)
|
||||
}
|
||||
|
||||
func (m *SyncBlockstore) Has(k cid.Cid) (bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
@ -153,6 +153,12 @@ func (t *TimedCacheBlockstore) DeleteBlock(k cid.Cid) error {
|
||||
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
|
||||
}
|
||||
|
||||
func (t *TimedCacheBlockstore) DeleteMany(ks []cid.Cid) error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return multierr.Combine(t.active.DeleteMany(ks), t.inactive.DeleteMany(ks))
|
||||
}
|
||||
|
||||
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
@ -82,6 +82,15 @@ func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) DeleteMany(cids []cid.Cid) (err error) {
|
||||
for _, bs := range m {
|
||||
if err = bs.DeleteMany(cids); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
// this does not deduplicate; this interface needs to be revisited.
|
||||
outCh := make(chan cid.Cid)
|
||||
|
@ -81,7 +81,7 @@ func init() {
|
||||
}
|
||||
|
||||
// ReorgNotifee represents a callback that gets called upon reorgs.
|
||||
type ReorgNotifee func(rev, app []*types.TipSet) error
|
||||
type ReorgNotifee = func(rev, app []*types.TipSet) error
|
||||
|
||||
// Journal event types.
|
||||
const (
|
||||
|
1
go.mod
1
go.mod
@ -137,6 +137,7 @@ require (
|
||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
|
||||
github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325
|
||||
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
|
||||
go.etcd.io/bbolt v1.3.4
|
||||
go.opencensus.io v0.22.5
|
||||
go.uber.org/dig v1.10.0 // indirect
|
||||
go.uber.org/fx v1.9.0
|
||||
|
4
go.sum
4
go.sum
@ -241,6 +241,8 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349/
|
||||
github.com/filecoin-project/go-amt-ipld/v3 v3.0.0 h1:Ou/q82QeHGOhpkedvaxxzpBYuqTxLCcj5OChkDNx4qc=
|
||||
github.com/filecoin-project/go-amt-ipld/v3 v3.0.0/go.mod h1:Qa95YNAbtoVCTSVtX38aAC1ptBnJfPma1R/zZsKmx4o=
|
||||
github.com/filecoin-project/go-bitfield v0.2.0/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
github.com/filecoin-project/go-bitfield v0.2.3 h1:pedK/7maYF06Z+BYJf2OeFFqIDEh6SP6mIOlLFpYXGs=
|
||||
github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW8p9au0C68JPgk=
|
||||
github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
@ -1934,6 +1936,8 @@ howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCU
|
||||
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
|
||||
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
|
||||
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=
|
||||
modernc.org/golex v1.0.0 h1:wWpDlbK8ejRfSyi0frMyhilD3JBvtcx2AdGDnU+JtsE=
|
||||
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
|
||||
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
|
||||
modernc.org/golex v1.0.1 h1:EYKY1a3wStt0RzHaH8mdSRNg78Ub0OHxYfCRWw35YtM=
|
||||
modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254=
|
||||
|
@ -82,6 +82,13 @@ var (
|
||||
WorkerCallsReturnedCount = stats.Int64("sealing/worker_calls_returned_count", "Counter of returned worker tasks", stats.UnitDimensionless)
|
||||
WorkerCallsReturnedDuration = stats.Float64("sealing/worker_calls_returned_ms", "Counter of returned worker tasks", stats.UnitMilliseconds)
|
||||
WorkerUntrackedCallsReturned = stats.Int64("sealing/worker_untracked_calls_returned", "Counter of returned untracked worker tasks", stats.UnitDimensionless)
|
||||
|
||||
// splitstore
|
||||
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
|
||||
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
|
||||
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
|
||||
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
|
||||
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
var (
|
||||
@ -222,6 +229,28 @@ var (
|
||||
Aggregation: workMillisecondsDistribution,
|
||||
TagKeys: []tag.Key{TaskType, WorkerHostname},
|
||||
}
|
||||
|
||||
// splitstore
|
||||
SplitstoreMissView = &view.View{
|
||||
Measure: SplitstoreMiss,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
SplitstoreCompactionTimeSecondsView = &view.View{
|
||||
Measure: SplitstoreCompactionTimeSeconds,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
SplitstoreCompactionHotView = &view.View{
|
||||
Measure: SplitstoreCompactionHot,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
SplitstoreCompactionColdView = &view.View{
|
||||
Measure: SplitstoreCompactionCold,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
SplitstoreCompactionDeadView = &view.View{
|
||||
Measure: SplitstoreCompactionDead,
|
||||
Aggregation: view.Sum(),
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
||||
@ -258,6 +287,11 @@ var ChainNodeViews = append([]*view.View{
|
||||
PubsubDropRPCView,
|
||||
VMFlushCopyCountView,
|
||||
VMFlushCopyDurationView,
|
||||
SplitstoreMissView,
|
||||
SplitstoreCompactionTimeSecondsView,
|
||||
SplitstoreCompactionHotView,
|
||||
SplitstoreCompactionColdView,
|
||||
SplitstoreCompactionDeadView,
|
||||
}, DefaultViews...)
|
||||
|
||||
var MinerNodeViews = append([]*view.View{
|
||||
|
@ -586,14 +586,39 @@ func Repo(r repo.Repo) Option {
|
||||
return err
|
||||
}
|
||||
|
||||
var cfg *config.Chainstore
|
||||
switch settings.nodeType {
|
||||
case repo.FullNode:
|
||||
cfgp, ok := c.(*config.FullNode)
|
||||
if !ok {
|
||||
return xerrors.Errorf("invalid config from repo, got: %T", c)
|
||||
}
|
||||
cfg = &cfgp.Chainstore
|
||||
default:
|
||||
cfg = &config.Chainstore{}
|
||||
}
|
||||
|
||||
return Options(
|
||||
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
|
||||
|
||||
Override(new(dtypes.MetadataDS), modules.Datastore),
|
||||
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateBlockstore),
|
||||
|
||||
If(cfg.EnableSplitstore,
|
||||
If(cfg.Splitstore.HotStoreType == "badger",
|
||||
Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)),
|
||||
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateSplitBlockstore),
|
||||
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))),
|
||||
),
|
||||
If(!cfg.EnableSplitstore,
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainFlatBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateFlatBlockstore),
|
||||
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||
),
|
||||
|
||||
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
|
||||
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
|
||||
|
@ -24,6 +24,7 @@ type FullNode struct {
|
||||
Metrics Metrics
|
||||
Wallet Wallet
|
||||
Fees FeeConfig
|
||||
Chainstore Chainstore
|
||||
}
|
||||
|
||||
// // Common
|
||||
@ -120,6 +121,20 @@ type Pubsub struct {
|
||||
RemoteTracer string
|
||||
}
|
||||
|
||||
type Chainstore struct {
|
||||
EnableSplitstore bool
|
||||
Splitstore Splitstore
|
||||
}
|
||||
|
||||
type Splitstore struct {
|
||||
HotStoreType string
|
||||
TrackingStoreType string
|
||||
MarkSetType string
|
||||
EnableFullCompaction bool
|
||||
EnableGC bool // EXPERIMENTAL
|
||||
Archival bool
|
||||
}
|
||||
|
||||
// // Full Node
|
||||
|
||||
type Metrics struct {
|
||||
@ -185,6 +200,12 @@ func DefaultFullNode() *FullNode {
|
||||
Client: Client{
|
||||
SimultaneousTransfers: DefaultSimultaneousTransfers,
|
||||
},
|
||||
Chainstore: Chainstore{
|
||||
EnableSplitstore: false,
|
||||
Splitstore: Splitstore{
|
||||
HotStoreType: "badger",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,19 +3,25 @@ package modules
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
"github.com/filecoin-project/lotus/blockstore/splitstore"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
// UniversalBlockstore returns a single universal blockstore that stores both
|
||||
// chain data and state data.
|
||||
// chain data and state data. It can be backed by a blockstore directly
|
||||
// (e.g. Badger), or by a Splitstore.
|
||||
func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
|
||||
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
@ -31,17 +37,76 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked
|
||||
return bs, err
|
||||
}
|
||||
|
||||
// StateBlockstore is a hook to overlay caches for state objects, or in the
|
||||
// future, to segregate the universal blockstore into different physical state
|
||||
// and chain stores.
|
||||
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
|
||||
func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
|
||||
path, err := r.SplitstorePath()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path = filepath.Join(path, "hot.badger")
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
opts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, path, r.Readonly())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bs, err := badgerbs.Open(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error {
|
||||
return bs.Close()
|
||||
}})
|
||||
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
// ChainBlockstore is a hook to overlay caches for state objects, or in the
|
||||
// future, to segregate the universal blockstore into different physical state
|
||||
// and chain stores.
|
||||
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
|
||||
func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
|
||||
return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
|
||||
path, err := r.SplitstorePath()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &splitstore.Config{
|
||||
TrackingStoreType: cfg.Splitstore.TrackingStoreType,
|
||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||
EnableFullCompaction: cfg.Splitstore.EnableFullCompaction,
|
||||
EnableGC: cfg.Splitstore.EnableGC,
|
||||
Archival: cfg.Splitstore.Archival,
|
||||
}
|
||||
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(context.Context) error {
|
||||
return ss.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return ss, err
|
||||
}
|
||||
}
|
||||
|
||||
func StateFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func StateSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func ChainFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func ChainSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/blockstore/splitstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/beacon"
|
||||
@ -72,14 +73,26 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, basebs dtypes.BaseBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
|
||||
|
||||
if err := chain.Load(); err != nil {
|
||||
log.Warnf("loading chain state from disk: %s", err)
|
||||
}
|
||||
|
||||
var startHook func(context.Context) error
|
||||
if ss, ok := basebs.(*splitstore.SplitStore); ok {
|
||||
startHook = func(_ context.Context) error {
|
||||
err := ss.Start(chain)
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("error starting splitstore: %w", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: startHook,
|
||||
OnStop: func(_ context.Context) error {
|
||||
return chain.Close()
|
||||
},
|
||||
|
@ -27,6 +27,15 @@ type (
|
||||
// UniversalBlockstore is the cold blockstore.
|
||||
UniversalBlockstore blockstore.Blockstore
|
||||
|
||||
// HotBlockstore is the Hot blockstore abstraction for the splitstore
|
||||
HotBlockstore blockstore.Blockstore
|
||||
|
||||
// SplitBlockstore is the hot/cold blockstore that sits on top of the ColdBlockstore.
|
||||
SplitBlockstore blockstore.Blockstore
|
||||
|
||||
// BaseBlockstore is something, coz DI
|
||||
BaseBlockstore blockstore.Blockstore
|
||||
|
||||
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
|
||||
// messages). It is physically backed by the BareMonolithBlockstore, but it
|
||||
// has a cache on top that is specially tuned for chain data access
|
||||
|
@ -23,6 +23,7 @@ const (
|
||||
// well as state. In the future, they may get segregated into different
|
||||
// domains.
|
||||
UniversalBlockstore = BlockstoreDomain("universal")
|
||||
HotBlockstore = BlockstoreDomain("hot")
|
||||
)
|
||||
|
||||
var (
|
||||
|
Loading…
Reference in New Issue
Block a user