Merge pull request #7578 from filecoin-project/rvagg/SimultaneousTransfersForStoragePerClient

feat(graphsync): allow setting of per-peer incoming requests for miners
This commit is contained in:
Łukasz Magiera 2021-12-17 14:27:15 +01:00 committed by GitHub
commit a4728d3c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 39 additions and 8 deletions

View File

@ -207,6 +207,17 @@
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE
#SimultaneousTransfersForStorage = 20
# The maximum number of simultaneous data transfers from any single client
# for storage deals.
# Unset by default (0), and values higher than SimultaneousTransfersForStorage
# will have no effect; i.e. the total number of simultaneous data transfers
# across all storage clients is bound by SimultaneousTransfersForStorage
# regardless of this number.
#
# type: uint64
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGEPERCLIENT
#SimultaneousTransfersForStoragePerClient = 0
# The maximum number of parallel online data transfers for retrieval deals
#
# type: uint64

View File

@ -139,7 +139,7 @@ func TestSimultanenousTransferLimit(t *testing.T) {
)
runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))),
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)),
))
ens.InterconnectAll().BeginMining(250 * time.Millisecond)

View File

@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableMarkets,
// Markets
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForStoragePerClient, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

View File

@ -160,8 +160,9 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForStoragePerClient: 0,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed

View File

@ -272,6 +272,17 @@ passed to the sealing node by the markets service. 0 is unlimited.`,
Comment: `The maximum number of parallel online data transfers for storage deals`,
},
{
Name: "SimultaneousTransfersForStoragePerClient",
Type: "uint64",
Comment: `The maximum number of simultaneous data transfers from any single client
for storage deals.
Unset by default (0), and values higher than SimultaneousTransfersForStorage
will have no effect; i.e. the total number of simultaneous data transfers
across all storage clients is bound by SimultaneousTransfersForStorage
regardless of this number.`,
},
{
Name: "SimultaneousTransfersForRetrieval",
Type: "uint64",

View File

@ -131,6 +131,13 @@ type DealmakingConfig struct {
MaxStagingDealsBytes int64
// The maximum number of parallel online data transfers for storage deals
SimultaneousTransfersForStorage uint64
// The maximum number of simultaneous data transfers from any single client
// for storage deals.
// Unset by default (0), and values higher than SimultaneousTransfersForStorage
// will have no effect; i.e. the total number of simultaneous data transfers
// across all storage clients is bound by SimultaneousTransfersForStorage
// regardless of this number.
SimultaneousTransfersForStoragePerClient uint64
// The maximum number of parallel online data transfers for retrieval deals
SimultaneousTransfersForRetrieval uint64
// Minimum start epoch buffer to give time for sealing of sector with deal.

View File

@ -39,7 +39,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host"
@ -396,7 +395,7 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe
// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
lsys := storeutil.LinkSystemForBlockstore(ibs)
@ -405,9 +404,10 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
lsys,
graphsync.RejectAllRequestsByDefault(),
graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval),
graphsync.MaxInProgressIncomingRequestsPerPeer(parallelTransfersForStoragePerPeer),
graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage),
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
graphsync.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsync.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
graphsyncStats(mctx, lc, gs)

View File

@ -21,6 +21,7 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"]
# IpfsMAddr = ""
# IpfsUseForRetrieval = false
# SimultaneousTransfersForStorage = 20
# SimultaneousTransfersForStoragePerClient = 0
# SimultaneousTransfersForRetrieval = 20
#
[Metrics]