// stm: #integration package itests import ( "context" "testing" "time" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" ) func TestActorEventsMpool(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) ens.InterconnectAll().BeginMining(10 * time.Millisecond) // create a new address where to send funds. addr, err := client.WalletNew(ctx, types.KTBLS) require.NoError(t, err) // get the existing balance from the default wallet to then split it. bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) require.NoError(t, err) // install filter filterID, err := client.EthNewPendingTransactionFilter(ctx) require.NoError(t, err) const iterations = 100 // we'll send half our balance (saving the other half for gas), // in `iterations` increments. toSend := big.Div(bal, big.NewInt(2)) each := big.Div(toSend, big.NewInt(iterations)) waitAllCh := make(chan struct{}) go func() { headChangeCh, err := client.ChainNotify(ctx) require.NoError(t, err) <-headChangeCh // skip hccurrent count := 0 for { select { case headChanges := <-headChangeCh: for _, change := range headChanges { if change.Type == store.HCApply { msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) require.NoError(t, err) count += len(msgs) if count == iterations { waitAllCh <- struct{}{} } } } } } }() var sms []*types.SignedMessage for i := 0; i < iterations; i++ { msg := &types.Message{ From: client.DefaultKey.Address, To: addr, Value: each, } sm, err := client.MpoolPushMessage(ctx, msg, nil) require.NoError(t, err) require.EqualValues(t, i, sm.Message.Nonce) sms = append(sms, sm) } select { case <-waitAllCh: case <-time.After(time.Minute): t.Errorf("timeout to wait for pack messages") } // collect filter results res, err := client.EthGetFilterChanges(ctx, filterID) require.NoError(t, err) // expect to have seen iteration number of mpool messages require.Equal(t, iterations, len(res.NewTransactionHashes)) } func TestActorEventsTipsets(t *testing.T) { ctx := context.Background() kit.QuietMiningLogs() client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) ens.InterconnectAll().BeginMining(10 * time.Millisecond) // create a new address where to send funds. addr, err := client.WalletNew(ctx, types.KTBLS) require.NoError(t, err) // get the existing balance from the default wallet to then split it. bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) require.NoError(t, err) // install filter filterID, err := client.EthNewBlockFilter(ctx) require.NoError(t, err) const iterations = 100 // we'll send half our balance (saving the other half for gas), // in `iterations` increments. toSend := big.Div(bal, big.NewInt(2)) each := big.Div(toSend, big.NewInt(iterations)) waitAllCh := make(chan struct{}) go func() { headChangeCh, err := client.ChainNotify(ctx) require.NoError(t, err) <-headChangeCh // skip hccurrent count := 0 for { select { case headChanges := <-headChangeCh: for _, change := range headChanges { if change.Type == store.HCApply { msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) require.NoError(t, err) count += len(msgs) if count == iterations { waitAllCh <- struct{}{} } } } } } }() var sms []*types.SignedMessage for i := 0; i < iterations; i++ { msg := &types.Message{ From: client.DefaultKey.Address, To: addr, Value: each, } sm, err := client.MpoolPushMessage(ctx, msg, nil) require.NoError(t, err) require.EqualValues(t, i, sm.Message.Nonce) sms = append(sms, sm) } select { case <-waitAllCh: case <-time.After(time.Minute): t.Errorf("timeout to wait for pack messages") } // collect filter results res, err := client.EthGetFilterChanges(ctx, filterID) require.NoError(t, err) // expect to have seen iteration number of tipsets require.Equal(t, iterations, len(res.NewBlockHashes)) }