Merge pull request #6474 from filecoin-project/feat/splitstore-redux

Splitstore Enhanchements
This commit is contained in:
Łukasz Magiera 2021-07-13 12:43:57 +02:00 committed by GitHub
commit c37401a1a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2034 additions and 1107 deletions

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
"sync/atomic" "sync"
"github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/options"
@ -73,20 +73,16 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) {
} }
const ( const (
stateOpen int64 = iota stateOpen = iota
stateClosing stateClosing
stateClosed stateClosed
) )
// Blockstore is a badger-backed IPLD blockstore. // Blockstore is a badger-backed IPLD blockstore.
//
// NOTE: once Close() is called, methods will try their best to return
// ErrBlockstoreClosed. This will guaranteed to happen for all subsequent
// operation calls after Close() has returned, but it may not happen for
// operations in progress. Those are likely to fail with a different error.
type Blockstore struct { type Blockstore struct {
// state is accessed atomically stateLk sync.RWMutex
state int64 state int
viewers sync.WaitGroup
DB *badger.DB DB *badger.DB
@ -97,6 +93,7 @@ type Blockstore struct {
var _ blockstore.Blockstore = (*Blockstore)(nil) var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil) var _ blockstore.Viewer = (*Blockstore)(nil)
var _ blockstore.BlockstoreIterator = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil) var _ io.Closer = (*Blockstore)(nil)
// Open creates a new badger-backed blockstore, with the supplied options. // Open creates a new badger-backed blockstore, with the supplied options.
@ -124,19 +121,51 @@ func Open(opts Options) (*Blockstore, error) {
// Close closes the store. If the store has already been closed, this noops and // Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error. // returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error { func (b *Blockstore) Close() error {
if !atomic.CompareAndSwapInt64(&b.state, stateOpen, stateClosing) { b.stateLk.Lock()
if b.state != stateOpen {
b.stateLk.Unlock()
return nil
}
b.state = stateClosing
b.stateLk.Unlock()
defer func() {
b.stateLk.Lock()
b.state = stateClosed
b.stateLk.Unlock()
}()
// wait for all accesses to complete
b.viewers.Wait()
return b.DB.Close()
}
func (b *Blockstore) access() error {
b.stateLk.RLock()
defer b.stateLk.RUnlock()
if b.state != stateOpen {
return ErrBlockstoreClosed
}
b.viewers.Add(1)
return nil return nil
} }
defer atomic.StoreInt64(&b.state, stateClosed) func (b *Blockstore) isOpen() bool {
return b.DB.Close() b.stateLk.RLock()
defer b.stateLk.RUnlock()
return b.state == stateOpen
} }
// CollectGarbage runs garbage collection on the value log // CollectGarbage runs garbage collection on the value log
func (b *Blockstore) CollectGarbage() error { func (b *Blockstore) CollectGarbage() error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
var err error var err error
for err == nil { for err == nil {
@ -153,9 +182,10 @@ func (b *Blockstore) CollectGarbage() error {
// Compact runs a synchronous compaction // Compact runs a synchronous compaction
func (b *Blockstore) Compact() error { func (b *Blockstore) Compact() error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
nworkers := runtime.NumCPU() / 2 nworkers := runtime.NumCPU() / 2
if nworkers < 2 { if nworkers < 2 {
@ -168,9 +198,10 @@ func (b *Blockstore) Compact() error {
// View implements blockstore.Viewer, which leverages zero-copy read-only // View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values. // access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -191,9 +222,10 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
// Has implements Blockstore.Has. // Has implements Blockstore.Has.
func (b *Blockstore) Has(cid cid.Cid) (bool, error) { func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return false, ErrBlockstoreClosed return false, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -221,9 +253,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
return nil, blockstore.ErrNotFound return nil, blockstore.ErrNotFound
} }
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return nil, ErrBlockstoreClosed return nil, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -250,9 +283,10 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
// GetSize implements Blockstore.GetSize. // GetSize implements Blockstore.GetSize.
func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return -1, ErrBlockstoreClosed return 0, err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -279,9 +313,10 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
// Put implements Blockstore.Put. // Put implements Blockstore.Put.
func (b *Blockstore) Put(block blocks.Block) error { func (b *Blockstore) Put(block blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(block.Cid()) k, pooled := b.PooledStorageKey(block.Cid())
if pooled { if pooled {
@ -299,9 +334,10 @@ func (b *Blockstore) Put(block blocks.Block) error {
// PutMany implements Blockstore.PutMany. // PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(blocks []blocks.Block) error { func (b *Blockstore) PutMany(blocks []blocks.Block) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
// toReturn tracks the byte slices to return to the pool, if we're using key // toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because // prefixing. we can't return each slice to the pool after each Set, because
@ -338,9 +374,10 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
// DeleteBlock implements Blockstore.DeleteBlock. // DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(cid cid.Cid) error { func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
k, pooled := b.PooledStorageKey(cid) k, pooled := b.PooledStorageKey(cid)
if pooled { if pooled {
@ -353,9 +390,10 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
} }
func (b *Blockstore) DeleteMany(cids []cid.Cid) error { func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return ErrBlockstoreClosed return err
} }
defer b.viewers.Done()
// toReturn tracks the byte slices to return to the pool, if we're using key // toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because // prefixing. we can't return each slice to the pool after each Set, because
@ -392,8 +430,8 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
// AllKeysChan implements Blockstore.AllKeysChan. // AllKeysChan implements Blockstore.AllKeysChan.
func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if atomic.LoadInt64(&b.state) != stateOpen { if err := b.access(); err != nil {
return nil, ErrBlockstoreClosed return nil, err
} }
txn := b.DB.NewTransaction(false) txn := b.DB.NewTransaction(false)
@ -405,6 +443,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid) ch := make(chan cid.Cid)
go func() { go func() {
defer b.viewers.Done()
defer close(ch) defer close(ch)
defer iter.Close() defer iter.Close()
@ -415,7 +454,7 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
if ctx.Err() != nil { if ctx.Err() != nil {
return // context has fired. return // context has fired.
} }
if atomic.LoadInt64(&b.state) != stateOpen { if !b.isOpen() {
// open iterators will run even after the database is closed... // open iterators will run even after the database is closed...
return // closing, yield. return // closing, yield.
} }
@ -442,6 +481,56 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return ch, nil return ch, nil
} }
// Implementation of BlockstoreIterator interface
func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
txn := b.DB.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
}
iter := txn.NewIterator(opts)
defer iter.Close()
var buf []byte
for iter.Rewind(); iter.Valid(); iter.Next() {
if !b.isOpen() {
return ErrBlockstoreClosed
}
k := iter.Item().Key()
if b.prefixing {
k = k[b.prefixLen:]
}
klen := base32.RawStdEncoding.DecodedLen(len(k))
if klen > len(buf) {
buf = make([]byte, klen)
}
n, err := base32.RawStdEncoding.Decode(buf, k)
if err != nil {
return err
}
c := cid.NewCidV1(cid.Raw, buf[:n])
err = f(c)
if err != nil {
return err
}
}
return nil
}
// HashOnRead implements Blockstore.HashOnRead. It is not supported by this // HashOnRead implements Blockstore.HashOnRead. It is not supported by this
// blockstore. // blockstore.
func (b *Blockstore) HashOnRead(_ bool) { func (b *Blockstore) HashOnRead(_ bool) {

View File

@ -30,6 +30,11 @@ type BatchDeleter interface {
DeleteMany(cids []cid.Cid) error DeleteMany(cids []cid.Cid) error
} }
// BlockstoreIterator is a trait for efficient iteration
type BlockstoreIterator interface {
ForEachKey(func(cid.Cid) error) error
}
// WrapIDStore wraps the underlying blockstore in an "identity" blockstore. // WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity" // The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity // hash function. It also extracts inlined blocks from CIDs using the identity

66
blockstore/discard.go Normal file
View File

@ -0,0 +1,66 @@
package blockstore
import (
"context"
"io"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)
var _ Blockstore = (*discardstore)(nil)
type discardstore struct {
bs Blockstore
}
func NewDiscardStore(bs Blockstore) Blockstore {
return &discardstore{bs: bs}
}
func (b *discardstore) Has(cid cid.Cid) (bool, error) {
return b.bs.Has(cid)
}
func (b *discardstore) HashOnRead(hor bool) {
b.bs.HashOnRead(hor)
}
func (b *discardstore) Get(cid cid.Cid) (blocks.Block, error) {
return b.bs.Get(cid)
}
func (b *discardstore) GetSize(cid cid.Cid) (int, error) {
return b.bs.GetSize(cid)
}
func (b *discardstore) View(cid cid.Cid, f func([]byte) error) error {
return b.bs.View(cid, f)
}
func (b *discardstore) Put(blk blocks.Block) error {
return nil
}
func (b *discardstore) PutMany(blks []blocks.Block) error {
return nil
}
func (b *discardstore) DeleteBlock(cid cid.Cid) error {
return nil
}
func (b *discardstore) DeleteMany(cids []cid.Cid) error {
return nil
}
func (b *discardstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}
func (b *discardstore) Close() error {
if c, ok := b.bs.(io.Closer); ok {
return c.Close()
}
return nil
}

View File

@ -0,0 +1,72 @@
# SplitStore: An actively scalable blockstore for the Filecoin chain
The SplitStore was first introduced in lotus v1.5.1, as an experiment
in reducing the performance impact of large blockstores.
With lotus v1.11.1, we introduce the next iteration in design and
implementation, which we call SplitStore v1.
The new design (see [#6474](https://github.com/filecoin-project/lotus/pull/6474)
evolves the splitstore to be a freestanding compacting blockstore that
allows us to keep a small (60-100GB) working set in a hot blockstore
and reliably archive out of scope objects in a coldstore. The
coldstore can also be a discard store, whereby out of scope objects
are discarded or a regular badger blockstore (the default), which can
be periodically garbage collected according to configurable user
retention policies.
To enable the splitstore, edit `.lotus/config.toml` and add the following:
```
[Chainstore]
EnableSplitstore = true
```
If you intend to use the discard coldstore, your also need to add the following:
```
[Chainstore.Splitstore]
ColdStoreType = "discard"
```
In general you _should not_ have to use the discard store, unless you
are running a network booster or have very constrained hardware with
not enough disk space to maintain a coldstore, even with garbage
collection.
## Operation
When the splitstore is first enabled, the existing blockstore becomes
the coldstore and a fresh hotstore is initialized.
The hotstore is warmed up on first startup so as to load all chain
headers and state roots in the current head. This allows us to
immediately gain the performance benefits of a smallerblockstore which
can be substantial for full archival nodes.
All new writes are directed to the hotstore, while reads first hit the
hotstore, with fallback to the coldstore.
Once 5 finalities have ellapsed, and every finality henceforth, the
blockstore _compacts_. Compaction is the process of moving all
unreachable objects within the last 4 finalities from the hotstore to
the coldstore. If the system is configured with a discard coldstore,
these objects are discarded. Note that chain headers, all the way to
genesis, are considered reachable. Stateroots and messages are
considered reachable only within the last 4 finalities, unless there
is a live reference to them.
## Compaction
Compaction works transactionally with the following algorithm:
- We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
- We walk the chain and mark reachable objects, keeping 4 finalities of state roots and messages and all headers all the way to genesis.
- Once the chain walk is complete, we begin full transaction protection with concurrent marking; we walk and mark all references created during the chain walk. On the same time, all I/O through the API concurrently marks objects as live references.
- We collect cold objects by iterating through the hotstore and checking the mark set; if an object is not marked, then it is candidate for purge.
- When running with a coldstore, we next copy all cold objects to the coldstore.
- At this point we are ready to begin purging:
- We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
- We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
- We then end the transaction and compact/gc the hotstore.
## Coldstore Garbage Collection
TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)

View File

@ -0,0 +1,273 @@
package splitstore
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"os/exec"
"path/filepath"
"runtime/debug"
"strings"
"sync"
"time"
"go.uber.org/multierr"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)
type debugLog struct {
readLog, writeLog, deleteLog, stackLog *debugLogOp
stackMx sync.Mutex
stackMap map[string]string
}
type debugLogOp struct {
path string
mx sync.Mutex
log *os.File
count int
}
func openDebugLog(path string) (*debugLog, error) {
basePath := filepath.Join(path, "debug")
err := os.MkdirAll(basePath, 0755)
if err != nil {
return nil, err
}
readLog, err := openDebugLogOp(basePath, "read.log")
if err != nil {
return nil, err
}
writeLog, err := openDebugLogOp(basePath, "write.log")
if err != nil {
_ = readLog.Close()
return nil, err
}
deleteLog, err := openDebugLogOp(basePath, "delete.log")
if err != nil {
_ = readLog.Close()
_ = writeLog.Close()
return nil, err
}
stackLog, err := openDebugLogOp(basePath, "stack.log")
if err != nil {
_ = readLog.Close()
_ = writeLog.Close()
_ = deleteLog.Close()
return nil, xerrors.Errorf("error opening stack log: %w", err)
}
return &debugLog{
readLog: readLog,
writeLog: writeLog,
deleteLog: deleteLog,
stackLog: stackLog,
stackMap: make(map[string]string),
}, nil
}
func (d *debugLog) LogReadMiss(cid cid.Cid) {
if d == nil {
return
}
stack := d.getStack()
err := d.readLog.Log("%s %s %s\n", d.timestamp(), cid, stack)
if err != nil {
log.Warnf("error writing read log: %s", err)
}
}
func (d *debugLog) LogWrite(blk blocks.Block) {
if d == nil {
return
}
var stack string
if enableDebugLogWriteTraces {
stack = " " + d.getStack()
}
err := d.writeLog.Log("%s %s%s\n", d.timestamp(), blk.Cid(), stack)
if err != nil {
log.Warnf("error writing write log: %s", err)
}
}
func (d *debugLog) LogWriteMany(blks []blocks.Block) {
if d == nil {
return
}
var stack string
if enableDebugLogWriteTraces {
stack = " " + d.getStack()
}
now := d.timestamp()
for _, blk := range blks {
err := d.writeLog.Log("%s %s%s\n", now, blk.Cid(), stack)
if err != nil {
log.Warnf("error writing write log: %s", err)
break
}
}
}
func (d *debugLog) LogDelete(cids []cid.Cid) {
if d == nil {
return
}
now := d.timestamp()
for _, c := range cids {
err := d.deleteLog.Log("%s %s\n", now, c)
if err != nil {
log.Warnf("error writing delete log: %s", err)
break
}
}
}
func (d *debugLog) Flush() {
if d == nil {
return
}
// rotate non-empty logs
d.readLog.Rotate()
d.writeLog.Rotate()
d.deleteLog.Rotate()
d.stackLog.Rotate()
}
func (d *debugLog) Close() error {
if d == nil {
return nil
}
err1 := d.readLog.Close()
err2 := d.writeLog.Close()
err3 := d.deleteLog.Close()
err4 := d.stackLog.Close()
return multierr.Combine(err1, err2, err3, err4)
}
func (d *debugLog) getStack() string {
sk := d.getNormalizedStackTrace()
hash := sha256.Sum256([]byte(sk))
key := string(hash[:])
d.stackMx.Lock()
repr, ok := d.stackMap[key]
if !ok {
repr = hex.EncodeToString(hash[:])
d.stackMap[key] = repr
err := d.stackLog.Log("%s\n%s\n", repr, sk)
if err != nil {
log.Warnf("error writing stack trace for %s: %s", repr, err)
}
}
d.stackMx.Unlock()
return repr
}
func (d *debugLog) getNormalizedStackTrace() string {
sk := string(debug.Stack())
// Normalization for deduplication
// skip first line -- it's the goroutine
// for each line that ends in a ), remove the call args -- these are the registers
lines := strings.Split(sk, "\n")[1:]
for i, line := range lines {
if len(line) > 0 && line[len(line)-1] == ')' {
idx := strings.LastIndex(line, "(")
if idx < 0 {
continue
}
lines[i] = line[:idx]
}
}
return strings.Join(lines, "\n")
}
func (d *debugLog) timestamp() string {
ts, _ := time.Now().MarshalText()
return string(ts)
}
func openDebugLogOp(basePath, name string) (*debugLogOp, error) {
path := filepath.Join(basePath, name)
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, xerrors.Errorf("error opening %s: %w", name, err)
}
return &debugLogOp{path: path, log: file}, nil
}
func (d *debugLogOp) Close() error {
d.mx.Lock()
defer d.mx.Unlock()
return d.log.Close()
}
func (d *debugLogOp) Log(template string, arg ...interface{}) error {
d.mx.Lock()
defer d.mx.Unlock()
d.count++
_, err := fmt.Fprintf(d.log, template, arg...)
return err
}
func (d *debugLogOp) Rotate() {
d.mx.Lock()
defer d.mx.Unlock()
if d.count == 0 {
return
}
err := d.log.Close()
if err != nil {
log.Warnf("error closing log (file: %s): %s", d.path, err)
return
}
arxivPath := fmt.Sprintf("%s-%d", d.path, time.Now().Unix())
err = os.Rename(d.path, arxivPath)
if err != nil {
log.Warnf("error moving log (file: %s): %s", d.path, err)
return
}
go func() {
cmd := exec.Command("gzip", arxivPath)
err := cmd.Run()
if err != nil {
log.Warnf("error compressing log (file: %s): %s", arxivPath, err)
}
}()
d.count = 0
d.log, err = os.OpenFile(d.path, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
log.Warnf("error opening log (file: %s): %s", d.path, err)
return
}
}

View File

@ -1,26 +1,26 @@
package splitstore package splitstore
import ( import (
"path/filepath" "errors"
"golang.org/x/xerrors" "golang.org/x/xerrors"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
var errMarkSetClosed = errors.New("markset closed")
// MarkSet is a utility to keep track of seen CID, and later query for them. // MarkSet is a utility to keep track of seen CID, and later query for them.
// //
// * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt). // * If the expected dataset is large, it can be backed by a datastore (e.g. bbolt).
// * If a probabilistic result is acceptable, it can be backed by a bloom filter (default). // * If a probabilistic result is acceptable, it can be backed by a bloom filter
type MarkSet interface { type MarkSet interface {
Mark(cid.Cid) error Mark(cid.Cid) error
Has(cid.Cid) (bool, error) Has(cid.Cid) (bool, error)
Close() error Close() error
SetConcurrent()
} }
// markBytes is deliberately a non-nil empty byte slice for serialization.
var markBytes = []byte{}
type MarkSetEnv interface { type MarkSetEnv interface {
Create(name string, sizeHint int64) (MarkSet, error) Create(name string, sizeHint int64) (MarkSet, error)
Close() error Close() error
@ -28,10 +28,10 @@ type MarkSetEnv interface {
func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) { func OpenMarkSetEnv(path string, mtype string) (MarkSetEnv, error) {
switch mtype { switch mtype {
case "", "bloom": case "bloom":
return NewBloomMarkSetEnv() return NewBloomMarkSetEnv()
case "bolt": case "map":
return NewBoltMarkSetEnv(filepath.Join(path, "markset.bolt")) return NewMapMarkSetEnv()
default: default:
return nil, xerrors.Errorf("unknown mark set type %s", mtype) return nil, xerrors.Errorf("unknown mark set type %s", mtype)
} }

View File

@ -3,6 +3,7 @@ package splitstore
import ( import (
"crypto/rand" "crypto/rand"
"crypto/sha256" "crypto/sha256"
"sync"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -21,7 +22,9 @@ var _ MarkSetEnv = (*BloomMarkSetEnv)(nil)
type BloomMarkSet struct { type BloomMarkSet struct {
salt []byte salt []byte
mx sync.RWMutex
bf *bbloom.Bloom bf *bbloom.Bloom
ts bool
} }
var _ MarkSet = (*BloomMarkSet)(nil) var _ MarkSet = (*BloomMarkSet)(nil)
@ -64,14 +67,41 @@ func (s *BloomMarkSet) saltedKey(cid cid.Cid) []byte {
} }
func (s *BloomMarkSet) Mark(cid cid.Cid) error { func (s *BloomMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.bf == nil {
return errMarkSetClosed
}
s.bf.Add(s.saltedKey(cid)) s.bf.Add(s.saltedKey(cid))
return nil return nil
} }
func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) { func (s *BloomMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
if s.bf == nil {
return false, errMarkSetClosed
}
return s.bf.Has(s.saltedKey(cid)), nil return s.bf.Has(s.saltedKey(cid)), nil
} }
func (s *BloomMarkSet) Close() error { func (s *BloomMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.bf = nil
return nil return nil
} }
func (s *BloomMarkSet) SetConcurrent() {
s.ts = true
}

View File

@ -1,81 +0,0 @@
package splitstore
import (
"time"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
bolt "go.etcd.io/bbolt"
)
type BoltMarkSetEnv struct {
db *bolt.DB
}
var _ MarkSetEnv = (*BoltMarkSetEnv)(nil)
type BoltMarkSet struct {
db *bolt.DB
bucketId []byte
}
var _ MarkSet = (*BoltMarkSet)(nil)
func NewBoltMarkSetEnv(path string) (*BoltMarkSetEnv, error) {
db, err := bolt.Open(path, 0644,
&bolt.Options{
Timeout: 1 * time.Second,
NoSync: true,
})
if err != nil {
return nil, err
}
return &BoltMarkSetEnv{db: db}, nil
}
func (e *BoltMarkSetEnv) Create(name string, hint int64) (MarkSet, error) {
bucketId := []byte(name)
err := e.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketId)
if err != nil {
return xerrors.Errorf("error creating bolt db bucket %s: %w", name, err)
}
return nil
})
if err != nil {
return nil, err
}
return &BoltMarkSet{db: e.db, bucketId: bucketId}, nil
}
func (e *BoltMarkSetEnv) Close() error {
return e.db.Close()
}
func (s *BoltMarkSet) Mark(cid cid.Cid) error {
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
return b.Put(cid.Hash(), markBytes)
})
}
func (s *BoltMarkSet) Has(cid cid.Cid) (result bool, err error) {
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
v := b.Get(cid.Hash())
result = v != nil
return nil
})
return result, err
}
func (s *BoltMarkSet) Close() error {
return s.db.Update(func(tx *bolt.Tx) error {
return tx.DeleteBucket(s.bucketId)
})
}

View File

@ -0,0 +1,75 @@
package splitstore
import (
"sync"
cid "github.com/ipfs/go-cid"
)
type MapMarkSetEnv struct{}
var _ MarkSetEnv = (*MapMarkSetEnv)(nil)
type MapMarkSet struct {
mx sync.RWMutex
set map[string]struct{}
ts bool
}
var _ MarkSet = (*MapMarkSet)(nil)
func NewMapMarkSetEnv() (*MapMarkSetEnv, error) {
return &MapMarkSetEnv{}, nil
}
func (e *MapMarkSetEnv) Create(name string, sizeHint int64) (MarkSet, error) {
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
}, nil
}
func (e *MapMarkSetEnv) Close() error {
return nil
}
func (s *MapMarkSet) Mark(cid cid.Cid) error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
if s.set == nil {
return errMarkSetClosed
}
s.set[string(cid.Hash())] = struct{}{}
return nil
}
func (s *MapMarkSet) Has(cid cid.Cid) (bool, error) {
if s.ts {
s.mx.RLock()
defer s.mx.RUnlock()
}
if s.set == nil {
return false, errMarkSetClosed
}
_, ok := s.set[string(cid.Hash())]
return ok, nil
}
func (s *MapMarkSet) Close() error {
if s.ts {
s.mx.Lock()
defer s.mx.Unlock()
}
s.set = nil
return nil
}
func (s *MapMarkSet) SetConcurrent() {
s.ts = true
}

View File

@ -8,8 +8,8 @@ import (
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
) )
func TestBoltMarkSet(t *testing.T) { func TestMapMarkSet(t *testing.T) {
testMarkSet(t, "bolt") testMarkSet(t, "map")
} }
func TestBloomMarkSet(t *testing.T) { func TestBloomMarkSet(t *testing.T) {

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@ package splitstore
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock" "github.com/filecoin-project/lotus/chain/types/mock"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore" datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync" dssync "github.com/ipfs/go-datastore/sync"
@ -21,22 +23,34 @@ import (
func init() { func init() {
CompactionThreshold = 5 CompactionThreshold = 5
CompactionCold = 1
CompactionBoundary = 2 CompactionBoundary = 2
logging.SetLogLevel("splitstore", "DEBUG") logging.SetLogLevel("splitstore", "DEBUG")
} }
func testSplitStore(t *testing.T, cfg *Config) { func testSplitStore(t *testing.T, cfg *Config) {
chain := &mockChain{t: t} chain := &mockChain{t: t}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// the myriads of stores // the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore()) ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := blockstore.NewMemorySync() hot := newMockStore()
cold := blockstore.NewMemorySync() cold := newMockStore()
// this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3})
err := cold.Put(garbage)
if err != nil {
t.Fatal(err)
}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genBlock.Messages = garbage.Cid()
genBlock.ParentMessageReceipts = garbage.Cid()
genBlock.ParentStateRoot = garbage.Cid()
genBlock.Timestamp = uint64(time.Now().Unix())
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// put the genesis block to cold store // put the genesis block to cold store
blk, err := genBlock.ToStorageBlock() blk, err := genBlock.ToStorageBlock()
@ -62,12 +76,22 @@ func testSplitStore(t *testing.T, cfg *Config) {
} }
// make some tipsets, but not enough to cause compaction // make some tipsets, but not enough to cause compaction
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet { mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
blk := mock.MkBlock(curTs, uint64(i), uint64(i)) blk := mock.MkBlock(curTs, uint64(i), uint64(i))
blk.Messages = garbage.Cid()
blk.ParentMessageReceipts = garbage.Cid()
blk.ParentStateRoot = stateRoot.Cid()
blk.Timestamp = uint64(time.Now().Unix())
sblk, err := blk.ToStorageBlock() sblk, err := blk.ToStorageBlock()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = ss.Put(stateRoot)
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk) err = ss.Put(sblk)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -78,18 +102,6 @@ func testSplitStore(t *testing.T, cfg *Config) {
return ts return ts
} }
mkGarbageBlock := func(curTs *types.TipSet, i int) {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk)
if err != nil {
t.Fatal(err)
}
}
waitForCompaction := func() { waitForCompaction := func() {
for atomic.LoadInt32(&ss.compacting) == 1 { for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -98,105 +110,63 @@ func testSplitStore(t *testing.T, cfg *Config) {
curTs := genTs curTs := genTs
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
curTs = mkBlock(curTs, i) stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction() waitForCompaction()
} }
mkGarbageBlock(genTs, 1)
// count objects in the cold and hot stores // count objects in the cold and hot stores
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
countBlocks := func(bs blockstore.Blockstore) int { countBlocks := func(bs blockstore.Blockstore) int {
count := 0 count := 0
ch, err := bs.AllKeysChan(ctx) _ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
if err != nil {
t.Fatal(err)
}
for range ch {
count++ count++
} return nil
})
return count return count
} }
coldCnt := countBlocks(cold) coldCnt := countBlocks(cold)
hotCnt := countBlocks(hot) hotCnt := countBlocks(hot)
if coldCnt != 1 { if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 1, coldCnt) t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
} }
if hotCnt != 5 { if hotCnt != 10 {
t.Errorf("expected %d blocks, but got %d", 5, hotCnt) t.Errorf("expected %d blocks, but got %d", 10, hotCnt)
} }
// trigger a compaction // trigger a compaction
for i := 5; i < 10; i++ { for i := 5; i < 10; i++ {
curTs = mkBlock(curTs, i) stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction() waitForCompaction()
} }
coldCnt = countBlocks(cold) coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot) hotCnt = countBlocks(hot)
if !cfg.EnableFullCompaction {
if coldCnt != 5 { if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt) t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
} }
if hotCnt != 5 { if hotCnt != 17 {
t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt) t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt)
}
}
if cfg.EnableFullCompaction && !cfg.EnableGC {
if coldCnt != 3 {
t.Errorf("expected %d cold blocks, but got %d", 3, coldCnt)
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
}
}
if cfg.EnableFullCompaction && cfg.EnableGC {
if coldCnt != 2 {
t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt)
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
}
} }
// Make sure we can revert without panicking. // Make sure we can revert without panicking.
chain.revert(2) chain.revert(2)
} }
func TestSplitStoreSimpleCompaction(t *testing.T) { func TestSplitStoreCompaction(t *testing.T) {
testSplitStore(t, &Config{TrackingStoreType: "mem"}) testSplitStore(t, &Config{MarkSetType: "map"})
}
func TestSplitStoreFullCompactionWithoutGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
})
}
func TestSplitStoreFullCompactionWithGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
EnableGC: true,
})
} }
type mockChain struct { type mockChain struct {
t testing.TB t testing.TB
sync.Mutex sync.Mutex
genesis *types.BlockHeader
tipsets []*types.TipSet tipsets []*types.TipSet
listener func(revert []*types.TipSet, apply []*types.TipSet) error listener func(revert []*types.TipSet, apply []*types.TipSet) error
} }
@ -204,6 +174,9 @@ type mockChain struct {
func (c *mockChain) push(ts *types.TipSet) { func (c *mockChain) push(ts *types.TipSet) {
c.Lock() c.Lock()
c.tipsets = append(c.tipsets, ts) c.tipsets = append(c.tipsets, ts)
if c.genesis == nil {
c.genesis = ts.Blocks()[0]
}
c.Unlock() c.Unlock()
if c.listener != nil { if c.listener != nil {
@ -242,7 +215,7 @@ func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _
return nil, fmt.Errorf("bad epoch %d", epoch) return nil, fmt.Errorf("bad epoch %d", epoch)
} }
return c.tipsets[iEpoch-1], nil return c.tipsets[iEpoch], nil
} }
func (c *mockChain) GetHeaviestTipSet() *types.TipSet { func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
@ -256,24 +229,105 @@ func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, app
c.listener = change c.listener = change
} }
func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error { type mockStore struct {
c.Lock() mx sync.Mutex
defer c.Unlock() set map[cid.Cid]blocks.Block
start := int(ts.Height()) - 1
end := start - int(epochs)
if end < 0 {
end = -1
} }
for i := start; i > end; i-- {
ts := c.tipsets[i] func newMockStore() *mockStore {
for _, cid := range ts.Cids() { return &mockStore{set: make(map[cid.Cid]blocks.Block)}
err := f(cid) }
func (b *mockStore) Has(cid cid.Cid) (bool, error) {
b.mx.Lock()
defer b.mx.Unlock()
_, ok := b.set[cid]
return ok, nil
}
func (b *mockStore) HashOnRead(hor bool) {}
func (b *mockStore) Get(cid cid.Cid) (blocks.Block, error) {
b.mx.Lock()
defer b.mx.Unlock()
blk, ok := b.set[cid]
if !ok {
return nil, blockstore.ErrNotFound
}
return blk, nil
}
func (b *mockStore) GetSize(cid cid.Cid) (int, error) {
blk, err := b.Get(cid)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
func (b *mockStore) View(cid cid.Cid, f func([]byte) error) error {
blk, err := b.Get(cid)
if err != nil {
return err
}
return f(blk.RawData())
}
func (b *mockStore) Put(blk blocks.Block) error {
b.mx.Lock()
defer b.mx.Unlock()
b.set[blk.Cid()] = blk
return nil
}
func (b *mockStore) PutMany(blks []blocks.Block) error {
b.mx.Lock()
defer b.mx.Unlock()
for _, blk := range blks {
b.set[blk.Cid()] = blk
}
return nil
}
func (b *mockStore) DeleteBlock(cid cid.Cid) error {
b.mx.Lock()
defer b.mx.Unlock()
delete(b.set, cid)
return nil
}
func (b *mockStore) DeleteMany(cids []cid.Cid) error {
b.mx.Lock()
defer b.mx.Unlock()
for _, c := range cids {
delete(b.set, c)
}
return nil
}
func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}
func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
b.mx.Lock()
defer b.mx.Unlock()
for c := range b.set {
err := f(c)
if err != nil { if err != nil {
return err return err
} }
} }
} return nil
}
func (b *mockStore) Close() error {
return nil return nil
} }

