Add unit and integration tests for mempool

This commit is contained in:
Darko Brdareski 2022-02-02 17:08:50 +01:00
parent 51643caf60
commit 008fbbd652
5 changed files with 1007 additions and 0 deletions

View File

@ -0,0 +1,224 @@
//stm: #unit
package messagepool
import (
"context"
"fmt"
"testing"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/assert"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/lotus/chain/wallet"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
)
func init() {
_ = logging.SetLogLevel("*", "INFO")
}
func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) {
for i := 0; i < len(msgStatuses); i++ {
iMsgStatuses := msgStatuses[i]
if iMsgStatuses.CheckStatus.Code == statusCode {
return &iMsgStatuses, nil
}
}
return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode)
}
func TestCheckMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
var protos []*api.MessagePrototype
for i := 0; i < 5; i++ {
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: uint64(i),
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
proto := &api.MessagePrototype{
Message: *msg,
ValidNonce: true,
}
protos = append(protos, proto)
}
messageStatuses, err := mp.CheckMessages(context.TODO(), protos)
assert.NoError(t, err)
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
for j := 0; j < len(iMsgStatuses); j++ {
jStatus := iMsgStatuses[i]
assert.True(t, jStatus.OK)
}
}
}
func TestCheckPendingMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
// add a valid message to the pool
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
mustAdd(t, mp, sm)
messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender)
assert.NoError(t, err)
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
for j := 0; j < len(iMsgStatuses); j++ {
jStatus := iMsgStatuses[i]
assert.True(t, jStatus.OK)
}
}
}
func TestCheckReplaceMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
// add a valid message to the pool
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
mustAdd(t, mp, sm)
// create a new message with the same data, except that it is too big
var msgs []*types.Message
invalidmsg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 128<<10),
}
msgs = append(msgs, invalidmsg)
{
messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs)
if err != nil {
t.Fatal(err)
}
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses)
if err != nil {
t.Fatal(err)
}
// the replacement message should cause a status error
assert.False(t, status.OK)
}
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
}
func TestMessagePool(t *testing.T) {
//stm: @CHAIN_MEMPOOL_GET_NONCE_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) {
Message: *msg,
Signature: *sig,
}
//stm: @CHAIN_MEMPOOL_PUSH_001
err = mp.Add(context.TODO(), sm)
assert.ErrorIs(t, err, ErrMessageTooBig)
}
@ -760,3 +764,298 @@ func TestUpdates(t *testing.T) {
t.Fatal("expected closed channel, but got an update instead")
}
}
func TestMessageBelowMinGasFee(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
// fee is just below minimum gas fee
fee := minimumBaseFee.Uint64() - 1
{
msg := &types.Message{
To: to,
From: from,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(fee),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
err = mp.Add(context.TODO(), sm)
assert.ErrorIs(t, err, ErrGasFeeCapTooLow)
}
}
func TestMessageValueTooHigh(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
totalFil := types.TotalFilecoinInt
extra := types.NewInt(1)
value := types.BigAdd(totalFil, extra)
{
msg := &types.Message{
To: to,
From: from,
Value: value,
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
err = mp.Add(context.TODO(), sm)
assert.Error(t, err)
}
}
func TestMessageSignatureInvalid(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
msg := &types.Message{
To: to,
From: from,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
badSig := &crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: make([]byte, 0),
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *badSig,
}
err = mp.Add(context.TODO(), sm)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid signature length")
}
}
func TestAddMessageTwice(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
// create a valid messages
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// try to add it twice
err = mp.Add(context.TODO(), sm)
assert.Contains(t, err.Error(), "with nonce 0 already in mpool")
}
}
func TestAddMessageTwiceNonceGap(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
// create message with invalid nonce (1)
sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// then try to add message again
err = mp.Add(context.TODO(), sm)
assert.Contains(t, err.Error(), "unfulfilled nonce gap")
}
}
func TestAddMessageTwiceCidDiff(t *testing.T) {
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// Create message with different data, so CID is different
sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64())
//stm: @CHAIN_MEMPOOL_PUSH_001
// then try to add message again
err = mp.Add(context.TODO(), sm2)
assert.Contains(t, err.Error(), "replace by fee has too low GasPremium")
}
}
func TestAddMessageTwiceCidDiffReplaced(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// Create message with different data, so CID is different
sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2)
mustAdd(t, mp, sm2)
}
}
func TestRemoveMessage(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
//stm: @CHAIN_MEMPOOL_REMOVE_001
// remove message for sender
mp.Remove(context.TODO(), from, sm.Message.Nonce, true)
//stm: @CHAIN_MEMPOOL_PENDING_FOR_001
// check messages in pool: should be none present
msgs := mp.pendingFor(context.TODO(), from)
assert.Len(t, msgs, 0)
}
}

