diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 5816bae28..b421f7739 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/lib/sigs" @@ -317,8 +318,9 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error { return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig) } + // Perform syntaxtic validation, minGas=0 as we check if correctly in select messages if err := m.Message.ValidForBlockInclusion(0); err != nil { - return xerrors.Errorf("message not valid for block inclusion: %d", err) + return xerrors.Errorf("message not valid for block inclusion: %w", err) } if m.Message.To == address.Undef { @@ -503,42 +505,16 @@ func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) } func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) { - // TODO: this method probably should be cached - - act, err := mp.api.StateGetActor(addr, curTs) + act, err := mp.api.GetActorAfter(addr, curTs) if err != nil { return 0, err } - baseNonce := act.Nonce - - // TODO: the correct thing to do here is probably to set curTs to chain.head - // but since we have an accurate view of the world until a head change occurs, - // this should be fine - if curTs == nil { - return baseNonce, nil - } - - msgs, err := mp.api.MessagesForTipset(curTs) - if err != nil { - return 0, xerrors.Errorf("failed to check messages for tipset: %w", err) - } - - for _, m := range msgs { - msg := m.VMMessage() - if msg.From == addr { - if msg.Nonce != baseNonce { - return 0, xerrors.Errorf("tipset %s has bad nonce ordering (%d != %d)", curTs.Cids(), msg.Nonce, baseNonce) - } - baseNonce++ - } - } - - return baseNonce, nil + return act.Nonce, nil } func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (types.BigInt, error) { - act, err := mp.api.StateGetActor(addr, ts) + act, err := mp.api.GetActorAfter(addr, ts) if err != nil { return types.EmptyInt, err } @@ -830,7 +806,8 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } for a, bkt := range buckets { - act, err := mp.api.StateGetActor(a, ts) + // TODO that might not be correct with GatActorAfter but it is only debug code + act, err := mp.api.GetActorAfter(a, ts) if err != nil { log.Debugf("%s, err: %s\n", a, err) continue @@ -880,6 +857,73 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) return merr } +func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs map[address.Address]map[uint64]*types.SignedMessage) error { + add := func(m *types.SignedMessage) { + s, ok := rmsgs[m.Message.From] + if !ok { + s = make(map[uint64]*types.SignedMessage) + rmsgs[m.Message.From] = s + } + s[m.Message.Nonce] = m + } + rm := func(from address.Address, nonce uint64) { + s, ok := rmsgs[from] + if !ok { + return + } + + if _, ok := s[nonce]; ok { + delete(s, nonce) + return + } + + } + + revert, apply, err := store.ReorgOps(mp.api.LoadTipSet, from, to) + if err != nil { + return xerrors.Errorf("failed to compute reorg ops for mpool pending messages: %w", err) + } + + var merr error + + for _, ts := range revert { + msgs, err := mp.MessagesForBlocks(ts.Blocks()) + if err != nil { + log.Errorf("error retrieving messages for reverted block: %s", err) + merr = multierror.Append(merr, err) + continue + } + + for _, msg := range msgs { + add(msg) + } + } + + for _, ts := range apply { + mp.curTs = ts + + for _, b := range ts.Blocks() { + bmsgs, smsgs, err := mp.api.MessagesForBlock(b) + if err != nil { + xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err) + log.Errorf("error retrieving messages for block: %s", xerr) + merr = multierror.Append(merr, xerr) + continue + } + + for _, msg := range smsgs { + rm(msg.Message.From, msg.Message.Nonce) + } + + for _, msg := range bmsgs { + rm(msg.From, msg.Nonce) + } + } + } + + return merr +} + type statBucket struct { msgs map[uint64]*types.SignedMessage } diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 35e21f817..baf734a92 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -3,6 +3,7 @@ package messagepool import ( "context" "fmt" + "sort" "testing" "github.com/filecoin-project/go-address" @@ -33,11 +34,20 @@ type testMpoolAPI struct { } func newTestMpoolAPI() *testMpoolAPI { - return &testMpoolAPI{ + tma := &testMpoolAPI{ bmsgs: make(map[cid.Cid][]*types.SignedMessage), statenonce: make(map[address.Address]uint64), balance: make(map[address.Address]types.BigInt), } + genesis := mock.MkBlock(nil, 1, 1) + tma.tipsets = append(tma.tipsets, mock.TipSet(genesis)) + return tma +} + +func (tma *testMpoolAPI) nextBlock() *types.BlockHeader { + newBlk := mock.MkBlock(tma.tipsets[len(tma.tipsets)-1], 1, 1) + tma.tipsets = append(tma.tipsets, mock.TipSet(newBlk)) + return newBlk } func (tma *testMpoolAPI) applyBlock(t *testing.T, b *types.BlockHeader) { @@ -68,12 +78,11 @@ func (tma *testMpoolAPI) setBalanceRaw(addr address.Address, v types.BigInt) { func (tma *testMpoolAPI) setBlockMessages(h *types.BlockHeader, msgs ...*types.SignedMessage) { tma.bmsgs[h.Cid()] = msgs - tma.tipsets = append(tma.tipsets, mock.TipSet(h)) } func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { tma.cb = cb - return nil + return tma.tipsets[0] } func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { @@ -84,15 +93,38 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { return nil } -func (tma *testMpoolAPI) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { +func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { balance, ok := tma.balance[addr] if !ok { balance = types.NewInt(1000e6) tma.balance[addr] = balance } + + msgs := make([]*types.SignedMessage, 0) + for _, b := range ts.Blocks() { + for _, m := range tma.bmsgs[b.Cid()] { + if m.Message.From == addr { + msgs = append(msgs, m) + } + } + } + + sort.Slice(msgs, func(i, j int) bool { + return msgs[i].Message.Nonce < msgs[j].Message.Nonce + }) + + nonce := tma.statenonce[addr] + + for _, m := range msgs { + if m.Message.Nonce != nonce { + break + } + nonce++ + } + return &types.Actor{ Code: builtin.StorageMarketActorCodeID, - Nonce: tma.statenonce[addr], + Nonce: nonce, Balance: balance, }, nil } @@ -178,7 +210,7 @@ func TestMessagePool(t *testing.T) { t.Fatal(err) } - a := mock.MkBlock(nil, 1, 1) + a := tma.nextBlock() sender, err := w.GenerateKey(crypto.SigTypeBLS) if err != nil { @@ -204,6 +236,50 @@ func TestMessagePool(t *testing.T) { assertNonce(t, mp, sender, 2) } +func TestMessagePoolMessagesInEachBlock(t *testing.T) { + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + a := tma.nextBlock() + + sender, err := w.GenerateKey(crypto.SigTypeBLS) + if err != nil { + t.Fatal(err) + } + target := mock.Address(1001) + + var msgs []*types.SignedMessage + for i := 0; i < 5; i++ { + m := mock.MkMessage(sender, target, uint64(i), w) + msgs = append(msgs, m) + mustAdd(t, mp, m) + } + + tma.setStateNonce(sender, 0) + + tma.setBlockMessages(a, msgs[0], msgs[1]) + tma.applyBlock(t, a) + tsa := mock.TipSet(a) + + _, _ = mp.Pending() + + selm, _ := mp.SelectMessages(tsa, 1) + if len(selm) == 0 { + t.Fatal("should have returned the rest of the messages") + } +} + func TestRevertMessages(t *testing.T) { tma := newTestMpoolAPI() @@ -219,8 +295,8 @@ func TestRevertMessages(t *testing.T) { t.Fatal(err) } - a := mock.MkBlock(nil, 1, 1) - b := mock.MkBlock(mock.TipSet(a), 1, 1) + a := tma.nextBlock() + b := tma.nextBlock() sender, err := w.GenerateKey(crypto.SigTypeBLS) if err != nil { @@ -254,6 +330,7 @@ func TestRevertMessages(t *testing.T) { assertNonce(t, mp, sender, 4) p, _ := mp.Pending() + fmt.Printf("%+v\n", p) if len(p) != 3 { t.Fatal("expected three messages in mempool") } @@ -275,7 +352,7 @@ func TestPruningSimple(t *testing.T) { t.Fatal(err) } - a := mock.MkBlock(nil, 1, 1) + a := tma.nextBlock() tma.applyBlock(t, a) sender, err := w.GenerateKey(crypto.SigTypeBLS) diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index fa8b8ea83..80b9a4297 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -16,7 +16,7 @@ type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(m types.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error - StateGetActor(address.Address, *types.TipSet) (*types.Actor, error) + GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error) StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error) MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error) @@ -46,9 +46,14 @@ func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { return mpp.ps.Publish(k, v) //nolint } -func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) { +func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { var act types.Actor - return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) + stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts) + if err != nil { + return nil, xerrors.Errorf("computing tipset state for GetActor: %w", err) + } + + return &act, mpp.sm.WithStateTree(stcid, mpp.sm.WithActor(addr, stmgr.GetActor(&act))) } func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index aeb80d53d..b39eb01cb 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -14,7 +14,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" abig "github.com/filecoin-project/specs-actors/actors/abi/big" - "github.com/ipfs/go-cid" ) var bigBlockGasLimit = big.NewInt(build.BlockGasLimit) @@ -528,7 +527,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. start := time.Now() result := make(map[address.Address]map[uint64]*types.SignedMessage) - haveCids := make(map[cid.Cid]struct{}) defer func() { if dt := time.Since(start); dt > time.Millisecond { log.Infow("get pending messages done", "took", dt) @@ -554,10 +552,6 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. } result[a] = msetCopy - // mark the messages as seen - for _, m := range mset.msgs { - haveCids[m.Cid()] = struct{}{} - } } } @@ -566,72 +560,11 @@ func (mp *MessagePool) getPendingMessages(curTs, ts *types.TipSet) (map[address. return result, nil } - // nope, we need to sync the tipsets - for { - if curTs.Height() == ts.Height() { - if curTs.Equals(ts) { - return result, nil - } - - // different blocks in tipsets -- we mark them as seen so that they are not included in - // in the message set we return, but *neither me (vyzo) nor why understand why* - // this code is also probably completely untested in production, so I am adding a big fat - // warning to revisit this case and sanity check this decision. - log.Warnf("mpool tipset has same height as target tipset but it's not equal; beware of dragons!") - - have, err := mp.MessagesForBlocks(ts.Blocks()) - if err != nil { - return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err) - } - - for _, m := range have { - haveCids[m.Cid()] = struct{}{} - } - } - - msgs, err := mp.MessagesForBlocks(ts.Blocks()) - if err != nil { - return nil, xerrors.Errorf("error retrieving messages for tipset: %w", err) - } - - for _, m := range msgs { - if _, have := haveCids[m.Cid()]; have { - continue - } - - haveCids[m.Cid()] = struct{}{} - mset, ok := result[m.Message.From] - if !ok { - mset = make(map[uint64]*types.SignedMessage) - result[m.Message.From] = mset - } - - other, dupNonce := mset[m.Message.Nonce] - if dupNonce { - // duplicate nonce, selfishly keep the message with the highest GasPrice - // if the gas prices are the same, keep the one with the highest GasLimit - switch m.Message.GasPremium.Int.Cmp(other.Message.GasPremium.Int) { - case 0: - if m.Message.GasLimit > other.Message.GasLimit { - mset[m.Message.Nonce] = m - } - case 1: - mset[m.Message.Nonce] = m - } - } else { - mset[m.Message.Nonce] = m - } - } - - if curTs.Height() >= ts.Height() { - return result, nil - } - - ts, err = mp.api.LoadTipSet(ts.Parents()) - if err != nil { - return nil, xerrors.Errorf("error loading parent tipset: %w", err) - } + if err := mp.runHeadChange(curTs, ts, result); err != nil { + return nil, xerrors.Errorf("failed to process difference between mpool head and given head: %w", err) } + + return result, nil } func (mp *MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt, ts *types.TipSet) *big.Int { @@ -671,7 +604,12 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6 // cannot exceed the block limit; drop all messages that exceed the limit // - the total gasReward cannot exceed the actor's balance; drop all messages that exceed // the balance - a, _ := mp.api.StateGetActor(actor, ts) + a, err := mp.api.GetActorAfter(actor, ts) + if err != nil { + log.Errorf("failed to load actor state, not building chain for %s: %w", actor, err) + return nil + } + curNonce := a.Nonce balance := a.Balance.Int gasLimit := int64(0) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 0032db23c..2e7692213 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -79,7 +79,7 @@ func TestMessageChains(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] @@ -317,7 +317,7 @@ func TestMessageChainSkipping(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] @@ -387,7 +387,7 @@ func TestBasicMessageSelection(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -440,12 +440,12 @@ func TestBasicMessageSelection(t *testing.T) { } // now we make a block with all the messages and advance the chain - block2 := mock.MkBlock(ts, 2, 2) + block2 := tma.nextBlock() tma.setBlockMessages(block2, msgs...) tma.applyBlock(t, block2) // we should have no pending messages in the mpool - pend, ts2 := mp.Pending() + pend, _ := mp.Pending() if len(pend) != 0 { t.Fatalf("expected no pending messages, but got %d", len(pend)) } @@ -458,13 +458,13 @@ func TestBasicMessageSelection(t *testing.T) { m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) msgs = append(msgs, m) } - block3 := mock.MkBlock(ts2, 3, 3) + block3 := tma.nextBlock() tma.setBlockMessages(block3, msgs...) ts3 := mock.TipSet(block3) // now create another set of messages and add them to the mpool for i := 20; i < 30; i++ { - m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+1)) + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(2*i+200)) mustAdd(t, mp, m) m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) mustAdd(t, mp, m) @@ -480,12 +480,12 @@ func TestBasicMessageSelection(t *testing.T) { if err != nil { t.Fatal(err) } - if len(msgs) != 40 { - t.Fatalf("expected 40 messages, got %d", len(msgs)) + if len(msgs) != 20 { + t.Fatalf("expected 20 messages, got %d", len(msgs)) } - nextNonce = 10 - for i := 0; i < 20; i++ { + nextNonce = 20 + for i := 0; i < 10; i++ { if msgs[i].Message.From != a1 { t.Fatalf("expected message from actor a1") } @@ -495,8 +495,8 @@ func TestBasicMessageSelection(t *testing.T) { nextNonce++ } - nextNonce = 10 - for i := 20; i < 40; i++ { + nextNonce = 20 + for i := 10; i < 20; i++ { if msgs[i].Message.From != a2 { t.Fatalf("expected message from actor a2") } @@ -531,7 +531,7 @@ func TestMessageSelectionTrimming(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -594,7 +594,7 @@ func TestPriorityMessageSelection(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -676,7 +676,7 @@ func TestOptimalMessageSelection1(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -743,7 +743,7 @@ func TestOptimalMessageSelection2(t *testing.T) { t.Fatal(err) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -821,7 +821,7 @@ func TestOptimalMessageSelection3(t *testing.T) { wallets = append(wallets, w) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) @@ -879,7 +879,6 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu // actors send with an randomly distributed premium dictated by the getPremium function. // a number of miners select with varying ticket quality and we compare the // capacity and rewards of greedy selection -vs- optimal selection - mp, tma := makeTestMpool() nActors := 300 @@ -902,7 +901,7 @@ func testCompetitiveMessageSelection(t *testing.T, rng *rand.Rand, getPremium fu wallets = append(wallets, w) } - block := mock.MkBlock(nil, 1, 1) + block := tma.nextBlock() ts := mock.TipSet(block) tma.applyBlock(t, block) diff --git a/chain/store/store.go b/chain/store/store.go index 139444f67..b71e7d4df 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -483,6 +483,10 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet, } func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { + return ReorgOps(cs.LoadTipSet, a, b) +} + +func ReorgOps(lts func(types.TipSetKey) (*types.TipSet, error), a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { left := a right := b @@ -490,7 +494,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti for !left.Equals(right) { if left.Height() > right.Height() { leftChain = append(leftChain, left) - par, err := cs.LoadTipSet(left.Parents()) + par, err := lts(left.Parents()) if err != nil { return nil, nil, err } @@ -498,7 +502,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti left = par } else { rightChain = append(rightChain, right) - par, err := cs.LoadTipSet(right.Parents()) + par, err := lts(right.Parents()) if err != nil { log.Infof("failed to fetch right.Parents: %s", err) return nil, nil, err @@ -509,6 +513,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti } return leftChain, rightChain, nil + } // GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).