View File

@ -1,109 +0,0 @@
package splitstore
import (
"path/filepath"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
cid "github.com/ipfs/go-cid"
)
// TrackingStore is a persistent store that tracks blocks that are added
// to the hotstore, tracking the epoch at which they are written.
type TrackingStore interface {
Put(cid.Cid, abi.ChainEpoch) error
PutBatch([]cid.Cid, abi.ChainEpoch) error
Get(cid.Cid) (abi.ChainEpoch, error)
Delete(cid.Cid) error
DeleteBatch([]cid.Cid) error
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
Sync() error
Close() error
}
// OpenTrackingStore opens a tracking store of the specified type in the
// specified path.
func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
switch ttype {
case "", "bolt":
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
case "mem":
return NewMemTrackingStore(), nil
default:
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
}
}
// NewMemTrackingStore creates an in-memory tracking store.
// This is only useful for test or situations where you don't want to open the
// real tracking store (eg concurrent read only access on a node's datastore)
func NewMemTrackingStore() *MemTrackingStore {
return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)}
}
// MemTrackingStore is a simple in-memory tracking store
type MemTrackingStore struct {
sync.Mutex
tab map[cid.Cid]abi.ChainEpoch
}
var _ TrackingStore = (*MemTrackingStore)(nil)
func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
s.Lock()
defer s.Unlock()
s.tab[cid] = epoch
return nil
}
func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
s.Lock()
defer s.Unlock()
for _, cid := range cids {
s.tab[cid] = epoch
}
return nil
}
func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) {
s.Lock()
defer s.Unlock()
epoch, ok := s.tab[cid]
if ok {
return epoch, nil
}
return 0, xerrors.Errorf("missing tracking epoch for %s", cid)
}
func (s *MemTrackingStore) Delete(cid cid.Cid) error {
s.Lock()
defer s.Unlock()
delete(s.tab, cid)
return nil
}
func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error {
s.Lock()
defer s.Unlock()
for _, cid := range cids {
delete(s.tab, cid)
}
return nil
}
func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
s.Lock()
defer s.Unlock()
for cid, epoch := range s.tab {
err := f(cid, epoch)
if err != nil {
return err
}
}
return nil
}
func (s *MemTrackingStore) Sync() error { return nil }
func (s *MemTrackingStore) Close() error { return nil }

