diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index c7dd03740..8988b96ff 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -982,6 +982,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err go func() { defer mp.changes.Unsub(sub, localUpdates) + defer close(out) for { select { @@ -990,9 +991,13 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err case out <- u.(api.MpoolUpdate): case <-ctx.Done(): return + case <-mp.closer: + return } case <-ctx.Done(): return + case <-mp.closer: + return } } }() diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index aadb4cd8e..e92fed7bc 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -573,3 +573,62 @@ func TestClearNonLocal(t *testing.T) { } } } + +func TestUpdates(t *testing.T) { + tma := newTestMpoolAPI() + ds := datastore.NewMapDatastore() + + mp, err := New(tma, ds, "mptest") + if err != nil { + t.Fatal(err) + } + + // the actors + w1, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + w2, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + ch, err := mp.Updates(ctx) + if err != nil { + t.Fatal(err) + } + + gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}] + for i := 0; i < 10; i++ { + m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + _, err := mp.Push(m) + if err != nil { + t.Fatal(err) + } + + _, ok := <-ch + if !ok { + t.Fatal("expected update, but got a closed channel instead") + } + } + + mp.Close() + _, ok := <-ch + if ok { + t.Fatal("expected closed channel, but got an update instead") + } +}