diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 5a451169e..e8b5247ce 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -12,6 +12,7 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" + "github.com/dgraph-io/badger/v2/pb" "github.com/multiformats/go-base32" "go.uber.org/zap" @@ -36,8 +37,6 @@ var ( log = logger.Logger("badgerbs") ) -const moveBatchSize = 16384 - // aliases to mask badger dependencies. const ( // FileIO is equivalent to badger/options.FileIO. @@ -350,85 +349,31 @@ func (b *Blockstore) movingGC() error { // doCopy copies a badger blockstore to another, with an optional filter; if the filter // is not nil, then only cids that satisfy the filter will be copied. func (b *Blockstore) doCopy(from, to *badger.DB) error { - count := 0 - batch := to.NewWriteBatch() - defer batch.Cancel() - - txn := from.NewTransaction(false) - defer txn.Discard() - - opts := badger.IteratorOptions{PrefetchSize: moveBatchSize} - iter := txn.NewIterator(opts) - defer iter.Close() - - // allocate a slab to improve performance; buffers larger than 1K will be allocated from the pool - var pooled [][]byte - const maxSlabSize = 1024 - slab := make([]byte, moveBatchSize*maxSlabSize) - slabStart := 0 - getSlab := func(size int) []byte { - if size > maxSlabSize { - buf := pool.Get(size) - pooled = append(pooled, buf) - return buf - } - - slabEnd := slabStart + size - buf := slab[slabStart:slabEnd] - slabStart = slabEnd - return buf + workers := runtime.NumCPU() / 2 + if workers < 2 { + workers = 2 } - resetSlab := func() { - slabStart = 0 - for _, buf := range pooled { - pool.Put(buf) - } - pooled = pooled[:0] - } - defer resetSlab() - for iter.Rewind(); iter.Valid(); iter.Next() { - if !b.isOpen() { - return ErrBlockstoreClosed - } + stream := from.NewStream() + stream.NumGo = workers + stream.LogPrefix = "doCopy" + stream.Send = func(list *pb.KVList) error { + batch := to.NewWriteBatch() + defer batch.Cancel() - item := iter.Item() - - kk := item.Key() - k := getSlab(len(kk)) - copy(k, kk) - - var v []byte - err := item.Value(func(vv []byte) error { - v = getSlab(len(vv)) - copy(v, vv) - return nil - }) - if err != nil { - return err - } - - if err := batch.Set(k, v); err != nil { - return err - } - - count++ - if count == moveBatchSize { - if err := batch.Flush(); err != nil { + for _, kv := range list.Kv { + if kv.Key == nil || kv.Value == nil { + continue + } + if err := batch.Set(kv.Key, kv.Value); err != nil { return err } - // Flush discards the transaction, so we need a new batch - batch = to.NewWriteBatch() - count = 0 - resetSlab() } - } - if count > 0 { return batch.Flush() } - return nil + return stream.Orchestrate(context.Background()) } func (b *Blockstore) deleteDB(path string) {