Add background flushing

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-11-11 20:42:00 +01:00
parent f9771c67fb
commit 9b64dba4a1
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA

View File

@ -682,17 +682,43 @@ func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) err
var numBlocks int
var totalCopySize int
var batch []block.Block
const batchSize = 128
const bufCount = 3
freeBufs := make(chan []block.Block, bufCount)
toFlush := make(chan []block.Block, bufCount)
for i := 0; i < bufCount; i++ {
freeBufs <- make([]block.Block, 0, batchSize)
}
errFlushChan := make(chan error)
go func() {
for b := range toFlush {
if err := to.PutMany(b); err != nil {
close(freeBufs)
errFlushChan <- xerrors.Errorf("batch put in copy: %w", err)
return
}
freeBufs <- b[:0]
}
close(errFlushChan)
close(freeBufs)
}()
var batch = <-freeBufs
batchCp := func(blk block.Block) error {
numBlocks++
totalCopySize += len(blk.RawData())
batch = append(batch, blk)
if len(batch) > 100 {
if err := to.PutMany(batch); err != nil {
return xerrors.Errorf("batch put in copy: %w", err)
if len(batch) >= batchSize {
toFlush <- batch
var ok bool
batch, ok = <-freeBufs
if !ok {
return <-errFlushChan
}
batch = batch[:0]
}
return nil
}
@ -702,9 +728,12 @@ func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) err
}
if len(batch) > 0 {
if err := to.PutMany(batch); err != nil {
return xerrors.Errorf("batch put in copy: %w", err)
toFlush <- batch
}
close(toFlush) // close the toFlush triggering the loop to end
err := <-errFlushChan // get error out or get nil if it was closed
if err != nil {
return err
}
span.AddAttributes(