configure SimultaneousTransfers in node/builder

This commit is contained in:
Łukasz Magiera 2020-11-25 19:57:38 +01:00
parent 97a76a4b9a
commit 2e544e3e6a
2 changed files with 32 additions and 38 deletions

View File

@ -290,7 +290,7 @@ func Online() Option {
Override(new(exchange.Server), exchange.NewServer),
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(new(dtypes.Graphsync), modules.Graphsync),
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
@ -465,12 +465,15 @@ func ConfigFullNode(c interface{}) Option {
ipfsMaddr := cfg.Client.IpfsMAddr
return Options(
ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)),
If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
),
),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),

View File

@ -1,7 +1,6 @@
package modules
import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
@ -12,45 +11,37 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"golang.org/x/xerrors"
)
// Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(clientBs)
storer := storeutil.StorerForBlockstore(clientBs)
raw, err := r.Config()
if err != nil {
return nil, err
}
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(clientBs)
storer := storeutil.StorerForBlockstore(clientBs)
cfg, ok := raw.(*config.FullNode)
if !ok {
return nil, xerrors.New("expected address of config.FullNode")
}
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(cfg.Client.SimultaneousTransfers))
chainLoader := storeutil.LoaderForBlockstore(chainBs)
chainStorer := storeutil.StorerForBlockstore(chainBs)
err = gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer)
if err != nil {
return nil, err
}
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers))
chainLoader := storeutil.LoaderForBlockstore(chainBs)
chainStorer := storeutil.StorerForBlockstore(chainBs)
err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer)
if err != nil {
return nil, err
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
}
}