View File

@ -1,120 +0,0 @@
package splitstore
import (
"time"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
bolt "go.etcd.io/bbolt"
"github.com/filecoin-project/go-state-types/abi"
)
type BoltTrackingStore struct {
db *bolt.DB
bucketId []byte
}
var _ TrackingStore = (*BoltTrackingStore)(nil)
func OpenBoltTrackingStore(path string) (*BoltTrackingStore, error) {
opts := &bolt.Options{
Timeout: 1 * time.Second,
NoSync: true,
}
db, err := bolt.Open(path, 0644, opts)
if err != nil {
return nil, err
}
bucketId := []byte("tracker")
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketId)
if err != nil {
return xerrors.Errorf("error creating bolt db bucket %s: %w", string(bucketId), err)
}
return nil
})
if err != nil {
_ = db.Close()
return nil, err
}
return &BoltTrackingStore{db: db, bucketId: bucketId}, nil
}
func (s *BoltTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
return b.Put(cid.Hash(), val)
})
}
func (s *BoltTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
for _, cid := range cids {
err := b.Put(cid.Hash(), val)
if err != nil {
return err
}
}
return nil
})
}
func (s *BoltTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
err = s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
val := b.Get(cid.Hash())
if val == nil {
return xerrors.Errorf("missing tracking epoch for %s", cid)
}
epoch = bytesToEpoch(val)
return nil
})
return epoch, err
}
func (s *BoltTrackingStore) Delete(cid cid.Cid) error {
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
return b.Delete(cid.Hash())
})
}
func (s *BoltTrackingStore) DeleteBatch(cids []cid.Cid) error {
return s.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
for _, cid := range cids {
err := b.Delete(cid.Hash())
if err != nil {
return xerrors.Errorf("error deleting %s", cid)
}
}
return nil
})
}
func (s *BoltTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
return s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketId)
return b.ForEach(func(k, v []byte) error {
cid := cid.NewCidV1(cid.Raw, k)
epoch := bytesToEpoch(v)
return f(cid, epoch)
})
})
}
func (s *BoltTrackingStore) Sync() error {
return s.db.Sync()
}
func (s *BoltTrackingStore) Close() error {
return s.db.Close()
}

