Merge pull request #6612 from filecoin-project/feat/miner-par-transfers-cfg
Miner SimultaneousTransfers config
This commit is contained in:
commit
49a709a123
@ -1,12 +1,22 @@
|
|||||||
package itests
|
package itests
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/itests/kit"
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDealCyclesConcurrent(t *testing.T) {
|
func TestDealCyclesConcurrent(t *testing.T) {
|
||||||
@ -47,3 +57,71 @@ func TestDealCyclesConcurrent(t *testing.T) {
|
|||||||
t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
|
t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSimultenousTransferLimit(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
blockTime := 10 * time.Millisecond
|
||||||
|
|
||||||
|
// For these tests where the block time is artificially short, just use
|
||||||
|
// a deal start epoch that is guaranteed to be far enough in the future
|
||||||
|
// so that the deal starts sealing in time
|
||||||
|
startEpoch := abi.ChainEpoch(2 << 12)
|
||||||
|
|
||||||
|
runTest := func(t *testing.T) {
|
||||||
|
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
|
||||||
|
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(2))),
|
||||||
|
))
|
||||||
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
dh := kit.NewDealHarness(t, client, miner)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
du, err := miner.MarketDataTransferUpdates(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var maxOngoing int
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
ongoing := map[datatransfer.TransferID]struct{}{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case u := <-du:
|
||||||
|
t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status])
|
||||||
|
if u.Status == datatransfer.Ongoing {
|
||||||
|
ongoing[u.TransferID] = struct{}{}
|
||||||
|
} else {
|
||||||
|
delete(ongoing, u.TransferID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ongoing) > maxOngoing {
|
||||||
|
maxOngoing = len(ongoing)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
|
||||||
|
N: 1, // TODO: set to 20 after https://github.com/ipfs/go-graphsync/issues/175 is fixed
|
||||||
|
FastRetrieval: true,
|
||||||
|
StartEpoch: startEpoch,
|
||||||
|
})
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
require.LessOrEqual(t, maxOngoing, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
runTest(t)
|
||||||
|
}
|
||||||
|
@ -238,7 +238,7 @@ var LibP2P = Options(
|
|||||||
Override(ConnGaterKey, lp2p.ConnGaterOption),
|
Override(ConnGaterKey, lp2p.ConnGaterOption),
|
||||||
)
|
)
|
||||||
|
|
||||||
func isType(t repo.RepoType) func(s *Settings) bool {
|
func IsType(t repo.RepoType) func(s *Settings) bool {
|
||||||
return func(s *Settings) bool { return s.nodeType == t }
|
return func(s *Settings) bool { return s.nodeType == t }
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -299,7 +299,7 @@ var ChainNode = Options(
|
|||||||
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
|
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
|
||||||
|
|
||||||
// Shared graphsync (markets, serving chain)
|
// Shared graphsync (markets, serving chain)
|
||||||
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
|
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultSimultaneousTransfers)),
|
||||||
|
|
||||||
// Service: Wallet
|
// Service: Wallet
|
||||||
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
|
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
|
||||||
@ -403,7 +403,7 @@ var MinerNode = Options(
|
|||||||
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
|
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
|
||||||
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
||||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||||
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
|
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(config.DefaultSimultaneousTransfers)),
|
||||||
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
|
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
|
||||||
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
||||||
|
|
||||||
@ -468,7 +468,7 @@ func Online() Option {
|
|||||||
LibP2P,
|
LibP2P,
|
||||||
|
|
||||||
ApplyIf(isFullOrLiteNode, ChainNode),
|
ApplyIf(isFullOrLiteNode, ChainNode),
|
||||||
ApplyIf(isType(repo.StorageMiner), MinerNode),
|
ApplyIf(IsType(repo.StorageMiner), MinerNode),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -606,6 +606,8 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
})),
|
})),
|
||||||
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
|
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
|
||||||
|
|
||||||
|
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)),
|
||||||
|
|
||||||
Override(new(sectorstorage.SealerConfig), cfg.Storage),
|
Override(new(sectorstorage.SealerConfig), cfg.Storage),
|
||||||
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
|
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
|
||||||
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
|
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
|
||||||
@ -678,8 +680,8 @@ func Repo(r repo.Repo) Option {
|
|||||||
|
|
||||||
Override(new(*dtypes.APIAlg), modules.APISecret),
|
Override(new(*dtypes.APIAlg), modules.APISecret),
|
||||||
|
|
||||||
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
|
ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)),
|
||||||
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
|
ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)),
|
||||||
)(settings)
|
)(settings)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,9 @@ type DealmakingConfig struct {
|
|||||||
// as a multiplier of the minimum collateral bound
|
// as a multiplier of the minimum collateral bound
|
||||||
MaxProviderCollateralMultiplier uint64
|
MaxProviderCollateralMultiplier uint64
|
||||||
|
|
||||||
|
// The maximum number of parallel online data transfers (storage+retrieval)
|
||||||
|
SimultaneousTransfers uint64
|
||||||
|
|
||||||
Filter string
|
Filter string
|
||||||
RetrievalFilter string
|
RetrievalFilter string
|
||||||
|
|
||||||
@ -362,6 +365,8 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
MaxDealsPerPublishMsg: 8,
|
MaxDealsPerPublishMsg: 8,
|
||||||
MaxProviderCollateralMultiplier: 2,
|
MaxProviderCollateralMultiplier: 2,
|
||||||
|
|
||||||
|
SimultaneousTransfers: DefaultSimultaneousTransfers,
|
||||||
|
|
||||||
RetrievalPricing: &RetrievalPricing{
|
RetrievalPricing: &RetrievalPricing{
|
||||||
Strategy: RetrievalPricingDefaultMode,
|
Strategy: RetrievalPricingDefaultMode,
|
||||||
Default: &RetrievalPricingDefault{
|
Default: &RetrievalPricingDefault{
|
||||||
|
@ -431,14 +431,16 @@ func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBloc
|
|||||||
|
|
||||||
// StagingGraphsync creates a graphsync instance which reads and writes blocks
|
// StagingGraphsync creates a graphsync instance which reads and writes blocks
|
||||||
// to the StagingBlockstore
|
// to the StagingBlockstore
|
||||||
func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
|
func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
|
||||||
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
|
||||||
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
|
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
|
||||||
loader := storeutil.LoaderForBlockstore(ibs)
|
loader := storeutil.LoaderForBlockstore(ibs)
|
||||||
storer := storeutil.StorerForBlockstore(ibs)
|
storer := storeutil.StorerForBlockstore(ibs)
|
||||||
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault())
|
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers))
|
||||||
|
|
||||||
return gs
|
return gs
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) {
|
func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) {
|
||||||
minerAddr, err := minerAddrFromDS(ds)
|
minerAddr, err := minerAddrFromDS(ds)
|
||||||
|
Loading…
Reference in New Issue
Block a user