Merge pull request #4246 from filecoin-project/feat/optim-search-msg
Optimize SearchForMessage and GetReceipt
This commit is contained in:
commit
efb13a0d9b
@ -2,6 +2,7 @@ package stmgr
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@ -507,16 +508,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
|
|||||||
return nil, fmt.Errorf("failed to load message: %w", err)
|
return nil, fmt.Errorf("failed to load message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, _, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage())
|
_, r, _, err := sm.searchBackForMsg(ctx, ts, m)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if r != nil {
|
|
||||||
return r, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, r, _, err = sm.searchBackForMsg(ctx, ts, m)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
|
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
|
||||||
}
|
}
|
||||||
@ -674,6 +666,18 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*ty
|
|||||||
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
||||||
|
|
||||||
cur := from
|
cur := from
|
||||||
|
curActor, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, cid.Undef, xerrors.Errorf("failed to load initital tipset")
|
||||||
|
}
|
||||||
|
|
||||||
|
mFromId, err := sm.LookupID(ctx, m.VMMessage().From, from)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, cid.Undef, xerrors.Errorf("looking up From id address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mNonce := m.VMMessage().Nonce
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if cur.Height() == 0 {
|
if cur.Height() == 0 {
|
||||||
// it ain't here!
|
// it ain't here!
|
||||||
@ -686,32 +690,37 @@ func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
act, err := sm.LoadActor(ctx, m.VMMessage().From, cur)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, cid.Cid{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for,
|
// we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for,
|
||||||
// either way, no reason to lookback, it ain't there
|
// either way, no reason to lookback, it ain't there
|
||||||
if act.Nonce == 0 || act.Nonce < m.VMMessage().Nonce {
|
if curActor == nil || curActor.Nonce == 0 || curActor.Nonce < mNonce {
|
||||||
return nil, nil, cid.Undef, nil
|
return nil, nil, cid.Undef, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ts, err := sm.cs.LoadTipSet(cur.Parents())
|
pts, err := sm.cs.LoadTipSet(cur.Parents())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, cid.Undef, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
|
return nil, nil, cid.Undef, xerrors.Errorf("failed to load tipset during msg wait searchback: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r, foundMsg, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage())
|
act, err := sm.LoadActor(ctx, mFromId, pts)
|
||||||
|
actorNoExist := errors.Is(err, types.ErrActorNotFound)
|
||||||
|
if err != nil && !actorNoExist {
|
||||||
|
return nil, nil, cid.Cid{}, xerrors.Errorf("failed to load the actor: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that between cur and parent tipset the nonce fell into range of our message
|
||||||
|
if actorNoExist || (curActor.Nonce > mNonce && act.Nonce <= mNonce) {
|
||||||
|
r, foundMsg, err := sm.tipsetExecutedMessage(cur, m.Cid(), m.VMMessage())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, cid.Undef, fmt.Errorf("checking for message execution during lookback: %w", err)
|
return nil, nil, cid.Undef, xerrors.Errorf("checking for message execution during lookback: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if r != nil {
|
if r != nil {
|
||||||
return ts, r, foundMsg, nil
|
return pts, r, foundMsg, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cur = ts
|
cur = pts
|
||||||
|
curActor = act
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,25 +172,24 @@ func fetchCids(
|
|||||||
cids []cid.Cid,
|
cids []cid.Cid,
|
||||||
cb func(int, blocks.Block) error,
|
cb func(int, blocks.Block) error,
|
||||||
) error {
|
) error {
|
||||||
fetchedBlocks := bserv.GetBlocks(ctx, cids)
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
cidIndex := make(map[cid.Cid]int)
|
cidIndex := make(map[cid.Cid]int)
|
||||||
for i, c := range cids {
|
for i, c := range cids {
|
||||||
cidIndex[c] = i
|
cidIndex[c] = i
|
||||||
}
|
}
|
||||||
|
if len(cids) != len(cidIndex) {
|
||||||
|
return fmt.Errorf("duplicate CIDs in fetchCids input")
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchedBlocks := bserv.GetBlocks(ctx, cids)
|
||||||
|
|
||||||
for i := 0; i < len(cids); i++ {
|
for i := 0; i < len(cids); i++ {
|
||||||
select {
|
select {
|
||||||
case block, ok := <-fetchedBlocks:
|
case block, ok := <-fetchedBlocks:
|
||||||
if !ok {
|
if !ok {
|
||||||
// Closed channel, no more blocks fetched, check if we have all
|
|
||||||
// of the CIDs requested.
|
|
||||||
// FIXME: Review this check. We don't call the callback on the
|
|
||||||
// last index?
|
|
||||||
if i == len(cids)-1 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Errorf("failed to fetch all messages")
|
return fmt.Errorf("failed to fetch all messages")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
63
chain/sub/incoming_test.go
Normal file
63
chain/sub/incoming_test.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package sub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
address "github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type getter struct {
|
||||||
|
msgs []*types.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *getter) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { panic("NYI") }
|
||||||
|
|
||||||
|
func (g *getter) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
|
||||||
|
ch := make(chan blocks.Block, len(g.msgs))
|
||||||
|
for _, m := range g.msgs {
|
||||||
|
by, err := m.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
b, err := blocks.NewBlockWithCid(by, m.Cid())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
ch <- b
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFetchCidsWithDedup(t *testing.T) {
|
||||||
|
msgs := []*types.Message{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
msgs = append(msgs, &types.Message{
|
||||||
|
From: address.TestAddress,
|
||||||
|
To: address.TestAddress,
|
||||||
|
|
||||||
|
Nonce: uint64(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
cids := []cid.Cid{}
|
||||||
|
for _, m := range msgs {
|
||||||
|
cids = append(cids, m.Cid())
|
||||||
|
}
|
||||||
|
g := &getter{msgs}
|
||||||
|
|
||||||
|
// the cids have a duplicate
|
||||||
|
res, err := FetchMessagesByCids(context.TODO(), g, append(cids, cids[0]))
|
||||||
|
|
||||||
|
t.Logf("err: %+v", err)
|
||||||
|
t.Logf("res: %+v", res)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("there should be an error")
|
||||||
|
}
|
||||||
|
if err == nil && (res[0] == nil || res[len(res)-1] == nil) {
|
||||||
|
t.Fatalf("there is a nil message: first %p, last %p", res[0], res[len(res)-1])
|
||||||
|
}
|
||||||
|
}
|
@ -217,6 +217,12 @@ func (syncer *Syncer) Stop() {
|
|||||||
// This should be called when connecting to new peers, and additionally
|
// This should be called when connecting to new peers, and additionally
|
||||||
// when receiving new blocks from the network
|
// when receiving new blocks from the network
|
||||||
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
|
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err != nil {
|
||||||
|
log.Errorf("panic in InformNewHead: ", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
if fts == nil {
|
if fts == nil {
|
||||||
log.Errorf("got nil tipset in InformNewHead")
|
log.Errorf("got nil tipset in InformNewHead")
|
||||||
@ -1281,9 +1287,11 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet
|
|||||||
|
|
||||||
blockSet := []*types.TipSet{incoming}
|
blockSet := []*types.TipSet{incoming}
|
||||||
|
|
||||||
|
// Parent of the new (possibly better) tipset that we need to fetch next.
|
||||||
at := incoming.Parents()
|
at := incoming.Parents()
|
||||||
|
|
||||||
// we want to sync all the blocks until the height above the block we have
|
// we want to sync all the blocks until the height above our
|
||||||
|
// best tipset so far
|
||||||
untilHeight := known.Height() + 1
|
untilHeight := known.Height() + 1
|
||||||
|
|
||||||
ss.SetHeight(blockSet[len(blockSet)-1].Height())
|
ss.SetHeight(blockSet[len(blockSet)-1].Height())
|
||||||
@ -1377,13 +1385,17 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
base := blockSet[len(blockSet)-1]
|
base := blockSet[len(blockSet)-1]
|
||||||
if base.Parents() == known.Parents() {
|
if base.IsChildOf(known) {
|
||||||
// common case: receiving a block thats potentially part of the same tipset as our best block
|
// common case: receiving blocks that are building on top of our best tipset
|
||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) {
|
knownParent, err := syncer.store.LoadTipSet(known.Parents())
|
||||||
// common case: receiving blocks that are building on top of our best tipset
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
|
||||||
|
}
|
||||||
|
if base.IsChildOf(knownParent) {
|
||||||
|
// common case: receiving a block thats potentially part of the same tipset as our best block
|
||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user