Merge branch 'master' into feat/replace-multistore-carv2

This commit is contained in:
Raúl Kripalani 2021-07-27 22:06:07 +01:00
commit 19182999dd
11 changed files with 624 additions and 52 deletions

View File

@ -8,9 +8,11 @@ import (
"path/filepath"
"runtime"
"sync"
"time"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/multiformats/go-base32"
"go.uber.org/zap"
@ -74,19 +76,44 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) {
b.skip2.Warnf(format, args...)
}
// bsState is the current blockstore state
type bsState int
const (
stateOpen = iota
// stateOpen signifies an open blockstore
stateOpen bsState = iota
// stateClosing signifies a blockstore that is currently closing
stateClosing
// stateClosed signifies a blockstore that has been colosed
stateClosed
)
type bsMoveState int
const (
// moveStateNone signifies that there is no move in progress
moveStateNone bsMoveState = iota
// moveStateMoving signifies that there is a move in a progress
moveStateMoving
// moveStateCleanup signifies that a move has completed or aborted and we are cleaning up
moveStateCleanup
// moveStateLock signifies that an exclusive lock has been acquired
moveStateLock
)
// Blockstore is a badger-backed IPLD blockstore.
type Blockstore struct {
stateLk sync.RWMutex
state int
state bsState
viewers sync.WaitGroup
DB *badger.DB
moveMx sync.Mutex
moveCond sync.Cond
moveState bsMoveState
rlock int
db *badger.DB
dbNext *badger.DB // when moving
opts Options
prefixing bool
@ -113,13 +140,15 @@ func Open(opts Options) (*Blockstore, error) {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
bs := &Blockstore{DB: db, opts: opts}
bs := &Blockstore{db: db, opts: opts}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
bs.prefixLen = len(bs.prefix)
}
bs.moveCond.L = &bs.moveMx
return bs, nil
}
@ -143,7 +172,7 @@ func (b *Blockstore) Close() error {
// wait for all accesses to complete
b.viewers.Wait()
return b.DB.Close()
return b.db.Close()
}
func (b *Blockstore) access() error {
@ -165,12 +194,225 @@ func (b *Blockstore) isOpen() bool {
return b.state == stateOpen
}
// CollectGarbage runs garbage collection on the value log
func (b *Blockstore) CollectGarbage() error {
if err := b.access(); err != nil {
// lockDB/unlockDB implement a recursive lock contingent on move state
func (b *Blockstore) lockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
if b.rlock == 0 {
for b.moveState == moveStateLock {
b.moveCond.Wait()
}
}
b.rlock++
}
func (b *Blockstore) unlockDB() {
b.moveMx.Lock()
defer b.moveMx.Unlock()
b.rlock--
if b.rlock == 0 && b.moveState == moveStateLock {
b.moveCond.Broadcast()
}
}
// lockMove/unlockMove implement an exclusive lock of move state
func (b *Blockstore) lockMove() {
b.moveMx.Lock()
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
}
func (b *Blockstore) unlockMove(state bsMoveState) {
b.moveState = state
b.moveCond.Broadcast()
b.moveMx.Unlock()
}
// movingGC moves the blockstore to a new path, adjacent to the current path, and creates
// a symlink from the current path to the new path; the old blockstore is deleted.
//
// The blockstore MUST accept new writes during the move and ensure that these
// are persisted to the new blockstore; if a failure occurs aboring the move,
// then they must be peristed to the old blockstore.
// In short, the blockstore must not lose data from new writes during the move.
func (b *Blockstore) movingGC() error {
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
// while one is in progress without clobbering state
b.moveMx.Lock()
if b.moveState != moveStateNone {
b.moveMx.Unlock()
return fmt.Errorf("move in progress")
}
b.moveState = moveStateLock
for b.rlock > 0 {
b.moveCond.Wait()
}
b.moveState = moveStateMoving
b.moveCond.Broadcast()
b.moveMx.Unlock()
var path string
defer func() {
b.lockMove()
db2 := b.dbNext
b.dbNext = nil
var state bsMoveState
if db2 != nil {
state = moveStateCleanup
} else {
state = moveStateNone
}
b.unlockMove(state)
if db2 != nil {
err := db2.Close()
if err != nil {
log.Warnf("error closing badger db: %s", err)
}
b.deleteDB(path)
b.lockMove()
b.unlockMove(moveStateNone)
}
}()
// we resolve symlinks to create the new path in the adjacent to the old path.
// this allows the user to symlink the db directory into a separate filesystem.
basePath := b.opts.Dir
linkPath, err := filepath.EvalSymlinks(basePath)
if err != nil {
return fmt.Errorf("error resolving symlink %s: %w", basePath, err)
}
if basePath == linkPath {
path = basePath
} else {
name := filepath.Base(basePath)
dir := filepath.Dir(linkPath)
path = filepath.Join(dir, name)
}
path = fmt.Sprintf("%s.%d", path, time.Now().UnixNano())
log.Infof("moving blockstore from %s to %s", b.opts.Dir, path)
opts := b.opts
opts.Dir = path
opts.ValueDir = path
db2, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore in %s: %w", path, err)
}
b.lockMove()
b.dbNext = db2
b.unlockMove(moveStateMoving)
log.Info("copying blockstore")
err = b.doCopy(b.db, b.dbNext)
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", path, err)
}
b.lockMove()
db1 := b.db
b.db = b.dbNext
b.dbNext = nil
b.unlockMove(moveStateCleanup)
err = db1.Close()
if err != nil {
log.Warnf("error closing old badger db: %s", err)
}
dbpath := b.opts.Dir
oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix())
if err = os.Rename(dbpath, oldpath); err != nil {
// this is not catastrophic in the sense that we have not lost any data.
// but it is pretty bad, as the db path points to the old db, while we are now using to the new
// db; we can't continue and leave a ticking bomb for the next restart.
// so a panic is appropriate and user can fix.
panic(fmt.Errorf("error renaming old badger db dir from %s to %s: %w; USER ACTION REQUIRED", dbpath, oldpath, err)) //nolint
}
if err = os.Symlink(path, dbpath); err != nil {
// same here; the db path is pointing to the void. panic and let the user fix.
panic(fmt.Errorf("error symlinking new badger db dir from %s to %s: %w; USER ACTION REQUIRED", path, dbpath, err)) //nolint
}
b.deleteDB(oldpath)
log.Info("moving blockstore done")
return nil
}
// doCopy copies a badger blockstore to another, with an optional filter; if the filter
// is not nil, then only cids that satisfy the filter will be copied.
func (b *Blockstore) doCopy(from, to *badger.DB) error {
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
stream := from.NewStream()
stream.NumGo = workers
stream.LogPrefix = "doCopy"
stream.Send = func(list *pb.KVList) error {
batch := to.NewWriteBatch()
defer batch.Cancel()
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
defer b.viewers.Done()
}
return batch.Flush()
}
return stream.Orchestrate(context.Background())
}
func (b *Blockstore) deleteDB(path string) {
// follow symbolic links, otherwise the data wil be left behind
lpath, err := filepath.EvalSymlinks(path)
if err != nil {
log.Warnf("error resolving symlinks in %s", path)
return
}
log.Infof("removing data directory %s", lpath)
if err := os.RemoveAll(lpath); err != nil {
log.Warnf("error deleting db at %s: %s", lpath, err)
return
}
if path != lpath {
log.Infof("removing link %s", path)
if err := os.Remove(path); err != nil {
log.Warnf("error removing symbolic link %s", err)
}
}
}
func (b *Blockstore) onlineGC() error {
b.lockDB()
defer b.unlockDB()
// compact first to gather the necessary statistics for GC
nworkers := runtime.NumCPU() / 2
@ -178,13 +420,13 @@ func (b *Blockstore) CollectGarbage() error {
nworkers = 2
}
err := b.DB.Flatten(nworkers)
err := b.db.Flatten(nworkers)
if err != nil {
return err
}
for err == nil {
err = b.DB.RunValueLogGC(0.125)
err = b.db.RunValueLogGC(0.125)
}
if err == badger.ErrNoRewrite {
@ -195,6 +437,29 @@ func (b *Blockstore) CollectGarbage() error {
return err
}
// CollectGarbage compacts and runs garbage collection on the value log;
// implements the BlockstoreGC trait
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
if err := b.access(); err != nil {
return err
}
defer b.viewers.Done()
var options blockstore.BlockstoreGCOptions
for _, opt := range opts {
err := opt(&options)
if err != nil {
return err
}
}
if options.FullGC {
return b.movingGC()
}
return b.onlineGC()
}
// Size returns the aggregate size of the blockstore
func (b *Blockstore) Size() (int64, error) {
if err := b.access(); err != nil {
@ -202,7 +467,10 @@ func (b *Blockstore) Size() (int64, error) {
}
defer b.viewers.Done()
lsm, vlog := b.DB.Size()
b.lockDB()
defer b.unlockDB()
lsm, vlog := b.db.Size()
size := lsm + vlog
if size == 0 {
@ -234,12 +502,15 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
return b.DB.View(func(txn *badger.Txn) error {
return b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
return item.Value(fn)
@ -258,12 +529,15 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(k)
return err
})
@ -289,13 +563,16 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var val []byte
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
val, err = item.ValueCopy(nil)
@ -319,13 +596,16 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
var size int
err := b.DB.View(func(txn *badger.Txn) error {
err := b.db.View(func(txn *badger.Txn) error {
switch item, err := txn.Get(k); err {
case nil:
size = int(item.ValueSize())
@ -349,20 +629,38 @@ func (b *Blockstore) Put(block blocks.Block) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
defer KeyPool.Put(k)
}
err := b.DB.Update(func(txn *badger.Txn) error {
put := func(db *badger.DB) error {
err := db.Update(func(txn *badger.Txn) error {
return txn.Set(k, block.RawData())
})
if err != nil {
err = fmt.Errorf("failed to put block in badger blockstore: %w", err)
return fmt.Errorf("failed to put block in badger blockstore: %w", err)
}
return nil
}
if err := put(b.db); err != nil {
return err
}
if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
return err
}
}
return nil
}
// PutMany implements Blockstore.PutMany.
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
if err := b.access(); err != nil {
@ -370,6 +668,9 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
@ -383,14 +684,21 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
}()
}
batch := b.DB.NewWriteBatch()
defer batch.Cancel()
keys := make([][]byte, 0, len(blocks))
for _, block := range blocks {
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
keys = append(keys, k)
}
put := func(db *badger.DB) error {
batch := db.NewWriteBatch()
defer batch.Cancel()
for i, block := range blocks {
k := keys[i]
if err := batch.Set(k, block.RawData()); err != nil {
return err
}
@ -398,11 +706,25 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
err := batch.Flush()
if err != nil {
err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}
return nil
}
if err := put(b.db); err != nil {
return err
}
if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
return err
}
}
return nil
}
// DeleteBlock implements Blockstore.DeleteBlock.
func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
if err := b.access(); err != nil {
@ -410,12 +732,15 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
k, pooled := b.PooledStorageKey(cid)
if pooled {
defer KeyPool.Put(k)
}
return b.DB.Update(func(txn *badger.Txn) error {
return b.db.Update(func(txn *badger.Txn) error {
return txn.Delete(k)
})
}
@ -426,6 +751,9 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
}
defer b.viewers.Done()
b.lockDB()
defer b.unlockDB()
// toReturn tracks the byte slices to return to the pool, if we're using key
// prefixing. we can't return each slice to the pool after each Set, because
// badger holds on to the slice.
@ -439,7 +767,7 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
}()
}
batch := b.DB.NewWriteBatch()
batch := b.db.NewWriteBatch()
defer batch.Cancel()
for _, cid := range cids {
@ -465,7 +793,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}
txn := b.DB.NewTransaction(false)
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
opts := badger.IteratorOptions{PrefetchSize: 100}
if b.prefixing {
opts.Prefix = b.prefix
@ -519,7 +850,10 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
}
defer b.viewers.Done()
txn := b.DB.NewTransaction(false)
b.lockDB()
defer b.unlockDB()
txn := b.db.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: 100}
@ -614,3 +948,9 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte {
}
return dst[:reqsize]
}
// this method is added for lotus-shed needs
// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE
func (b *Blockstore) DB() *badger.DB {
return b.db
}

View File

@ -1,12 +1,19 @@
package badgerbs
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
blocks "github.com/ipfs/go-block-format"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/blockstore"
)
@ -89,3 +96,165 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB,
return Open(optsSupplier(path))
}
}
func testMove(t *testing.T, optsF func(string) Options) {
basePath, err := ioutil.TempDir("", "")
if err != nil {
t.Fatal(err)
}
dbPath := filepath.Join(basePath, "db")
t.Cleanup(func() {
_ = os.RemoveAll(basePath)
})
db, err := Open(optsF(dbPath))
if err != nil {
t.Fatal(err)
}
defer db.Close() //nolint
var have []blocks.Block
var deleted []cid.Cid
// add some blocks
for i := 0; i < 10; i++ {
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := db.Put(blk)
if err != nil {
t.Fatal(err)
}
have = append(have, blk)
}
// delete some of them
for i := 5; i < 10; i++ {
c := have[i].Cid()
err := db.DeleteBlock(c)
if err != nil {
t.Fatal(err)
}
deleted = append(deleted, c)
}
have = have[:5]
// start a move concurrent with some more puts
g := new(errgroup.Group)
g.Go(func() error {
for i := 10; i < 1000; i++ {
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := db.Put(blk)
if err != nil {
return err
}
have = append(have, blk)
}
return nil
})
g.Go(func() error {
return db.CollectGarbage(blockstore.WithFullGC(true))
})
err = g.Wait()
if err != nil {
t.Fatal(err)
}
// now check that we have all the blocks in have and none in the deleted lists
checkBlocks := func() {
for _, blk := range have {
has, err := db.Has(blk.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("missing block")
}
blk2, err := db.Get(blk.Cid())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(blk.RawData(), blk2.RawData()) {
t.Fatal("data mismatch")
}
}
for _, c := range deleted {
has, err := db.Has(c)
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("resurrected block")
}
}
}
checkBlocks()
// check the basePath -- it should contain a directory with name db.{timestamp}, soft-linked
// to db and nothing else
checkPath := func() {
entries, err := os.ReadDir(basePath)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 {
t.Fatalf("too many entries; expected %d but got %d", 2, len(entries))
}
var haveDB, haveDBLink bool
for _, e := range entries {
if e.Name() == "db" {
if (e.Type() & os.ModeSymlink) == 0 {
t.Fatal("found db, but it's not a symlink")
}
haveDBLink = true
continue
}
if strings.HasPrefix(e.Name(), "db.") {
if !e.Type().IsDir() {
t.Fatal("found db prefix, but it's not a directory")
}
haveDB = true
continue
}
}
if !haveDB {
t.Fatal("db directory is missing")
}
if !haveDBLink {
t.Fatal("db link is missing")
}
}
checkPath()
// now do another FullGC to test the double move and following of symlinks
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
t.Fatal(err)
}
checkBlocks()
checkPath()
}
func TestMoveNoPrefix(t *testing.T) {
testMove(t, DefaultOptions)
}
func TestMoveWithPrefix(t *testing.T) {
testMove(t, func(path string) Options {
opts := DefaultOptions(path)
opts.Prefix = "/prefixed/"
return opts
})
}

