diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f8d077760..8e1a3a1ff 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -8,9 +8,11 @@ import ( "path/filepath" "runtime" "sync" + "time" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" "github.com/multiformats/go-base32" "go.uber.org/zap" @@ -74,20 +76,45 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) { b.skip2.Warnf(format, args...) } +// bsState is the current blockstore state +type bsState int + const ( - stateOpen = iota + // stateOpen signifies an open blockstore + stateOpen bsState = iota + // stateClosing signifies a blockstore that is currently closing stateClosing + // stateClosed signifies a blockstore that has been colosed stateClosed ) +type bsMoveState int + +const ( + // moveStateNone signifies that there is no move in progress + moveStateNone bsMoveState = iota + // moveStateMoving signifies that there is a move in a progress + moveStateMoving + // moveStateCleanup signifies that a move has completed or aborted and we are cleaning up + moveStateCleanup + // moveStateLock signifies that an exclusive lock has been acquired + moveStateLock +) + // Blockstore is a badger-backed IPLD blockstore. type Blockstore struct { stateLk sync.RWMutex - state int + state bsState viewers sync.WaitGroup - DB *badger.DB - opts Options + moveMx sync.Mutex + moveCond sync.Cond + moveState bsMoveState + rlock int + + db *badger.DB + dbNext *badger.DB // when moving + opts Options prefixing bool prefix []byte @@ -113,13 +140,15 @@ func Open(opts Options) (*Blockstore, error) { return nil, fmt.Errorf("failed to open badger blockstore: %w", err) } - bs := &Blockstore{DB: db, opts: opts} + bs := &Blockstore{db: db, opts: opts} if p := opts.Prefix; p != "" { bs.prefixing = true bs.prefix = []byte(p) bs.prefixLen = len(bs.prefix) } + bs.moveCond.L = &bs.moveMx + return bs, nil } @@ -143,7 +172,7 @@ func (b *Blockstore) Close() error { // wait for all accesses to complete b.viewers.Wait() - return b.DB.Close() + return b.db.Close() } func (b *Blockstore) access() error { @@ -165,12 +194,225 @@ func (b *Blockstore) isOpen() bool { return b.state == stateOpen } -// CollectGarbage runs garbage collection on the value log -func (b *Blockstore) CollectGarbage() error { - if err := b.access(); err != nil { - return err +// lockDB/unlockDB implement a recursive lock contingent on move state +func (b *Blockstore) lockDB() { + b.moveMx.Lock() + defer b.moveMx.Unlock() + + if b.rlock == 0 { + for b.moveState == moveStateLock { + b.moveCond.Wait() + } } - defer b.viewers.Done() + + b.rlock++ +} + +func (b *Blockstore) unlockDB() { + b.moveMx.Lock() + defer b.moveMx.Unlock() + + b.rlock-- + if b.rlock == 0 && b.moveState == moveStateLock { + b.moveCond.Broadcast() + } +} + +// lockMove/unlockMove implement an exclusive lock of move state +func (b *Blockstore) lockMove() { + b.moveMx.Lock() + b.moveState = moveStateLock + for b.rlock > 0 { + b.moveCond.Wait() + } +} + +func (b *Blockstore) unlockMove(state bsMoveState) { + b.moveState = state + b.moveCond.Broadcast() + b.moveMx.Unlock() +} + +// movingGC moves the blockstore to a new path, adjacent to the current path, and creates +// a symlink from the current path to the new path; the old blockstore is deleted. +// +// The blockstore MUST accept new writes during the move and ensure that these +// are persisted to the new blockstore; if a failure occurs aboring the move, +// then they must be peristed to the old blockstore. +// In short, the blockstore must not lose data from new writes during the move. +func (b *Blockstore) movingGC() error { + // this inlines moveLock/moveUnlock for the initial state check to prevent a second move + // while one is in progress without clobbering state + b.moveMx.Lock() + if b.moveState != moveStateNone { + b.moveMx.Unlock() + return fmt.Errorf("move in progress") + } + + b.moveState = moveStateLock + for b.rlock > 0 { + b.moveCond.Wait() + } + + b.moveState = moveStateMoving + b.moveCond.Broadcast() + b.moveMx.Unlock() + + var path string + + defer func() { + b.lockMove() + + db2 := b.dbNext + b.dbNext = nil + + var state bsMoveState + if db2 != nil { + state = moveStateCleanup + } else { + state = moveStateNone + } + + b.unlockMove(state) + + if db2 != nil { + err := db2.Close() + if err != nil { + log.Warnf("error closing badger db: %s", err) + } + b.deleteDB(path) + + b.lockMove() + b.unlockMove(moveStateNone) + } + }() + + // we resolve symlinks to create the new path in the adjacent to the old path. + // this allows the user to symlink the db directory into a separate filesystem. + basePath := b.opts.Dir + linkPath, err := filepath.EvalSymlinks(basePath) + if err != nil { + return fmt.Errorf("error resolving symlink %s: %w", basePath, err) + } + + if basePath == linkPath { + path = basePath + } else { + name := filepath.Base(basePath) + dir := filepath.Dir(linkPath) + path = filepath.Join(dir, name) + } + path = fmt.Sprintf("%s.%d", path, time.Now().UnixNano()) + + log.Infof("moving blockstore from %s to %s", b.opts.Dir, path) + + opts := b.opts + opts.Dir = path + opts.ValueDir = path + + db2, err := badger.Open(opts.Options) + if err != nil { + return fmt.Errorf("failed to open badger blockstore in %s: %w", path, err) + } + + b.lockMove() + b.dbNext = db2 + b.unlockMove(moveStateMoving) + + log.Info("copying blockstore") + err = b.doCopy(b.db, b.dbNext) + if err != nil { + return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) + } + + b.lockMove() + db1 := b.db + b.db = b.dbNext + b.dbNext = nil + b.unlockMove(moveStateCleanup) + + err = db1.Close() + if err != nil { + log.Warnf("error closing old badger db: %s", err) + } + + dbpath := b.opts.Dir + oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix()) + + if err = os.Rename(dbpath, oldpath); err != nil { + // this is not catastrophic in the sense that we have not lost any data. + // but it is pretty bad, as the db path points to the old db, while we are now using to the new + // db; we can't continue and leave a ticking bomb for the next restart. + // so a panic is appropriate and user can fix. + panic(fmt.Errorf("error renaming old badger db dir from %s to %s: %w; USER ACTION REQUIRED", dbpath, oldpath, err)) //nolint + } + + if err = os.Symlink(path, dbpath); err != nil { + // same here; the db path is pointing to the void. panic and let the user fix. + panic(fmt.Errorf("error symlinking new badger db dir from %s to %s: %w; USER ACTION REQUIRED", path, dbpath, err)) //nolint + } + + b.deleteDB(oldpath) + + log.Info("moving blockstore done") + return nil +} + +// doCopy copies a badger blockstore to another, with an optional filter; if the filter +// is not nil, then only cids that satisfy the filter will be copied. +func (b *Blockstore) doCopy(from, to *badger.DB) error { + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 + } + + stream := from.NewStream() + stream.NumGo = workers + stream.LogPrefix = "doCopy" + stream.Send = func(list *pb.KVList) error { + batch := to.NewWriteBatch() + defer batch.Cancel() + + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + if err := batch.Set(kv.Key, kv.Value); err != nil { + return err + } + } + + return batch.Flush() + } + + return stream.Orchestrate(context.Background()) +} + +func (b *Blockstore) deleteDB(path string) { + // follow symbolic links, otherwise the data wil be left behind + lpath, err := filepath.EvalSymlinks(path) + if err != nil { + log.Warnf("error resolving symlinks in %s", path) + return + } + + log.Infof("removing data directory %s", lpath) + if err := os.RemoveAll(lpath); err != nil { + log.Warnf("error deleting db at %s: %s", lpath, err) + return + } + + if path != lpath { + log.Infof("removing link %s", path) + if err := os.Remove(path); err != nil { + log.Warnf("error removing symbolic link %s", err) + } + } +} + +func (b *Blockstore) onlineGC() error { + b.lockDB() + defer b.unlockDB() // compact first to gather the necessary statistics for GC nworkers := runtime.NumCPU() / 2 @@ -178,13 +420,13 @@ func (b *Blockstore) CollectGarbage() error { nworkers = 2 } - err := b.DB.Flatten(nworkers) + err := b.db.Flatten(nworkers) if err != nil { return err } for err == nil { - err = b.DB.RunValueLogGC(0.125) + err = b.db.RunValueLogGC(0.125) } if err == badger.ErrNoRewrite { @@ -195,6 +437,29 @@ func (b *Blockstore) CollectGarbage() error { return err } +// CollectGarbage compacts and runs garbage collection on the value log; +// implements the BlockstoreGC trait +func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + var options blockstore.BlockstoreGCOptions + for _, opt := range opts { + err := opt(&options) + if err != nil { + return err + } + } + + if options.FullGC { + return b.movingGC() + } + + return b.onlineGC() +} + // Size returns the aggregate size of the blockstore func (b *Blockstore) Size() (int64, error) { if err := b.access(); err != nil { @@ -202,7 +467,10 @@ func (b *Blockstore) Size() (int64, error) { } defer b.viewers.Done() - lsm, vlog := b.DB.Size() + b.lockDB() + defer b.unlockDB() + + lsm, vlog := b.db.Size() size := lsm + vlog if size == 0 { @@ -234,12 +502,15 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - return b.DB.View(func(txn *badger.Txn) error { + return b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: return item.Value(fn) @@ -258,12 +529,15 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { _, err := txn.Get(k) return err }) @@ -289,13 +563,16 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } var val []byte - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: val, err = item.ValueCopy(nil) @@ -319,13 +596,16 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } var size int - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: size = int(item.ValueSize()) @@ -349,18 +629,36 @@ func (b *Blockstore) Put(block blocks.Block) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(block.Cid()) if pooled { defer KeyPool.Put(k) } - err := b.DB.Update(func(txn *badger.Txn) error { - return txn.Set(k, block.RawData()) - }) - if err != nil { - err = fmt.Errorf("failed to put block in badger blockstore: %w", err) + put := func(db *badger.DB) error { + err := db.Update(func(txn *badger.Txn) error { + return txn.Set(k, block.RawData()) + }) + if err != nil { + return fmt.Errorf("failed to put block in badger blockstore: %w", err) + } + + return nil } - return err + + if err := put(b.db); err != nil { + return err + } + + if b.dbNext != nil { + if err := put(b.dbNext); err != nil { + return err + } + } + + return nil } // PutMany implements Blockstore.PutMany. @@ -370,6 +668,9 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because // badger holds on to the slice. @@ -383,24 +684,45 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { }() } - batch := b.DB.NewWriteBatch() - defer batch.Cancel() - + keys := make([][]byte, 0, len(blocks)) for _, block := range blocks { k, pooled := b.PooledStorageKey(block.Cid()) if pooled { toReturn = append(toReturn, k) } - if err := batch.Set(k, block.RawData()); err != nil { + keys = append(keys, k) + } + + put := func(db *badger.DB) error { + batch := db.NewWriteBatch() + defer batch.Cancel() + + for i, block := range blocks { + k := keys[i] + if err := batch.Set(k, block.RawData()); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + return fmt.Errorf("failed to put blocks in badger blockstore: %w", err) + } + + return nil + } + + if err := put(b.db); err != nil { + return err + } + + if b.dbNext != nil { + if err := put(b.dbNext); err != nil { return err } } - err := batch.Flush() - if err != nil { - err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err) - } - return err + return nil } // DeleteBlock implements Blockstore.DeleteBlock. @@ -410,12 +732,15 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - return b.DB.Update(func(txn *badger.Txn) error { + return b.db.Update(func(txn *badger.Txn) error { return txn.Delete(k) }) } @@ -426,6 +751,9 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because // badger holds on to the slice. @@ -439,7 +767,7 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { }() } - batch := b.DB.NewWriteBatch() + batch := b.db.NewWriteBatch() defer batch.Cancel() for _, cid := range cids { @@ -465,7 +793,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, err } - txn := b.DB.NewTransaction(false) + b.lockDB() + defer b.unlockDB() + + txn := b.db.NewTransaction(false) opts := badger.IteratorOptions{PrefetchSize: 100} if b.prefixing { opts.Prefix = b.prefix @@ -519,7 +850,10 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { } defer b.viewers.Done() - txn := b.DB.NewTransaction(false) + b.lockDB() + defer b.unlockDB() + + txn := b.db.NewTransaction(false) defer txn.Discard() opts := badger.IteratorOptions{PrefetchSize: 100} @@ -614,3 +948,9 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte { } return dst[:reqsize] } + +// this method is added for lotus-shed needs +// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE +func (b *Blockstore) DB() *badger.DB { + return b.db +} diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index 3221458d2..ddfa6f28d 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -1,12 +1,19 @@ package badgerbs import ( + "bytes" + "fmt" "io/ioutil" "os" + "path/filepath" + "strings" "testing" - blocks "github.com/ipfs/go-block-format" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/blockstore" ) @@ -89,3 +96,165 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, return Open(optsSupplier(path)) } } + +func testMove(t *testing.T, optsF func(string) Options) { + basePath, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + + dbPath := filepath.Join(basePath, "db") + + t.Cleanup(func() { + _ = os.RemoveAll(basePath) + }) + + db, err := Open(optsF(dbPath)) + if err != nil { + t.Fatal(err) + } + + defer db.Close() //nolint + + var have []blocks.Block + var deleted []cid.Cid + + // add some blocks + for i := 0; i < 10; i++ { + blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := db.Put(blk) + if err != nil { + t.Fatal(err) + } + have = append(have, blk) + } + + // delete some of them + for i := 5; i < 10; i++ { + c := have[i].Cid() + err := db.DeleteBlock(c) + if err != nil { + t.Fatal(err) + } + deleted = append(deleted, c) + } + have = have[:5] + + // start a move concurrent with some more puts + g := new(errgroup.Group) + g.Go(func() error { + for i := 10; i < 1000; i++ { + blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := db.Put(blk) + if err != nil { + return err + } + have = append(have, blk) + } + return nil + }) + g.Go(func() error { + return db.CollectGarbage(blockstore.WithFullGC(true)) + }) + + err = g.Wait() + if err != nil { + t.Fatal(err) + } + + // now check that we have all the blocks in have and none in the deleted lists + checkBlocks := func() { + for _, blk := range have { + has, err := db.Has(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("missing block") + } + + blk2, err := db.Get(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(blk.RawData(), blk2.RawData()) { + t.Fatal("data mismatch") + } + } + + for _, c := range deleted { + has, err := db.Has(c) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("resurrected block") + } + } + } + + checkBlocks() + + // check the basePath -- it should contain a directory with name db.{timestamp}, soft-linked + // to db and nothing else + checkPath := func() { + entries, err := os.ReadDir(basePath) + if err != nil { + t.Fatal(err) + } + + if len(entries) != 2 { + t.Fatalf("too many entries; expected %d but got %d", 2, len(entries)) + } + + var haveDB, haveDBLink bool + for _, e := range entries { + if e.Name() == "db" { + if (e.Type() & os.ModeSymlink) == 0 { + t.Fatal("found db, but it's not a symlink") + } + haveDBLink = true + continue + } + if strings.HasPrefix(e.Name(), "db.") { + if !e.Type().IsDir() { + t.Fatal("found db prefix, but it's not a directory") + } + haveDB = true + continue + } + } + + if !haveDB { + t.Fatal("db directory is missing") + } + if !haveDBLink { + t.Fatal("db link is missing") + } + } + + checkPath() + + // now do another FullGC to test the double move and following of symlinks + if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil { + t.Fatal(err) + } + + checkBlocks() + checkPath() +} + +func TestMoveNoPrefix(t *testing.T) { + testMove(t, DefaultOptions) +} + +func TestMoveWithPrefix(t *testing.T) { + testMove(t, func(path string) Options { + opts := DefaultOptions(path) + opts.Prefix = "/prefixed/" + return opts + }) +} diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 43e6cd1a4..8ede31eb9 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -37,7 +37,22 @@ type BlockstoreIterator interface { // BlockstoreGC is a trait for blockstores that support online garbage collection type BlockstoreGC interface { - CollectGarbage() error + CollectGarbage(options ...BlockstoreGCOption) error +} + +// BlockstoreGCOption is a functional interface for controlling blockstore GC options +type BlockstoreGCOption = func(*BlockstoreGCOptions) error + +// BlockstoreGCOptions is a struct with GC options +type BlockstoreGCOptions struct { + FullGC bool +} + +func WithFullGC(fullgc bool) BlockstoreGCOption { + return func(opts *BlockstoreGCOptions) error { + opts.FullGC = fullgc + return nil + } } // BlockstoreSize is a trait for on-disk blockstores that can report their size diff --git a/blockstore/splitstore/README.md b/blockstore/splitstore/README.md index b6f30ef43..4efd6f61d 100644 --- a/blockstore/splitstore/README.md +++ b/blockstore/splitstore/README.md @@ -59,6 +59,15 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration: nodes beyond 4 finalities, while running with the discard coldstore option. It is also useful for miners who accept deals and need to lookback messages beyond the 4 finalities, which would otherwise hit the coldstore. +- `HotStoreFullGCFrequency` -- specifies how frequenty to garbage collect the hotstore + using full (moving) GC. + The default value is 20, which uses full GC every 20 compactions (about once a week); + set to 0 to disable full GC altogether. + Rationale: badger supports online GC, and this is used by default. However it has proven to + be ineffective in practice with the hotstore size slowly creeping up. In order to address this, + we have added moving GC support in our badger wrapper, which can effectively reclaim all space. + The downside is that it takes a bit longer to perform a moving GC and you also need enough + space to house the new hotstore while the old one is still live. ## Operation diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 7a2abf9a8..171b5a6e4 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -81,6 +81,13 @@ type Config struct { // - a positive integer indicates the number of finalities, outside the compaction boundary, // for which messages will be retained in the hotstore. HotStoreMessageRetention uint64 + + // HotstoreFullGCFrequency indicates how frequently (in terms of compactions) to garbage collect + // the hotstore using full (moving) GC if supported by the hotstore. + // A value of 0 disables full GC entirely. + // A positive value is the number of compactions before a full GC is performed; + // a value of 1 will perform full GC in every compaction. + HotStoreFullGCFrequency uint64 } // ChainAccessor allows the Splitstore to access the chain. It will most likely diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index 46668167c..2e1ffd4ad 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -8,17 +8,22 @@ import ( ) func (s *SplitStore) gcHotstore() { - if err := s.gcBlockstoreOnline(s.hot); err != nil { + var opts []bstore.BlockstoreGCOption + if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 { + opts = append(opts, bstore.WithFullGC(true)) + } + + if err := s.gcBlockstore(s.hot, opts); err != nil { log.Warnf("error garbage collecting hostore: %s", err) } } -func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error { +func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error { if gc, ok := b.(bstore.BlockstoreGC); ok { log.Info("garbage collecting blockstore") startGC := time.Now() - if err := gc.CollectGarbage(); err != nil { + if err := gc.CollectGarbage(opts...); err != nil { return err } @@ -26,5 +31,5 @@ func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error { return nil } - return fmt.Errorf("blockstore doesn't support online gc: %T", b) + return fmt.Errorf("blockstore doesn't support garbage collection: %T", b) } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 1afe76c4d..188f5b28f 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -161,7 +161,7 @@ var stateTreePruneCmd = &cli.Command{ if cctx.Bool("only-ds-gc") { fmt.Println("running datastore gc....") for i := 0; i < cctx.Int("gc-count"); i++ { - if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil { return xerrors.Errorf("datastore GC failed: %w", err) } } @@ -208,7 +208,7 @@ var stateTreePruneCmd = &cli.Command{ return nil } - b := badgbs.DB.NewWriteBatch() + b := badgbs.DB().NewWriteBatch() defer b.Cancel() markForRemoval := func(c cid.Cid) error { @@ -249,7 +249,7 @@ var stateTreePruneCmd = &cli.Command{ fmt.Println("running datastore gc....") for i := 0; i < cctx.Int("gc-count"); i++ { - if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil { return xerrors.Errorf("datastore GC failed: %w", err) } } diff --git a/node/config/def.go b/node/config/def.go index ef5dcd3ba..c5c455c68 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -72,6 +72,8 @@ func DefaultFullNode() *FullNode { ColdStoreType: "universal", HotStoreType: "badger", MarkSetType: "map", + + HotStoreFullGCFrequency: 20, }, }, } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index ea68dc344..5d4a91d5f 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -685,25 +685,37 @@ submitting proofs to the chain individually`, Name: "ColdStoreType", Type: "string", - Comment: ``, + Comment: `ColdStoreType specifies the type of the coldstore. +It can be "universal" (default) or "discard" for discarding cold blocks.`, }, { Name: "HotStoreType", Type: "string", - Comment: ``, + Comment: `HotStoreType specifies the type of the hotstore. +Only currently supported value is "badger".`, }, { Name: "MarkSetType", Type: "string", - Comment: ``, + Comment: `MarkSetType specifies the type of the markset. +It can be "map" (default) for in memory marking or "badger" for on-disk marking.`, }, { Name: "HotStoreMessageRetention", Type: "uint64", - Comment: ``, + Comment: `HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond +the compaction boundary; default is 0.`, + }, + { + Name: "HotStoreFullGCFrequency", + Type: "uint64", + + Comment: `HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore. +A value of 0 disables, while a value 1 will do full GC in every compaction. +Default is 20 (about once a week).`, }, }, "StorageMiner": []DocField{ diff --git a/node/config/types.go b/node/config/types.go index 63a493f51..fe42aa27e 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -278,11 +278,23 @@ type Chainstore struct { } type Splitstore struct { + // ColdStoreType specifies the type of the coldstore. + // It can be "universal" (default) or "discard" for discarding cold blocks. ColdStoreType string - HotStoreType string - MarkSetType string + // HotStoreType specifies the type of the hotstore. + // Only currently supported value is "badger". + HotStoreType string + // MarkSetType specifies the type of the markset. + // It can be "map" (default) for in memory marking or "badger" for on-disk marking. + MarkSetType string + // HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond + // the compaction boundary; default is 0. HotStoreMessageRetention uint64 + // HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore. + // A value of 0 disables, while a value 1 will do full GC in every compaction. + // Default is 20 (about once a week). + HotStoreFullGCFrequency uint64 } // // Full Node diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 2588e3f98..2486b9744 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -81,6 +81,7 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked MarkSetType: cfg.Splitstore.MarkSetType, DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, + HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil {