use the badger streaming interface in doCopy

This commit is contained in:
vyzo 2021-07-27 11:46:35 +03:00
parent 59aebba0d9
commit b82f953fd5

View File

@ -12,6 +12,7 @@ import (
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/pb"
"github.com/multiformats/go-base32"
"go.uber.org/zap"
@ -36,8 +37,6 @@ var (
log = logger.Logger("badgerbs")
)
const moveBatchSize = 16384
// aliases to mask badger dependencies.
const (
// FileIO is equivalent to badger/options.FileIO.
@ -350,85 +349,31 @@ func (b *Blockstore) movingGC() error {
// doCopy copies a badger blockstore to another, with an optional filter; if the filter
// is not nil, then only cids that satisfy the filter will be copied.
func (b *Blockstore) doCopy(from, to *badger.DB) error {
count := 0
batch := to.NewWriteBatch()
defer batch.Cancel()
txn := from.NewTransaction(false)
defer txn.Discard()
opts := badger.IteratorOptions{PrefetchSize: moveBatchSize}
iter := txn.NewIterator(opts)
defer iter.Close()
// allocate a slab to improve performance; buffers larger than 1K will be allocated from the pool
var pooled [][]byte
const maxSlabSize = 1024
slab := make([]byte, moveBatchSize*maxSlabSize)
slabStart := 0
getSlab := func(size int) []byte {
if size > maxSlabSize {
buf := pool.Get(size)
pooled = append(pooled, buf)
return buf
}
slabEnd := slabStart + size
buf := slab[slabStart:slabEnd]
slabStart = slabEnd
return buf
workers := runtime.NumCPU() / 2
if workers < 2 {
workers = 2
}
resetSlab := func() {
slabStart = 0
for _, buf := range pooled {
pool.Put(buf)
}
pooled = pooled[:0]
}
defer resetSlab()
for iter.Rewind(); iter.Valid(); iter.Next() {
if !b.isOpen() {
return ErrBlockstoreClosed
}
stream := from.NewStream()
stream.NumGo = workers
stream.LogPrefix = "doCopy"
stream.Send = func(list *pb.KVList) error {
batch := to.NewWriteBatch()
defer batch.Cancel()
item := iter.Item()
kk := item.Key()
k := getSlab(len(kk))
copy(k, kk)
var v []byte
err := item.Value(func(vv []byte) error {
v = getSlab(len(vv))
copy(v, vv)
return nil
})
if err != nil {
return err
}
if err := batch.Set(k, v); err != nil {
return err
}
count++
if count == moveBatchSize {
if err := batch.Flush(); err != nil {
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
// Flush discards the transaction, so we need a new batch
batch = to.NewWriteBatch()
count = 0
resetSlab()
}
}
if count > 0 {
return batch.Flush()
}
return nil
return stream.Orchestrate(context.Background())
}
func (b *Blockstore) deleteDB(path string) {