From b741d61b201d9d9b9596c9eec10503ad93fe19c5 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 14:33:15 +0300 Subject: [PATCH 01/33] implement BlockstoreMover in badger --- blockstore/badger/blockstore.go | 373 +++++++++++++++++++++++++++++--- 1 file changed, 343 insertions(+), 30 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f8d077760..7e1b1769f 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -8,6 +8,7 @@ import ( "path/filepath" "runtime" "sync" + "time" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" @@ -35,6 +36,8 @@ var ( log = logger.Logger("badgerbs") ) +const moveBatchSize = 16384 + // aliases to mask badger dependencies. const ( // FileIO is equivalent to badger/options.FileIO. @@ -80,13 +83,26 @@ const ( stateClosed ) +const ( + moveStateNone = iota + moveStateMoving + moveStateCleanup + moveStateLock +) + // Blockstore is a badger-backed IPLD blockstore. type Blockstore struct { stateLk sync.RWMutex state int viewers sync.WaitGroup - DB *badger.DB + moveMx sync.RWMutex + moveCond *sync.Cond + moveState int + rlock int + + db *badger.DB + db2 *badger.DB // when moving opts Options prefixing bool @@ -113,13 +129,15 @@ func Open(opts Options) (*Blockstore, error) { return nil, fmt.Errorf("failed to open badger blockstore: %w", err) } - bs := &Blockstore{DB: db, opts: opts} + bs := &Blockstore{db: db, opts: opts} if p := opts.Prefix; p != "" { bs.prefixing = true bs.prefix = []byte(p) bs.prefixLen = len(bs.prefix) } + bs.moveCond = sync.NewCond(&bs.moveMx) + return bs, nil } @@ -143,7 +161,7 @@ func (b *Blockstore) Close() error { // wait for all accesses to complete b.viewers.Wait() - return b.DB.Close() + return b.db.Close() } func (b *Blockstore) access() error { @@ -165,26 +183,252 @@ func (b *Blockstore) isOpen() bool { return b.state == stateOpen } -// CollectGarbage runs garbage collection on the value log +// lockDB/unlockDB implement a recursive lock contigent on move state +func (b *Blockstore) lockDB() { + b.moveMx.Lock() + defer b.moveMx.Unlock() + + if b.rlock == 0 { + for b.moveState == moveStateLock { + b.moveCond.Wait() + } + } + + b.rlock++ +} + +func (b *Blockstore) unlockDB() { + b.moveMx.Lock() + defer b.moveMx.Unlock() + + b.rlock-- + if b.rlock == 0 && b.moveState == moveStateLock { + b.moveCond.Broadcast() + } +} + +// lockMove/unlockMove implement an exclusive lock of move state +func (b *Blockstore) lockMove() { + b.moveMx.Lock() + b.moveState = moveStateLock + for b.rlock > 0 { + b.moveCond.Wait() + } +} + +func (b *Blockstore) unlockMove(state int) { + b.moveState = state + b.moveCond.Broadcast() + b.moveMx.Unlock() +} + +// MoveTo implements the BlockstoreMover trait +func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + // this inlines moveLock/moveUnlock for the initial state check to prevent a second move + // while one is in progress without clobbering state + b.moveMx.Lock() + if b.moveState != moveStateNone { + b.moveMx.Unlock() + return fmt.Errorf("move in progress") + } + + b.moveState = moveStateLock + for b.rlock > 0 { + b.moveCond.Wait() + } + + b.moveState = moveStateMoving + b.moveCond.Broadcast() + b.moveMx.Unlock() + + if path == "" { + path = fmt.Sprintf("%s.%d", b.opts.Dir, time.Now().Unix()) + } + + defer func() { + b.lockMove() + + db2 := b.db2 + b.db2 = nil + + var state int + if db2 != nil { + state = moveStateCleanup + } else { + state = moveStateNone + } + + b.unlockMove(state) + + if db2 != nil { + err := db2.Close() + if err != nil { + log.Warnf("error closing badger db: %s", err) + } + b.deleteDB(path) + + b.lockMove() + b.unlockMove(moveStateNone) + } + }() + + opts := b.opts + opts.Dir = path + opts.ValueDir = path + + db2, err := badger.Open(opts.Options) + if err != nil { + return fmt.Errorf("failed to open badger blockstore in %s: %w", path, err) + } + + b.lockMove() + b.db2 = db2 + b.unlockMove(moveStateMoving) + + err = b.doCopy(b.db, b.db2, filter) + if err != nil { + return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) + } + + b.lockMove() + db1 := b.db + b.db = b.db2 + b.db2 = nil + b.unlockMove(moveStateCleanup) + + err = db1.Close() + if err != nil { + log.Warnf("error closing badger db: %s", err) + } + + dbpath := b.opts.Dir + oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix()) + + ok := true + err = os.Rename(dbpath, oldpath) + if err != nil { + // this is bad, but not catastrophic; new data will be written in db2 and user can fix + log.Errorf("error renaming badger db dir from %s to %s; USER ACTION REQUIRED", dbpath, oldpath) + ok = false + } + + if ok { + err = os.Symlink(path, dbpath) + if err != nil { + // ditto, this is bad, but not catastrophic; user can fix + log.Errorf("error symlinking badger db dir from %s to %s; USER ACTION REQUIRED", path, dbpath) + } + + b.deleteDB(oldpath) + } + + return nil +} + +func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) error { + count := 0 + batch := to.NewWriteBatch() + defer batch.Cancel() + + txn := from.NewTransaction(false) + defer txn.Discard() + + opts := badger.IteratorOptions{PrefetchSize: moveBatchSize} + iter := txn.NewIterator(opts) + defer iter.Close() + + var buf []byte + for iter.Rewind(); iter.Valid(); iter.Next() { + if !b.isOpen() { + return ErrBlockstoreClosed + } + + item := iter.Item() + + if filter != nil { + k := item.Key() + if b.prefixing { + k = k[b.prefixLen:] + } + + klen := base32.RawStdEncoding.DecodedLen(len(k)) + if klen > len(buf) { + buf = make([]byte, klen) + } + + n, err := base32.RawStdEncoding.Decode(buf, k) + if err != nil { + return err + } + + c := cid.NewCidV1(cid.Raw, buf[:n]) + if !filter(c) { + continue + } + } + + k := item.KeyCopy(nil) + v, err := item.ValueCopy(nil) + if err != nil { + return err + } + + if err := batch.Set(k, v); err != nil { + return err + } + + count++ + if count == moveBatchSize { + if err := batch.Flush(); err != nil { + return err + } + count = 0 + } + } + + if count > 0 { + return batch.Flush() + } + + return nil +} + +func (b *Blockstore) deleteDB(path string) { + err := os.RemoveAll(path) + if err != nil { + log.Warnf("error deleting db at %s: %s", path, err) + } +} + +// CollectGarbage compacts and runs garbage collection on the value log; +// implements the BlockstoreGC trait func (b *Blockstore) CollectGarbage() error { if err := b.access(); err != nil { return err } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + // compact first to gather the necessary statistics for GC nworkers := runtime.NumCPU() / 2 if nworkers < 2 { nworkers = 2 } - err := b.DB.Flatten(nworkers) + err := b.db.Flatten(nworkers) if err != nil { return err } for err == nil { - err = b.DB.RunValueLogGC(0.125) + err = b.db.RunValueLogGC(0.125) } if err == badger.ErrNoRewrite { @@ -202,7 +446,10 @@ func (b *Blockstore) Size() (int64, error) { } defer b.viewers.Done() - lsm, vlog := b.DB.Size() + b.lockDB() + defer b.unlockDB() + + lsm, vlog := b.db.Size() size := lsm + vlog if size == 0 { @@ -234,12 +481,15 @@ func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - return b.DB.View(func(txn *badger.Txn) error { + return b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: return item.Value(fn) @@ -258,12 +508,15 @@ func (b *Blockstore) Has(cid cid.Cid) (bool, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { _, err := txn.Get(k) return err }) @@ -289,13 +542,16 @@ func (b *Blockstore) Get(cid cid.Cid) (blocks.Block, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } var val []byte - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: val, err = item.ValueCopy(nil) @@ -319,13 +575,16 @@ func (b *Blockstore) GetSize(cid cid.Cid) (int, error) { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } var size int - err := b.DB.View(func(txn *badger.Txn) error { + err := b.db.View(func(txn *badger.Txn) error { switch item, err := txn.Get(k); err { case nil: size = int(item.ValueSize()) @@ -349,18 +608,36 @@ func (b *Blockstore) Put(block blocks.Block) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(block.Cid()) if pooled { defer KeyPool.Put(k) } - err := b.DB.Update(func(txn *badger.Txn) error { - return txn.Set(k, block.RawData()) - }) - if err != nil { - err = fmt.Errorf("failed to put block in badger blockstore: %w", err) + put := func(db *badger.DB) error { + err := db.Update(func(txn *badger.Txn) error { + return txn.Set(k, block.RawData()) + }) + if err != nil { + return fmt.Errorf("failed to put block in badger blockstore: %w", err) + } + + return nil } - return err + + if err := put(b.db); err != nil { + return err + } + + if b.db2 != nil { + if err := put(b.db2); err != nil { + return err + } + } + + return nil } // PutMany implements Blockstore.PutMany. @@ -370,6 +647,9 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because // badger holds on to the slice. @@ -383,24 +663,45 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { }() } - batch := b.DB.NewWriteBatch() - defer batch.Cancel() - + keys := make([][]byte, 0, len(blocks)) for _, block := range blocks { k, pooled := b.PooledStorageKey(block.Cid()) if pooled { toReturn = append(toReturn, k) } - if err := batch.Set(k, block.RawData()); err != nil { + keys = append(keys, k) + } + + put := func(db *badger.DB) error { + batch := db.NewWriteBatch() + defer batch.Cancel() + + for i, block := range blocks { + k := keys[i] + if err := batch.Set(k, block.RawData()); err != nil { + return err + } + } + + err := batch.Flush() + if err != nil { + return fmt.Errorf("failed to put blocks in badger blockstore: %w", err) + } + + return nil + } + + if err := put(b.db); err != nil { + return err + } + + if b.db2 != nil { + if err := put(b.db2); err != nil { return err } } - err := batch.Flush() - if err != nil { - err = fmt.Errorf("failed to put blocks in badger blockstore: %w", err) - } - return err + return nil } // DeleteBlock implements Blockstore.DeleteBlock. @@ -410,12 +711,15 @@ func (b *Blockstore) DeleteBlock(cid cid.Cid) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + k, pooled := b.PooledStorageKey(cid) if pooled { defer KeyPool.Put(k) } - return b.DB.Update(func(txn *badger.Txn) error { + return b.db.Update(func(txn *badger.Txn) error { return txn.Delete(k) }) } @@ -426,6 +730,9 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { } defer b.viewers.Done() + b.lockDB() + defer b.unlockDB() + // toReturn tracks the byte slices to return to the pool, if we're using key // prefixing. we can't return each slice to the pool after each Set, because // badger holds on to the slice. @@ -439,7 +746,7 @@ func (b *Blockstore) DeleteMany(cids []cid.Cid) error { }() } - batch := b.DB.NewWriteBatch() + batch := b.db.NewWriteBatch() defer batch.Cancel() for _, cid := range cids { @@ -465,7 +772,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, err } - txn := b.DB.NewTransaction(false) + b.lockDB() + defer b.unlockDB() + + txn := b.db.NewTransaction(false) opts := badger.IteratorOptions{PrefetchSize: 100} if b.prefixing { opts.Prefix = b.prefix @@ -519,7 +829,10 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { } defer b.viewers.Done() - txn := b.DB.NewTransaction(false) + b.lockDB() + defer b.unlockDB() + + txn := b.db.NewTransaction(false) defer txn.Discard() opts := badger.IteratorOptions{PrefetchSize: 100} From 001c04f2dd2f10228b304cedfd4db73424bf44a3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 14:43:52 +0300 Subject: [PATCH 02/33] use pooled slices for the copy --- blockstore/badger/blockstore.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 7e1b1769f..d5f30d9d7 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -342,6 +342,20 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro iter := txn.NewIterator(opts) defer iter.Close() + pooled := make([][]byte, 0, 2*moveBatchSize) + getPooled := func(size int) []byte { + buf := pool.Get(size) + pooled = append(pooled, buf) + return buf + } + putPooled := func() { + for _, buf := range pooled { + pool.Put(buf) + } + pooled = pooled[:0] + } + defer putPooled() + var buf []byte for iter.Rewind(); iter.Valid(); iter.Next() { if !b.isOpen() { @@ -350,8 +364,9 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro item := iter.Item() + kk := item.Key() if filter != nil { - k := item.Key() + k := kk if b.prefixing { k = k[b.prefixLen:] } @@ -372,8 +387,15 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro } } - k := item.KeyCopy(nil) - v, err := item.ValueCopy(nil) + k := getPooled(len(kk)) + copy(k, kk) + + var v []byte + err := item.Value(func(vv []byte) error { + v = getPooled(len(vv)) + copy(v, vv) + return nil + }) if err != nil { return err } @@ -388,6 +410,7 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro return err } count = 0 + putPooled() } } From aec126879e3a27a0c2fb8ad7caaed6e6b709789d Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 17:51:20 +0300 Subject: [PATCH 03/33] add MoveTo test --- blockstore/badger/blockstore_test.go | 155 ++++++++++++++++++++++++++- 1 file changed, 154 insertions(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index 3221458d2..dab1fdc3a 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -1,12 +1,19 @@ package badgerbs import ( + "bytes" + "fmt" "io/ioutil" "os" + "path/filepath" + "strings" "testing" - blocks "github.com/ipfs/go-block-format" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/blockstore" ) @@ -89,3 +96,149 @@ func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, return Open(optsSupplier(path)) } } + +func testMove(t *testing.T, optsF func(string) Options) { + basePath, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + + dbPath := filepath.Join(basePath, "db") + + t.Cleanup(func() { + _ = os.RemoveAll(basePath) + }) + + db, err := Open(optsF(dbPath)) + if err != nil { + t.Fatal(err) + } + + defer db.Close() //nolint + + var have []blocks.Block + var deleted []cid.Cid + + // add some blocks + for i := 0; i < 10; i++ { + blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := db.Put(blk) + if err != nil { + t.Fatal(err) + } + have = append(have, blk) + } + + // delete some of them + for i := 5; i < 10; i++ { + c := have[i].Cid() + err := db.DeleteBlock(c) + if err != nil { + t.Fatal(err) + } + deleted = append(deleted, c) + } + have = have[:5] + + // start a move concurrent with some more puts + g := new(errgroup.Group) + g.Go(func() error { + for i := 10; i < 1000; i++ { + blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := db.Put(blk) + if err != nil { + return err + } + have = append(have, blk) + } + return nil + }) + g.Go(func() error { + return db.MoveTo("", nil) + }) + + err = g.Wait() + if err != nil { + t.Fatal(err) + } + + // now check that we have all the blocks in have and none in the deleted lists + for _, blk := range have { + has, err := db.Has(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("missing block") + } + + blk2, err := db.Get(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(blk.RawData(), blk2.RawData()) { + t.Fatal("data mismatch") + } + } + + for _, c := range deleted { + has, err := db.Has(c) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("resurrected block") + } + } + + // check the basePath -- it should contain a directory with name db.{timestamp}, soft-linked + // to db and nothing else + entries, err := os.ReadDir(basePath) + if err != nil { + t.Fatal(err) + } + + if len(entries) != 2 { + t.Fatalf("too many entries; expected %d but got %d", 2, len(entries)) + } + + var haveDB, haveDBLink bool + for _, e := range entries { + if e.Name() == "db" { + if (e.Type() & os.ModeSymlink) == 0 { + t.Fatal("found db, but it's not a symlink") + } + haveDBLink = true + continue + } + if strings.HasPrefix(e.Name(), "db.") { + if !e.Type().IsDir() { + t.Fatal("found db prefix, but it's not a directory") + } + haveDB = true + continue + } + } + + if !haveDB { + t.Fatal("db directory is missing") + } + if !haveDBLink { + t.Fatal("db link is missing") + } +} + +func TestMoveNoPrefix(t *testing.T) { + testMove(t, DefaultOptions) +} + +func TestMoveWithPrefix(t *testing.T) { + testMove(t, func(path string) Options { + opts := DefaultOptions(path) + opts.Prefix = "/prefixed/" + return opts + }) +} From 4715b1f4369114320f7dd1310965149ea36f36a1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 18:00:54 +0300 Subject: [PATCH 04/33] fix lotus-shed --- blockstore/badger/blockstore.go | 6 ++++++ cmd/lotus-shed/pruning.go | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index d5f30d9d7..f0f602b20 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -950,3 +950,9 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte { } return dst[:reqsize] } + +// this method is added for lotus-shed needs +// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE +func (b *Blockstore) DB() *badger.DB { + return b.db +} diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 1afe76c4d..188f5b28f 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -161,7 +161,7 @@ var stateTreePruneCmd = &cli.Command{ if cctx.Bool("only-ds-gc") { fmt.Println("running datastore gc....") for i := 0; i < cctx.Int("gc-count"); i++ { - if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil { return xerrors.Errorf("datastore GC failed: %w", err) } } @@ -208,7 +208,7 @@ var stateTreePruneCmd = &cli.Command{ return nil } - b := badgbs.DB.NewWriteBatch() + b := badgbs.DB().NewWriteBatch() defer b.Cancel() markForRemoval := func(c cid.Cid) error { @@ -249,7 +249,7 @@ var stateTreePruneCmd = &cli.Command{ fmt.Println("running datastore gc....") for i := 0; i < cctx.Int("gc-count"); i++ { - if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil { + if err := badgbs.DB().RunValueLogGC(DiscardRatio); err != nil { return xerrors.Errorf("datastore GC failed: %w", err) } } From 4b0b37a4efbe03938cf90194d890877a262a87fe Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 18:03:32 +0300 Subject: [PATCH 05/33] fix lint the great spellchecker strikes again --- blockstore/badger/blockstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f0f602b20..b78da9965 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -183,7 +183,7 @@ func (b *Blockstore) isOpen() bool { return b.state == stateOpen } -// lockDB/unlockDB implement a recursive lock contigent on move state +// lockDB/unlockDB implement a recursive lock contingent on move state func (b *Blockstore) lockDB() { b.moveMx.Lock() defer b.moveMx.Unlock() From 608a9f84d25a66536c384c91b06c392380851136 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 11 Jul 2021 19:25:48 +0300 Subject: [PATCH 06/33] fix copy: flush discards the transaction --- blockstore/badger/blockstore.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index b78da9965..f1d87f597 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -409,6 +409,8 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro if err := batch.Flush(); err != nil { return err } + // Flush discards the transaction, so we need a new batch + batch = to.NewWriteBatch() count = 0 putPooled() } From f2c7b08be5b2bc7985910d6357ff7cc90bce4630 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 12 Jul 2021 08:25:09 +0300 Subject: [PATCH 07/33] follow symbolic links when deleting old dbs --- blockstore/badger/blockstore.go | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f1d87f597..4df25a696 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -424,9 +424,34 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro } func (b *Blockstore) deleteDB(path string) { - err := os.RemoveAll(path) - if err != nil { - log.Warnf("error deleting db at %s: %s", path, err) + // follow symbolic links, otherwise the data wil be left behind + lpath := path + for { + fi, err := os.Lstat(lpath) + if err != nil { + log.Warnf("error stating %s: %s", path, err) + return + } + + if fi.Mode()&os.ModeSymlink == 0 { + break + } + + lpath, err := os.Readlink(lpath) + if err != nil { + log.Warnf("error resolving symbolic link %s: %s", lpath, err) + } + } + + if err := os.RemoveAll(lpath); err != nil { + log.Warnf("error deleting db at %s: %s", lpath, err) + return + } + + if path != lpath { + if err := os.Remove(path); err != nil { + log.Warnf("error removing symbolic link %s", err) + } } } From 524564e2cfbad5d5d728721a04ada04ca4c212f6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 12 Jul 2021 12:06:44 +0300 Subject: [PATCH 08/33] add some more logging around move --- blockstore/badger/blockstore.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 4df25a696..db558d972 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -277,6 +277,8 @@ func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { } }() + log.Infof("moving blockstore from %s to %s", b.opts.Dir, path) + opts := b.opts opts.Dir = path opts.ValueDir = path @@ -290,6 +292,7 @@ func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { b.db2 = db2 b.unlockMove(moveStateMoving) + log.Info("copying blockstore") err = b.doCopy(b.db, b.db2, filter) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) @@ -327,6 +330,7 @@ func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { b.deleteDB(oldpath) } + log.Info("moving blockstore done") return nil } @@ -429,7 +433,7 @@ func (b *Blockstore) deleteDB(path string) { for { fi, err := os.Lstat(lpath) if err != nil { - log.Warnf("error stating %s: %s", path, err) + log.Warnf("error stating %s: %s", lpath, err) return } @@ -437,18 +441,24 @@ func (b *Blockstore) deleteDB(path string) { break } - lpath, err := os.Readlink(lpath) + log.Infof("resolving symbolic link %s", lpath) + newpath, err := os.Readlink(lpath) if err != nil { log.Warnf("error resolving symbolic link %s: %s", lpath, err) + return } + log.Infof("resolved symbolic link %s -> %s", lpath, newpath) + lpath = newpath } + log.Infof("removing data directory %s", lpath) if err := os.RemoveAll(lpath); err != nil { log.Warnf("error deleting db at %s: %s", lpath, err) return } if path != lpath { + log.Infof("removing link %s", path) if err := os.Remove(path); err != nil { log.Warnf("error removing symbolic link %s", err) } From 5cf6fdf81d2f7592757c9007de10bd497fd52373 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 13 Jul 2021 12:10:32 +0300 Subject: [PATCH 09/33] don't heap allocate the cond, just set L --- blockstore/badger/blockstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index db558d972..ed0368efc 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -97,7 +97,7 @@ type Blockstore struct { viewers sync.WaitGroup moveMx sync.RWMutex - moveCond *sync.Cond + moveCond sync.Cond moveState int rlock int @@ -136,7 +136,7 @@ func Open(opts Options) (*Blockstore, error) { bs.prefixLen = len(bs.prefix) } - bs.moveCond = sync.NewCond(&bs.moveMx) + bs.moveCond.L = &bs.moveMx return bs, nil } From 94509968a0109606404470ff8c067db90248a6d3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 22:25:32 +0300 Subject: [PATCH 10/33] make moveTo a private method --- blockstore/badger/blockstore.go | 20 ++++++++++++-------- blockstore/badger/blockstore_test.go | 2 +- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index ed0368efc..b0da429f0 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -222,13 +222,15 @@ func (b *Blockstore) unlockMove(state int) { b.moveMx.Unlock() } -// MoveTo implements the BlockstoreMover trait -func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { - if err := b.access(); err != nil { - return err - } - defer b.viewers.Done() - +// moveTo moves the blockstore to path, and creates a symlink from the current path +// to the new path; the old blockstore is deleted. +// If path is empty, then a new path adjacent to the current path is created +// automatically. +// The blockstore must accept new writes during the move and ensure that these +// are persisted to the new blockstore; if a failure occurs aboring the move, +// then they must be peristed to the old blockstore. +// In short, the blockstore must not lose data from new writes during the move. +func (b *Blockstore) moveTo(path string) error { // this inlines moveLock/moveUnlock for the initial state check to prevent a second move // while one is in progress without clobbering state b.moveMx.Lock() @@ -293,7 +295,7 @@ func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { b.unlockMove(moveStateMoving) log.Info("copying blockstore") - err = b.doCopy(b.db, b.db2, filter) + err = b.doCopy(b.db, b.db2, nil) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) } @@ -334,6 +336,8 @@ func (b *Blockstore) MoveTo(path string, filter func(cid.Cid) bool) error { return nil } +// doCopy copies a badger blockstore to another, with an optional filter; if the filter +// is not nil, then only cids that satisfy the filter will be copied. func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) error { count := 0 batch := to.NewWriteBatch() diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index dab1fdc3a..a4cfae652 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -154,7 +154,7 @@ func testMove(t *testing.T, optsF func(string) Options) { return nil }) g.Go(func() error { - return db.MoveTo("", nil) + return db.moveTo("") }) err = g.Wait() From a84366513228ca33507a45c952521ff336a4ddfb Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 22:30:40 +0300 Subject: [PATCH 11/33] add options to BlockstoreGC trait --- blockstore/badger/blockstore.go | 2 +- blockstore/blockstore.go | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index b0da429f0..dd3a710f4 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -471,7 +471,7 @@ func (b *Blockstore) deleteDB(path string) { // CollectGarbage compacts and runs garbage collection on the value log; // implements the BlockstoreGC trait -func (b *Blockstore) CollectGarbage() error { +func (b *Blockstore) CollectGarbage(options map[interface{}]interface{}) error { if err := b.access(); err != nil { return err } diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index 43e6cd1a4..a9482b323 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -37,9 +37,21 @@ type BlockstoreIterator interface { // BlockstoreGC is a trait for blockstores that support online garbage collection type BlockstoreGC interface { - CollectGarbage() error + CollectGarbage(options map[interface{}]interface{}) error } +// garbage collection options +type blockstoreMovingGCKey struct{} +type blockstoreMovingGCPath struct{} + +// BlockstoreMovingGC is a garbage collection option that instructs the blockstore +// to use moving GC if supported. +var BlockstoreMovingGC = blockstoreMovingGCKey{} + +// BlockstoreMovingGCPath is a garbage collection option that specifies an optional +// target path for moving GC. +var BlockstoreMovingGCPath = blockstoreMovingGCPath{} + // BlockstoreSize is a trait for on-disk blockstores that can report their size type BlockstoreSize interface { Size() (int64, error) From c747f2f1e229a8b1bcd6cd7c9c5a01cd7b00882d Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 22:44:54 +0300 Subject: [PATCH 12/33] do moving GC if the user asks for it --- blockstore/badger/blockstore.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index dd3a710f4..1979ddebe 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -477,6 +477,32 @@ func (b *Blockstore) CollectGarbage(options map[interface{}]interface{}) error { } defer b.viewers.Done() + var movingGC bool + movingGCOpt, ok := options[blockstore.BlockstoreMovingGC] + if ok { + movingGC, ok = movingGCOpt.(bool) + if !ok { + return fmt.Errorf("incorrect type for moving gc option; expected bool but got %T", movingGCOpt) + } + } + + if !movingGC { + return b.onlineGC() + } + + var movingGCPath string + movingGCPathOpt, ok := options[blockstore.BlockstoreMovingGCPath] + if ok { + movingGCPath, ok = movingGCPathOpt.(string) + if !ok { + return fmt.Errorf("incorrect type for moving gc path option; expected string but got %T", movingGCPathOpt) + } + } + + return b.moveTo(movingGCPath) +} + +func (b *Blockstore) onlineGC() error { b.lockDB() defer b.unlockDB() From fb3986226f9f57b09565acc1e51d2f89e423aae7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 22:55:03 +0300 Subject: [PATCH 13/33] do hotstore moving GC in splitstore with a user-specified frequency --- blockstore/splitstore/splitstore.go | 7 +++++++ blockstore/splitstore/splitstore_gc.go | 15 +++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 7a2abf9a8..07f211be2 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -81,6 +81,13 @@ type Config struct { // - a positive integer indicates the number of finalities, outside the compaction boundary, // for which messages will be retained in the hotstore. HotStoreMessageRetention uint64 + + // HotstoreMovingGCFrequency indicates how frequently to garbage collect the hotstore using + // moving GC (if supported by the hotstore). + // A value of 0 disables moving GC entirely. + // A positive value is the number of compactions before a moving GC is performed; + // a value of 1 will perform moving GC in every compaction. + HotStoreMovingGCFrequency uint } // ChainAccessor allows the Splitstore to access the chain. It will most likely diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index 46668167c..cd7703963 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -8,17 +8,24 @@ import ( ) func (s *SplitStore) gcHotstore() { - if err := s.gcBlockstoreOnline(s.hot); err != nil { + var opts map[interface{}]interface{} + if s.cfg.HotStoreMovingGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreMovingGCFrequency) == 0 { + opts = map[interface{}]interface{}{ + bstore.BlockstoreMovingGC: true, + } + } + + if err := s.gcBlockstore(s.hot, opts); err != nil { log.Warnf("error garbage collecting hostore: %s", err) } } -func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error { +func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts map[interface{}]interface{}) error { if gc, ok := b.(bstore.BlockstoreGC); ok { log.Info("garbage collecting blockstore") startGC := time.Now() - if err := gc.CollectGarbage(); err != nil { + if err := gc.CollectGarbage(opts); err != nil { return err } @@ -26,5 +33,5 @@ func (s *SplitStore) gcBlockstoreOnline(b bstore.Blockstore) error { return nil } - return fmt.Errorf("blockstore doesn't support online gc: %T", b) + return fmt.Errorf("blockstore doesn't support garbage collection: %T", b) } From 5acae50e07939e063dfe0d16a388e05818474525 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 22:58:34 +0300 Subject: [PATCH 14/33] add config option for splitstore moving gc frequency --- blockstore/splitstore/splitstore.go | 2 +- node/config/def.go | 2 ++ node/config/types.go | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 07f211be2..d463859b9 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -87,7 +87,7 @@ type Config struct { // A value of 0 disables moving GC entirely. // A positive value is the number of compactions before a moving GC is performed; // a value of 1 will perform moving GC in every compaction. - HotStoreMovingGCFrequency uint + HotStoreMovingGCFrequency uint64 } // ChainAccessor allows the Splitstore to access the chain. It will most likely diff --git a/node/config/def.go b/node/config/def.go index ef5dcd3ba..554ae66ad 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -72,6 +72,8 @@ func DefaultFullNode() *FullNode { ColdStoreType: "universal", HotStoreType: "badger", MarkSetType: "map", + + HotStoreMovingGCFrequency: 20, }, }, } diff --git a/node/config/types.go b/node/config/types.go index 63a493f51..35c4962c7 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -282,7 +282,8 @@ type Splitstore struct { HotStoreType string MarkSetType string - HotStoreMessageRetention uint64 + HotStoreMessageRetention uint64 + HotStoreMovingGCFrequency uint64 } // // Full Node From b1f60e85e96e5157db7149b6346d23ce76193480 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 23 Jul 2021 23:05:59 +0300 Subject: [PATCH 15/33] document moving GC frequency option in README --- blockstore/splitstore/README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/blockstore/splitstore/README.md b/blockstore/splitstore/README.md index b6f30ef43..079b50cc7 100644 --- a/blockstore/splitstore/README.md +++ b/blockstore/splitstore/README.md @@ -59,6 +59,17 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration: nodes beyond 4 finalities, while running with the discard coldstore option. It is also useful for miners who accept deals and need to lookback messages beyond the 4 finalities, which would otherwise hit the coldstore. +- `HotStoreMovingGCFrequency` -- specifies how frequenty to garbage collect the hotstore + using moving GC. + The default value is 20, which uses moving GC every 20 compactions; set to 0 to disable moving + GC altogether. + Rationale: badger supports online GC, and this is used by default. However it has proven to + be ineffective in practice with the hotstore size slowly creeping up. In order to address this, + we have added moving GC support in our badger wrapper, which can effectively reclaim all space. + The downside is that it takes a bit of time to perform a moving GC (about 40min) and you also + need enough space to house the new hotstore while the old one is still live. + This option controls how frequently to perform moving GC, with the default of 20 corresponding + to about once a week. ## Operation From 21e7c188da7ff27e9a0b27855ae09a7ad15e3764 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 24 Jul 2021 08:43:15 +0300 Subject: [PATCH 16/33] use CollectGarbage in blockstore move test, as it is the real interface --- blockstore/badger/blockstore_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index a4cfae652..f40ccbaf8 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -154,7 +154,9 @@ func testMove(t *testing.T, optsF func(string) Options) { return nil }) g.Go(func() error { - return db.moveTo("") + return db.CollectGarbage(map[interface{}]interface{}{ + blockstore.BlockstoreMovingGC: true, + }) }) err = g.Wait() From 4cdb34e44888c2c568250a59decf9c28cbc3bd46 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 24 Jul 2021 08:59:15 +0300 Subject: [PATCH 17/33] add docstrings for splitstore config --- node/config/types.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/node/config/types.go b/node/config/types.go index 35c4962c7..66a179bd5 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -278,11 +278,22 @@ type Chainstore struct { } type Splitstore struct { + // ColdStoreType specifies the type of the coldstore. + // It can be "universal" (default) or "discard" for discarding cold blocks. ColdStoreType string - HotStoreType string - MarkSetType string + // HotStoreType specifies the type of the hotstore. + // Only currently supported value is "badger". + HotStoreType string + // MarkSetType specifies the type of the markset. + // It can be "map" (default) for in memory marking or "badger" for on-disk marking. + MarkSetType string - HotStoreMessageRetention uint64 + // HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond + // the compaction boundary; default is 0. + HotStoreMessageRetention uint64 + // HotStoreMovingGCFrequency specifies how often to perform moving GC on the hotstore. + // A value of 0 disables, while a value 1 will do moving GC in every compaction. + // Default is 20 (about once a week). HotStoreMovingGCFrequency uint64 } From 20f93a520fb848407b0a93b2d26b228ac5aadc58 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 24 Jul 2021 09:00:00 +0300 Subject: [PATCH 18/33] make cfgdoc-gen --- node/config/doc_gen.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index ea68dc344..7ebecc5bc 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -685,25 +685,37 @@ submitting proofs to the chain individually`, Name: "ColdStoreType", Type: "string", - Comment: ``, + Comment: `ColdStoreType specifies the type of the coldstore. +It can be "universal" (default) or "discard" for discarding cold blocks.`, }, { Name: "HotStoreType", Type: "string", - Comment: ``, + Comment: `HotStoreType specifies the type of the hotstore. +Only currently supported value is "badger".`, }, { Name: "MarkSetType", Type: "string", - Comment: ``, + Comment: `MarkSetType specifies the type of the markset. +It can be "map" (default) for in memory marking or "badger" for on-disk marking.`, }, { Name: "HotStoreMessageRetention", Type: "uint64", - Comment: ``, + Comment: `HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond +the compaction boundary; default is 0.`, + }, + { + Name: "HotStoreMovingGCFrequency", + Type: "uint64", + + Comment: `HotStoreMovingGCFrequency specifies how often to perform moving GC on the hotstore. +A value of 0 disables, while a value 1 will do moving GC in every compaction. +Default is 20 (about once a week).`, }, }, "StorageMiner": []DocField{ From aa0bd51b2c39eec1d9cea05eabbe584115b03ee4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 24 Jul 2021 11:17:47 +0300 Subject: [PATCH 19/33] thread GCFrequency option into the splitstore config --- node/modules/blockstore.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 2588e3f98..32e67cf62 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -78,9 +78,10 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } cfg := &splitstore.Config{ - MarkSetType: cfg.Splitstore.MarkSetType, - DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", - HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, + HotStoreMovingGCFrequency: cfg.Splitstore.HotStoreMovingGCFrequency, } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil { From 938330e6c3cc490ee2c4f06ab8293e3b91d78ef3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 26 Jul 2021 15:09:31 +0300 Subject: [PATCH 20/33] moveMx is not an RWMutex, just a regular mutex --- blockstore/badger/blockstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 1979ddebe..50a092d12 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -96,7 +96,7 @@ type Blockstore struct { state int viewers sync.WaitGroup - moveMx sync.RWMutex + moveMx sync.Mutex moveCond sync.Cond moveState int rlock int From 96c1123c33756618e419d1255cc1bcddcfc794be Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 09:50:44 +0300 Subject: [PATCH 21/33] use functional options in the BlockstoreGC interface --- blockstore/badger/blockstore.go | 71 ++++++++++++---------------- blockstore/badger/blockstore_test.go | 4 +- blockstore/blockstore.go | 23 +++++---- 3 files changed, 43 insertions(+), 55 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 50a092d12..1ef622f39 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -222,15 +222,14 @@ func (b *Blockstore) unlockMove(state int) { b.moveMx.Unlock() } -// moveTo moves the blockstore to path, and creates a symlink from the current path -// to the new path; the old blockstore is deleted. -// If path is empty, then a new path adjacent to the current path is created -// automatically. -// The blockstore must accept new writes during the move and ensure that these +// movingGC moves the blockstore to a new path, adjacent to the current path, and creates +// a symlink from the current path to the new path; the old blockstore is deleted. +// +// The blockstore MUST accept new writes during the move and ensure that these // are persisted to the new blockstore; if a failure occurs aboring the move, // then they must be peristed to the old blockstore. // In short, the blockstore must not lose data from new writes during the move. -func (b *Blockstore) moveTo(path string) error { +func (b *Blockstore) movingGC() error { // this inlines moveLock/moveUnlock for the initial state check to prevent a second move // while one is in progress without clobbering state b.moveMx.Lock() @@ -248,9 +247,7 @@ func (b *Blockstore) moveTo(path string) error { b.moveCond.Broadcast() b.moveMx.Unlock() - if path == "" { - path = fmt.Sprintf("%s.%d", b.opts.Dir, time.Now().Unix()) - } + path := fmt.Sprintf("%s.%d", b.opts.Dir, time.Now().Unix()) defer func() { b.lockMove() @@ -469,39 +466,6 @@ func (b *Blockstore) deleteDB(path string) { } } -// CollectGarbage compacts and runs garbage collection on the value log; -// implements the BlockstoreGC trait -func (b *Blockstore) CollectGarbage(options map[interface{}]interface{}) error { - if err := b.access(); err != nil { - return err - } - defer b.viewers.Done() - - var movingGC bool - movingGCOpt, ok := options[blockstore.BlockstoreMovingGC] - if ok { - movingGC, ok = movingGCOpt.(bool) - if !ok { - return fmt.Errorf("incorrect type for moving gc option; expected bool but got %T", movingGCOpt) - } - } - - if !movingGC { - return b.onlineGC() - } - - var movingGCPath string - movingGCPathOpt, ok := options[blockstore.BlockstoreMovingGCPath] - if ok { - movingGCPath, ok = movingGCPathOpt.(string) - if !ok { - return fmt.Errorf("incorrect type for moving gc path option; expected string but got %T", movingGCPathOpt) - } - } - - return b.moveTo(movingGCPath) -} - func (b *Blockstore) onlineGC() error { b.lockDB() defer b.unlockDB() @@ -529,6 +493,29 @@ func (b *Blockstore) onlineGC() error { return err } +// CollectGarbage compacts and runs garbage collection on the value log; +// implements the BlockstoreGC trait +func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + var options blockstore.BlockstoreGCOptions + for _, opt := range opts { + err := opt(&options) + if err != nil { + return err + } + } + + if options.FullGC { + return b.movingGC() + } + + return b.onlineGC() +} + // Size returns the aggregate size of the blockstore func (b *Blockstore) Size() (int64, error) { if err := b.access(); err != nil { diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index f40ccbaf8..d1312eccc 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -154,9 +154,7 @@ func testMove(t *testing.T, optsF func(string) Options) { return nil }) g.Go(func() error { - return db.CollectGarbage(map[interface{}]interface{}{ - blockstore.BlockstoreMovingGC: true, - }) + return db.CollectGarbage(blockstore.WithFullGC(true)) }) err = g.Wait() diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index a9482b323..8ede31eb9 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -37,20 +37,23 @@ type BlockstoreIterator interface { // BlockstoreGC is a trait for blockstores that support online garbage collection type BlockstoreGC interface { - CollectGarbage(options map[interface{}]interface{}) error + CollectGarbage(options ...BlockstoreGCOption) error } -// garbage collection options -type blockstoreMovingGCKey struct{} -type blockstoreMovingGCPath struct{} +// BlockstoreGCOption is a functional interface for controlling blockstore GC options +type BlockstoreGCOption = func(*BlockstoreGCOptions) error -// BlockstoreMovingGC is a garbage collection option that instructs the blockstore -// to use moving GC if supported. -var BlockstoreMovingGC = blockstoreMovingGCKey{} +// BlockstoreGCOptions is a struct with GC options +type BlockstoreGCOptions struct { + FullGC bool +} -// BlockstoreMovingGCPath is a garbage collection option that specifies an optional -// target path for moving GC. -var BlockstoreMovingGCPath = blockstoreMovingGCPath{} +func WithFullGC(fullgc bool) BlockstoreGCOption { + return func(opts *BlockstoreGCOptions) error { + opts.FullGC = fullgc + return nil + } +} // BlockstoreSize is a trait for on-disk blockstores that can report their size type BlockstoreSize interface { From 9d254647039cf562db5d5c4a6041bb967e6cf735 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 09:53:22 +0300 Subject: [PATCH 22/33] use functional options for hotstore gc, rename MovingGC to FullGC --- blockstore/splitstore/splitstore.go | 12 ++++++------ blockstore/splitstore/splitstore_gc.go | 12 +++++------- node/config/def.go | 2 +- node/config/types.go | 6 +++--- node/modules/blockstore.go | 8 ++++---- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index d463859b9..171b5a6e4 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -82,12 +82,12 @@ type Config struct { // for which messages will be retained in the hotstore. HotStoreMessageRetention uint64 - // HotstoreMovingGCFrequency indicates how frequently to garbage collect the hotstore using - // moving GC (if supported by the hotstore). - // A value of 0 disables moving GC entirely. - // A positive value is the number of compactions before a moving GC is performed; - // a value of 1 will perform moving GC in every compaction. - HotStoreMovingGCFrequency uint64 + // HotstoreFullGCFrequency indicates how frequently (in terms of compactions) to garbage collect + // the hotstore using full (moving) GC if supported by the hotstore. + // A value of 0 disables full GC entirely. + // A positive value is the number of compactions before a full GC is performed; + // a value of 1 will perform full GC in every compaction. + HotStoreFullGCFrequency uint64 } // ChainAccessor allows the Splitstore to access the chain. It will most likely diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index cd7703963..2e1ffd4ad 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -8,11 +8,9 @@ import ( ) func (s *SplitStore) gcHotstore() { - var opts map[interface{}]interface{} - if s.cfg.HotStoreMovingGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreMovingGCFrequency) == 0 { - opts = map[interface{}]interface{}{ - bstore.BlockstoreMovingGC: true, - } + var opts []bstore.BlockstoreGCOption + if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 { + opts = append(opts, bstore.WithFullGC(true)) } if err := s.gcBlockstore(s.hot, opts); err != nil { @@ -20,12 +18,12 @@ func (s *SplitStore) gcHotstore() { } } -func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts map[interface{}]interface{}) error { +func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error { if gc, ok := b.(bstore.BlockstoreGC); ok { log.Info("garbage collecting blockstore") startGC := time.Now() - if err := gc.CollectGarbage(opts); err != nil { + if err := gc.CollectGarbage(opts...); err != nil { return err } diff --git a/node/config/def.go b/node/config/def.go index 554ae66ad..c5c455c68 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -73,7 +73,7 @@ func DefaultFullNode() *FullNode { HotStoreType: "badger", MarkSetType: "map", - HotStoreMovingGCFrequency: 20, + HotStoreFullGCFrequency: 20, }, }, } diff --git a/node/config/types.go b/node/config/types.go index 66a179bd5..fe42aa27e 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -291,10 +291,10 @@ type Splitstore struct { // HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond // the compaction boundary; default is 0. HotStoreMessageRetention uint64 - // HotStoreMovingGCFrequency specifies how often to perform moving GC on the hotstore. - // A value of 0 disables, while a value 1 will do moving GC in every compaction. + // HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore. + // A value of 0 disables, while a value 1 will do full GC in every compaction. // Default is 20 (about once a week). - HotStoreMovingGCFrequency uint64 + HotStoreFullGCFrequency uint64 } // // Full Node diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 32e67cf62..2486b9744 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -78,10 +78,10 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } cfg := &splitstore.Config{ - MarkSetType: cfg.Splitstore.MarkSetType, - DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", - HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, - HotStoreMovingGCFrequency: cfg.Splitstore.HotStoreMovingGCFrequency, + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, + HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil { From c03859c1b53645257028fa71340656c9bf3dbb8b Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 10:05:35 +0300 Subject: [PATCH 23/33] resolve symlinks when constructing the new db path so that the new path is adjacent to the old path, allowing the user to symlink the db in a different file system. --- blockstore/badger/blockstore.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 1ef622f39..83c5012c1 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -247,7 +247,7 @@ func (b *Blockstore) movingGC() error { b.moveCond.Broadcast() b.moveMx.Unlock() - path := fmt.Sprintf("%s.%d", b.opts.Dir, time.Now().Unix()) + var path string defer func() { b.lockMove() @@ -276,6 +276,23 @@ func (b *Blockstore) movingGC() error { } }() + // we resolve symlinks to create the new path in the adjacent to the old path. + // this allows the user to symlink the db directory into a separate filesystem. + basePath := b.opts.Dir + linkPath, err := filepath.EvalSymlinks(basePath) + if err != nil { + return fmt.Errorf("error resolving symlink %s: %w", basePath, err) + } + + if basePath == linkPath { + path = basePath + } else { + name := filepath.Base(basePath) + dir := filepath.Dir(linkPath) + path = filepath.Join(dir, name) + } + path = fmt.Sprintf("%s.%d", path, time.Now().Unix()) + log.Infof("moving blockstore from %s to %s", b.opts.Dir, path) opts := b.opts From cbaffab9dd3029bfd18bc22a8632075949240dc2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 10:08:07 +0300 Subject: [PATCH 24/33] use EvalSymlinks in deleteDB --- blockstore/badger/blockstore.go | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 83c5012c1..6bab69915 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -447,26 +447,10 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro func (b *Blockstore) deleteDB(path string) { // follow symbolic links, otherwise the data wil be left behind - lpath := path - for { - fi, err := os.Lstat(lpath) - if err != nil { - log.Warnf("error stating %s: %s", lpath, err) - return - } - - if fi.Mode()&os.ModeSymlink == 0 { - break - } - - log.Infof("resolving symbolic link %s", lpath) - newpath, err := os.Readlink(lpath) - if err != nil { - log.Warnf("error resolving symbolic link %s: %s", lpath, err) - return - } - log.Infof("resolved symbolic link %s -> %s", lpath, newpath) - lpath = newpath + lpath, err := filepath.EvalSymlinks(path) + if err != nil { + log.Warnf("error resolving symlinks in %s", path) + return } log.Infof("removing data directory %s", lpath) From d6ace68540826625888cb36a201d8bb440826509 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 10:16:50 +0300 Subject: [PATCH 25/33] extend test to do a double move and check symlink following --- blockstore/badger/blockstore.go | 2 +- blockstore/badger/blockstore_test.go | 124 +++++++++++++++------------ 2 files changed, 71 insertions(+), 55 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 6bab69915..8730c8134 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -291,7 +291,7 @@ func (b *Blockstore) movingGC() error { dir := filepath.Dir(linkPath) path = filepath.Join(dir, name) } - path = fmt.Sprintf("%s.%d", path, time.Now().Unix()) + path = fmt.Sprintf("%s.%d", path, time.Now().UnixNano()) log.Infof("moving blockstore from %s to %s", b.opts.Dir, path) diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index d1312eccc..ddfa6f28d 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -163,72 +163,88 @@ func testMove(t *testing.T, optsF func(string) Options) { } // now check that we have all the blocks in have and none in the deleted lists - for _, blk := range have { - has, err := db.Has(blk.Cid()) - if err != nil { - t.Fatal(err) + checkBlocks := func() { + for _, blk := range have { + has, err := db.Has(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("missing block") + } + + blk2, err := db.Get(blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(blk.RawData(), blk2.RawData()) { + t.Fatal("data mismatch") + } } - if !has { - t.Fatal("missing block") - } + for _, c := range deleted { + has, err := db.Has(c) + if err != nil { + t.Fatal(err) + } - blk2, err := db.Get(blk.Cid()) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(blk.RawData(), blk2.RawData()) { - t.Fatal("data mismatch") + if has { + t.Fatal("resurrected block") + } } } - for _, c := range deleted { - has, err := db.Has(c) - if err != nil { - t.Fatal(err) - } - - if has { - t.Fatal("resurrected block") - } - } + checkBlocks() // check the basePath -- it should contain a directory with name db.{timestamp}, soft-linked // to db and nothing else - entries, err := os.ReadDir(basePath) - if err != nil { + checkPath := func() { + entries, err := os.ReadDir(basePath) + if err != nil { + t.Fatal(err) + } + + if len(entries) != 2 { + t.Fatalf("too many entries; expected %d but got %d", 2, len(entries)) + } + + var haveDB, haveDBLink bool + for _, e := range entries { + if e.Name() == "db" { + if (e.Type() & os.ModeSymlink) == 0 { + t.Fatal("found db, but it's not a symlink") + } + haveDBLink = true + continue + } + if strings.HasPrefix(e.Name(), "db.") { + if !e.Type().IsDir() { + t.Fatal("found db prefix, but it's not a directory") + } + haveDB = true + continue + } + } + + if !haveDB { + t.Fatal("db directory is missing") + } + if !haveDBLink { + t.Fatal("db link is missing") + } + } + + checkPath() + + // now do another FullGC to test the double move and following of symlinks + if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil { t.Fatal(err) } - if len(entries) != 2 { - t.Fatalf("too many entries; expected %d but got %d", 2, len(entries)) - } - - var haveDB, haveDBLink bool - for _, e := range entries { - if e.Name() == "db" { - if (e.Type() & os.ModeSymlink) == 0 { - t.Fatal("found db, but it's not a symlink") - } - haveDBLink = true - continue - } - if strings.HasPrefix(e.Name(), "db.") { - if !e.Type().IsDir() { - t.Fatal("found db prefix, but it's not a directory") - } - haveDB = true - continue - } - } - - if !haveDB { - t.Fatal("db directory is missing") - } - if !haveDBLink { - t.Fatal("db link is missing") - } + checkBlocks() + checkPath() } func TestMoveNoPrefix(t *testing.T) { From 649fc6286387bf11c016297101f93f4b595bdd0d Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 10:27:16 +0300 Subject: [PATCH 26/33] panic if we fail to correctly setup the db paths. we can't really continue and leave a ticking bomb for the next restart; the user might not see it. --- blockstore/badger/blockstore.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 8730c8134..f13bac746 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -322,30 +322,27 @@ func (b *Blockstore) movingGC() error { err = db1.Close() if err != nil { - log.Warnf("error closing badger db: %s", err) + log.Warnf("error closing old badger db: %s", err) } dbpath := b.opts.Dir oldpath := fmt.Sprintf("%s.old.%d", dbpath, time.Now().Unix()) - ok := true - err = os.Rename(dbpath, oldpath) - if err != nil { - // this is bad, but not catastrophic; new data will be written in db2 and user can fix - log.Errorf("error renaming badger db dir from %s to %s; USER ACTION REQUIRED", dbpath, oldpath) - ok = false + if err = os.Rename(dbpath, oldpath); err != nil { + // this is not catastrophic in the sense that we have not lost any data. + // but it is pretty bad, as the db path points to the old db, while we are now using to the new + // db; we can't continue and leave a ticking bomb for the next restart. + // so a panic is appropriate and user can fix. + panic(fmt.Errorf("error renaming old badger db dir from %s to %s: %w; USER ACTION REQUIRED", dbpath, oldpath, err)) //nolint } - if ok { - err = os.Symlink(path, dbpath) - if err != nil { - // ditto, this is bad, but not catastrophic; user can fix - log.Errorf("error symlinking badger db dir from %s to %s; USER ACTION REQUIRED", path, dbpath) - } - - b.deleteDB(oldpath) + if err = os.Symlink(path, dbpath); err != nil { + // same here; the db path is pointing to the void. panic and let the user fix. + panic(fmt.Errorf("error symlinking new badger db dir from %s to %s: %w; USER ACTION REQUIRED", path, dbpath, err)) //nolint } + b.deleteDB(oldpath) + log.Info("moving blockstore done") return nil } From 0baeec068618312b7de3990f47070018908c606f Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 11:18:24 +0300 Subject: [PATCH 27/33] remove filter from doCopy; it's not used it was there to support a potential CopyTo interface; but we'll cross that bridge when we get there. --- blockstore/badger/blockstore.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f13bac746..f06f17c58 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -309,7 +309,7 @@ func (b *Blockstore) movingGC() error { b.unlockMove(moveStateMoving) log.Info("copying blockstore") - err = b.doCopy(b.db, b.db2, nil) + err = b.doCopy(b.db, b.db2) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) } @@ -349,7 +349,7 @@ func (b *Blockstore) movingGC() error { // doCopy copies a badger blockstore to another, with an optional filter; if the filter // is not nil, then only cids that satisfy the filter will be copied. -func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) error { +func (b *Blockstore) doCopy(from, to *badger.DB) error { count := 0 batch := to.NewWriteBatch() defer batch.Cancel() @@ -375,7 +375,6 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro } defer putPooled() - var buf []byte for iter.Rewind(); iter.Valid(); iter.Next() { if !b.isOpen() { return ErrBlockstoreClosed @@ -384,28 +383,6 @@ func (b *Blockstore) doCopy(from, to *badger.DB, filter func(cid.Cid) bool) erro item := iter.Item() kk := item.Key() - if filter != nil { - k := kk - if b.prefixing { - k = k[b.prefixLen:] - } - - klen := base32.RawStdEncoding.DecodedLen(len(k)) - if klen > len(buf) { - buf = make([]byte, klen) - } - - n, err := base32.RawStdEncoding.Decode(buf, k) - if err != nil { - return err - } - - c := cid.NewCidV1(cid.Raw, buf[:n]) - if !filter(c) { - continue - } - } - k := getPooled(len(kk)) copy(k, kk) From 59aebba0d9234661d6a1bd03267e494f54278f1a Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 11:35:21 +0300 Subject: [PATCH 28/33] use a slab allocator for the copy --- blockstore/badger/blockstore.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f06f17c58..5a451169e 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -361,19 +361,31 @@ func (b *Blockstore) doCopy(from, to *badger.DB) error { iter := txn.NewIterator(opts) defer iter.Close() - pooled := make([][]byte, 0, 2*moveBatchSize) - getPooled := func(size int) []byte { - buf := pool.Get(size) - pooled = append(pooled, buf) + // allocate a slab to improve performance; buffers larger than 1K will be allocated from the pool + var pooled [][]byte + const maxSlabSize = 1024 + slab := make([]byte, moveBatchSize*maxSlabSize) + slabStart := 0 + getSlab := func(size int) []byte { + if size > maxSlabSize { + buf := pool.Get(size) + pooled = append(pooled, buf) + return buf + } + + slabEnd := slabStart + size + buf := slab[slabStart:slabEnd] + slabStart = slabEnd return buf } - putPooled := func() { + resetSlab := func() { + slabStart = 0 for _, buf := range pooled { pool.Put(buf) } pooled = pooled[:0] } - defer putPooled() + defer resetSlab() for iter.Rewind(); iter.Valid(); iter.Next() { if !b.isOpen() { @@ -383,12 +395,12 @@ func (b *Blockstore) doCopy(from, to *badger.DB) error { item := iter.Item() kk := item.Key() - k := getPooled(len(kk)) + k := getSlab(len(kk)) copy(k, kk) var v []byte err := item.Value(func(vv []byte) error { - v = getPooled(len(vv)) + v = getSlab(len(vv)) copy(v, vv) return nil }) @@ -408,7 +420,7 @@ func (b *Blockstore) doCopy(from, to *badger.DB) error { // Flush discards the transaction, so we need a new batch batch = to.NewWriteBatch() count = 0 - putPooled() + resetSlab() } } From b82f953fd58e6c2d3931ce8a81606380edccfee2 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 11:46:35 +0300 Subject: [PATCH 29/33] use the badger streaming interface in doCopy --- blockstore/badger/blockstore.go | 87 ++++++--------------------------- 1 file changed, 16 insertions(+), 71 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 5a451169e..e8b5247ce 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -12,6 +12,7 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" "github.com/multiformats/go-base32" "go.uber.org/zap" @@ -36,8 +37,6 @@ var ( log = logger.Logger("badgerbs") ) -const moveBatchSize = 16384 - // aliases to mask badger dependencies. const ( // FileIO is equivalent to badger/options.FileIO. @@ -350,85 +349,31 @@ func (b *Blockstore) movingGC() error { // doCopy copies a badger blockstore to another, with an optional filter; if the filter // is not nil, then only cids that satisfy the filter will be copied. func (b *Blockstore) doCopy(from, to *badger.DB) error { - count := 0 - batch := to.NewWriteBatch() - defer batch.Cancel() - - txn := from.NewTransaction(false) - defer txn.Discard() - - opts := badger.IteratorOptions{PrefetchSize: moveBatchSize} - iter := txn.NewIterator(opts) - defer iter.Close() - - // allocate a slab to improve performance; buffers larger than 1K will be allocated from the pool - var pooled [][]byte - const maxSlabSize = 1024 - slab := make([]byte, moveBatchSize*maxSlabSize) - slabStart := 0 - getSlab := func(size int) []byte { - if size > maxSlabSize { - buf := pool.Get(size) - pooled = append(pooled, buf) - return buf - } - - slabEnd := slabStart + size - buf := slab[slabStart:slabEnd] - slabStart = slabEnd - return buf + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 } - resetSlab := func() { - slabStart = 0 - for _, buf := range pooled { - pool.Put(buf) - } - pooled = pooled[:0] - } - defer resetSlab() - for iter.Rewind(); iter.Valid(); iter.Next() { - if !b.isOpen() { - return ErrBlockstoreClosed - } + stream := from.NewStream() + stream.NumGo = workers + stream.LogPrefix = "doCopy" + stream.Send = func(list *pb.KVList) error { + batch := to.NewWriteBatch() + defer batch.Cancel() - item := iter.Item() - - kk := item.Key() - k := getSlab(len(kk)) - copy(k, kk) - - var v []byte - err := item.Value(func(vv []byte) error { - v = getSlab(len(vv)) - copy(v, vv) - return nil - }) - if err != nil { - return err - } - - if err := batch.Set(k, v); err != nil { - return err - } - - count++ - if count == moveBatchSize { - if err := batch.Flush(); err != nil { + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + if err := batch.Set(kv.Key, kv.Value); err != nil { return err } - // Flush discards the transaction, so we need a new batch - batch = to.NewWriteBatch() - count = 0 - resetSlab() } - } - if count > 0 { return batch.Flush() } - return nil + return stream.Orchestrate(context.Background()) } func (b *Blockstore) deleteDB(path string) { From bb2d99908c0d2e3298d7a99eb7c4ad03065471c0 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 12:03:26 +0300 Subject: [PATCH 30/33] make state constants typed --- blockstore/badger/blockstore.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index e8b5247ce..f577d81f8 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -76,28 +76,40 @@ func (b *badgerLogger) Warningf(format string, args ...interface{}) { b.skip2.Warnf(format, args...) } +// bsState is the current blockstore state +type bsState int + const ( - stateOpen = iota + // stateOpen signifies an open blockstore + stateOpen bsState = iota + // stateClosing signifies a blockstore that is currently closing stateClosing + // stateClosed signifies a blockstore that has been colosed stateClosed ) +type bsMoveState int + const ( - moveStateNone = iota + // moveStateNone signifies that there is no move in progress + moveStateNone bsMoveState = iota + // moveStateMoving signifies that there is a move in a progress moveStateMoving + // moveStateCleanup signifies that a move has completed or aborted and we are cleaning up moveStateCleanup + // moveStateLock signifies that an exclusive lock has been acquired moveStateLock ) // Blockstore is a badger-backed IPLD blockstore. type Blockstore struct { stateLk sync.RWMutex - state int + state bsState viewers sync.WaitGroup moveMx sync.Mutex moveCond sync.Cond - moveState int + moveState bsMoveState rlock int db *badger.DB @@ -215,7 +227,7 @@ func (b *Blockstore) lockMove() { } } -func (b *Blockstore) unlockMove(state int) { +func (b *Blockstore) unlockMove(state bsMoveState) { b.moveState = state b.moveCond.Broadcast() b.moveMx.Unlock() @@ -254,7 +266,7 @@ func (b *Blockstore) movingGC() error { db2 := b.db2 b.db2 = nil - var state int + var state bsMoveState if db2 != nil { state = moveStateCleanup } else { From c21c4136954733af5ddb6487551652b65751f2ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 12:06:40 +0300 Subject: [PATCH 31/33] remove db2 to dbNext --- blockstore/badger/blockstore.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index f577d81f8..8e1a3a1ff 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -112,9 +112,9 @@ type Blockstore struct { moveState bsMoveState rlock int - db *badger.DB - db2 *badger.DB // when moving - opts Options + db *badger.DB + dbNext *badger.DB // when moving + opts Options prefixing bool prefix []byte @@ -263,8 +263,8 @@ func (b *Blockstore) movingGC() error { defer func() { b.lockMove() - db2 := b.db2 - b.db2 = nil + db2 := b.dbNext + b.dbNext = nil var state bsMoveState if db2 != nil { @@ -316,19 +316,19 @@ func (b *Blockstore) movingGC() error { } b.lockMove() - b.db2 = db2 + b.dbNext = db2 b.unlockMove(moveStateMoving) log.Info("copying blockstore") - err = b.doCopy(b.db, b.db2) + err = b.doCopy(b.db, b.dbNext) if err != nil { return fmt.Errorf("error moving badger blockstore to %s: %w", path, err) } b.lockMove() db1 := b.db - b.db = b.db2 - b.db2 = nil + b.db = b.dbNext + b.dbNext = nil b.unlockMove(moveStateCleanup) err = db1.Close() @@ -652,8 +652,8 @@ func (b *Blockstore) Put(block blocks.Block) error { return err } - if b.db2 != nil { - if err := put(b.db2); err != nil { + if b.dbNext != nil { + if err := put(b.dbNext); err != nil { return err } } @@ -716,8 +716,8 @@ func (b *Blockstore) PutMany(blocks []blocks.Block) error { return err } - if b.db2 != nil { - if err := put(b.db2); err != nil { + if b.dbNext != nil { + if err := put(b.dbNext); err != nil { return err } } From 88097071582528d4fed28b7ddbc6a77fb430d246 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 12:08:39 +0300 Subject: [PATCH 32/33] update README --- blockstore/splitstore/README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/blockstore/splitstore/README.md b/blockstore/splitstore/README.md index 079b50cc7..4efd6f61d 100644 --- a/blockstore/splitstore/README.md +++ b/blockstore/splitstore/README.md @@ -59,17 +59,15 @@ These are options in the `[Chainstore.Splitstore]` section of the configuration: nodes beyond 4 finalities, while running with the discard coldstore option. It is also useful for miners who accept deals and need to lookback messages beyond the 4 finalities, which would otherwise hit the coldstore. -- `HotStoreMovingGCFrequency` -- specifies how frequenty to garbage collect the hotstore - using moving GC. - The default value is 20, which uses moving GC every 20 compactions; set to 0 to disable moving - GC altogether. +- `HotStoreFullGCFrequency` -- specifies how frequenty to garbage collect the hotstore + using full (moving) GC. + The default value is 20, which uses full GC every 20 compactions (about once a week); + set to 0 to disable full GC altogether. Rationale: badger supports online GC, and this is used by default. However it has proven to be ineffective in practice with the hotstore size slowly creeping up. In order to address this, we have added moving GC support in our badger wrapper, which can effectively reclaim all space. - The downside is that it takes a bit of time to perform a moving GC (about 40min) and you also - need enough space to house the new hotstore while the old one is still live. - This option controls how frequently to perform moving GC, with the default of 20 corresponding - to about once a week. + The downside is that it takes a bit longer to perform a moving GC and you also need enough + space to house the new hotstore while the old one is still live. ## Operation From 7c195245a10100951c711581c5cda2cd9ed158e7 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 27 Jul 2021 12:13:26 +0300 Subject: [PATCH 33/33] make cfgdoc-gen --- node/config/doc_gen.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 7ebecc5bc..5d4a91d5f 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -710,11 +710,11 @@ It can be "map" (default) for in memory marking or "badger" for on-disk marking. the compaction boundary; default is 0.`, }, { - Name: "HotStoreMovingGCFrequency", + Name: "HotStoreFullGCFrequency", Type: "uint64", - Comment: `HotStoreMovingGCFrequency specifies how often to perform moving GC on the hotstore. -A value of 0 disables, while a value 1 will do moving GC in every compaction. + Comment: `HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore. +A value of 0 disables, while a value 1 will do full GC in every compaction. Default is 20 (about once a week).`, }, },