diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index d402f65ed..b034698a2 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -207,6 +207,17 @@ # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE #SimultaneousTransfersForStorage = 20 + # The maximum number of simultaneous data transfers from any single client + # for storage deals. + # Unset by default (0), and values higher than SimultaneousTransfersForStorage + # will have no effect; i.e. the total number of simultaneous data transfers + # across all storage clients is bound by SimultaneousTransfersForStorage + # regardless of this number. + # + # type: uint64 + # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGEPERCLIENT + #SimultaneousTransfersForStoragePerClient = 0 + # The maximum number of parallel online data transfers for retrieval deals # # type: uint64 diff --git a/itests/deals_concurrent_test.go b/itests/deals_concurrent_test.go index c0458e8d1..49a8bb008 100644 --- a/itests/deals_concurrent_test.go +++ b/itests/deals_concurrent_test.go @@ -139,7 +139,7 @@ func TestSimultanenousTransferLimit(t *testing.T) { ) runTest := func(t *testing.T) { client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( - node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))), + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))), node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)), )) ens.InterconnectAll().BeginMining(250 * time.Millisecond) diff --git a/node/builder_miner.go b/node/builder_miner.go index 74b0c5558..c8a04255f 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option { If(cfg.Subsystems.EnableMarkets, // Markets Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), - Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)), + Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForStoragePerClient, cfg.Dealmaking.SimultaneousTransfersForRetrieval)), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), diff --git a/node/config/def.go b/node/config/def.go index 735107e29..4a525e697 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -160,8 +160,9 @@ func DefaultStorageMiner() *StorageMiner { MaxDealsPerPublishMsg: 8, MaxProviderCollateralMultiplier: 2, - SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, - SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, + SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, + SimultaneousTransfersForStoragePerClient: 0, + SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 296501edc..eded0b1fe 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -272,6 +272,17 @@ passed to the sealing node by the markets service. 0 is unlimited.`, Comment: `The maximum number of parallel online data transfers for storage deals`, }, + { + Name: "SimultaneousTransfersForStoragePerClient", + Type: "uint64", + + Comment: `The maximum number of simultaneous data transfers from any single client +for storage deals. +Unset by default (0), and values higher than SimultaneousTransfersForStorage +will have no effect; i.e. the total number of simultaneous data transfers +across all storage clients is bound by SimultaneousTransfersForStorage +regardless of this number.`, + }, { Name: "SimultaneousTransfersForRetrieval", Type: "uint64", diff --git a/node/config/types.go b/node/config/types.go index 1be40029e..2ae2d8eee 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -131,6 +131,13 @@ type DealmakingConfig struct { MaxStagingDealsBytes int64 // The maximum number of parallel online data transfers for storage deals SimultaneousTransfersForStorage uint64 + // The maximum number of simultaneous data transfers from any single client + // for storage deals. + // Unset by default (0), and values higher than SimultaneousTransfersForStorage + // will have no effect; i.e. the total number of simultaneous data transfers + // across all storage clients is bound by SimultaneousTransfersForStorage + // regardless of this number. + SimultaneousTransfersForStoragePerClient uint64 // The maximum number of parallel online data transfers for retrieval deals SimultaneousTransfersForRetrieval uint64 // Minimum start epoch buffer to give time for sealing of sector with deal. diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 3eb6f3c3c..552f43660 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -39,7 +39,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" graphsync "github.com/ipfs/go-graphsync/impl" - graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" "github.com/ipfs/go-graphsync/storeutil" "github.com/libp2p/go-libp2p-core/host" @@ -396,7 +395,7 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe // StagingGraphsync creates a graphsync instance which reads and writes blocks // to the StagingBlockstore -func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { +func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) lsys := storeutil.LinkSystemForBlockstore(ibs) @@ -405,9 +404,10 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe lsys, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval), + graphsync.MaxInProgressIncomingRequestsPerPeer(parallelTransfersForStoragePerPeer), graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage), - graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), - graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) + graphsync.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), + graphsync.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) graphsyncStats(mctx, lc, gs) diff --git a/tools/packer/repo/config.toml b/tools/packer/repo/config.toml index 900dad218..380d5a28f 100644 --- a/tools/packer/repo/config.toml +++ b/tools/packer/repo/config.toml @@ -21,6 +21,7 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"] # IpfsMAddr = "" # IpfsUseForRetrieval = false # SimultaneousTransfersForStorage = 20 +# SimultaneousTransfersForStoragePerClient = 0 # SimultaneousTransfersForRetrieval = 20 # [Metrics]