Merge pull request #3494 from filecoin-project/feat/faster-blocksync-serving
load fewer messages from disk when serving blocksync requests
This commit is contained in:
commit
d552803f56
@ -221,37 +221,36 @@ func collectChainSegment(
|
|||||||
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
|
||||||
blsmsgmap := make(map[cid.Cid]uint64)
|
blsmsgmap := make(map[cid.Cid]uint64)
|
||||||
secpkmsgmap := make(map[cid.Cid]uint64)
|
secpkmsgmap := make(map[cid.Cid]uint64)
|
||||||
var secpkmsgs []*types.SignedMessage
|
|
||||||
var blsmsgs []*types.Message
|
|
||||||
var secpkincl, blsincl [][]uint64
|
var secpkincl, blsincl [][]uint64
|
||||||
|
|
||||||
|
var blscids, secpkcids []cid.Cid
|
||||||
for _, block := range ts.Blocks() {
|
for _, block := range ts.Blocks() {
|
||||||
bmsgs, smsgs, err := cs.MessagesForBlock(block)
|
bc, sc, err := cs.ReadMsgMetaCids(block.Messages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: DRY. Use `chain.Message` interface.
|
// FIXME: DRY. Use `chain.Message` interface.
|
||||||
bmi := make([]uint64, 0, len(bmsgs))
|
bmi := make([]uint64, 0, len(bc))
|
||||||
for _, m := range bmsgs {
|
for _, m := range bc {
|
||||||
i, ok := blsmsgmap[m.Cid()]
|
i, ok := blsmsgmap[m]
|
||||||
if !ok {
|
if !ok {
|
||||||
i = uint64(len(blsmsgs))
|
i = uint64(len(blscids))
|
||||||
blsmsgs = append(blsmsgs, m)
|
blscids = append(blscids, m)
|
||||||
blsmsgmap[m.Cid()] = i
|
blsmsgmap[m] = i
|
||||||
}
|
}
|
||||||
|
|
||||||
bmi = append(bmi, i)
|
bmi = append(bmi, i)
|
||||||
}
|
}
|
||||||
blsincl = append(blsincl, bmi)
|
blsincl = append(blsincl, bmi)
|
||||||
|
|
||||||
smi := make([]uint64, 0, len(smsgs))
|
smi := make([]uint64, 0, len(sc))
|
||||||
for _, m := range smsgs {
|
for _, m := range sc {
|
||||||
i, ok := secpkmsgmap[m.Cid()]
|
i, ok := secpkmsgmap[m]
|
||||||
if !ok {
|
if !ok {
|
||||||
i = uint64(len(secpkmsgs))
|
i = uint64(len(secpkcids))
|
||||||
secpkmsgs = append(secpkmsgs, m)
|
secpkcids = append(secpkcids, m)
|
||||||
secpkmsgmap[m.Cid()] = i
|
secpkmsgmap[m] = i
|
||||||
}
|
}
|
||||||
|
|
||||||
smi = append(smi, i)
|
smi = append(smi, i)
|
||||||
@ -259,5 +258,15 @@ func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [
|
|||||||
secpkincl = append(secpkincl, smi)
|
secpkincl = append(secpkincl, smi)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blsmsgs, err := cs.LoadMessagesFromCids(blscids)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,7 @@ var chainHeadKey = dstore.NewKey("head")
|
|||||||
var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
|
var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
|
||||||
|
|
||||||
var DefaultTipSetCacheSize = 8192
|
var DefaultTipSetCacheSize = 8192
|
||||||
|
var DefaultMsgMetaCacheSize = 2048
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
|
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
|
||||||
@ -58,6 +59,14 @@ func init() {
|
|||||||
}
|
}
|
||||||
DefaultTipSetCacheSize = tscs
|
DefaultTipSetCacheSize = tscs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s := os.Getenv("LOTUS_CHAIN_MSGMETA_CACHE"); s != "" {
|
||||||
|
mmcs, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to parse 'LOTUS_CHAIN_MSGMETA_CACHE' env var: %s", err)
|
||||||
|
}
|
||||||
|
DefaultMsgMetaCacheSize = mmcs
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReorgNotifee represents a callback that gets called upon reorgs.
|
// ReorgNotifee represents a callback that gets called upon reorgs.
|
||||||
@ -97,7 +106,7 @@ type ChainStore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
|
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
|
||||||
c, _ := lru.NewARC(2048)
|
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||||
cs := &ChainStore{
|
cs := &ChainStore{
|
||||||
bs: bs,
|
bs: bs,
|
||||||
@ -834,7 +843,7 @@ type mmCids struct {
|
|||||||
secpk []cid.Cid
|
secpk []cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
||||||
o, ok := cs.mmCache.Get(mmc)
|
o, ok := cs.mmCache.Get(mmc)
|
||||||
if ok {
|
if ok {
|
||||||
mmcids := o.(*mmCids)
|
mmcids := o.(*mmCids)
|
||||||
@ -890,7 +899,7 @@ func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to type
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
blscids, secpkcids, err := cs.readMsgMetaCids(b.Messages)
|
blscids, secpkcids, err := cs.ReadMsgMetaCids(b.Messages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user