// stm: #integration package itests import ( "context" "fmt" "sync" "testing" "time" provider "github.com/ipni/index-provider" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/shared_testutil" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) // TestDealWithMarketAndMinerNode is running concurrently a number of storage and retrieval deals towards a miner // architecture where the `mining/sealing/proving` node is a separate process from the `markets` node func TestDealWithMarketAndMinerNode(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 if testing.Short() { t.Skip("skipping test in short mode") } t.Skip("skipping due to flakiness: see #6956") kit.QuietMiningLogs() // For these tests where the block time is artificially short, just use // a deal start epoch that is guaranteed to be far enough in the future // so that the deal starts sealing in time startEpoch := abi.ChainEpoch(8 << 10) runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) { api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me idxProv := shared_testutil.NewMockIndexProvider() idxProvOpt := kit.ConstructorOpts(node.Override(new(provider.Interface), idxProv)) client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC(), idxProvOpt) dh := kit.NewDealHarness(t, client, main, market) dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ N: n, FastRetrieval: fastRetrieval, CarExport: carExport, StartEpoch: startEpoch, IndexProvider: idxProv, }) } // this test is expensive because we don't use mock proofs; do a single cycle. cycles := []int{4} for _, n := range cycles { n := n ns := fmt.Sprintf("%d", n) t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) }) t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) }) t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, false, true) }) t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) } } func TestDealCyclesConcurrent(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 //stm: @CHAIN_INCOMING_HANDLE_INCOMING_BLOCKS_001, @CHAIN_INCOMING_VALIDATE_BLOCK_PUBSUB_001, @CHAIN_INCOMING_VALIDATE_MESSAGE_PUBSUB_001 if testing.Short() { t.Skip("skipping test in short mode") } kit.QuietMiningLogs() // For these tests where the block time is artificially short, just use // a deal start epoch that is guaranteed to be far enough in the future // so that the deal starts sealing in time startEpoch := abi.ChainEpoch(2 << 12) runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) { client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs()) ens.InterconnectAll().BeginMining(250 * time.Millisecond) dh := kit.NewDealHarness(t, client, miner, miner) dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ N: n, FastRetrieval: fastRetrieval, CarExport: carExport, StartEpoch: startEpoch, }) } // this test is cheap because we use mock proofs, do various cycles cycles := []int{2, 4, 8, 16} for _, n := range cycles { n := n ns := fmt.Sprintf("%d", n) t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) }) t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) }) t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, false, true) }) t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) } } func TestSimultanenousTransferLimit(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 t.Skip("skipping as flaky #7152") if testing.Short() { t.Skip("skipping test in short mode") } kit.QuietMiningLogs() // For these tests where the block time is artificially short, just use // a deal start epoch that is guaranteed to be far enough in the future // so that the deal starts sealing in time startEpoch := abi.ChainEpoch(2 << 12) const ( graphsyncThrottle = 2 concurrency = 20 ) runTest := func(t *testing.T) { client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))), node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)), )) ens.InterconnectAll().BeginMining(250 * time.Millisecond) dh := kit.NewDealHarness(t, client, miner, miner) ctx, cancel := context.WithCancel(context.Background()) du, err := miner.MarketDataTransferUpdates(ctx) require.NoError(t, err) var maxOngoing int var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() ongoing := map[datatransfer.TransferID]struct{}{} for { select { case u := <-du: t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status]) if u.Status == datatransfer.Ongoing && u.Transferred > 0 { ongoing[u.TransferID] = struct{}{} } else { delete(ongoing, u.TransferID) } if len(ongoing) > maxOngoing { maxOngoing = len(ongoing) } case <-ctx.Done(): return } } }() t.Logf("running concurrent deals: %d", concurrency) dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ N: concurrency, FastRetrieval: true, StartEpoch: startEpoch, }) t.Logf("all deals finished") cancel() wg.Wait() // The eventing systems across go-data-transfer and go-graphsync // are racy, and that's why we can't enforce graphsyncThrottle exactly, // without making this test racy. // // Essentially what could happen is that the graphsync layer starts the // next transfer before the go-data-transfer FSM has the opportunity to // move the previously completed transfer to the next stage, thus giving // the appearance that more than graphsyncThrottle transfers are // in progress. // // Concurrency (20) is x10 higher than graphsyncThrottle (2), so if all // 20 transfers are not happening at once, we know the throttle is // in effect. Thus we are a little bit lenient here to account for the // above races and allow up to graphsyncThrottle*2. require.LessOrEqual(t, maxOngoing, graphsyncThrottle*2) } runTest(t) }