View File

@ -37,7 +37,22 @@ type BlockstoreIterator interface {
// BlockstoreGC is a trait for blockstores that support online garbage collection
type BlockstoreGC interface {
CollectGarbage() error
CollectGarbage(options ...BlockstoreGCOption) error
}
// BlockstoreGCOption is a functional interface for controlling blockstore GC options
type BlockstoreGCOption = func(*BlockstoreGCOptions) error
// BlockstoreGCOptions is a struct with GC options
type BlockstoreGCOptions struct {
FullGC bool
}
func WithFullGC(fullgc bool) BlockstoreGCOption {
return func(opts *BlockstoreGCOptions) error {
opts.FullGC = fullgc
return nil
}
}
// BlockstoreSize is a trait for on-disk blockstores that can report their size

View File

@ -59,6 +59,15 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
nodes beyond 4 finalities, while running with the discard coldstore option.
It is also useful for miners who accept deals and need to lookback messages beyond
the 4 finalities, which would otherwise hit the coldstore.
- `HotStoreFullGCFrequency` -- specifies how frequenty to garbage collect the hotstore
using full (moving) GC.
The default value is 20, which uses full GC every 20 compactions (about once a week);
set to 0 to disable full GC altogether.
Rationale: badger supports online GC, and this is used by default. However it has proven to
be ineffective in practice with the hotstore size slowly creeping up. In order to address this,
we have added moving GC support in our badger wrapper, which can effectively reclaim all space.
The downside is that it takes a bit longer to perform a moving GC and you also need enough
space to house the new hotstore while the old one is still live.
## Operation

View File

@ -81,6 +81,13 @@ type Config struct {
// - a positive integer indicates the number of finalities, outside the compaction boundary,
// for which messages will be retained in the hotstore.
HotStoreMessageRetention uint64
// HotstoreFullGCFrequency indicates how frequently (in terms of compactions) to garbage collect
// the hotstore using full (moving) GC if supported by the hotstore.
// A value of 0 disables full GC entirely.
// A positive value is the number of compactions before a full GC is performed;
// a value of 1 will perform full GC in every compaction.
HotStoreFullGCFrequency uint64
}
// ChainAccessor allows the Splitstore to access the chain. It will most likely

View File

@ -8,17 +8,22 @@ import (
)
func (s *SplitStore) gcHotstore() {
if err := s.gcBlockstoreOnline(s.hot); err != nil {
var opts []bstore.BlockstoreGCOption
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
opts = append(opts, bstore.WithFullGC(true))
}
if err := s.gcBlockstore(s.hot, opts); err != nil {
log.Warnf("error garbage collecting hostore: %s", err)
}
}
func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error {
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
if gc, ok := b.(bstore.BlockstoreGC); ok {
log.Info("garbage collecting blockstore")
startGC := time.Now()
if err := gc.CollectGarbage(); err != nil {
if err := gc.CollectGarbage(opts...); err != nil {
return err
}
@ -26,5 +31,5 @@ func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error {
return nil
}
return fmt.Errorf("blockstore doesn't support online gc: %T", b)
return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
}

View File

@ -161,7 +161,7 @@ var stateTreePruneCmd = &cli.Command{
if cctx.Bool("only-ds-gc") {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
@ -208,7 +208,7 @@ var stateTreePruneCmd = &cli.Command{
return nil
}
b := badgbs.DB.NewWriteBatch()
b := badgbs.DB().NewWriteBatch()
defer b.Cancel()
markForRemoval := func(c cid.Cid) error {
@ -249,7 +249,7 @@ var stateTreePruneCmd = &cli.Command{
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}

View File

@ -72,6 +72,8 @@ func DefaultFullNode() *FullNode {
ColdStoreType: "universal",
HotStoreType: "badger",
MarkSetType: "map",
HotStoreFullGCFrequency: 20,
},
},
}

View File

@ -685,25 +685,37 @@ submitting proofs to the chain individually`,
Name: "ColdStoreType",
Type: "string",
Comment: ``,
Comment: `ColdStoreType specifies the type of the coldstore.
It can be "universal" (default) or "discard" for discarding cold blocks.`,
},
{
Name: "HotStoreType",
Type: "string",
Comment: ``,
Comment: `HotStoreType specifies the type of the hotstore.
Only currently supported value is "badger".`,
},
{
Name: "MarkSetType",
Type: "string",
Comment: ``,
Comment: `MarkSetType specifies the type of the markset.
It can be "map" (default) for in memory marking or "badger" for on-disk marking.`,
},
{
Name: "HotStoreMessageRetention",
Type: "uint64",
Comment: ``,
Comment: `HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
the compaction boundary; default is 0.`,
},
{
Name: "HotStoreFullGCFrequency",
Type: "uint64",
Comment: `HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore.
A value of 0 disables, while a value 1 will do full GC in every compaction.
Default is 20 (about once a week).`,
},
},
"StorageMiner": []DocField{

View File

@ -278,11 +278,23 @@ type Chainstore struct {
}
type Splitstore struct {
// ColdStoreType specifies the type of the coldstore.
// It can be "universal" (default) or "discard" for discarding cold blocks.
ColdStoreType string
// HotStoreType specifies the type of the hotstore.
// Only currently supported value is "badger".
HotStoreType string
// MarkSetType specifies the type of the markset.
// It can be "map" (default) for in memory marking or "badger" for on-disk marking.
MarkSetType string
// HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
// the compaction boundary; default is 0.
HotStoreMessageRetention uint64
// HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore.
// A value of 0 disables, while a value 1 will do full GC in every compaction.
// Default is 20 (about once a week).
HotStoreFullGCFrequency uint64
}
// // Full Node

View File

@ -81,6 +81,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
}
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil {