lotus/itests/mempool_test.go

524 lines
14 KiB
Go
Raw Normal View History

//stm: #integration
package itests
import (
"context"
"testing"
"time"
2022-06-14 15:00:51 +00:00
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/big"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
const mPoolThrottle = time.Millisecond * 100
const mPoolTimeout = time.Second * 10
func TestMemPoolPushSingleNode(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, err := firstNode.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm, err := firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
// check pending messages for address
kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushTwoNodes(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
sender2 := secondNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
addr2, _ := secondNode.WalletNew(ctx, types.KTBLS)
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
const totalMessages = 10
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
var sms []*types.SignedMessage
// push messages to message pools of both nodes
for i := 0; i < totalMessages; i++ {
// first
msg1 := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm1.Message.Nonce)
sms = append(sms, sm1)
// second
msg2 := &types.Message{
From: sender2,
To: addr2,
Value: each,
}
sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm2.Message.Nonce)
sms = append(sms, sm2)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool {
pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending1) == 0 && len(pending2) == 0 {
// Check messages on both nodes
for _, lookMsg := range sms {
msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup1)
msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup2)
}
return true
}
return false
})
}
func TestMemPoolClearPending(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001
//stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// Add single message, then clear the pool
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
_, err = firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
// message should be in the mempool
kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
return len(pending) == 1
})
err = firstNode.MpoolClear(ctx, true)
require.NoError(t, err)
// pool should be empty now
kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
return len(pending) == 0
})
// mine a couple of blocks
ens.BeginMining(blockTime)
time.Sleep(5 * blockTime)
// make sure that the cleared message wasn't picked up and mined
_, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
require.Error(t, err)
}
func TestMemPoolBatchPush(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPush(ctx, sms)
require.NoError(t, err)
// check pending messages for address
kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
require.NoError(t, err)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_PUSH_003
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
// push untrusted messages
pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage)
require.NoError(t, err)
require.Equal(t, msg.Cid(), pushedCid)
sms = append(sms, signedMessage)
}
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// check pending messages for address
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolBatchPushUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPushUntrusted(ctx, sms)
require.NoError(t, err)
// check pending messages for address, wait until they are all pushed
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
// wait until pending messages are mined, pool pending list should be empty
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}