diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 3b50713ed..0670ed92b 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -38,7 +38,7 @@ import ( var log = logging.Logger("messagepool") -const futureDebug = false +var futureDebug = false var rbfNumBig = types.NewInt(uint64((ReplaceByFeeRatioDefault - 1) * RbfDenom)) var rbfDenomBig = types.NewInt(RbfDenom) @@ -651,6 +651,10 @@ func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { mp.lk.Lock() defer mp.lk.Unlock() + return mp.allPending() +} + +func (mp *MessagePool) allPending() ([]*types.SignedMessage, *types.TipSet) { out := make([]*types.SignedMessage, 0) for a := range mp.pending { out = append(out, mp.pendingFor(a)...) @@ -658,6 +662,7 @@ func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { return out, mp.curTs } + func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -790,7 +795,9 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } if len(revert) > 0 && futureDebug { - msgs, ts := mp.Pending() + mp.lk.Lock() + msgs, ts := mp.allPending() + mp.lk.Unlock() buckets := map[address.Address]*statBucket{} @@ -901,8 +908,6 @@ func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs } for _, ts := range apply { - mp.curTs = ts - for _, b := range ts.Blocks() { bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { @@ -975,6 +980,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err go func() { defer mp.changes.Unsub(sub, localUpdates) + defer close(out) for { select { @@ -983,9 +989,13 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err case out <- u.(api.MpoolUpdate): case <-ctx.Done(): return + case <-mp.closer: + return } case <-ctx.Done(): return + case <-mp.closer: + return } } }() diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index baf734a92..316c86d87 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/messagepool/gasguess" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" "github.com/filecoin-project/lotus/chain/wallet" @@ -31,6 +32,8 @@ type testMpoolAPI struct { balance map[address.Address]types.BigInt tipsets []*types.TipSet + + published int } func newTestMpoolAPI() *testMpoolAPI { @@ -90,10 +93,16 @@ func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { } func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { + tma.published++ return nil } func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + // regression check for load bug + if ts == nil { + panic("GetActorAfter called with nil tipset") + } + balance, ok := tma.balance[addr] if !ok { balance = types.NewInt(1000e6) @@ -281,6 +290,11 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) { } func TestRevertMessages(t *testing.T) { + futureDebug = true + defer func() { + futureDebug = false + }() + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) @@ -385,3 +399,246 @@ func TestPruningSimple(t *testing.T) { t.Fatal("expected only 5 messages in pool, got: ", len(msgs)) } } + +func TestLoadLocal(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + msgs := make(map[cid.Cid]struct{}) + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + cid, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + msgs[cid] = struct{}{} + } + err = mp.Close() + if err != nil { + t.Fatal(err) + } + + mp, err = New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + pmsgs, _ := mp.Pending() + if len(msgs) != len(pmsgs) { + t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs)) + } + + for _, m := range pmsgs { + cid := m.Cid() + _, ok := msgs[cid] + if !ok { + t.Fatal("unknown message") + } + + delete(msgs, cid) + } + + if len(msgs) > 0 { + t.Fatalf("not all messages were laoded; missing %d messages", len(msgs)) + } +} + +func TestClearAll(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 10; i++ { + m := makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) + mustAdd(t, mp, m) + } + + mp.Clear(true) + + pending, _ := mp.Pending() + if len(pending) > 0 { + t.Fatalf("cleared the mpool, but got %d pending messages", len(pending)) + } +} + +func TestClearNonLocal(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 10; i++ { + m := makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) + mustAdd(t, mp, m) + } + + mp.Clear(false) + + pending, _ := mp.Pending() + if len(pending) != 10 { + t.Fatalf("expected 10 pending messages, but got %d instead", len(pending)) + } + + for _, m := range pending { + if m.Message.From != a1 { + t.Fatalf("expected message from %s but got one from %s instead", a1, m.Message.From) + } + } +} + +func TestUpdates(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + ch, err := mp.Updates(ctx) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + + _, ok := <-ch + if !ok { + t.Fatal("expected update, but got a closed channel instead") + } + } + + err = mp.Close() + if err != nil { + t.Fatal(err) + } + + _, ok := <-ch + if ok { + t.Fatal("expected closed channel, but got an update instead") + } +} diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go new file mode 100644 index 000000000..28a69c92a --- /dev/null +++ b/chain/messagepool/repub_test.go @@ -0,0 +1,66 @@ +package messagepool + +import ( + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/messagepool/gasguess" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-datastore" +) + +func TestRepubMessages(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + + tma.setBalance(a1, 1) // in FIL + + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + if tma.published != 10 { + t.Fatalf("expected to have published 10 messages, but got %d instead", tma.published) + } + + mp.repubTrigger <- struct{}{} + time.Sleep(100 * time.Millisecond) + + if tma.published != 20 { + t.Fatalf("expected to have published 20 messages, but got %d instead", tma.published) + } +} diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2e7692213..2d17f7115 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -649,6 +649,73 @@ func TestPriorityMessageSelection(t *testing.T) { } } +func TestPriorityMessageSelection2(t *testing.T) { + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := tma.nextBlock() + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + mp.cfg.PriorityAddrs = []address.Address{a1} + + nMessages := int(2 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 1.0) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages but got %d", expectedMsgs, len(msgs)) + } + + // all messages must be from a1 + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a1 { + t.Fatal("expected messages from a1 before messages from a2") + } + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + func TestOptimalMessageSelection1(t *testing.T) { // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor