test: fix flaky message pool integration tests

Using the same pattern described in my previous commit.
I also added the CircuitBreaker to the itests kit as it may be useful
for other integration tests when debugging flakyness caused by timeouts.
This commit is contained in:
Nikola Divic 2022-02-12 19:52:51 +01:00
parent aca2a0fd1b
commit 34387326d1
2 changed files with 175 additions and 105 deletions

34
itests/kit/circuit.go Normal file
View File

@ -0,0 +1,34 @@
package kit
import (
"fmt"
"testing"
"time"
)
/*
CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish.
This is how it works:
- It runs the `cb` function until it returns true,
- waiting for `throttle` duration between each iteration,
- or at most `timeout` duration until it breaks test execution.
You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out,
or you need to set different deadlines in the same test.
*/
func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) {
tmo := time.After(timeout)
for {
if cb() {
break
}
select {
case <-tmo:
t.Fatal("timeout: ", label)
default:
fmt.Printf("waiting: %s\n", label)
time.Sleep(throttle)
}
}
}

View File

@ -3,7 +3,6 @@ package itests
import (
"context"
"fmt"
"testing"
"time"
@ -14,6 +13,9 @@ import (
"github.com/stretchr/testify/require"
)
const mPoolThrottle = time.Millisecond * 100
const mPoolTimeout = time.Second * 10
func TestMemPoolPushSingleNode(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
@ -21,7 +23,7 @@ func TestMemPoolPushSingleNode(t *testing.T) {
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
@ -53,13 +55,18 @@ func TestMemPoolPushSingleNode(t *testing.T) {
}
// check pending messages for address
kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.Equal(t, totalMessages, len(msgStatuses))
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
@ -74,19 +81,24 @@ func TestMemPoolPushSingleNode(t *testing.T) {
require.True(t, found)
}
time.Sleep(10 * blockTime)
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushTwoNodes(t *testing.T) {
@ -96,7 +108,7 @@ func TestMemPoolPushTwoNodes(t *testing.T) {
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
@ -141,16 +153,16 @@ func TestMemPoolPushTwoNodes(t *testing.T) {
sms = append(sms, sm2)
}
time.Sleep(10 * blockTime)
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool {
pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending1))
pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending2))
if len(pending1) == 0 && len(pending2) == 0 {
// Check messages on both nodes
for _, lookMsg := range sms {
msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
@ -161,6 +173,10 @@ func TestMemPoolPushTwoNodes(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, msgLookup2)
}
return true
}
return false
})
}
func TestMemPoolClearPending(t *testing.T) {
@ -169,7 +185,7 @@ func TestMemPoolClearPending(t *testing.T) {
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
@ -192,17 +208,30 @@ func TestMemPoolClearPending(t *testing.T) {
_, err = firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
// message should be in the mempool
kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
return len(pending) == 1
})
err = firstNode.MpoolClear(ctx, true)
require.NoError(t, err)
// pool should be empty now
kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
time.Sleep(2 * blockTime)
return len(pending) == 0
})
// waiting for the message should produce nothing
// mine a couple of blocks
ens.BeginMining(blockTime)
time.Sleep(5 * blockTime)
// make sure that the cleared message wasn't picked up and mined
_, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
require.Error(t, err)
}
@ -215,7 +244,7 @@ func TestMemPoolBatchPush(t *testing.T) {
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
@ -252,14 +281,20 @@ func TestMemPoolBatchPush(t *testing.T) {
require.NoError(t, err)
// check pending messages for address
kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
require.Equal(t, totalMessages, len(msgStatuses))
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
@ -275,19 +310,24 @@ func TestMemPoolBatchPush(t *testing.T) {
require.True(t, found)
}
time.Sleep(10 * blockTime)
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
@ -298,7 +338,7 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
@ -336,14 +376,20 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
sms = append(sms, signedMessage)
}
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// check pending messages for address
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.Equal(t, totalMessages, len(msgStatuses))
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
@ -358,19 +404,25 @@ func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
require.True(t, found)
}
time.Sleep(10 * blockTime)
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolBatchPushUntrusted(t *testing.T) {
@ -418,8 +470,7 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) {
require.NoError(t, err)
// check pending messages for address, wait until they are all pushed
timeout := time.After(time.Second * 10)
for {
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
@ -429,17 +480,10 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) {
require.True(t, status.OK)
}
}
break
}
select {
case <-timeout:
t.Fatal("waiting for batch push timed out")
default:
fmt.Printf("waiting for %d more messages to be pushed\n", len(msgStatuses)-totalMessages)
time.Sleep(time.Millisecond * 100)
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
@ -457,10 +501,8 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) {
ens.BeginMining(blockTime)
// wait until pending messages are mined
timeout = time.After(time.Second * 10)
for {
// pool pending list should be empty
// wait until pending messages are mined, pool pending list should be empty
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
@ -471,15 +513,9 @@ func TestMemPoolBatchPushUntrusted(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
break
return true
}
return false
})
select {
case <-timeout:
t.Fatal("waiting for pending messages to be mined timed out")
default:
fmt.Printf("waiting for %d more messages to be mined\n", len(pending))
time.Sleep(time.Millisecond * 100)
}
}
}