diff --git a/chain/vm/vm.go b/chain/vm/vm.go index cb6571a7b..0e3381eac 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -682,17 +682,43 @@ func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) err var numBlocks int var totalCopySize int - var batch []block.Block + const batchSize = 128 + const bufCount = 3 + freeBufs := make(chan []block.Block, bufCount) + toFlush := make(chan []block.Block, bufCount) + for i := 0; i < bufCount; i++ { + freeBufs <- make([]block.Block, 0, batchSize) + } + + errFlushChan := make(chan error) + + go func() { + for b := range toFlush { + if err := to.PutMany(b); err != nil { + close(freeBufs) + errFlushChan <- xerrors.Errorf("batch put in copy: %w", err) + return + } + freeBufs <- b[:0] + } + close(errFlushChan) + close(freeBufs) + }() + + var batch = <-freeBufs batchCp := func(blk block.Block) error { numBlocks++ totalCopySize += len(blk.RawData()) batch = append(batch, blk) - if len(batch) > 100 { - if err := to.PutMany(batch); err != nil { - return xerrors.Errorf("batch put in copy: %w", err) + + if len(batch) >= batchSize { + toFlush <- batch + var ok bool + batch, ok = <-freeBufs + if !ok { + return <-errFlushChan } - batch = batch[:0] } return nil } @@ -702,9 +728,12 @@ func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) err } if len(batch) > 0 { - if err := to.PutMany(batch); err != nil { - return xerrors.Errorf("batch put in copy: %w", err) - } + toFlush <- batch + } + close(toFlush) // close the toFlush triggering the loop to end + err := <-errFlushChan // get error out or get nil if it was closed + if err != nil { + return err } span.AddAttributes(