Optimize chain and message sync

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-10-08 22:49:36 +02:00
parent 60768f4863
commit 973f61bc10
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
3 changed files with 88 additions and 14 deletions

View File

@ -172,25 +172,24 @@ func fetchCids(
cids []cid.Cid, cids []cid.Cid,
cb func(int, blocks.Block) error, cb func(int, blocks.Block) error,
) error { ) error {
fetchedBlocks := bserv.GetBlocks(ctx, cids)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cidIndex := make(map[cid.Cid]int) cidIndex := make(map[cid.Cid]int)
for i, c := range cids { for i, c := range cids {
cidIndex[c] = i cidIndex[c] = i
} }
if len(cids) != len(cidIndex) {
return fmt.Errorf("duplicate CIDs in fetchCids input")
}
fetchedBlocks := bserv.GetBlocks(ctx, cids)
for i := 0; i < len(cids); i++ { for i := 0; i < len(cids); i++ {
select { select {
case block, ok := <-fetchedBlocks: case block, ok := <-fetchedBlocks:
if !ok { if !ok {
// Closed channel, no more blocks fetched, check if we have all
// of the CIDs requested.
// FIXME: Review this check. We don't call the callback on the
// last index?
if i == len(cids)-1 {
break
}
return fmt.Errorf("failed to fetch all messages") return fmt.Errorf("failed to fetch all messages")
} }

View File

@ -0,0 +1,63 @@
package sub
import (
"context"
"testing"
address "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
type getter struct {
msgs []*types.Message
}
func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") }
func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ch := make(chan blocks.Block, len(g.msgs))
for _, m := range g.msgs {
by, err := m.Serialize()
if err != nil {
panic(err)
}
b, err := blocks.NewBlockWithCid(by, m.Cid())
if err != nil {
panic(err)
}
ch <- b
}
close(ch)
return ch
}
func TestFetchCidsWithDedup(t *testing.T) {
msgs := []*types.Message{}
for i := 0; i < 10; i++ {
msgs = append(msgs, &types.Message{
From: address.TestAddress,
To: address.TestAddress,
Nonce: uint64(i),
})
}
cids := []cid.Cid{}
for _, m := range msgs {
cids = append(cids, m.Cid())
}
g := &getter{msgs}
// the cids have a duplicate
res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0]))
t.Logf("err: %+v", err)
t.Logf("res: %+v", res)
if err == nil {
t.Errorf("there should be an error")
}
if err == nil && (res[0] == nil || res[len(res)-1] == nil) {
t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1])
}
}

View File

@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() {
// This should be called when connecting to new peers, and additionally // This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network // when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic in InformNewHead: ", err)
}
}()
ctx := context.Background() ctx := context.Background()
if fts == nil { if fts == nil {
log.Errorf("got nil tipset in InformNewHead") log.Errorf("got nil tipset in InformNewHead")
@ -1281,9 +1287,11 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet
blockSet := []*types.TipSet{incoming} blockSet := []*types.TipSet{incoming}
// Parent of the new (possibly better) tipset that we need to fetch next.
at := incoming.Parents() at := incoming.Parents()
// we want to sync all the blocks until the height above the block we have // we want to sync all the blocks until the height above our
// best tipset so far
untilHeight := known.Height() + 1 untilHeight := known.Height() + 1
ss.SetHeight(blockSet[len(blockSet)-1].Height()) ss.SetHeight(blockSet[len(blockSet)-1].Height())
@ -1377,13 +1385,17 @@ loop:
} }
base := blockSet[len(blockSet)-1] base := blockSet[len(blockSet)-1]
if base.Parents() == known.Parents() { if base.IsChildOf(known) {
// common case: receiving a block thats potentially part of the same tipset as our best block // common case: receiving blocks that are building on top of our best tipset
return blockSet, nil return blockSet, nil
} }
if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { knownParent, err := syncer.store.LoadTipSet(known.Parents())
// common case: receiving blocks that are building on top of our best tipset if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
if base.IsChildOf(knownParent) {
// common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet, nil return blockSet, nil
} }