feat: chain import: don't walk to genesis - 2-3x faster snapshot import (#11446)
* chain import: don't walk to genesis * fix daemon build * fast snapshot genesis: address review
This commit is contained in:
parent
1ca071ef77
commit
231b2070d4
@ -42,7 +42,7 @@ func TestIndexSeeks(t *testing.T) {
|
||||
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), filcns.Weight, nil)
|
||||
defer cs.Close() //nolint:errcheck
|
||||
|
||||
_, err = cs.Import(ctx, bytes.NewReader(gencar))
|
||||
_, _, err = cs.Import(ctx, bytes.NewReader(gencar))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
|
||||
func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (head *types.TipSet, genesis *types.BlockHeader, err error) {
|
||||
// TODO: writing only to the state blockstore is incorrect.
|
||||
// At this time, both the state and chain blockstores are backed by the
|
||||
// universal store. When we physically segregate the stores, we will need
|
||||
@ -69,7 +69,7 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
||||
|
||||
br, err := carv2.NewBlockReader(r)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
||||
return nil, nil, xerrors.Errorf("loadcar failed: %w", err)
|
||||
}
|
||||
|
||||
s := cs.StateBlockstore()
|
||||
@ -80,27 +80,51 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
||||
putThrottle <- nil
|
||||
}
|
||||
|
||||
if len(br.Roots) == 0 {
|
||||
return nil, nil, xerrors.Errorf("no roots in snapshot car file")
|
||||
}
|
||||
nextTailCid := br.Roots[0]
|
||||
|
||||
var tailBlock types.BlockHeader
|
||||
tailBlock.Height = abi.ChainEpoch(-1)
|
||||
|
||||
var buf []blocks.Block
|
||||
for {
|
||||
blk, err := br.Next()
|
||||
if err != nil {
|
||||
|
||||
// we're at the end
|
||||
if err == io.EOF {
|
||||
if len(buf) > 0 {
|
||||
if err := s.PutMany(ctx, buf); err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// check for header block, looking for genesis
|
||||
if blk.Cid() == nextTailCid && tailBlock.Height != 0 {
|
||||
if err := tailBlock.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to unmarshal genesis block: %w", err)
|
||||
}
|
||||
if len(tailBlock.Parents) > 0 {
|
||||
nextTailCid = tailBlock.Parents[0]
|
||||
} else {
|
||||
// note: even the 0th block has a parent linking to the cbor genesis block
|
||||
return nil, nil, xerrors.Errorf("current block (epoch %d cid %s) has no parents", tailBlock.Height, tailBlock.Cid())
|
||||
}
|
||||
}
|
||||
|
||||
// append to batch
|
||||
buf = append(buf, blk)
|
||||
|
||||
if len(buf) > 1000 {
|
||||
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
|
||||
return nil, lastErr
|
||||
return nil, nil, lastErr
|
||||
}
|
||||
|
||||
go func(buf []blocks.Block) {
|
||||
@ -113,13 +137,17 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
||||
// check errors
|
||||
for i := 0; i < parallelPuts; i++ {
|
||||
if lastErr := <-putThrottle; lastErr != nil {
|
||||
return nil, lastErr
|
||||
return nil, nil, lastErr
|
||||
}
|
||||
}
|
||||
|
||||
if tailBlock.Height != 0 {
|
||||
return nil, nil, xerrors.Errorf("expected genesis block to have height 0 (genesis), got %d: %s", tailBlock.Height, tailBlock.Cid())
|
||||
}
|
||||
|
||||
root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
||||
return nil, nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
|
||||
}
|
||||
|
||||
ts := root
|
||||
@ -135,10 +163,10 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
||||
}
|
||||
|
||||
if err := cs.PersistTipsets(ctx, tssToPersist); err != nil {
|
||||
return nil, xerrors.Errorf("failed to persist tipsets: %w", err)
|
||||
return nil, nil, xerrors.Errorf("failed to persist tipsets: %w", err)
|
||||
}
|
||||
|
||||
return root, nil
|
||||
return root, &tailBlock, nil
|
||||
}
|
||||
|
||||
type walkSchedTaskType int
|
||||
|
@ -118,7 +118,7 @@ func TestChainExportImport(t *testing.T) {
|
||||
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
|
||||
defer cs.Close() //nolint:errcheck
|
||||
|
||||
root, err := cs.Import(context.TODO(), buf)
|
||||
root, _, err := cs.Import(context.TODO(), buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -153,7 +153,7 @@ func TestChainImportTipsetKeyCid(t *testing.T) {
|
||||
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
|
||||
defer cs.Close() //nolint:errcheck
|
||||
|
||||
root, err := cs.Import(ctx, buf)
|
||||
root, _, err := cs.Import(ctx, buf)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Truef(t, root.Equals(last), "imported chain differed from exported chain")
|
||||
@ -202,7 +202,7 @@ func TestChainExportImportFull(t *testing.T) {
|
||||
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
|
||||
defer cs.Close() //nolint:errcheck
|
||||
|
||||
root, err := cs.Import(context.TODO(), buf)
|
||||
root, _, err := cs.Import(context.TODO(), buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ var importBenchCmd = &cli.Command{
|
||||
return fmt.Errorf("no CAR file provided for import")
|
||||
}
|
||||
|
||||
head, err = cs.Import(cctx.Context, carFile)
|
||||
head, _, err = cs.Import(cctx.Context, carFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ var genesisVerifyCmd = &cli.Command{
|
||||
return xerrors.Errorf("opening the car file: %w", err)
|
||||
}
|
||||
|
||||
ts, err := cs.Import(cctx.Context, f)
|
||||
ts, _, err := cs.Import(cctx.Context, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -565,7 +565,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
||||
}
|
||||
|
||||
bar.Start()
|
||||
ts, err := cst.Import(ctx, ir)
|
||||
ts, gen, err := cst.Import(ctx, ir)
|
||||
bar.Finish()
|
||||
|
||||
if err != nil {
|
||||
@ -576,18 +576,14 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
||||
return xerrors.Errorf("flushing validation cache failed: %w", err)
|
||||
}
|
||||
|
||||
gb, err := cst.GetTipsetByHeight(ctx, 0, ts, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = cst.SetGenesis(ctx, gb.Blocks()[0])
|
||||
log.Infof("setting genesis")
|
||||
err = cst.SetGenesis(ctx, gen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !snapshot {
|
||||
shd, err := drand.BeaconScheduleFromDrandSchedule(build.DrandConfigSchedule(), gb.MinTimestamp(), nil)
|
||||
shd, err := drand.BeaconScheduleFromDrandSchedule(build.DrandConfigSchedule(), gen.Timestamp, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to construct beacon schedule: %w", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user