From a01fa11a26133a82c13e9d9f5e0a5defcfe07e79 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:03:50 +0300 Subject: [PATCH 1/8] exercise debug code, fix deadlock --- chain/messagepool/messagepool.go | 11 +++++++++-- chain/messagepool/messagepool_test.go | 5 +++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 3b50713ed..c7dd03740 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -38,7 +38,7 @@ import ( var log = logging.Logger("messagepool") -const futureDebug = false +var futureDebug = false var rbfNumBig = types.NewInt(uint64((ReplaceByFeeRatioDefault - 1) * RbfDenom)) var rbfDenomBig = types.NewInt(RbfDenom) @@ -651,6 +651,10 @@ func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { mp.lk.Lock() defer mp.lk.Unlock() + return mp.allPending() +} + +func (mp *MessagePool) allPending() ([]*types.SignedMessage, *types.TipSet) { out := make([]*types.SignedMessage, 0) for a := range mp.pending { out = append(out, mp.pendingFor(a)...) @@ -658,6 +662,7 @@ func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) { return out, mp.curTs } + func (mp *MessagePool) PendingFor(a address.Address) ([]*types.SignedMessage, *types.TipSet) { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() @@ -790,7 +795,9 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) } if len(revert) > 0 && futureDebug { - msgs, ts := mp.Pending() + mp.lk.Lock() + msgs, ts := mp.allPending() + mp.lk.Unlock() buckets := map[address.Address]*statBucket{} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index baf734a92..cee3304d9 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -281,6 +281,11 @@ func TestMessagePoolMessagesInEachBlock(t *testing.T) { } func TestRevertMessages(t *testing.T) { + futureDebug = true + defer func() { + futureDebug = false + }() + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) From 0e6d34c0c98362e2c8b8a55b41d2983f96ea8ef3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:16:55 +0300 Subject: [PATCH 2/8] add test for loadLocal --- chain/messagepool/messagepool_test.go | 73 +++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index cee3304d9..f4b4ce4db 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/messagepool/gasguess" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" "github.com/filecoin-project/lotus/chain/wallet" @@ -94,6 +95,11 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { } func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + // regression check for load bug + if ts == nil { + panic("GetActorAfter called with nil tipset") + } + balance, ok := tma.balance[addr] if !ok { balance = types.NewInt(1000e6) @@ -390,3 +396,70 @@ func TestPruningSimple(t *testing.T) { t.Fatal("expected only 5 messages in pool, got: ", len(msgs)) } } + +func TestLoadLocal(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + msgs := make(map[cid.Cid]struct{}) + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + cid, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + msgs[cid] = struct{}{} + } + mp.Close() + + mp, err = New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + pmsgs, _ := mp.Pending() + if len(msgs) != len(pmsgs) { + t.Fatalf("expected %d messages, but got %d", len(msgs), len(pmsgs)) + } + + for _, m := range pmsgs { + cid := m.Cid() + _, ok := msgs[cid] + if !ok { + t.Fatal("unknown message") + } + + delete(msgs, cid) + } + + if len(msgs) > 0 { + t.Fatalf("not all messages were laoded; missing %d messages", len(msgs)) + } +} From 7049b6478c4f6f5dbd045599cea058671f151345 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:24:11 +0300 Subject: [PATCH 3/8] add test for Clear --- chain/messagepool/messagepool_test.go | 110 ++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index f4b4ce4db..aadb4cd8e 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -463,3 +463,113 @@ func TestLoadLocal(t *testing.T) { t.Fatalf("not all messages were laoded; missing %d messages", len(msgs)) } } + +func TestClearAll(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 10; i++ { + m := makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) + mustAdd(t, mp, m) + } + + mp.Clear(true) + + pending, _ := mp.Pending() + if len(pending) > 0 { + t.Fatalf("cleared the mpool, but got %d pending messages", len(pending)) + } +} + +func TestClearNonLocal(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 10; i++ { + m := makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(i+1)) + mustAdd(t, mp, m) + } + + mp.Clear(false) + + pending, _ := mp.Pending() + if len(pending) != 10 { + t.Fatalf("expected 10 pending messages, but got %d instead", len(pending)) + } + + for _, m := range pending { + if m.Message.From != a1 { + t.Fatalf("expected message from %s but got one from %s instead", a1, m.Message.From) + } + } +} From 9c68123ede4a63aaf0ed66d7948ad480879c5c47 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:29:09 +0300 Subject: [PATCH 4/8] add test for Updates, fix behaviour for closed mpool --- chain/messagepool/messagepool.go | 5 +++ chain/messagepool/messagepool_test.go | 59 +++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index c7dd03740..8988b96ff 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -982,6 +982,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err go func() { defer mp.changes.Unsub(sub, localUpdates) + defer close(out) for { select { @@ -990,9 +991,13 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err case out <- u.(api.MpoolUpdate): case <-ctx.Done(): return + case <-mp.closer: + return } case <-ctx.Done(): return + case <-mp.closer: + return } } }() diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index aadb4cd8e..e92fed7bc 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -573,3 +573,62 @@ func TestClearNonLocal(t *testing.T) { } } } + +func TestUpdates(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + ch, err := mp.Updates(ctx) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + + _, ok := <-ch + if !ok { + t.Fatal("expected update, but got a closed channel instead") + } + } + + mp.Close() + _, ok := <-ch + if ok { + t.Fatal("expected closed channel, but got an update instead") + } +} From 6b9d392d0ef3927079a7924d30c06d37243eb5e9 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:36:03 +0300 Subject: [PATCH 5/8] add another priority selection test --- chain/messagepool/selection_test.go | 67 +++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2e7692213..2d17f7115 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -649,6 +649,73 @@ func TestPriorityMessageSelection(t *testing.T) { } } +func TestPriorityMessageSelection2(t *testing.T) { + mp, tma := makeTestMpool() + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + block := tma.nextBlock() + ts := mock.TipSet(block) + tma.applyBlock(t, block) + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + + tma.setBalance(a1, 1) // in FIL + tma.setBalance(a2, 1) // in FIL + + mp.cfg.PriorityAddrs = []address.Address{a1} + + nMessages := int(2 * build.BlockGasLimit / gasLimit) + for i := 0; i < nMessages; i++ { + bias := (nMessages - i) / 3 + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + m = makeTestMessage(w2, a2, a1, uint64(i), gasLimit, uint64(1+i%3+bias)) + mustAdd(t, mp, m) + } + + msgs, err := mp.SelectMessages(ts, 1.0) + if err != nil { + t.Fatal(err) + } + + expectedMsgs := int(build.BlockGasLimit / gasLimit) + if len(msgs) != expectedMsgs { + t.Fatalf("expected %d messages but got %d", expectedMsgs, len(msgs)) + } + + // all messages must be from a1 + nextNonce := uint64(0) + for _, m := range msgs { + if m.Message.From != a1 { + t.Fatal("expected messages from a1 before messages from a2") + } + if m.Message.Nonce != nextNonce { + t.Fatalf("expected nonce %d but got %d", nextNonce, m.Message.Nonce) + } + nextNonce++ + } +} + func TestOptimalMessageSelection1(t *testing.T) { // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor From 62b15ecc122a88ec5023ac5f4188e14f5b4a9750 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:41:20 +0300 Subject: [PATCH 6/8] remove bug line --- chain/messagepool/messagepool.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 8988b96ff..0670ed92b 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -908,8 +908,6 @@ func (mp *MessagePool) runHeadChange(from *types.TipSet, to *types.TipSet, rmsgs } for _, ts := range apply { - mp.curTs = ts - for _, b := range ts.Blocks() { bmsgs, smsgs, err := mp.api.MessagesForBlock(b) if err != nil { From 58cfac9f67a98a6b3ef897d2380d70df4c101960 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 13:57:02 +0300 Subject: [PATCH 7/8] add test for message republish --- chain/messagepool/messagepool_test.go | 3 ++ chain/messagepool/repub_test.go | 66 +++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 chain/messagepool/repub_test.go diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index e92fed7bc..2571339b9 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -32,6 +32,8 @@ type testMpoolAPI struct { balance map[address.Address]types.BigInt tipsets []*types.TipSet + + published int } func newTestMpoolAPI() *testMpoolAPI { @@ -91,6 +93,7 @@ func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { } func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { + tma.published++ return nil } diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go new file mode 100644 index 000000000..28a69c92a --- /dev/null +++ b/chain/messagepool/repub_test.go @@ -0,0 +1,66 @@ +package messagepool + +import ( + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/messagepool/gasguess" + "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-datastore" +) + +func TestRepubMessages(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + + tma.setBalance(a1, 1) // in FIL + + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + } + + if tma.published != 10 { + t.Fatalf("expected to have published 10 messages, but got %d instead", tma.published) + } + + mp.repubTrigger <- struct{}{} + time.Sleep(100 * time.Millisecond) + + if tma.published != 20 { + t.Fatalf("expected to have published 20 messages, but got %d instead", tma.published) + } +} From 6b3282150bd7276e97fe70598f084fb182be0004 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 25 Aug 2020 14:03:49 +0300 Subject: [PATCH 8/8] appease linter --- chain/messagepool/messagepool_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 2571339b9..316c86d87 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -440,7 +440,10 @@ func TestLoadLocal(t *testing.T) { } msgs[cid] = struct{}{} } - mp.Close() + err = mp.Close() + if err != nil { + t.Fatal(err) + } mp, err = New(tma, ds, "mptest") if err != nil { @@ -629,7 +632,11 @@ func TestUpdates(t *testing.T) { } } - mp.Close() + err = mp.Close() + if err != nil { + t.Fatal(err) + } + _, ok := <-ch if ok { t.Fatal("expected closed channel, but got an update instead")