feat: chain: Faster snapshot imports, zstd imports
This commit is contained in:
parent
8ba4355cab
commit
ac8ab3ef9e
@ -5,9 +5,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipld/go-car"
|
"github.com/ipld/go-car"
|
||||||
carutil "github.com/ipld/go-car/util"
|
carutil "github.com/ipld/go-car/util"
|
||||||
|
carv2 "github.com/ipld/go-car/v2"
|
||||||
mh "github.com/multiformats/go-multihash"
|
mh "github.com/multiformats/go-multihash"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -55,12 +57,58 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
|||||||
// universal store. When we physically segregate the stores, we will need
|
// universal store. When we physically segregate the stores, we will need
|
||||||
// to route state objects to the state blockstore, and chain objects to
|
// to route state objects to the state blockstore, and chain objects to
|
||||||
// the chain blockstore.
|
// the chain blockstore.
|
||||||
header, err := car.LoadCar(ctx, cs.StateBlockstore(), r)
|
|
||||||
|
br, err := carv2.NewBlockReader(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(header.Roots...))
|
s := cs.StateBlockstore()
|
||||||
|
|
||||||
|
parallelPuts := 5
|
||||||
|
putThrottle := make(chan error, parallelPuts)
|
||||||
|
for i := 0; i < parallelPuts; i++ {
|
||||||
|
putThrottle <- nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf []blocks.Block
|
||||||
|
for {
|
||||||
|
blk, err := br.Next()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
if len(buf) > 0 {
|
||||||
|
if err := s.PutMany(ctx, buf); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = append(buf, blk)
|
||||||
|
|
||||||
|
if len(buf) > 1000 {
|
||||||
|
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
|
||||||
|
return nil, lastErr
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(buf []blocks.Block) {
|
||||||
|
putThrottle <- s.PutMany(ctx, buf)
|
||||||
|
}(buf)
|
||||||
|
buf = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check errors
|
||||||
|
for i := 0; i < parallelPuts; i++ {
|
||||||
|
if lastErr := <-putThrottle; lastErr != nil {
|
||||||
|
return nil, lastErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"runtime/pprof"
|
"runtime/pprof"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/DataDog/zstd"
|
||||||
metricsprom "github.com/ipfs/go-metrics-prometheus"
|
metricsprom "github.com/ipfs/go-metrics-prometheus"
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
@ -491,6 +492,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
|
|
||||||
bufr := bufio.NewReaderSize(rd, 1<<20)
|
bufr := bufio.NewReaderSize(rd, 1<<20)
|
||||||
|
|
||||||
|
header, err := bufr.Peek(4)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("peek header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
bar := pb.New64(l)
|
bar := pb.New64(l)
|
||||||
br := bar.NewProxyReader(bufr)
|
br := bar.NewProxyReader(bufr)
|
||||||
bar.ShowTimeLeft = true
|
bar.ShowTimeLeft = true
|
||||||
@ -498,8 +504,20 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
bar.ShowSpeed = true
|
bar.ShowSpeed = true
|
||||||
bar.Units = pb.U_BYTES
|
bar.Units = pb.U_BYTES
|
||||||
|
|
||||||
|
var ir io.Reader = br
|
||||||
|
|
||||||
|
if string(header[1:]) == "\xB5\x2F\xFD" { // zstd
|
||||||
|
zr := zstd.NewReader(br)
|
||||||
|
defer func() {
|
||||||
|
if err := zr.Close(); err != nil {
|
||||||
|
log.Errorw("closing zstd reader", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
ir = zr
|
||||||
|
}
|
||||||
|
|
||||||
bar.Start()
|
bar.Start()
|
||||||
ts, err := cst.Import(ctx, br)
|
ts, err := cst.Import(ctx, ir)
|
||||||
bar.Finish()
|
bar.Finish()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user