cache load tipset

This commit is contained in:
whyrusleeping 2019-12-16 11:22:56 -08:00
parent c25f616562
commit 6e94377469
13 changed files with 124 additions and 41 deletions

View File

@ -124,7 +124,7 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
trace.BoolAttribute("messages", opts.IncludeMessages), trace.BoolAttribute("messages", opts.IncludeMessages),
) )
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) chain, err := bss.collectChainSegment(types.NewTipSetKey(req.Start...), req.RequestLength, opts)
if err != nil { if err != nil {
log.Warn("encountered error while responding to block sync request: ", err) log.Warn("encountered error while responding to block sync request: ", err)
return &BlockSyncResponse{ return &BlockSyncResponse{
@ -139,7 +139,7 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR
}, nil }, nil
} }
func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64, opts *BSOptions) ([]*BSTipSet, error) { func (bss *BlockSyncService) collectChainSegment(start types.TipSetKey, length uint64, opts *BSOptions) ([]*BSTipSet, error) {
var bstips []*BSTipSet var bstips []*BSTipSet
cur := start cur := start
for { for {

View File

@ -59,18 +59,18 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
} }
} }
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) { func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End() defer span.End()
if span.IsRecordingEvents() { if span.IsRecordingEvents() {
span.AddAttributes( span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(tipset)), trace.StringAttribute("tipset", fmt.Sprint(tsk.Cids())),
trace.Int64Attribute("count", int64(count)), trace.Int64Attribute("count", int64(count)),
) )
} }
req := &BlockSyncRequest{ req := &BlockSyncRequest{
Start: tipset, Start: tsk.Cids(),
RequestLength: uint64(count), RequestLength: uint64(count),
Options: BSOptBlocks, Options: BSOptBlocks,
} }
@ -110,11 +110,11 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr) return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
} }
func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) { func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
// TODO: round robin through these peers on error // TODO: round robin through these peers on error
req := &BlockSyncRequest{ req := &BlockSyncRequest{
Start: h, Start: tsk.Cids(),
RequestLength: 1, RequestLength: 1,
Options: BSOptBlocks | BSOptMessages, Options: BSOptBlocks | BSOptMessages,
} }
@ -275,7 +275,7 @@ func (bs *BlockSync) processBlocksResponse(req *BlockSyncRequest, res *BlockSync
return nil, err return nil, err
} }
if !types.CidArrsEqual(cur.Parents(), nts.Cids()) { if !types.CidArrsEqual(cur.Parents().Cids(), nts.Cids()) {
return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi) return nil, fmt.Errorf("parents of tipset[%d] were not tipset[%d]", bi-1, bi)
} }

View File

