package itests import ( "bytes" "context" "testing" "time" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/node" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/stretchr/testify/require" ) func TestPublishDealsBatching(t *testing.T) { var ( ctx = context.Background() publishPeriod = 10 * time.Second maxDealsPerMsg = uint64(2) // Set max deals per publish deals message to 2 startEpoch = abi.ChainEpoch(2 << 12) ) kit.QuietMiningLogs() opts := node.Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ Period: publishPeriod, MaxDealsPerMsg: maxDealsPerMsg, }), ) client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(opts)) ens.InterconnectAll().BeginMining(10 * time.Millisecond) dh := kit.NewDealHarness(t, client, miner) // Starts a deal and waits until it's published runDealTillPublish := func(rseed int) { res, _ := client.CreateImportFile(ctx, rseed, 0) upds, err := client.ClientGetDealUpdates(ctx) require.NoError(t, err) dh.StartDeal(ctx, res.Root, false, startEpoch) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) done := make(chan struct{}) go func() { for upd := range upds { if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit { done <- struct{}{} } } }() <-done } // Run three deals in parallel done := make(chan struct{}, maxDealsPerMsg+1) for rseed := 1; rseed <= 3; rseed++ { rseed := rseed go func() { runDealTillPublish(rseed) done <- struct{}{} }() } // Wait for two of the deals to be published for i := 0; i < int(maxDealsPerMsg); i++ { <-done } // Expect a single PublishStorageDeals message that includes the first two deals msgCids, err := client.StateListMessages(ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1) require.NoError(t, err) count := 0 for _, msgCid := range msgCids { msg, err := client.ChainGetMessage(ctx, msgCid) require.NoError(t, err) if msg.Method == market.Methods.PublishStorageDeals { count++ var pubDealsParams market2.PublishStorageDealsParams err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params)) require.NoError(t, err) require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg)) } } require.Equal(t, 1, count) // The third deal should be published once the publish period expires. // Allow a little padding as it takes a moment for the state change to // be noticed by the client. padding := 10 * time.Second select { case <-time.After(publishPeriod + padding): require.Fail(t, "Expected 3rd deal to be published once publish period elapsed") case <-done: // Success } }