View File

@ -1,3 +1,4 @@
//stm: #unit
package messagepool
import (
@ -16,6 +17,7 @@ import (
)
func TestRepubMessages(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001
oldRepublishBatchDelay := RepublishBatchDelay
RepublishBatchDelay = time.Microsecond
defer func() {
@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
if err != nil {
t.Fatal(err)

View File

@ -1,3 +1,4 @@
//stm: #unit
package messagepool
import (
@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) {
}
func TestMessageChains(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
mp, tma := makeTestMpool()
// the actors
@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) {
}
func TestMessageChainSkipping(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
// regression test for chain skip bug
mp, tma := makeTestMpool()
@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) {
}
func TestBasicMessageSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) {
}
func TestMessageSelectionTrimmingGas(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
}
func TestPriorityMessageSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) {
}
func TestPriorityMessageSelection2(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
}
func TestPriorityMessageSelection3(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) {
}
func TestOptimalMessageSelection1(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses just a single actor sending messages with a low tq
// the chain depenent merging algorithm should pick messages from the actor
// from the start
@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) {
}
func TestOptimalMessageSelection2(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses two actors sending messages to each other, with the first
// actor paying (much) higher gas premium than the second.
// We select with a low ticket quality; the chain depenent merging algorithm should pick
@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) {
}
func TestOptimalMessageSelection3(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses 10 actors sending a block of messages to each other, with the the first
// actors paying higher gas premium than the subsequent actors.
// We select with a low ticket quality; the chain dependent merging algorithm should pick
@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 {
}
func TestCompetitiveMessageSelectionExp(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
if testing.Short() {
t.Skip("skipping in short mode")
}
@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) {
}
func TestCompetitiveMessageSelectionZipf(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
if testing.Short() {
t.Skip("skipping in short mode")
}
@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) {
}
func TestGasReward(t *testing.T) {
//stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001
tests := []struct {
Premium uint64
FeeCap uint64
@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) {
}
func TestRealWorldSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001
// load test-messages.json.gz and rewrite the messages so that
// 1) we map each real actor to a test actor so that we can sign the messages
// 2) adjust the nonces so that they start from 0

455
itests/mempool_test.go Normal file
View File

@ -0,0 +1,455 @@
//stm: #integration
package itests
import (
"context"
"testing"
"time"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/stretchr/testify/require"
)
func TestMemPoolPushSingleNode(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, err := firstNode.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm, err := firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
// check pending messages for address
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.Equal(t, totalMessages, len(msgStatuses))
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
time.Sleep(10 * blockTime)
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
}
func TestMemPoolPushTwoNodes(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
sender2 := secondNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
addr2, _ := secondNode.WalletNew(ctx, types.KTBLS)
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
const totalMessages = 10
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
var sms []*types.SignedMessage
// push messages to message pools of both nodes
for i := 0; i < totalMessages; i++ {
// first
msg1 := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm1.Message.Nonce)
sms = append(sms, sm1)
// second
msg2 := &types.Message{
From: sender2,
To: addr2,
Value: each,
}
sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm2.Message.Nonce)
sms = append(sms, sm2)
}
time.Sleep(10 * blockTime)
pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending1))
pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending2))
// Check messages on both nodes
for _, lookMsg := range sms {
msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup1)
msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup2)
}
}
func TestMemPoolClearPending(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001
//stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// Add single message, then clear the pool
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
_, err = firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
err = firstNode.MpoolClear(ctx, true)
require.NoError(t, err)
// pool should be empty now
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
time.Sleep(2 * blockTime)
// waiting for the message should produce nothing
_, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
require.Error(t, err)
}
func TestMemPoolBatchPush(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPush(ctx, sms)
require.NoError(t, err)
// check pending messages for address
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
require.Equal(t, totalMessages, len(msgStatuses))
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
require.NoError(t, err)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
time.Sleep(10 * blockTime)
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
}
func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_PUSH_003
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
// push untrusted messages
pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage)
require.NoError(t, err)
require.Equal(t, msg.Cid(), pushedCid)
sms = append(sms, signedMessage)
}
// check pending messages for address
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.Equal(t, totalMessages, len(msgStatuses))
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
time.Sleep(10 * blockTime)
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
}
func TestMemPoolBatchPushUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blockTime)
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPushUntrusted(ctx, sms)
require.NoError(t, err)
// check pending messages for address
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
require.Equal(t, totalMessages, len(msgStatuses))
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
time.Sleep(10 * blockTime)
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, 0, len(pending))
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
}