@ -109,7 +109,7 @@ type Provider interface {
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error) MessagesForTipset(*types.TipSet) ([]store.ChainMsg, error)
LoadTipSet(cids []cid.Cid) (*types.TipSet, error) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
} }
type mpoolProvider struct { type mpoolProvider struct {
@ -146,8 +146,8 @@ func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg,
return mpp.sm.ChainStore().MessagesForTipset(ts) return mpp.sm.ChainStore().MessagesForTipset(ts)
} }
func (mpp *mpoolProvider) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
return mpp.sm.ChainStore().LoadTipSet(cids) return mpp.sm.ChainStore().LoadTipSet(tsk)
} }
func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) { func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {

View File

@ -98,9 +98,9 @@ func (tma *testMpoolApi) MessagesForTipset(ts *types.TipSet) ([]store.ChainMsg,
return out, nil return out, nil
} }
func (tma *testMpoolApi) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { func (tma *testMpoolApi) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
for _, ts := range tma.tipsets { for _, ts := range tma.tipsets {
if types.CidArrsEqual(cids, ts.Cids()) { if types.CidArrsEqual(tsk.Cids(), ts.Cids()) {
return ts, nil return ts, nil
} }
} }

View File

@ -50,16 +50,19 @@ type ChainStore struct {
headChangeNotifs []func(rev, app []*types.TipSet) error headChangeNotifs []func(rev, app []*types.TipSet) error
mmCache *lru.ARCCache mmCache *lru.ARCCache
tsCache *lru.ARCCache
} }
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
c, _ := lru.NewARC(2048) c, _ := lru.NewARC(2048)
tsc, _ := lru.NewARC(4096)
cs := &ChainStore{ cs := &ChainStore{
bs: bs, bs: bs,
ds: ds, ds: ds,
bestTips: pubsub.New(64), bestTips: pubsub.New(64),
tipsets: make(map[uint64][]cid.Cid), tipsets: make(map[uint64][]cid.Cid),
mmCache: c, mmCache: c,
tsCache: tsc,
} }
cs.reorgCh = cs.reorgWorker(context.TODO()) cs.reorgCh = cs.reorgWorker(context.TODO())
@ -107,7 +110,7 @@ func (cs *ChainStore) Load() error {
return xerrors.Errorf("failed to unmarshal stored chain head: %w", err) return xerrors.Errorf("failed to unmarshal stored chain head: %w", err)
} }
ts, err := cs.LoadTipSet(tscids) ts, err := cs.LoadTipSet(types.NewTipSetKey(tscids...))
if err != nil { if err != nil {
return xerrors.Errorf("loading tipset: %w", err) return xerrors.Errorf("loading tipset: %w", err)
} }
@ -336,9 +339,14 @@ func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
return types.DecodeBlock(sb.RawData()) return types.DecodeBlock(sb.RawData())
} }
func (cs *ChainStore) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) { func (cs *ChainStore) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
v, ok := cs.tsCache.Get(tsk)
if ok {
return v.(*types.TipSet), nil
}
var blks []*types.BlockHeader var blks []*types.BlockHeader
for _, c := range cids { for _, c := range tsk.Cids() {
b, err := cs.GetBlock(c) b, err := cs.GetBlock(c)
if err != nil { if err != nil {
return nil, xerrors.Errorf("get block %s: %w", c, err) return nil, xerrors.Errorf("get block %s: %w", c, err)
@ -347,7 +355,14 @@ func (cs *ChainStore) LoadTipSet(cids []cid.Cid) (*types.TipSet, error) {
blks = append(blks, b) blks = append(blks, b)
} }
return types.NewTipSet(blks) ts, err := types.NewTipSet(blks)
if err != nil {
return nil, err
}
cs.tsCache.Add(tsk, ts)
return ts, nil
} }
// returns true if 'a' is an ancestor of 'b' // returns true if 'a' is an ancestor of 'b'
@ -817,7 +832,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, round i
span.AddAttributes(trace.Int64Attribute("round", round)) span.AddAttributes(trace.Int64Attribute("round", round))
for { for {
nts, err := cs.LoadTipSet(blks) nts, err := cs.LoadTipSet(types.NewTipSetKey(blks...))
if err != nil { if err != nil {
return nil, err return nil, err
} }

68
chain/store/store_test.go Normal file
View File

@ -0,0 +1,68 @@
package store_test
import (
"context"
"testing"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/repo"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
func init() {
build.SectorSizes = []uint64{1024}
build.MinimumMinerPower = 1024
}
func BenchmarkGetRandomness(b *testing.B) {
cg, err := gen.NewGenerator()
if err != nil {
b.Fatal(err)
}
var last *types.TipSet
for i := 0; i < 2000; i++ {
ts, err := cg.NextTipSet()
if err != nil {
b.Fatal(err)
}
last = ts.TipSet.TipSet()
}
r, err := cg.YieldRepo()
if err != nil {
b.Fatal(err)
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
b.Fatal(err)
}
bds, err := lr.Datastore("/blocks")
if err != nil {
b.Fatal(err)
}
mds, err := lr.Datastore("/metadata")
if err != nil {
b.Fatal(err)
}
bs := blockstore.NewBlockstore(bds)
cs := store.NewChainStore(bs, mds)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := cs.GetRandomness(context.TODO(), last.Cids(), 500)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -329,16 +329,16 @@ func computeMsgMeta(bs amt.Blocks, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.
return mrcid, nil return mrcid, nil
} }
func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, cids []cid.Cid) (*store.FullTipSet, error) { func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
if fts, err := syncer.tryLoadFullTipSet(cids); err == nil { if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil {
return fts, nil return fts, nil
} }
return syncer.Bsync.GetFullTipSet(ctx, p, cids) return syncer.Bsync.GetFullTipSet(ctx, p, tsk)
} }
func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, error) { func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) {
ts, err := syncer.store.LoadTipSet(cids) ts, err := syncer.store.LoadTipSet(tsk)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -469,7 +469,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
h := b.Header h := b.Header
baseTs, err := syncer.store.LoadTipSet(h.Parents) baseTs, err := syncer.store.LoadTipSet(types.NewTipSetKey(h.Parents...))
if err != nil { if err != nil {
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err) return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
} }
@ -828,7 +828,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
trace.Int64Attribute("toHeight", int64(to.Height())), trace.Int64Attribute("toHeight", int64(to.Height())),
) )
for _, pcid := range from.Parents() { for _, pcid := range from.Parents().Cids() {
if syncer.bad.Has(pcid) { if syncer.bad.Has(pcid) {
for _, b := range from.Cids() { for _, b := range from.Cids() {
syncer.bad.Add(b) syncer.bad.Add(b)
@ -850,7 +850,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
loop: loop:
for blockSet[len(blockSet)-1].Height() > untilHeight { for blockSet[len(blockSet)-1].Height() > untilHeight {
for _, bc := range at { for _, bc := range at.Cids() {
if syncer.bad.Has(bc) { if syncer.bad.Has(bc) {
for _, b := range acceptedBlocks { for _, b := range acceptedBlocks {
syncer.bad.Add(b) syncer.bad.Add(b)
@ -863,7 +863,7 @@ loop:
// If, for some reason, we have a suffix of the chain locally, handle that here // If, for some reason, we have a suffix of the chain locally, handle that here
ts, err := syncer.store.LoadTipSet(at) ts, err := syncer.store.LoadTipSet(at)
if err == nil { if err == nil {
acceptedBlocks = append(acceptedBlocks, at...) acceptedBlocks = append(acceptedBlocks, at.Cids()...)
blockSet = append(blockSet, ts) blockSet = append(blockSet, ts)
at = ts.Parents() at = ts.Parents()
@ -910,16 +910,16 @@ loop:
blockSet = append(blockSet, b) blockSet = append(blockSet, b)
} }
acceptedBlocks = append(acceptedBlocks, at...) acceptedBlocks = append(acceptedBlocks, at.Cids()...)
ss.SetHeight(blks[len(blks)-1].Height()) ss.SetHeight(blks[len(blks)-1].Height())
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
// We have now ascertained that this is *not* a 'fast forward' // We have now ascertained that this is *not* a 'fast forward'
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents(), to.Cids()) { if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) {
last := blockSet[len(blockSet)-1] last := blockSet[len(blockSet)-1]
if types.CidArrsEqual(last.Parents(), to.Parents()) { if last.Parents() == to.Parents() {
// common case: receiving a block thats potentially part of the same tipset as our best block // common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet, nil return blockSet, nil
} }

View File

@ -197,10 +197,10 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
if ts.Equals(t) { if ts.Equals(t) {
return true return true
} }
if types.CidArrsEqual(ts.Cids(), t.Parents()) { if ts.Key() == t.Parents() {
return true return true
} }
if types.CidArrsEqual(ts.Parents(), t.Cids()) { if ts.Parents() == t.Key() {
return true return true
} }
} }
@ -293,7 +293,7 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
break break
} }
if types.CidArrsEqual(ts.Parents(), acts.Cids()) { if ts.Parents() == acts.Key() {
// sync this next, after that sync process finishes // sync this next, after that sync process finishes
relatedToActiveSync = true relatedToActiveSync = true
} }

View File

@ -136,8 +136,8 @@ func (ts *TipSet) Height() uint64 {
return ts.height return ts.height
} }
func (ts *TipSet) Parents() []cid.Cid { func (ts *TipSet) Parents() TipSetKey {
return ts.blks[0].Parents return NewTipSetKey(ts.blks[0].Parents...)
} }
func (ts *TipSet) Blocks() []*BlockHeader { func (ts *TipSet) Blocks() []*BlockHeader {

View File

@ -319,7 +319,7 @@ var chainListCmd = &cli.Command{
break break
} }
head, err = api.ChainGetTipSet(ctx, types.NewTipSetKey(head.Parents()...)) head, err = api.ChainGetTipSet(ctx, head.Parents())
if err != nil { if err != nil {
return err return err
} }

View File

@ -92,7 +92,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
} }
}() }()
ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), hmsg.HeaviestTipSet) ts, err := hs.syncer.FetchTipSet(context.Background(), s.Conn().RemotePeer(), types.NewTipSetKey(hmsg.HeaviestTipSet...))
if err != nil { if err != nil {
log.Errorf("failed to fetch tipset from peer during hello: %s", err) log.Errorf("failed to fetch tipset from peer during hello: %s", err)
return return

View File

@ -37,7 +37,7 @@ func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Block
} }
func (a *ChainAPI) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { func (a *ChainAPI) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
return a.Chain.LoadTipSet(key.Cids()) return a.Chain.LoadTipSet(key)
} }
func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) {
@ -80,7 +80,7 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]
} }
// TODO: need to get the number of messages better than this // TODO: need to get the number of messages better than this
pts, err := a.Chain.LoadTipSet(b.Parents) pts, err := a.Chain.LoadTipSet(types.NewTipSetKey(b.Parents...))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -112,7 +112,7 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
} }
// TODO: need to get the number of messages better than this // TODO: need to get the number of messages better than this
pts, err := a.Chain.LoadTipSet(b.Parents) pts, err := a.Chain.LoadTipSet(types.NewTipSetKey(b.Parents...))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -134,7 +134,7 @@ func loadTipsets(ctx context.Context, api api.FullNode, curr *types.TipSet, lowe
log.Printf("Walking back { height:%d }", curr.Height()) log.Printf("Walking back { height:%d }", curr.Height())
tipsets = append(tipsets, curr) tipsets = append(tipsets, curr)
tsk := types.NewTipSetKey(curr.Parents()...) tsk := curr.Parents()
prev, err := api.ChainGetTipSet(ctx, tsk) prev, err := api.ChainGetTipSet(ctx, tsk)
if err != nil { if err != nil {
return tipsets, err return tipsets, err