Merge branch 'master' into jen/proof
This commit is contained in:
commit
83f7dc8c96
@ -8,9 +8,11 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/dgraph-io/badger/v2"
|
"github.com/dgraph-io/badger/v2"
|
||||||
"github.com/dgraph-io/badger/v2/options"
|
"github.com/dgraph-io/badger/v2/options"
|
||||||
|
"github.com/dgraph-io/badger/v2/pb"
|
||||||
"github.com/multiformats/go-base32"
|
"github.com/multiformats/go-base32"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
@ -74,20 +76,45 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) {
|
|||||||
b.skip2.Warnf(format, args...)
|
b.skip2.Warnf(format, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// bsState is the current blockstore state
|
||||||
|
type bsState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
stateOpen = iota
|
// stateOpen signifies an open blockstore
|
||||||
|
stateOpen bsState = iota
|
||||||
|
// stateClosing signifies a blockstore that is currently closing
|
||||||
stateClosing
|
stateClosing
|
||||||
|
// stateClosed signifies a blockstore that has been colosed
|
||||||
stateClosed
|
stateClosed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type bsMoveState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// moveStateNone signifies that there is no move in progress
|
||||||
|
moveStateNone bsMoveState = iota
|
||||||
|
// moveStateMoving signifies that there is a move in a progress
|
||||||
|
moveStateMoving
|
||||||
|
// moveStateCleanup signifies that a move has completed or aborted and we are cleaning up
|
||||||
|
moveStateCleanup
|
||||||
|
// moveStateLock signifies that an exclusive lock has been acquired
|
||||||
|
moveStateLock
|
||||||
|
)
|
||||||
|
|
||||||
// Blockstore is a badger-backed IPLD blockstore.
|
// Blockstore is a badger-backed IPLD blockstore.
|
||||||
type Blockstore struct {
|
type Blockstore struct {
|
||||||
stateLk sync.RWMutex
|
stateLk sync.RWMutex
|
||||||
state int
|
state bsState
|
||||||
viewers sync.WaitGroup
|
viewers sync.WaitGroup
|
||||||
|
|
||||||
DB *badger.DB
|
moveMx sync.Mutex
|
||||||
opts Options
|
moveCond sync.Cond
|
||||||
|
moveState bsMoveState
|
||||||
|
rlock int
|
||||||
|
|
||||||
|
db *badger.DB
|
||||||
|
dbNext *badger.DB // when moving
|
||||||
|
opts Options
|
||||||
|
|
||||||
prefixing bool
|
prefixing bool
|
||||||
prefix []byte
|
prefix []byte
|
||||||
@ -113,13 +140,15 @@ func Open(opts Options) (*Blockstore, error) {
|
|||||||
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
|
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bs := &Blockstore{DB: db, opts: opts}
|
bs := &Blockstore{db: db, opts: opts}
|
||||||
if p := opts.Prefix; p != "" {
|
if p := opts.Prefix; p != "" {
|
||||||
bs.prefixing = true
|
bs.prefixing = true
|
||||||
bs.prefix = []byte(p)
|
bs.prefix = []byte(p)
|
||||||
bs.prefixLen = len(bs.prefix)
|
bs.prefixLen = len(bs.prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bs.moveCond.L = &bs.moveMx
|
||||||
|
|
||||||
return bs, nil
|
return bs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,7 +172,7 @@ func (b *Blockstore) Close() error {
|
|||||||
// wait for all accesses to complete
|
// wait for all accesses to complete
|
||||||
b.viewers.Wait()
|
b.viewers.Wait()
|
||||||
|
|
||||||
return b.DB.Close()
|
return b.db.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blockstore) access() error {
|
func (b *Blockstore) access() error {
|
||||||
@ -165,12 +194,225 @@ func (b *Blockstore) isOpen() bool {
|
|||||||
return b.state == stateOpen
|
return b.state == stateOpen
|
||||||
}
|
}
|
||||||
|
|
||||||
// CollectGarbage runs garbage collection on the value log
|
// lockDB/unlockDB implement a recursive lock contingent on move state
|
||||||
func (b *Blockstore) CollectGarbage() error {
|
func (b *Blockstore) lockDB() {
|
||||||
if err := b.access(); err != nil {
|
b.moveMx.Lock()
|
||||||
return err
|
defer b.moveMx.Unlock()
|
||||||
|
|
||||||
|
if b.rlock == 0 {
|
||||||
|
for b.moveState == moveStateLock {
|
||||||
|
b.moveCond.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
|
||||||
|
b.rlock++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blockstore) unlockDB() {
|
||||||
|
b.moveMx.Lock()
|
||||||
|
defer b.moveMx.Unlock()
|
||||||
|
|
||||||
|
b.rlock--
|
||||||
|
if b.rlock == 0 && b.moveState == moveStateLock {
|
||||||
|
b.moveCond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lockMove/unlockMove implement an exclusive lock of move state
|
||||||
|
func (b *Blockstore) lockMove() {
|
||||||
|
b.moveMx.Lock()
|
||||||
|
b.moveState = moveStateLock
|
||||||
|
for b.rlock > 0 {
|
||||||
|
b.moveCond.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blockstore) unlockMove(state bsMoveState) {
|
||||||
|
b.moveState = state
|
||||||
|
b.moveCond.Broadcast()
|
||||||
|
b.moveMx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// movingGC moves the blockstore to a new path, adjacent to the current path, and creates
|
||||||
|
// a symlink from the current path to the new path; the old blockstore is deleted.
|
||||||
|
//
|
||||||
|
// The blockstore MUST accept new writes during the move and ensure that these
|
||||||
|
// are persisted to the new blockstore; if a failure occurs aboring the move,
|
||||||
|
// then they must be peristed to the old blockstore.
|
||||||
|
// In short, the blockstore must not lose data from new writes during the move.
|
||||||
|
func (b *Blockstore) movingGC() error {
|
||||||
|
// this inlines moveLock/moveUnlock for the initial state check to prevent a second move
|
||||||
|
// while one is in progress without clobbering state
|
||||||
|
b.moveMx.Lock()
|
||||||
|
if b.moveState != moveStateNone {
|
||||||
|
b.moveMx.Unlock()
|
||||||
|
return fmt.Errorf("move in progress")
|
||||||
|
}
|
||||||
|
|
||||||
|
b.moveState = moveStateLock
|
||||||
|
for b.rlock > 0 {
|
||||||
|
b.moveCond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
b.moveState = moveStateMoving
|
||||||
|
b.moveCond.Broadcast()
|
||||||
|
b.moveMx.Unlock()
|
||||||
|
|
||||||
|
var path string
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
b.lockMove()
|
||||||
|
|
||||||
|
db2 := b.dbNext
|
||||||
|
b.dbNext = nil
|
||||||
|
|
||||||
|
var state bsMoveState
|
||||||
|
if db2 != nil {
|
||||||
|
state = moveStateCleanup
|
||||||
|
} else {
|
||||||
|
state = moveStateNone
|
||||||
|
}
|
||||||
|
|
||||||
|
b.unlockMove(state)
|
||||||
|
|
||||||
|
if db2 != nil {
|
||||||
|
err := db2.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error closing badger db: %s", err)
|
||||||
|
}
|
||||||
|
b.deleteDB(path)
|
||||||
|
|
||||||
|
b.lockMove()
|
||||||
|
b.unlockMove(moveStateNone)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// we resolve symlinks to create the new path in the adjacent to the old path.
|
||||||
|
// this allows the user to symlink the db directory into a separate filesystem.
|
||||||
|
basePath := b.opts.Dir
|
||||||
|
linkPath, err := filepath.EvalSymlinks(basePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error resolving symlink %s: %w", basePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if basePath == linkPath {
|
||||||
|
path = basePath
|
||||||
|
} else {
|
||||||
|
name := filepath.Base(basePath)
|
||||||
|
dir := filepath.Dir(linkPath)
|
||||||
|
path = filepath.Join(dir, name)
|
||||||
|
}
|
||||||
|
path = fmt.Sprintf("%s.%d", path, time.Now().UnixNano())
|
||||||
|
|
||||||
|
log.Infof("moving blockstore from %s to %s", b.opts.Dir, path)
|
||||||
|
|
||||||
|
opts := b.opts
|
||||||
|
opts.Dir = path
|
||||||
|
opts.ValueDir = path
|
||||||
|
|
||||||
|
db2, err := badger.Open(opts.Options)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open badger blockstore in %s: %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.lockMove()
|
||||||
|
b.dbNext = db2
|
||||||
|
b.unlockMove(moveStateMoving)
|
||||||
|
|
||||||
|
log.Info("copying blockstore")
|
||||||
|
err = b.doCopy(b.db, b.dbNext)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error moving badger blockstore to %s: %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.lockMove()
|
||||||
|
db1 := b.db
|
||||||
|
b.db = b.dbNext
|
||||||
|
b.dbNext = nil
|
||||||
|
b.unlockMove(moveStateCleanup)
|
||||||
|
|
||||||
|
err = db1.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error closing old badger db: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dbpath := b.opts.Dir
|
||||||
|
oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix())
|
||||||
|
|
||||||
|
if err = os.Rename(dbpath, oldpath); err != nil {
|
||||||
|
// this is not catastrophic in the sense that we have not lost any data.
|
||||||
|
// but it is pretty bad, as the db path points to the old db, while we are now using to the new
|
||||||
|
// db; we can't continue and leave a ticking bomb for the next restart.
|
||||||
|
// so a panic is appropriate and user can fix.
|
||||||
|
panic(fmt.Errorf("error renaming old badger db dir from %s to %s: %w; USER ACTION REQUIRED", dbpath, oldpath, err)) //nolint
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = os.Symlink(path, dbpath); err != nil {
|
||||||
|
// same here; the db path is pointing to the void. panic and let the user fix.
|
||||||
|
panic(fmt.Errorf("error symlinking new badger db dir from %s to %s: %w; USER ACTION REQUIRED", path, dbpath, err)) //nolint
|
||||||
|
}
|
||||||
|
|
||||||
|
b.deleteDB(oldpath)
|
||||||
|
|
||||||
|
log.Info("moving blockstore done")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// doCopy copies a badger blockstore to another, with an optional filter; if the filter
|
||||||
|
// is not nil, then only cids that satisfy the filter will be copied.
|
||||||
|
func (b *Blockstore) doCopy(from, to *badger.DB) error {
|
||||||
|
workers := runtime.NumCPU() / 2
|
||||||
|
if workers < 2 {
|
||||||
|
workers = 2
|
||||||
|
}
|
||||||
|
|
||||||
|
stream := from.NewStream()
|
||||||
|
stream.NumGo = workers
|
||||||
|
stream.LogPrefix = "doCopy"
|
||||||
|
stream.Send = func(list *pb.KVList) error {
|
||||||
|
batch := to.NewWriteBatch()
|
||||||
|
defer batch.Cancel()
|
||||||
|
|
||||||
|
for _, kv := range list.Kv {
|
||||||
|
if kv.Key == nil || kv.Value == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := batch.Set(kv.Key, kv.Value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return batch.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream.Orchestrate(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blockstore) deleteDB(path string) {
|
||||||
|
// follow symbolic links, otherwise the data wil be left behind
|
||||||
|
lpath, err := filepath.EvalSymlinks(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("error resolving symlinks in %s", path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("removing data directory %s", lpath)
|
||||||
|
if err := os.RemoveAll(lpath); err != nil {
|
||||||
|
log.Warnf("error deleting db at %s: %s", lpath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if path != lpath {
|
||||||
|
log.Infof("removing link %s", path)
|
||||||
|
if err := os.Remove(path); err != nil {
|
||||||
|
log.Warnf("error removing symbolic link %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blockstore) onlineGC() error {
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
// compact first to gather the necessary statistics for GC
|
// compact first to gather the necessary statistics for GC
|
||||||
nworkers := runtime.NumCPU() / 2
|
nworkers := runtime.NumCPU() / 2
|
||||||
@ -178,13 +420,13 @@ func (b *Blockstore) CollectGarbage() error {
|
|||||||
nworkers = 2
|
nworkers = 2
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.DB.Flatten(nworkers)
|
err := b.db.Flatten(nworkers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for err == nil {
|
for err == nil {
|
||||||
err = b.DB.RunValueLogGC(0.125)
|
err = b.db.RunValueLogGC(0.125)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == badger.ErrNoRewrite {
|
if err == badger.ErrNoRewrite {
|
||||||
@ -195,6 +437,29 @@ func (b *Blockstore) CollectGarbage() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CollectGarbage compacts and runs garbage collection on the value log;
|
||||||
|
// implements the BlockstoreGC trait
|
||||||
|
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
|
||||||
|
if err := b.access(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
var options blockstore.BlockstoreGCOptions
|
||||||
|
for _, opt := range opts {
|
||||||
|
err := opt(&options)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.FullGC {
|
||||||
|
return b.movingGC()
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.onlineGC()
|
||||||
|
}
|
||||||
|
|
||||||
// Size returns the aggregate size of the blockstore
|
// Size returns the aggregate size of the blockstore
|
||||||
func (b *Blockstore) Size() (int64, error) {
|
func (b *Blockstore) Size() (int64, error) {
|
||||||
if err := b.access(); err != nil {
|
if err := b.access(); err != nil {
|
||||||
@ -202,7 +467,10 @@ func (b *Blockstore) Size() (int64, error) {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
lsm, vlog := b.DB.Size()
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
|
lsm, vlog := b.db.Size()
|
||||||
size := lsm + vlog
|
size := lsm + vlog
|
||||||
|
|
||||||
if size == 0 {
|
if size == 0 {
|
||||||
@ -234,12 +502,15 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.DB.View(func(txn *badger.Txn) error {
|
return b.db.View(func(txn *badger.Txn) error {
|
||||||
switch item, err := txn.Get(k); err {
|
switch item, err := txn.Get(k); err {
|
||||||
case nil:
|
case nil:
|
||||||
return item.Value(fn)
|
return item.Value(fn)
|
||||||
@ -258,12 +529,15 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.DB.View(func(txn *badger.Txn) error {
|
err := b.db.View(func(txn *badger.Txn) error {
|
||||||
_, err := txn.Get(k)
|
_, err := txn.Get(k)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -289,13 +563,16 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
var val []byte
|
var val []byte
|
||||||
err := b.DB.View(func(txn *badger.Txn) error {
|
err := b.db.View(func(txn *badger.Txn) error {
|
||||||
switch item, err := txn.Get(k); err {
|
switch item, err := txn.Get(k); err {
|
||||||
case nil:
|
case nil:
|
||||||
val, err = item.ValueCopy(nil)
|
val, err = item.ValueCopy(nil)
|
||||||
@ -319,13 +596,16 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
var size int
|
var size int
|
||||||
err := b.DB.View(func(txn *badger.Txn) error {
|
err := b.db.View(func(txn *badger.Txn) error {
|
||||||
switch item, err := txn.Get(k); err {
|
switch item, err := txn.Get(k); err {
|
||||||
case nil:
|
case nil:
|
||||||
size = int(item.ValueSize())
|
size = int(item.ValueSize())
|
||||||
@ -349,18 +629,36 @@ func (b *Blockstore) Put(block blocks.Block) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(block.Cid())
|
k, pooled := b.PooledStorageKey(block.Cid())
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.DB.Update(func(txn *badger.Txn) error {
|
put := func(db *badger.DB) error {
|
||||||
return txn.Set(k, block.RawData())
|
err := db.Update(func(txn *badger.Txn) error {
|
||||||
})
|
return txn.Set(k, block.RawData())
|
||||||
if err != nil {
|
})
|
||||||
err = fmt.Errorf("failed to put block in badger blockstore: %w", err)
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to put block in badger blockstore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
if err := put(b.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.dbNext != nil {
|
||||||
|
if err := put(b.dbNext); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutMany implements Blockstore.PutMany.
|
// PutMany implements Blockstore.PutMany.
|
||||||
@ -370,6 +668,9 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
// toReturn tracks the byte slices to return to the pool, if we're using key
|
// toReturn tracks the byte slices to return to the pool, if we're using key
|
||||||
// prefixing. we can't return each slice to the pool after each Set, because
|
// prefixing. we can't return each slice to the pool after each Set, because
|
||||||
// badger holds on to the slice.
|
// badger holds on to the slice.
|
||||||
@ -383,24 +684,45 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
batch := b.DB.NewWriteBatch()
|
keys := make([][]byte, 0, len(blocks))
|
||||||
defer batch.Cancel()
|
|
||||||
|
|
||||||
for _, block := range blocks {
|
for _, block := range blocks {
|
||||||
k, pooled := b.PooledStorageKey(block.Cid())
|
k, pooled := b.PooledStorageKey(block.Cid())
|
||||||
if pooled {
|
if pooled {
|
||||||
toReturn = append(toReturn, k)
|
toReturn = append(toReturn, k)
|
||||||
}
|
}
|
||||||
if err := batch.Set(k, block.RawData()); err != nil {
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
put := func(db *badger.DB) error {
|
||||||
|
batch := db.NewWriteBatch()
|
||||||
|
defer batch.Cancel()
|
||||||
|
|
||||||
|
for i, block := range blocks {
|
||||||
|
k := keys[i]
|
||||||
|
if err := batch.Set(k, block.RawData()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := batch.Flush()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := put(b.db); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.dbNext != nil {
|
||||||
|
if err := put(b.dbNext); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := batch.Flush()
|
return nil
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBlock implements Blockstore.DeleteBlock.
|
// DeleteBlock implements Blockstore.DeleteBlock.
|
||||||
@ -410,12 +732,15 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
k, pooled := b.PooledStorageKey(cid)
|
k, pooled := b.PooledStorageKey(cid)
|
||||||
if pooled {
|
if pooled {
|
||||||
defer KeyPool.Put(k)
|
defer KeyPool.Put(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.DB.Update(func(txn *badger.Txn) error {
|
return b.db.Update(func(txn *badger.Txn) error {
|
||||||
return txn.Delete(k)
|
return txn.Delete(k)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -426,6 +751,9 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
// toReturn tracks the byte slices to return to the pool, if we're using key
|
// toReturn tracks the byte slices to return to the pool, if we're using key
|
||||||
// prefixing. we can't return each slice to the pool after each Set, because
|
// prefixing. we can't return each slice to the pool after each Set, because
|
||||||
// badger holds on to the slice.
|
// badger holds on to the slice.
|
||||||
@ -439,7 +767,7 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
batch := b.DB.NewWriteBatch()
|
batch := b.db.NewWriteBatch()
|
||||||
defer batch.Cancel()
|
defer batch.Cancel()
|
||||||
|
|
||||||
for _, cid := range cids {
|
for _, cid := range cids {
|
||||||
@ -465,7 +793,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
txn := b.DB.NewTransaction(false)
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
|
txn := b.db.NewTransaction(false)
|
||||||
opts := badger.IteratorOptions{PrefetchSize: 100}
|
opts := badger.IteratorOptions{PrefetchSize: 100}
|
||||||
if b.prefixing {
|
if b.prefixing {
|
||||||
opts.Prefix = b.prefix
|
opts.Prefix = b.prefix
|
||||||
@ -519,7 +850,10 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
|
|||||||
}
|
}
|
||||||
defer b.viewers.Done()
|
defer b.viewers.Done()
|
||||||
|
|
||||||
txn := b.DB.NewTransaction(false)
|
b.lockDB()
|
||||||
|
defer b.unlockDB()
|
||||||
|
|
||||||
|
txn := b.db.NewTransaction(false)
|
||||||
defer txn.Discard()
|
defer txn.Discard()
|
||||||
|
|
||||||
opts := badger.IteratorOptions{PrefetchSize: 100}
|
opts := badger.IteratorOptions{PrefetchSize: 100}
|
||||||
@ -614,3 +948,9 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte {
|
|||||||
}
|
}
|
||||||
return dst[:reqsize]
|
return dst[:reqsize]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is added for lotus-shed needs
|
||||||
|
// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE
|
||||||
|
func (b *Blockstore) DB() *badger.DB {
|
||||||
|
return b.db
|
||||||
|
}
|
||||||
|
@ -1,12 +1,19 @@
|
|||||||
package badgerbs
|
package badgerbs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
cid "github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/blockstore"
|
"github.com/filecoin-project/lotus/blockstore"
|
||||||
)
|
)
|
||||||
@ -89,3 +96,165 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB,
|
|||||||
return Open(optsSupplier(path))
|
return Open(optsSupplier(path))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testMove(t *testing.T, optsF func(string) Options) {
|
||||||
|
basePath, err := ioutil.TempDir("", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dbPath := filepath.Join(basePath, "db")
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = os.RemoveAll(basePath)
|
||||||
|
})
|
||||||
|
|
||||||
|
db, err := Open(optsF(dbPath))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer db.Close() //nolint
|
||||||
|
|
||||||
|
var have []blocks.Block
|
||||||
|
var deleted []cid.Cid
|
||||||
|
|
||||||
|
// add some blocks
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
|
||||||
|
err := db.Put(blk)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
have = append(have, blk)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete some of them
|
||||||
|
for i := 5; i < 10; i++ {
|
||||||
|
c := have[i].Cid()
|
||||||
|
err := db.DeleteBlock(c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
deleted = append(deleted, c)
|
||||||
|
}
|
||||||
|
have = have[:5]
|
||||||
|
|
||||||
|
// start a move concurrent with some more puts
|
||||||
|
g := new(errgroup.Group)
|
||||||
|
g.Go(func() error {
|
||||||
|
for i := 10; i < 1000; i++ {
|
||||||
|
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
|
||||||
|
err := db.Put(blk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
have = append(have, blk)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
g.Go(func() error {
|
||||||
|
return db.CollectGarbage(blockstore.WithFullGC(true))
|
||||||
|
})
|
||||||
|
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now check that we have all the blocks in have and none in the deleted lists
|
||||||
|
checkBlocks := func() {
|
||||||
|
for _, blk := range have {
|
||||||
|
has, err := db.Has(blk.Cid())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !has {
|
||||||
|
t.Fatal("missing block")
|
||||||
|
}
|
||||||
|
|
||||||
|
blk2, err := db.Get(blk.Cid())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(blk.RawData(), blk2.RawData()) {
|
||||||
|
t.Fatal("data mismatch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range deleted {
|
||||||
|
has, err := db.Has(c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if has {
|
||||||
|
t.Fatal("resurrected block")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkBlocks()
|
||||||
|
|
||||||
|
// check the basePath -- it should contain a directory with name db.{timestamp}, soft-linked
|
||||||
|
// to db and nothing else
|
||||||
|
checkPath := func() {
|
||||||
|
entries, err := os.ReadDir(basePath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(entries) != 2 {
|
||||||
|
t.Fatalf("too many entries; expected %d but got %d", 2, len(entries))
|
||||||
|
}
|
||||||
|
|
||||||
|
var haveDB, haveDBLink bool
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.Name() == "db" {
|
||||||
|
if (e.Type() & os.ModeSymlink) == 0 {
|
||||||
|
t.Fatal("found db, but it's not a symlink")
|
||||||
|
}
|
||||||
|
haveDBLink = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(e.Name(), "db.") {
|
||||||
|
if !e.Type().IsDir() {
|
||||||
|
t.Fatal("found db prefix, but it's not a directory")
|
||||||
|
}
|
||||||
|
haveDB = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !haveDB {
|
||||||
|
t.Fatal("db directory is missing")
|
||||||
|
}
|
||||||
|
if !haveDBLink {
|
||||||
|
t.Fatal("db link is missing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
checkPath()
|
||||||
|
|
||||||
|
// now do another FullGC to test the double move and following of symlinks
|
||||||
|
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkBlocks()
|
||||||
|
checkPath()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMoveNoPrefix(t *testing.T) {
|
||||||
|
testMove(t, DefaultOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMoveWithPrefix(t *testing.T) {
|
||||||
|
testMove(t, func(path string) Options {
|
||||||
|
opts := DefaultOptions(path)
|
||||||
|
opts.Prefix = "/prefixed/"
|
||||||
|
return opts
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -37,7 +37,22 @@ type BlockstoreIterator interface {
|
|||||||
|
|
||||||
// BlockstoreGC is a trait for blockstores that support online garbage collection
|
// BlockstoreGC is a trait for blockstores that support online garbage collection
|
||||||
type BlockstoreGC interface {
|
type BlockstoreGC interface {
|
||||||
CollectGarbage() error
|
CollectGarbage(options ...BlockstoreGCOption) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// BlockstoreGCOption is a functional interface for controlling blockstore GC options
|
||||||
|
type BlockstoreGCOption = func(*BlockstoreGCOptions) error
|
||||||
|
|
||||||
|
// BlockstoreGCOptions is a struct with GC options
|
||||||
|
type BlockstoreGCOptions struct {
|
||||||
|
FullGC bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithFullGC(fullgc bool) BlockstoreGCOption {
|
||||||
|
return func(opts *BlockstoreGCOptions) error {
|
||||||
|
opts.FullGC = fullgc
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
||||||
|
@ -59,6 +59,15 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration:
|
|||||||
nodes beyond 4 finalities, while running with the discard coldstore option.
|
nodes beyond 4 finalities, while running with the discard coldstore option.
|
||||||
It is also useful for miners who accept deals and need to lookback messages beyond
|
It is also useful for miners who accept deals and need to lookback messages beyond
|
||||||
the 4 finalities, which would otherwise hit the coldstore.
|
the 4 finalities, which would otherwise hit the coldstore.
|
||||||
|
- `HotStoreFullGCFrequency` -- specifies how frequenty to garbage collect the hotstore
|
||||||
|
using full (moving) GC.
|
||||||
|
The default value is 20, which uses full GC every 20 compactions (about once a week);
|
||||||
|
set to 0 to disable full GC altogether.
|
||||||
|
Rationale: badger supports online GC, and this is used by default. However it has proven to
|
||||||
|
be ineffective in practice with the hotstore size slowly creeping up. In order to address this,
|
||||||
|
we have added moving GC support in our badger wrapper, which can effectively reclaim all space.
|
||||||
|
The downside is that it takes a bit longer to perform a moving GC and you also need enough
|
||||||
|
space to house the new hotstore while the old one is still live.
|
||||||
|
|
||||||
|
|
||||||
## Operation
|
## Operation
|
||||||
|
@ -81,6 +81,13 @@ type Config struct {
|
|||||||
// - a positive integer indicates the number of finalities, outside the compaction boundary,
|
// - a positive integer indicates the number of finalities, outside the compaction boundary,
|
||||||
// for which messages will be retained in the hotstore.
|
// for which messages will be retained in the hotstore.
|
||||||
HotStoreMessageRetention uint64
|
HotStoreMessageRetention uint64
|
||||||
|
|
||||||
|
// HotstoreFullGCFrequency indicates how frequently (in terms of compactions) to garbage collect
|
||||||
|
// the hotstore using full (moving) GC if supported by the hotstore.
|
||||||
|
// A value of 0 disables full GC entirely.
|
||||||
|
// A positive value is the number of compactions before a full GC is performed;
|
||||||
|
// a value of 1 will perform full GC in every compaction.
|
||||||
|
HotStoreFullGCFrequency uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
// ChainAccessor allows the Splitstore to access the chain. It will most likely
|
||||||
|
@ -8,17 +8,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (s *SplitStore) gcHotstore() {
|
func (s *SplitStore) gcHotstore() {
|
||||||
if err := s.gcBlockstoreOnline(s.hot); err != nil {
|
var opts []bstore.BlockstoreGCOption
|
||||||
|
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
|
||||||
|
opts = append(opts, bstore.WithFullGC(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.gcBlockstore(s.hot, opts); err != nil {
|
||||||
log.Warnf("error garbage collecting hostore: %s", err)
|
log.Warnf("error garbage collecting hostore: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error {
|
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
||||||
if gc, ok := b.(bstore.BlockstoreGC); ok {
|
if gc, ok := b.(bstore.BlockstoreGC); ok {
|
||||||
log.Info("garbage collecting blockstore")
|
log.Info("garbage collecting blockstore")
|
||||||
startGC := time.Now()
|
startGC := time.Now()
|
||||||
|
|
||||||
if err := gc.CollectGarbage(); err != nil {
|
if err := gc.CollectGarbage(opts...); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -26,5 +31,5 @@ func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("blockstore doesn't support online gc: %T", b)
|
return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
|
||||||
}
|
}
|
||||||
|
@ -161,7 +161,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
if cctx.Bool("only-ds-gc") {
|
if cctx.Bool("only-ds-gc") {
|
||||||
fmt.Println("running datastore gc....")
|
fmt.Println("running datastore gc....")
|
||||||
for i := 0; i < cctx.Int("gc-count"); i++ {
|
for i := 0; i < cctx.Int("gc-count"); i++ {
|
||||||
if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
|
if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil {
|
||||||
return xerrors.Errorf("datastore GC failed: %w", err)
|
return xerrors.Errorf("datastore GC failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -208,7 +208,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
b := badgbs.DB.NewWriteBatch()
|
b := badgbs.DB().NewWriteBatch()
|
||||||
defer b.Cancel()
|
defer b.Cancel()
|
||||||
|
|
||||||
markForRemoval := func(c cid.Cid) error {
|
markForRemoval := func(c cid.Cid) error {
|
||||||
@ -249,7 +249,7 @@ var stateTreePruneCmd = &cli.Command{
|
|||||||
|
|
||||||
fmt.Println("running datastore gc....")
|
fmt.Println("running datastore gc....")
|
||||||
for i := 0; i < cctx.Int("gc-count"); i++ {
|
for i := 0; i < cctx.Int("gc-count"); i++ {
|
||||||
if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
|
if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil {
|
||||||
return xerrors.Errorf("datastore GC failed: %w", err)
|
return xerrors.Errorf("datastore GC failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,8 @@ func DefaultFullNode() *FullNode {
|
|||||||
ColdStoreType: "universal",
|
ColdStoreType: "universal",
|
||||||
HotStoreType: "badger",
|
HotStoreType: "badger",
|
||||||
MarkSetType: "map",
|
MarkSetType: "map",
|
||||||
|
|
||||||
|
HotStoreFullGCFrequency: 20,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -685,25 +685,37 @@ submitting proofs to the chain individually`,
|
|||||||
Name: "ColdStoreType",
|
Name: "ColdStoreType",
|
||||||
Type: "string",
|
Type: "string",
|
||||||
|
|
||||||
Comment: ``,
|
Comment: `ColdStoreType specifies the type of the coldstore.
|
||||||
|
It can be "universal" (default) or "discard" for discarding cold blocks.`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "HotStoreType",
|
Name: "HotStoreType",
|
||||||
Type: "string",
|
Type: "string",
|
||||||
|
|
||||||
Comment: ``,
|
Comment: `HotStoreType specifies the type of the hotstore.
|
||||||
|
Only currently supported value is "badger".`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "MarkSetType",
|
Name: "MarkSetType",
|
||||||
Type: "string",
|
Type: "string",
|
||||||
|
|
||||||
Comment: ``,
|
Comment: `MarkSetType specifies the type of the markset.
|
||||||
|
It can be "map" (default) for in memory marking or "badger" for on-disk marking.`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "HotStoreMessageRetention",
|
Name: "HotStoreMessageRetention",
|
||||||
Type: "uint64",
|
Type: "uint64",
|
||||||
|
|
||||||
Comment: ``,
|
Comment: `HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
|
||||||
|
the compaction boundary; default is 0.`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "HotStoreFullGCFrequency",
|
||||||
|
Type: "uint64",
|
||||||
|
|
||||||
|
Comment: `HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore.
|
||||||
|
A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||||
|
Default is 20 (about once a week).`,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"StorageMiner": []DocField{
|
"StorageMiner": []DocField{
|
||||||
|
@ -278,11 +278,23 @@ type Chainstore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Splitstore struct {
|
type Splitstore struct {
|
||||||
|
// ColdStoreType specifies the type of the coldstore.
|
||||||
|
// It can be "universal" (default) or "discard" for discarding cold blocks.
|
||||||
ColdStoreType string
|
ColdStoreType string
|
||||||
HotStoreType string
|
// HotStoreType specifies the type of the hotstore.
|
||||||
MarkSetType string
|
// Only currently supported value is "badger".
|
||||||
|
HotStoreType string
|
||||||
|
// MarkSetType specifies the type of the markset.
|
||||||
|
// It can be "map" (default) for in memory marking or "badger" for on-disk marking.
|
||||||
|
MarkSetType string
|
||||||
|
|
||||||
|
// HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
|
||||||
|
// the compaction boundary; default is 0.
|
||||||
HotStoreMessageRetention uint64
|
HotStoreMessageRetention uint64
|
||||||
|
// HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore.
|
||||||
|
// A value of 0 disables, while a value 1 will do full GC in every compaction.
|
||||||
|
// Default is 20 (about once a week).
|
||||||
|
HotStoreFullGCFrequency uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Full Node
|
// // Full Node
|
||||||
|
@ -81,6 +81,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
|
|||||||
MarkSetType: cfg.Splitstore.MarkSetType,
|
MarkSetType: cfg.Splitstore.MarkSetType,
|
||||||
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
|
||||||
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
|
||||||
|
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
|
||||||
}
|
}
|
||||||
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user