View File

@ -1,130 +0,0 @@
package splitstore
import (
"io/ioutil"
"testing"
cid "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/filecoin-project/go-state-types/abi"
)
func TestBoltTrackingStore(t *testing.T) {
testTrackingStore(t, "bolt")
}
func testTrackingStore(t *testing.T, tsType string) {
t.Helper()
makeCid := func(key string) cid.Cid {
h, err := multihash.Sum([]byte(key), multihash.SHA2_256, -1)
if err != nil {
t.Fatal(err)
}
return cid.NewCidV1(cid.Raw, h)
}
mustHave := func(s TrackingStore, cid cid.Cid, epoch abi.ChainEpoch) {
val, err := s.Get(cid)
if err != nil {
t.Fatal(err)
}
if val != epoch {
t.Fatal("epoch mismatch")
}
}
mustNotHave := func(s TrackingStore, cid cid.Cid) {
_, err := s.Get(cid)
if err == nil {
t.Fatal("expected error")
}
}
path, err := ioutil.TempDir("", "snoop-test.*")
if err != nil {
t.Fatal(err)
}
s, err := OpenTrackingStore(path, tsType)
if err != nil {
t.Fatal(err)
}
k1 := makeCid("a")
k2 := makeCid("b")
k3 := makeCid("c")
k4 := makeCid("d")
s.Put(k1, 1) //nolint
s.Put(k2, 2) //nolint
s.Put(k3, 3) //nolint
s.Put(k4, 4) //nolint
mustHave(s, k1, 1)
mustHave(s, k2, 2)
mustHave(s, k3, 3)
mustHave(s, k4, 4)
s.Delete(k1) // nolint
s.Delete(k2) // nolint
mustNotHave(s, k1)
mustNotHave(s, k2)
mustHave(s, k3, 3)
mustHave(s, k4, 4)
s.PutBatch([]cid.Cid{k1}, 1) //nolint
s.PutBatch([]cid.Cid{k2}, 2) //nolint
mustHave(s, k1, 1)
mustHave(s, k2, 2)
mustHave(s, k3, 3)
mustHave(s, k4, 4)
allKeys := map[string]struct{}{
k1.String(): {},
k2.String(): {},
k3.String(): {},
k4.String(): {},
}
err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error {
_, ok := allKeys[k.String()]
if !ok {
t.Fatal("unexpected key")
}
delete(allKeys, k.String())
return nil
})
if err != nil {
t.Fatal(err)
}
if len(allKeys) != 0 {
t.Fatal("not all keys were returned")
}
// no close and reopen and ensure the keys still exist
err = s.Close()
if err != nil {
t.Fatal(err)
}
s, err = OpenTrackingStore(path, tsType)
if err != nil {
t.Fatal(err)
}
mustHave(s, k1, 1)
mustHave(s, k2, 2)
mustHave(s, k3, 3)
mustHave(s, k4, 4)
s.Close() //nolint:errcheck
}

