feat(graphsync): allow setting of per-peer incoming requests for miners

This commit is contained in:
Rod Vagg 2021-10-28 22:39:57 +11:00
parent 690be5bf7f
commit 9e7d9affbe
No known key found for this signature in database
GPG Key ID: C273792F7D83545D
8 changed files with 39 additions and 8 deletions

View File

@ -207,6 +207,17 @@
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE # env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGE
#SimultaneousTransfersForStorage = 20 #SimultaneousTransfersForStorage = 20
# The maximum number of simultaneous data transfers from any single client
# for storage deals.
# Unset by default (0), and values higher than SimultaneousTransfersForStorage
# will have no effect; i.e. the total number of simultaneous data transfers
# across all storage clients is bound by SimultaneousTransfersForStorage
# regardless of this number.
#
# type: uint64
# env var: LOTUS_DEALMAKING_SIMULTANEOUSTRANSFERSFORSTORAGEPERCLIENT
#SimultaneousTransfersForStoragePerClient = 0
# The maximum number of parallel online data transfers for retrieval deals # The maximum number of parallel online data transfers for retrieval deals
# #
# type: uint64 # type: uint64

View File

@ -139,7 +139,7 @@ func TestSimultanenousTransferLimit(t *testing.T) {
) )
runTest := func(t *testing.T) { runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))), node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)), node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)),
)) ))
ens.InterconnectAll().BeginMining(250 * time.Millisecond) ens.InterconnectAll().BeginMining(250 * time.Millisecond)

View File

@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableMarkets, If(cfg.Subsystems.EnableMarkets,
// Markets // Markets
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForStoragePerClient, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

View File

@ -160,8 +160,9 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8, MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2, MaxProviderCollateralMultiplier: 2,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers, SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers, SimultaneousTransfersForStoragePerClient: 0,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed

View File

@ -272,6 +272,17 @@ passed to the sealing node by the markets service. 0 is unlimited.`,
Comment: `The maximum number of parallel online data transfers for storage deals`, Comment: `The maximum number of parallel online data transfers for storage deals`,
}, },
{
Name: "SimultaneousTransfersForStoragePerClient",
Type: "uint64",
Comment: `The maximum number of simultaneous data transfers from any single client
for storage deals.
Unset by default (0), and values higher than SimultaneousTransfersForStorage
will have no effect; i.e. the total number of simultaneous data transfers
across all storage clients is bound by SimultaneousTransfersForStorage
regardless of this number.`,
},
{ {
Name: "SimultaneousTransfersForRetrieval", Name: "SimultaneousTransfersForRetrieval",
Type: "uint64", Type: "uint64",

View File

@ -131,6 +131,13 @@ type DealmakingConfig struct {
MaxStagingDealsBytes int64 MaxStagingDealsBytes int64
// The maximum number of parallel online data transfers for storage deals // The maximum number of parallel online data transfers for storage deals
SimultaneousTransfersForStorage uint64 SimultaneousTransfersForStorage uint64
// The maximum number of simultaneous data transfers from any single client
// for storage deals.
// Unset by default (0), and values higher than SimultaneousTransfersForStorage
// will have no effect; i.e. the total number of simultaneous data transfers
// across all storage clients is bound by SimultaneousTransfersForStorage
// regardless of this number.
SimultaneousTransfersForStoragePerClient uint64
// The maximum number of parallel online data transfers for retrieval deals // The maximum number of parallel online data transfers for retrieval deals
SimultaneousTransfersForRetrieval uint64 SimultaneousTransfersForRetrieval uint64
// Minimum start epoch buffer to give time for sealing of sector with deal. // Minimum start epoch buffer to give time for sealing of sector with deal.

View File

@ -39,7 +39,6 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl" graphsync "github.com/ipfs/go-graphsync/impl"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil" "github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -396,7 +395,7 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe
// StagingGraphsync creates a graphsync instance which reads and writes blocks // StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore // to the StagingBlockstore
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForStoragePerPeer uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h) graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
lsys := storeutil.LinkSystemForBlockstore(ibs) lsys := storeutil.LinkSystemForBlockstore(ibs)
@ -405,9 +404,10 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
lsys, lsys,
graphsync.RejectAllRequestsByDefault(), graphsync.RejectAllRequestsByDefault(),
graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval), graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval),
graphsync.MaxInProgressIncomingRequestsPerPeer(parallelTransfersForStoragePerPeer),
graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage), graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage),
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks), graphsync.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks)) graphsync.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))
graphsyncStats(mctx, lc, gs) graphsyncStats(mctx, lc, gs)

View File

@ -21,6 +21,7 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"]
# IpfsMAddr = "" # IpfsMAddr = ""
# IpfsUseForRetrieval = false # IpfsUseForRetrieval = false
# SimultaneousTransfersForStorage = 20 # SimultaneousTransfersForStorage = 20
# SimultaneousTransfersForStoragePerClient = 0
# SimultaneousTransfersForRetrieval = 20 # SimultaneousTransfersForRetrieval = 20
# #
[Metrics] [Metrics]