From 37c5dd5afc61199768b37dc521695f248bfab32f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 28 Jun 2021 11:39:01 +0200 Subject: [PATCH 1/2] Miner SimultaneousTransfers config --- node/builder.go | 6 ++++-- node/config/def.go | 5 +++++ node/modules/storageminer.go | 14 ++++++++------ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/node/builder.go b/node/builder.go index c737e85b8..f5294b8a3 100644 --- a/node/builder.go +++ b/node/builder.go @@ -299,7 +299,7 @@ var ChainNode = Options( Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), // Shared graphsync (markets, serving chain) - Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)), + Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultSimultaneousTransfers)), // Service: Wallet Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner), @@ -403,7 +403,7 @@ var MinerNode = Options( Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingDAG), modules.StagingDAG), - Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync), + Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(config.DefaultSimultaneousTransfers)), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), @@ -606,6 +606,8 @@ func ConfigStorageMiner(c interface{}) Option { })), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)), + Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)), + Override(new(sectorstorage.SealerConfig), cfg.Storage), Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)), Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), diff --git a/node/config/def.go b/node/config/def.go index b2ed43ac6..b331b1f49 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -78,6 +78,9 @@ type DealmakingConfig struct { // as a multiplier of the minimum collateral bound MaxProviderCollateralMultiplier uint64 + // The maximum number of parallel online data transfers (storage+retrieval) + SimultaneousTransfers uint64 + Filter string RetrievalFilter string @@ -362,6 +365,8 @@ func DefaultStorageMiner() *StorageMiner { MaxDealsPerPublishMsg: 8, MaxProviderCollateralMultiplier: 2, + SimultaneousTransfers: DefaultSimultaneousTransfers, + RetrievalPricing: &RetrievalPricing{ Strategy: RetrievalPricingDefaultMode, Default: &RetrievalPricingDefault{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 5ed092d3c..09b1e2dfd 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -431,13 +431,15 @@ func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBloc // StagingGraphsync creates a graphsync instance which reads and writes blocks // to the StagingBlockstore -func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { - graphsyncNetwork := gsnet.NewFromLibp2pHost(h) - loader := storeutil.LoaderForBlockstore(ibs) - storer := storeutil.StorerForBlockstore(ibs) - gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault()) +func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { + graphsyncNetwork := gsnet.NewFromLibp2pHost(h) + loader := storeutil.LoaderForBlockstore(ibs) + storer := storeutil.StorerForBlockstore(ibs) + gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers)) - return gs + return gs + } } func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) { From e9dd3e86502d4337c905f5e20571d94e815b4aca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 28 Jun 2021 18:17:22 +0200 Subject: [PATCH 2/2] Test Miner SimultaneousTransfers --- itests/deals_concurrent_test.go | 78 +++++++++++++++++++++++++++++++++ node/builder.go | 8 ++-- 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/itests/deals_concurrent_test.go b/itests/deals_concurrent_test.go index 33a8218dd..44b25c7b3 100644 --- a/itests/deals_concurrent_test.go +++ b/itests/deals_concurrent_test.go @@ -1,12 +1,22 @@ package itests import ( + "context" "fmt" + "sync" "testing" "time" + "github.com/stretchr/testify/require" + + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" ) func TestDealCyclesConcurrent(t *testing.T) { @@ -47,3 +57,71 @@ func TestDealCyclesConcurrent(t *testing.T) { t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) } } + +func TestSimultenousTransferLimit(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + blockTime := 10 * time.Millisecond + + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + startEpoch := abi.ChainEpoch(2 << 12) + + runTest := func(t *testing.T) { + client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(2))), + )) + ens.InterconnectAll().BeginMining(blockTime) + dh := kit.NewDealHarness(t, client, miner) + + ctx, cancel := context.WithCancel(context.Background()) + + du, err := miner.MarketDataTransferUpdates(ctx) + require.NoError(t, err) + + var maxOngoing int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + ongoing := map[datatransfer.TransferID]struct{}{} + + for { + select { + case u := <-du: + t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status]) + if u.Status == datatransfer.Ongoing { + ongoing[u.TransferID] = struct{}{} + } else { + delete(ongoing, u.TransferID) + } + + if len(ongoing) > maxOngoing { + maxOngoing = len(ongoing) + } + case <-ctx.Done(): + return + } + } + }() + + dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ + N: 1, // TODO: set to 20 after https://github.com/ipfs/go-graphsync/issues/175 is fixed + FastRetrieval: true, + StartEpoch: startEpoch, + }) + + cancel() + wg.Wait() + + require.LessOrEqual(t, maxOngoing, 2) + } + + runTest(t) +} diff --git a/node/builder.go b/node/builder.go index f5294b8a3..884261a89 100644 --- a/node/builder.go +++ b/node/builder.go @@ -238,7 +238,7 @@ var LibP2P = Options( Override(ConnGaterKey, lp2p.ConnGaterOption), ) -func isType(t repo.RepoType) func(s *Settings) bool { +func IsType(t repo.RepoType) func(s *Settings) bool { return func(s *Settings) bool { return s.nodeType == t } } @@ -468,7 +468,7 @@ func Online() Option { LibP2P, ApplyIf(isFullOrLiteNode, ChainNode), - ApplyIf(isType(repo.StorageMiner), MinerNode), + ApplyIf(IsType(repo.StorageMiner), MinerNode), ) } @@ -680,8 +680,8 @@ func Repo(r repo.Repo) Option { Override(new(*dtypes.APIAlg), modules.APISecret), - ApplyIf(isType(repo.FullNode), ConfigFullNode(c)), - ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)), + ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)), + ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)), )(settings) } }