1
go.mod
View File

@ -144,7 +144,6 @@ require (
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325 github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
go.etcd.io/bbolt v1.3.4
go.opencensus.io v0.23.0 go.opencensus.io v0.23.0
go.uber.org/dig v1.10.0 // indirect go.uber.org/dig v1.10.0 // indirect
go.uber.org/fx v1.9.0 go.uber.org/fx v1.9.0

View File

@ -643,6 +643,10 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
If(cfg.EnableSplitstore, If(cfg.EnableSplitstore,
If(cfg.Splitstore.ColdStoreType == "universal",
Override(new(dtypes.ColdBlockstore), From(new(dtypes.UniversalBlockstore)))),
If(cfg.Splitstore.ColdStoreType == "discard",
Override(new(dtypes.ColdBlockstore), modules.DiscardColdBlockstore)),
If(cfg.Splitstore.HotStoreType == "badger", If(cfg.Splitstore.HotStoreType == "badger",
Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)), Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)),
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)), Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)),

View File

@ -230,12 +230,9 @@ type Chainstore struct {
} }
type Splitstore struct { type Splitstore struct {
ColdStoreType string
HotStoreType string HotStoreType string
TrackingStoreType string
MarkSetType string MarkSetType string
EnableFullCompaction bool
EnableGC bool // EXPERIMENTAL
Archival bool
} }
// // Full Node // // Full Node
@ -306,7 +303,9 @@ func DefaultFullNode() *FullNode {
Chainstore: Chainstore{ Chainstore: Chainstore{
EnableSplitstore: false, EnableSplitstore: false,
Splitstore: Splitstore{ Splitstore: Splitstore{
ColdStoreType: "universal",
HotStoreType: "badger", HotStoreType: "badger",
MarkSetType: "map",
}, },
}, },
} }

