add test for Updates, fix behaviour for closed mpool
This commit is contained in:
parent
7049b6478c
commit
9c68123ede
@ -982,6 +982,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer mp.changes.Unsub(sub, localUpdates)
|
defer mp.changes.Unsub(sub, localUpdates)
|
||||||
|
defer close(out)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -990,9 +991,13 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
|
|||||||
case out <- u.(api.MpoolUpdate):
|
case out <- u.(api.MpoolUpdate):
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-mp.closer:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
|
case <-mp.closer:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -573,3 +573,62 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdates(t *testing.T) {
|
||||||
|
tma := newTestMpoolAPI()
|
||||||
|
ds := datastore.NewMapDatastore()
|
||||||
|
|
||||||
|
mp, err := New(tma, ds, "mptest")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the actors
|
||||||
|
w1, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a1, err := w1.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w2, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a2, err := w2.GenerateKey(crypto.SigTypeSecp256k1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ch, err := mp.Updates(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
|
_, err := mp.Push(m)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := <-ch
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected update, but got a closed channel instead")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mp.Close()
|
||||||
|
_, ok := <-ch
|
||||||
|
if ok {
|
||||||
|
t.Fatal("expected closed channel, but got an update instead")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user