Merge pull request #3276 from filecoin-project/fix/mpool-select-head-change
more correctly handle discrepancies between mempools head and chainhead
This commit is contained in:
		
						commit
						09bcc7aa52
					
				| @ -27,6 +27,7 @@ import ( | |||||||
| 
 | 
 | ||||||
| 	"github.com/filecoin-project/lotus/api" | 	"github.com/filecoin-project/lotus/api" | ||||||
| 	"github.com/filecoin-project/lotus/build" | 	"github.com/filecoin-project/lotus/build" | ||||||
|  | 	"github.com/filecoin-project/lotus/chain/store" | ||||||
| 	"github.com/filecoin-project/lotus/chain/types" | 	"github.com/filecoin-project/lotus/chain/types" | ||||||
| 	"github.com/filecoin-project/lotus/chain/vm" | 	"github.com/filecoin-project/lotus/chain/vm" | ||||||
| 	"github.com/filecoin-project/lotus/lib/sigs" | 	"github.com/filecoin-project/lotus/lib/sigs" | ||||||
| @ -317,8 +318,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { | |||||||
| 		return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) | 		return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	// Perform syntaxtic validation, minGas=0 as we check if correctly in select messages
 | ||||||
| 	if err := m.Message.ValidForBlockInclusion(0); err != nil { | 	if err := m.Message.ValidForBlockInclusion(0); err != nil { | ||||||
| 		return xerrors.Errorf("message not valid for block inclusion: %d", err) | 		return xerrors.Errorf("message not valid for block inclusion: %w", err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if m.Message.To == address.Undef { | 	if m.Message.To == address.Undef { | ||||||
| @ -503,42 +505,16 @@ func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { | func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { | ||||||
| 	// TODO: this method probably should be cached
 | 	act, err := mp.api.GetActorAfter(addr, curTs) | ||||||
| 
 |  | ||||||
| 	act, err := mp.api.StateGetActor(addr, curTs) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, err | 		return 0, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	baseNonce := act.Nonce | 	return act.Nonce, nil | ||||||
| 
 |  | ||||||
| 	// TODO: the correct thing to do here is probably to set curTs to chain.head
 |  | ||||||
| 	// but since we have an accurate view of the world until a head change occurs,
 |  | ||||||
| 	// this should be fine
 |  | ||||||
| 	if curTs == nil { |  | ||||||
| 		return baseNonce, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	msgs, err := mp.api.MessagesForTipset(curTs) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	for _, m := range msgs { |  | ||||||
| 		msg := m.VMMessage() |  | ||||||
| 		if msg.From == addr { |  | ||||||
| 			if msg.Nonce != baseNonce { |  | ||||||
| 				return 0, xerrors.Errorf("tipset %s has bad nonce ordering (%d != %d)", curTs.Cids(), msg.Nonce, baseNonce) |  | ||||||
| 			} |  | ||||||
| 			baseNonce++ |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return baseNonce, nil |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) { | func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) { | ||||||
| 	act, err := mp.api.StateGetActor(addr, ts) | 	act, err := mp.api.GetActorAfter(addr, ts) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return types.EmptyInt, err | 		return types.EmptyInt, err | ||||||
| 	} | 	} | ||||||
| @ -830,7 +806,8 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) | |||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		for a, bkt := range buckets { | 		for a, bkt := range buckets { | ||||||
| 			act, err := mp.api.StateGetActor(a, ts) | 			// TODO that might not be correct with GatActorAfter but it is only debug code
 | ||||||
|  | 			act, err := mp.api.GetActorAfter(a, ts) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Debugf("%s, err: %s\n", a, err) | 				log.Debugf("%s, err: %s\n", a, err) | ||||||
| 				continue | 				continue | ||||||
| @ -880,6 +857,73 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) | |||||||
| 	return merr | 	return merr | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs map[address.Address]map[uint64]*types.SignedMessage) error { | ||||||
|  | 	add := func(m *types.SignedMessage) { | ||||||
|  | 		s, ok := rmsgs[m.Message.From] | ||||||
|  | 		if !ok { | ||||||
|  | 			s = make(map[uint64]*types.SignedMessage) | ||||||
|  | 			rmsgs[m.Message.From] = s | ||||||
|  | 		} | ||||||
|  | 		s[m.Message.Nonce] = m | ||||||
|  | 	} | ||||||
|  | 	rm := func(from address.Address, nonce uint64) { | ||||||
|  | 		s, ok := rmsgs[from] | ||||||
|  | 		if !ok { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if _, ok := s[nonce]; ok { | ||||||
|  | 			delete(s, nonce) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	revert, apply, err := store.ReorgOps(mp.api.LoadTipSet, from, to) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return xerrors.Errorf("failed to compute reorg ops for mpool pending messages: %w", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	var merr error | ||||||
|  | 
 | ||||||
|  | 	for _, ts := range revert { | ||||||
|  | 		msgs, err := mp.MessagesForBlocks(ts.Blocks()) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Errorf("error retrieving messages for reverted block: %s", err) | ||||||
|  | 			merr = multierror.Append(merr, err) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		for _, msg := range msgs { | ||||||
|  | 			add(msg) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for _, ts := range apply { | ||||||
|  | 		mp.curTs = ts | ||||||
|  | 
 | ||||||
|  | 		for _, b := range ts.Blocks() { | ||||||
|  | 			bmsgs, smsgs, err := mp.api.MessagesForBlock(b) | ||||||
|  | 			if err != nil { | ||||||
|  | 				xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) | ||||||
|  | 				log.Errorf("error retrieving messages for block: %s", xerr) | ||||||
|  | 				merr = multierror.Append(merr, xerr) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			for _, msg := range smsgs { | ||||||
|  | 				rm(msg.Message.From, msg.Message.Nonce) | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			for _, msg := range bmsgs { | ||||||
|  | 				rm(msg.From, msg.Nonce) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return merr | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type statBucket struct { | type statBucket struct { | ||||||
| 	msgs map[uint64]*types.SignedMessage | 	msgs map[uint64]*types.SignedMessage | ||||||
| } | } | ||||||
|  | |||||||
| @ -3,6 +3,7 @@ package messagepool | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"sort" | ||||||
| 	"testing" | 	"testing" | ||||||
| 
 | 
 | ||||||
| 	"github.com/filecoin-project/go-address" | 	"github.com/filecoin-project/go-address" | ||||||
| @ -33,11 +34,20 @@ type testMpoolAPI struct { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func newTestMpoolAPI() *testMpoolAPI { | func newTestMpoolAPI() *testMpoolAPI { | ||||||
| 	return &testMpoolAPI{ | 	tma := &testMpoolAPI{ | ||||||
| 		bmsgs:      make(map[cid.Cid][]*types.SignedMessage), | 		bmsgs:      make(map[cid.Cid][]*types.SignedMessage), | ||||||
| 		statenonce: make(map[address.Address]uint64), | 		statenonce: make(map[address.Address]uint64), | ||||||
| 		balance:    make(map[address.Address]types.BigInt), | 		balance:    make(map[address.Address]types.BigInt), | ||||||
| 	} | 	} | ||||||
|  | 	genesis := mock.MkBlock(nil, 1, 1) | ||||||
|  | 	tma.tipsets = append(tma.tipsets, mock.TipSet(genesis)) | ||||||
|  | 	return tma | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (tma *testMpoolAPI) nextBlock() *types.BlockHeader { | ||||||
|  | 	newBlk := mock.MkBlock(tma.tipsets[len(tma.tipsets)-1], 1, 1) | ||||||
|  | 	tma.tipsets = append(tma.tipsets, mock.TipSet(newBlk)) | ||||||
|  | 	return newBlk | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (tma *testMpoolAPI) applyBlock(t *testing.T, b *types.BlockHeader) { | func (tma *testMpoolAPI) applyBlock(t *testing.T, b *types.BlockHeader) { | ||||||
| @ -68,12 +78,11 @@ func (tma *testMpoolAPI) setBalanceRaw(addr address.Address, v types.BigInt) { | |||||||
| 
 | 
 | ||||||
| func (tma *testMpoolAPI) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) { | func (tma *testMpoolAPI) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) { | ||||||
| 	tma.bmsgs[h.Cid()] = msgs | 	tma.bmsgs[h.Cid()] = msgs | ||||||
| 	tma.tipsets = append(tma.tipsets, mock.TipSet(h)) |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { | func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { | ||||||
| 	tma.cb = cb | 	tma.cb = cb | ||||||
| 	return nil | 	return tma.tipsets[0] | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { | func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { | ||||||
| @ -84,15 +93,38 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (tma *testMpoolAPI) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { | func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { | ||||||
| 	balance, ok := tma.balance[addr] | 	balance, ok := tma.balance[addr] | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		balance = types.NewInt(1000e6) | 		balance = types.NewInt(1000e6) | ||||||
| 		tma.balance[addr] = balance | 		tma.balance[addr] = balance | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	msgs := make([]*types.SignedMessage, 0) | ||||||
|  | 	for _, b := range ts.Blocks() { | ||||||
|  | 		for _, m := range tma.bmsgs[b.Cid()] { | ||||||
|  | 			if m.Message.From == addr { | ||||||
|  | 				msgs = append(msgs, m) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	sort.Slice(msgs, func(i, j int) bool { | ||||||
|  | 		return msgs[i].Message.Nonce < msgs[j].Message.Nonce | ||||||
|  | 	}) | ||||||
|  | 
 | ||||||
|  | 	nonce := tma.statenonce[addr] | ||||||
|  | 
 | ||||||
|  | 	for _, m := range msgs { | ||||||
|  | 		if m.Message.Nonce != nonce { | ||||||
|  | 			break | ||||||
|  | 		} | ||||||
|  | 		nonce++ | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	return &types.Actor{ | 	return &types.Actor{ | ||||||
| 		Code:    builtin.StorageMarketActorCodeID, | 		Code:    builtin.StorageMarketActorCodeID, | ||||||
| 		Nonce:   tma.statenonce[addr], | 		Nonce:   nonce, | ||||||
| 		Balance: balance, | 		Balance: balance, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
| @ -178,7 +210,7 @@ func TestMessagePool(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	a := mock.MkBlock(nil, 1, 1) | 	a := tma.nextBlock() | ||||||
| 
 | 
 | ||||||
| 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -204,6 +236,50 @@ func TestMessagePool(t *testing.T) { | |||||||
| 	assertNonce(t, mp, sender, 2) | 	assertNonce(t, mp, sender, 2) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestMessagePoolMessagesInEachBlock(t *testing.T) { | ||||||
|  | 	tma := newTestMpoolAPI() | ||||||
|  | 
 | ||||||
|  | 	w, err := wallet.NewWallet(wallet.NewMemKeyStore()) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	ds := datastore.NewMapDatastore() | ||||||
|  | 
 | ||||||
|  | 	mp, err := New(tma, ds, "mptest") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	a := tma.nextBlock() | ||||||
|  | 
 | ||||||
|  | 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 	target := mock.Address(1001) | ||||||
|  | 
 | ||||||
|  | 	var msgs []*types.SignedMessage | ||||||
|  | 	for i := 0; i < 5; i++ { | ||||||
|  | 		m := mock.MkMessage(sender, target, uint64(i), w) | ||||||
|  | 		msgs = append(msgs, m) | ||||||
|  | 		mustAdd(t, mp, m) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	tma.setStateNonce(sender, 0) | ||||||
|  | 
 | ||||||
|  | 	tma.setBlockMessages(a, msgs[0], msgs[1]) | ||||||
|  | 	tma.applyBlock(t, a) | ||||||
|  | 	tsa := mock.TipSet(a) | ||||||
|  | 
 | ||||||
|  | 	_, _ = mp.Pending() | ||||||
|  | 
 | ||||||
|  | 	selm, _ := mp.SelectMessages(tsa, 1) | ||||||
|  | 	if len(selm) == 0 { | ||||||
|  | 		t.Fatal("should have returned the rest of the messages") | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestRevertMessages(t *testing.T) { | func TestRevertMessages(t *testing.T) { | ||||||
| 	tma := newTestMpoolAPI() | 	tma := newTestMpoolAPI() | ||||||
| 
 | 
 | ||||||
| @ -219,8 +295,8 @@ func TestRevertMessages(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	a := mock.MkBlock(nil, 1, 1) | 	a := tma.nextBlock() | ||||||
| 	b := mock.MkBlock(mock.TipSet(a), 1, 1) | 	b := tma.nextBlock() | ||||||
| 
 | 
 | ||||||
| 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| @ -254,6 +330,7 @@ func TestRevertMessages(t *testing.T) { | |||||||
| 	assertNonce(t, mp, sender, 4) | 	assertNonce(t, mp, sender, 4) | ||||||
| 
 | 
 | ||||||
| 	p, _ := mp.Pending() | 	p, _ := mp.Pending() | ||||||
|  | 	fmt.Printf("%+v\n", p) | ||||||
| 	if len(p) != 3 { | 	if len(p) != 3 { | ||||||
| 		t.Fatal("expected three messages in mempool") | 		t.Fatal("expected three messages in mempool") | ||||||
| 	} | 	} | ||||||
| @ -275,7 +352,7 @@ func TestPruningSimple(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	a := mock.MkBlock(nil, 1, 1) | 	a := tma.nextBlock() | ||||||
| 	tma.applyBlock(t, a) | 	tma.applyBlock(t, a) | ||||||
| 
 | 
 | ||||||
| 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | 	sender, err := w.GenerateKey(crypto.SigTypeBLS) | ||||||
|  | |||||||
| @ -16,7 +16,7 @@ type Provider interface { | |||||||
| 	SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet | 	SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet | ||||||
| 	PutMessage(m types.ChainMsg) (cid.Cid, error) | 	PutMessage(m types.ChainMsg) (cid.Cid, error) | ||||||
| 	PubSubPublish(string, []byte) error | 	PubSubPublish(string, []byte) error | ||||||
| 	StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) | 	GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error) | ||||||
| 	StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) | 	StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) | ||||||
| 	MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) | 	MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) | ||||||
| 	MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) | 	MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) | ||||||
| @ -46,9 +46,14 @@ func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { | |||||||
| 	return mpp.ps.Publish(k, v) //nolint
 | 	return mpp.ps.Publish(k, v) //nolint
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { | func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { | ||||||
| 	var act types.Actor | 	var act types.Actor | ||||||
| 	return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) | 	stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, xerrors.Errorf("computing tipset state for GetActor: %w", err) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return &act, mpp.sm.WithStateTree(stcid, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { | func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { | ||||||
|  | |||||||
| @ -14,7 +14,6 @@ import ( | |||||||
| 	"github.com/filecoin-project/lotus/chain/types" | 	"github.com/filecoin-project/lotus/chain/types" | ||||||
| 	"github.com/filecoin-project/lotus/chain/vm" | 	"github.com/filecoin-project/lotus/chain/vm" | ||||||
| 	abig "github.com/filecoin-project/specs-actors/actors/abi/big" | 	abig "github.com/filecoin-project/specs-actors/actors/abi/big" | ||||||
| 	"github.com/ipfs/go-cid" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) | var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) | ||||||
| @ -528,7 +527,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. | |||||||
| 	start := time.Now() | 	start := time.Now() | ||||||
| 
 | 
 | ||||||
| 	result := make(map[address.Address]map[uint64]*types.SignedMessage) | 	result := make(map[address.Address]map[uint64]*types.SignedMessage) | ||||||
| 	haveCids := make(map[cid.Cid]struct{}) |  | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if dt := time.Since(start); dt > time.Millisecond { | 		if dt := time.Since(start); dt > time.Millisecond { | ||||||
| 			log.Infow("get pending messages done", "took", dt) | 			log.Infow("get pending messages done", "took", dt) | ||||||
| @ -554,10 +552,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. | |||||||
| 			} | 			} | ||||||
| 			result[a] = msetCopy | 			result[a] = msetCopy | ||||||
| 
 | 
 | ||||||
| 			// mark the messages as seen
 |  | ||||||
| 			for _, m := range mset.msgs { |  | ||||||
| 				haveCids[m.Cid()] = struct{}{} |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -566,72 +560,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. | |||||||
| 		return result, nil | 		return result, nil | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// nope, we need to sync the tipsets
 | 	if err := mp.runHeadChange(curTs, ts, result); err != nil { | ||||||
| 	for { | 		return nil, xerrors.Errorf("failed to process difference between mpool head and given head: %w", err) | ||||||
| 		if curTs.Height() == ts.Height() { | 	} | ||||||
| 			if curTs.Equals(ts) { | 
 | ||||||
| 	return result, nil | 	return result, nil | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			// different blocks in tipsets -- we mark them as seen so that they are not included in
 |  | ||||||
| 			// in the message set we return, but *neither me (vyzo) nor why understand why*
 |  | ||||||
| 			// this code is also probably completely untested in production, so I am adding a big fat
 |  | ||||||
| 			// warning to revisit this case and sanity check this decision.
 |  | ||||||
| 			log.Warnf("mpool tipset has same height as target tipset but it's not equal; beware of dragons!") |  | ||||||
| 
 |  | ||||||
| 			have, err := mp.MessagesForBlocks(ts.Blocks()) |  | ||||||
| 			if err != nil { |  | ||||||
| 				return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err) |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			for _, m := range have { |  | ||||||
| 				haveCids[m.Cid()] = struct{}{} |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		msgs, err := mp.MessagesForBlocks(ts.Blocks()) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		for _, m := range msgs { |  | ||||||
| 			if _, have := haveCids[m.Cid()]; have { |  | ||||||
| 				continue |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			haveCids[m.Cid()] = struct{}{} |  | ||||||
| 			mset, ok := result[m.Message.From] |  | ||||||
| 			if !ok { |  | ||||||
| 				mset = make(map[uint64]*types.SignedMessage) |  | ||||||
| 				result[m.Message.From] = mset |  | ||||||
| 			} |  | ||||||
| 
 |  | ||||||
| 			other, dupNonce := mset[m.Message.Nonce] |  | ||||||
| 			if dupNonce { |  | ||||||
| 				// duplicate nonce, selfishly keep the message with the highest GasPrice
 |  | ||||||
| 				// if the gas prices are the same, keep the one with the highest GasLimit
 |  | ||||||
| 				switch m.Message.GasPremium.Int.Cmp(other.Message.GasPremium.Int) { |  | ||||||
| 				case 0: |  | ||||||
| 					if m.Message.GasLimit > other.Message.GasLimit { |  | ||||||
| 						mset[m.Message.Nonce] = m |  | ||||||
| 					} |  | ||||||
| 				case 1: |  | ||||||
| 					mset[m.Message.Nonce] = m |  | ||||||
| 				} |  | ||||||
| 			} else { |  | ||||||
| 				mset[m.Message.Nonce] = m |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		if curTs.Height() >= ts.Height() { |  | ||||||
| 			return result, nil |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		ts, err = mp.api.LoadTipSet(ts.Parents()) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, xerrors.Errorf("error loading parent tipset: %w", err) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int { | func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int { | ||||||
| @ -671,7 +604,12 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 | |||||||
| 	//   cannot exceed the block limit; drop all messages that exceed the limit
 | 	//   cannot exceed the block limit; drop all messages that exceed the limit
 | ||||||
| 	// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
 | 	// - the total gasReward cannot exceed the actor's balance; drop all messages that exceed
 | ||||||
| 	//   the balance
 | 	//   the balance
 | ||||||
| 	a, _ := mp.api.StateGetActor(actor, ts) | 	a, err := mp.api.GetActorAfter(actor, ts) | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Errorf("failed to load actor state, not building chain for %s: %w", actor, err) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	curNonce := a.Nonce | 	curNonce := a.Nonce | ||||||
| 	balance := a.Balance.Int | 	balance := a.Balance.Int | ||||||
| 	gasLimit := int64(0) | 	gasLimit := int64(0) | ||||||
|  | |||||||
| @ -79,7 +79,7 @@ func TestMessageChains(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 
 | 
 | ||||||
| 	gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] | 	gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] | ||||||
| @ -317,7 +317,7 @@ func TestMessageChainSkipping(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 
 | 
 | ||||||
| 	gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] | 	gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] | ||||||
| @ -387,7 +387,7 @@ func TestBasicMessageSelection(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -440,12 +440,12 @@ func TestBasicMessageSelection(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// now we make a block with all the messages and advance the chain
 | 	// now we make a block with all the messages and advance the chain
 | ||||||
| 	block2 := mock.MkBlock(ts, 2, 2) | 	block2 := tma.nextBlock() | ||||||
| 	tma.setBlockMessages(block2, msgs...) | 	tma.setBlockMessages(block2, msgs...) | ||||||
| 	tma.applyBlock(t, block2) | 	tma.applyBlock(t, block2) | ||||||
| 
 | 
 | ||||||
| 	// we should have no pending messages in the mpool
 | 	// we should have no pending messages in the mpool
 | ||||||
| 	pend, ts2 := mp.Pending() | 	pend, _ := mp.Pending() | ||||||
| 	if len(pend) != 0 { | 	if len(pend) != 0 { | ||||||
| 		t.Fatalf("expected no pending messages, but got %d", len(pend)) | 		t.Fatalf("expected no pending messages, but got %d", len(pend)) | ||||||
| 	} | 	} | ||||||
| @ -458,13 +458,13 @@ func TestBasicMessageSelection(t *testing.T) { | |||||||
| 		m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) | 		m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) | ||||||
| 		msgs = append(msgs, m) | 		msgs = append(msgs, m) | ||||||
| 	} | 	} | ||||||
| 	block3 := mock.MkBlock(ts2, 3, 3) | 	block3 := tma.nextBlock() | ||||||
| 	tma.setBlockMessages(block3, msgs...) | 	tma.setBlockMessages(block3, msgs...) | ||||||
| 	ts3 := mock.TipSet(block3) | 	ts3 := mock.TipSet(block3) | ||||||
| 
 | 
 | ||||||
| 	// now create another set of messages and add them to the mpool
 | 	// now create another set of messages and add them to the mpool
 | ||||||
| 	for i := 20; i < 30; i++ { | 	for i := 20; i < 30; i++ { | ||||||
| 		m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+1)) | 		m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+200)) | ||||||
| 		mustAdd(t, mp, m) | 		mustAdd(t, mp, m) | ||||||
| 		m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) | 		m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) | ||||||
| 		mustAdd(t, mp, m) | 		mustAdd(t, mp, m) | ||||||
| @ -480,12 +480,12 @@ func TestBasicMessageSelection(t *testing.T) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 	if len(msgs) != 40 { | 	if len(msgs) != 20 { | ||||||
| 		t.Fatalf("expected 40 messages, got %d", len(msgs)) | 		t.Fatalf("expected 20 messages, got %d", len(msgs)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	nextNonce = 10 | 	nextNonce = 20 | ||||||
| 	for i := 0; i < 20; i++ { | 	for i := 0; i < 10; i++ { | ||||||
| 		if msgs[i].Message.From != a1 { | 		if msgs[i].Message.From != a1 { | ||||||
| 			t.Fatalf("expected message from actor a1") | 			t.Fatalf("expected message from actor a1") | ||||||
| 		} | 		} | ||||||
| @ -495,8 +495,8 @@ func TestBasicMessageSelection(t *testing.T) { | |||||||
| 		nextNonce++ | 		nextNonce++ | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	nextNonce = 10 | 	nextNonce = 20 | ||||||
| 	for i := 20; i < 40; i++ { | 	for i := 10; i < 20; i++ { | ||||||
| 		if msgs[i].Message.From != a2 { | 		if msgs[i].Message.From != a2 { | ||||||
| 			t.Fatalf("expected message from actor a2") | 			t.Fatalf("expected message from actor a2") | ||||||
| 		} | 		} | ||||||
| @ -531,7 +531,7 @@ func TestMessageSelectionTrimming(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -594,7 +594,7 @@ func TestPriorityMessageSelection(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -676,7 +676,7 @@ func TestOptimalMessageSelection1(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -743,7 +743,7 @@ func TestOptimalMessageSelection2(t *testing.T) { | |||||||
| 		t.Fatal(err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -821,7 +821,7 @@ func TestOptimalMessageSelection3(t *testing.T) { | |||||||
| 		wallets = append(wallets, w) | 		wallets = append(wallets, w) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
| @ -879,7 +879,6 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu | |||||||
| 	// actors send with an randomly distributed premium dictated by the getPremium function.
 | 	// actors send with an randomly distributed premium dictated by the getPremium function.
 | ||||||
| 	// a number of miners select with varying ticket quality and we compare the
 | 	// a number of miners select with varying ticket quality and we compare the
 | ||||||
| 	// capacity and rewards of greedy selection -vs- optimal selection
 | 	// capacity and rewards of greedy selection -vs- optimal selection
 | ||||||
| 
 |  | ||||||
| 	mp, tma := makeTestMpool() | 	mp, tma := makeTestMpool() | ||||||
| 
 | 
 | ||||||
| 	nActors := 300 | 	nActors := 300 | ||||||
| @ -902,7 +901,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu | |||||||
| 		wallets = append(wallets, w) | 		wallets = append(wallets, w) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	block := mock.MkBlock(nil, 1, 1) | 	block := tma.nextBlock() | ||||||
| 	ts := mock.TipSet(block) | 	ts := mock.TipSet(block) | ||||||
| 	tma.applyBlock(t, block) | 	tma.applyBlock(t, block) | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -483,6 +483,10 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet, | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { | func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { | ||||||
|  | 	return ReorgOps(cs.LoadTipSet, a, b) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func ReorgOps(lts func(types.TipSetKey) (*types.TipSet, error), a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { | ||||||
| 	left := a | 	left := a | ||||||
| 	right := b | 	right := b | ||||||
| 
 | 
 | ||||||
| @ -490,7 +494,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti | |||||||
| 	for !left.Equals(right) { | 	for !left.Equals(right) { | ||||||
| 		if left.Height() > right.Height() { | 		if left.Height() > right.Height() { | ||||||
| 			leftChain = append(leftChain, left) | 			leftChain = append(leftChain, left) | ||||||
| 			par, err := cs.LoadTipSet(left.Parents()) | 			par, err := lts(left.Parents()) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, nil, err | 				return nil, nil, err | ||||||
| 			} | 			} | ||||||
| @ -498,7 +502,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti | |||||||
| 			left = par | 			left = par | ||||||
| 		} else { | 		} else { | ||||||
| 			rightChain = append(rightChain, right) | 			rightChain = append(rightChain, right) | ||||||
| 			par, err := cs.LoadTipSet(right.Parents()) | 			par, err := lts(right.Parents()) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				log.Infof("failed to fetch right.Parents: %s", err) | 				log.Infof("failed to fetch right.Parents: %s", err) | ||||||
| 				return nil, nil, err | 				return nil, nil, err | ||||||
| @ -509,6 +513,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return leftChain, rightChain, nil | 	return leftChain, rightChain, nil | ||||||
|  | 
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
 | // GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
 | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user