Miner SimultaneousTransfers config

This commit is contained in:
Łukasz Magiera 2021-06-28 11:39:01 +02:00
parent cefd140e45
commit 37c5dd5afc
3 changed files with 17 additions and 8 deletions

View File

@ -299,7 +299,7 @@ var ChainNode = Options(
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
// Shared graphsync (markets, serving chain) // Shared graphsync (markets, serving chain)
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)), Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultSimultaneousTransfers)),
// Service: Wallet // Service: Wallet
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner), Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
@ -403,7 +403,7 @@ var MinerNode = Options(
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore), Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(config.DefaultSimultaneousTransfers)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
@ -606,6 +606,8 @@ func ConfigStorageMiner(c interface{}) Option {
})), })),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)),
Override(new(sectorstorage.SealerConfig), cfg.Storage), Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)), Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),

View File

@ -78,6 +78,9 @@ type DealmakingConfig struct {
// as a multiplier of the minimum collateral bound // as a multiplier of the minimum collateral bound
MaxProviderCollateralMultiplier uint64 MaxProviderCollateralMultiplier uint64
// The maximum number of parallel online data transfers (storage+retrieval)
SimultaneousTransfers uint64
Filter string Filter string
RetrievalFilter string RetrievalFilter string
@ -362,6 +365,8 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8, MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2, MaxProviderCollateralMultiplier: 2,
SimultaneousTransfers: DefaultSimultaneousTransfers,
RetrievalPricing: &RetrievalPricing{ RetrievalPricing: &RetrievalPricing{
Strategy: RetrievalPricingDefaultMode, Strategy: RetrievalPricingDefaultMode,
Default: &RetrievalPricingDefault{ Default: &RetrievalPricingDefault{

View File

@ -431,13 +431,15 @@ func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBloc
// StagingGraphsync creates a graphsync instance which reads and writes blocks // StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore // to the StagingBlockstore
func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync { func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h) return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
loader := storeutil.LoaderForBlockstore(ibs) graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
storer := storeutil.StorerForBlockstore(ibs) loader := storeutil.LoaderForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault()) storer := storeutil.StorerForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers))
return gs return gs
}
} }
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) { func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) {