Merge pull request #10800 from filecoin-project/asr/speedup-persist
feat: chainstore: batch writes of tipsets
This commit is contained in:
commit
76776e5cc1
@ -123,11 +123,9 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
ts := root
|
ts := root
|
||||||
|
tssToPersist := make([]*types.TipSet, 0, TipsetkeyBackfillRange)
|
||||||
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
|
for i := 0; i < int(TipsetkeyBackfillRange); i++ {
|
||||||
err = cs.PersistTipset(ctx, ts)
|
tssToPersist = append(tssToPersist, ts)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
parentTsKey := ts.Parents()
|
parentTsKey := ts.Parents()
|
||||||
ts, err = cs.LoadTipSet(ctx, parentTsKey)
|
ts, err = cs.LoadTipSet(ctx, parentTsKey)
|
||||||
if ts == nil || err != nil {
|
if ts == nil || err != nil {
|
||||||
@ -136,6 +134,10 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := cs.PersistTipsets(ctx, tssToPersist); err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to persist tipsets: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return root, nil
|
return root, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,7 +378,7 @@ func (cs *ChainStore) SetGenesis(ctx context.Context, b *types.BlockHeader) erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
|
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
|
||||||
if err := cs.PersistTipset(ctx, ts); err != nil {
|
if err := cs.PersistTipsets(ctx, []*types.TipSet{ts}); err != nil {
|
||||||
return xerrors.Errorf("failed to persist tipset: %w", err)
|
return xerrors.Errorf("failed to persist tipset: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -970,18 +970,25 @@ func (cs *ChainStore) AddToTipSetTracker(ctx context.Context, b *types.BlockHead
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) PersistTipset(ctx context.Context, ts *types.TipSet) error {
|
func (cs *ChainStore) PersistTipsets(ctx context.Context, tipsets []*types.TipSet) error {
|
||||||
if err := cs.persistBlockHeaders(ctx, ts.Blocks()...); err != nil {
|
toPersist := make([]*types.BlockHeader, 0, len(tipsets)*int(build.BlocksPerEpoch))
|
||||||
return xerrors.Errorf("failed to persist block headers: %w", err)
|
tsBlks := make([]block.Block, 0, len(tipsets))
|
||||||
}
|
for _, ts := range tipsets {
|
||||||
|
toPersist = append(toPersist, ts.Blocks()...)
|
||||||
tsBlk, err := ts.Key().ToStorageBlock()
|
tsBlk, err := ts.Key().ToStorageBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to get tipset key block: %w", err)
|
return xerrors.Errorf("failed to get tipset key block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = cs.chainLocalBlockstore.Put(ctx, tsBlk); err != nil {
|
tsBlks = append(tsBlks, tsBlk)
|
||||||
return xerrors.Errorf("failed to put tipset key block: %w", err)
|
}
|
||||||
|
|
||||||
|
if err := cs.persistBlockHeaders(ctx, toPersist...); err != nil {
|
||||||
|
return xerrors.Errorf("failed to persist block headers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cs.chainLocalBlockstore.PutMany(ctx, tsBlks); err != nil {
|
||||||
|
return xerrors.Errorf("failed to put tipset key blocks: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -228,7 +228,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
|
|||||||
|
|
||||||
// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
|
// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
|
||||||
// the blockstore
|
// the blockstore
|
||||||
if err := syncer.store.PersistTipset(ctx, fts.TipSet()); err != nil {
|
if err := syncer.store.PersistTipsets(ctx, []*types.TipSet{fts.TipSet()}); err != nil {
|
||||||
log.Warn("failed to persist incoming block header: ", err)
|
log.Warn("failed to persist incoming block header: ", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -1199,14 +1199,11 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet, hts *t
|
|||||||
ss.SetStage(api.StagePersistHeaders)
|
ss.SetStage(api.StagePersistHeaders)
|
||||||
|
|
||||||
// Write tipsets from oldest to newest.
|
// Write tipsets from oldest to newest.
|
||||||
for i := len(headers) - 1; i >= 0; i-- {
|
if err := syncer.store.PersistTipsets(ctx, headers); err != nil {
|
||||||
ts := headers[i]
|
|
||||||
if err := syncer.store.PersistTipset(ctx, ts); err != nil {
|
|
||||||
err = xerrors.Errorf("failed to persist synced tipset to the chainstore: %w", err)
|
err = xerrors.Errorf("failed to persist synced tipset to the chainstore: %w", err)
|
||||||
ss.Error(err)
|
ss.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
ss.SetStage(api.StageMessages)
|
ss.SetStage(api.StageMessages)
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ func (sim *Simulation) makeTipSet(ctx context.Context, messages []*types.Message
|
|||||||
return nil, xerrors.Errorf("failed to create new tipset: %w", err)
|
return nil, xerrors.Errorf("failed to create new tipset: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = sim.Node.Chainstore.PersistTipset(ctx, newTipSet)
|
err = sim.Node.Chainstore.PersistTipsets(ctx, []*types.TipSet{newTipSet})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to persist block headers: %w", err)
|
return nil, xerrors.Errorf("failed to persist block headers: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user