Merge pull request #3980 from filecoin-project/feat/batch-blockstore-copy
batch blockstore copies after block validation
This commit is contained in:
commit
bbcad52ce0
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
"github.com/Gurpartap/async"
|
"github.com/Gurpartap/async"
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -374,21 +375,28 @@ func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) bool {
|
|||||||
return syncer.InformNewHead(from, fts)
|
return syncer.InformNewHead(from, fts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyBlockstore(from, to bstore.Blockstore) error {
|
func copyBlockstore(ctx context.Context, from, to bstore.Blockstore) error {
|
||||||
cids, err := from.AllKeysChan(context.TODO())
|
ctx, span := trace.StartSpan(ctx, "copyBlockstore")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
cids, err := from.AllKeysChan(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: should probably expose better methods on the blockstore for this operation
|
||||||
|
var blks []blocks.Block
|
||||||
for c := range cids {
|
for c := range cids {
|
||||||
b, err := from.Get(c)
|
b, err := from.Get(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := to.Put(b); err != nil {
|
blks = append(blks, b)
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
|
if err := to.PutMany(blks); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -1515,11 +1523,11 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := persistMessages(bs, bstip); err != nil {
|
if err := persistMessages(ctx, bs, bstip); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := copyBlockstore(bs, syncer.store.Blockstore()); err != nil {
|
if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil {
|
||||||
return xerrors.Errorf("message processing failed: %w", err)
|
return xerrors.Errorf("message processing failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1596,7 +1604,10 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet
|
|||||||
return batch, nil
|
return batch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
|
func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
|
||||||
|
_, span := trace.StartSpan(ctx, "persistMessages")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
for _, m := range bst.Bls {
|
for _, m := range bst.Bls {
|
||||||
//log.Infof("putting BLS message: %s", m.Cid())
|
//log.Infof("putting BLS message: %s", m.Cid())
|
||||||
if _, err := store.PutMessage(bs, m); err != nil {
|
if _, err := store.PutMessage(bs, m); err != nil {
|
||||||
|
@ -546,7 +546,7 @@ func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) {
|
|||||||
return cid.Undef, xerrors.Errorf("flushing vm: %w", err)
|
return cid.Undef, xerrors.Errorf("flushing vm: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := Copy(from, to, root); err != nil {
|
if err := Copy(ctx, from, to, root); err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("copying tree: %w", err)
|
return cid.Undef, xerrors.Errorf("copying tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,9 +600,18 @@ func linksForObj(blk block.Block, cb func(cid.Cid)) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Copy(from, to blockstore.Blockstore, root cid.Cid) error {
|
func Copy(ctx context.Context, from, to blockstore.Blockstore, root cid.Cid) error {
|
||||||
|
ctx, span := trace.StartSpan(ctx, "vm.Copy") // nolint
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var numBlocks int
|
||||||
|
var totalCopySize int
|
||||||
|
|
||||||
var batch []block.Block
|
var batch []block.Block
|
||||||
batchCp := func(blk block.Block) error {
|
batchCp := func(blk block.Block) error {
|
||||||
|
numBlocks++
|
||||||
|
totalCopySize += len(blk.RawData())
|
||||||
|
|
||||||
batch = append(batch, blk)
|
batch = append(batch, blk)
|
||||||
if len(batch) > 100 {
|
if len(batch) > 100 {
|
||||||
if err := to.PutMany(batch); err != nil {
|
if err := to.PutMany(batch); err != nil {
|
||||||
@ -623,6 +632,11 @@ func Copy(from, to blockstore.Blockstore, root cid.Cid) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.AddAttributes(
|
||||||
|
trace.Int64Attribute("numBlocks", int64(numBlocks)),
|
||||||
|
trace.Int64Attribute("copySize", int64(totalCopySize)),
|
||||||
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user