View File

@ -37,6 +37,10 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked
return bs, err return bs, err
} }
func DiscardColdBlockstore(lc fx.Lifecycle, bs dtypes.UniversalBlockstore) (dtypes.ColdBlockstore, error) {
return blockstore.NewDiscardStore(bs), nil
}
func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) { func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
path, err := r.SplitstorePath() path, err := r.SplitstorePath()
if err != nil { if err != nil {
@ -66,19 +70,16 @@ func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlocksto
return bs, nil return bs, nil
} }
func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.UniversalBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) { return func(lc fx.Lifecycle, r repo.LockedRepo, ds dtypes.MetadataDS, cold dtypes.ColdBlockstore, hot dtypes.HotBlockstore) (dtypes.SplitBlockstore, error) {
path, err := r.SplitstorePath() path, err := r.SplitstorePath()
if err != nil { if err != nil {
return nil, err return nil, err
} }
cfg := &splitstore.Config{ cfg := &splitstore.Config{
TrackingStoreType: cfg.Splitstore.TrackingStoreType,
MarkSetType: cfg.Splitstore.MarkSetType, MarkSetType: cfg.Splitstore.MarkSetType,
EnableFullCompaction: cfg.Splitstore.EnableFullCompaction, DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
EnableGC: cfg.Splitstore.EnableGC,
Archival: cfg.Splitstore.Archival,
} }
ss, err := splitstore.Open(path, ds, hot, cold, cfg) ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil { if err != nil {

View File

@ -24,9 +24,12 @@ import (
type MetadataDS datastore.Batching type MetadataDS datastore.Batching
type ( type (
// UniversalBlockstore is the cold blockstore. // UniversalBlockstore is the universal blockstore backend.
UniversalBlockstore blockstore.Blockstore UniversalBlockstore blockstore.Blockstore
// ColdBlockstore is the Cold blockstore abstraction for the splitstore
ColdBlockstore blockstore.Blockstore
// HotBlockstore is the Hot blockstore abstraction for the splitstore // HotBlockstore is the Hot blockstore abstraction for the splitstore
HotBlockstore blockstore.Blockstore HotBlockstore blockstore.Blockstore