From 694834e8d5c1831bbd680d8874f5c3c4822915b2 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 24 Nov 2020 14:32:30 -0800 Subject: [PATCH 1/3] feat(graphsync): configure simultaneous requests allow configuration of the number of simultaneous requests graphsync will process at once --- go.mod | 2 +- go.sum | 2 ++ node/config/def.go | 13 +++++++++---- node/modules/graphsync.go | 19 ++++++++++++++++--- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index e0b970b3c..a32929168 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.5.0 + github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 diff --git a/go.sum b/go.sum index 21e99f85b..00d3e6a88 100644 --- a/go.sum +++ b/go.sum @@ -569,6 +569,8 @@ github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6 github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM= github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 h1:PexIl92Qi3/c+gOREQP/6bv6/5/+ZbmGND21a7ZX6Yc= +github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= diff --git a/node/config/def.go b/node/config/def.go index 3e109e84a..7e45b88a3 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -105,10 +105,11 @@ type Metrics struct { } type Client struct { - UseIpfs bool - IpfsOnlineMode bool - IpfsMAddr string - IpfsUseForRetrieval bool + UseIpfs bool + IpfsOnlineMode bool + IpfsMAddr string + IpfsUseForRetrieval bool + SimultaneousTransfers uint64 } type Wallet struct { @@ -149,6 +150,7 @@ func defCommon() Common { } var DefaultDefaultMaxFee = types.MustParseFIL("0.007") +var DefaultSimultaneousTransfers = uint64(20) // DefaultFullNode returns the default config func DefaultFullNode() *FullNode { @@ -157,6 +159,9 @@ func DefaultFullNode() *FullNode { Fees: FeeConfig{ DefaultMaxFee: DefaultDefaultMaxFee, }, + Client: Client{ + SimultaneousTransfers: DefaultSimultaneousTransfers, + }, } } diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 9bdc9bcca..8ba372cc8 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -1,8 +1,10 @@ package modules import ( + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" @@ -10,17 +12,28 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" + "golang.org/x/xerrors" ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { +func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) loader := storeutil.LoaderForBlockstore(clientBs) storer := storeutil.StorerForBlockstore(clientBs) - gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault()) + raw, err := r.Config() + if err != nil { + return nil, err + } + + cfg, ok := raw.(*config.FullNode) + if !ok { + return nil, xerrors.New("expected address of config.FullNode") + } + + gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(cfg.Client.SimultaneousTransfers)) chainLoader := storeutil.LoaderForBlockstore(chainBs) chainStorer := storeutil.StorerForBlockstore(chainBs) - err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) + err = gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) if err != nil { return nil, err } From 97a76a4b9a19228eabbf7241b93b3642563a0eb5 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Wed, 25 Nov 2020 13:18:41 +0100 Subject: [PATCH 2/3] feat: update to go-graphsync v0.5.1 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index a32929168..259eee673 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 + github.com/ipfs/go-graphsync v0.5.1 github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 diff --git a/go.sum b/go.sum index 00d3e6a88..9b0cc0fe0 100644 --- a/go.sum +++ b/go.sum @@ -569,8 +569,8 @@ github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6 github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM= github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= -github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 h1:PexIl92Qi3/c+gOREQP/6bv6/5/+ZbmGND21a7ZX6Yc= -github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.5.1 h1:4fXBRvRKicTgTmCFMmEua/H5jvmAOLgU9Z7PCPWt2ec= +github.com/ipfs/go-graphsync v0.5.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= From 2e544e3e6a2b226618b58397f8e3f21fdf08a8f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 25 Nov 2020 19:57:38 +0100 Subject: [PATCH 3/3] configure SimultaneousTransfers in node/builder --- node/builder.go | 5 ++- node/modules/graphsync.go | 65 +++++++++++++++++---------------------- 2 files changed, 32 insertions(+), 38 deletions(-) diff --git a/node/builder.go b/node/builder.go index 0edfb2de6..04400e1e1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -290,7 +290,7 @@ func Online() Option { Override(new(exchange.Server), exchange.NewServer), Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), - Override(new(dtypes.Graphsync), modules.Graphsync), + Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery), Override(new(discovery.PeerResolver), modules.RetrievalResolver), @@ -465,12 +465,15 @@ func ConfigFullNode(c interface{}) Option { ipfsMaddr := cfg.Client.IpfsMAddr return Options( ConfigCommon(&cfg.Common), + If(cfg.Client.UseIpfs, Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)), If(cfg.Client.IpfsUseForRetrieval, Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager), ), ), + Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), + If(cfg.Metrics.HeadNotifs, Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), ), diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 8ba372cc8..bbb039957 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -1,7 +1,6 @@ package modules import ( - "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" @@ -12,45 +11,37 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" - "golang.org/x/xerrors" ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { - graphsyncNetwork := gsnet.NewFromLibp2pHost(h) - loader := storeutil.LoaderForBlockstore(clientBs) - storer := storeutil.StorerForBlockstore(clientBs) - raw, err := r.Config() - if err != nil { - return nil, err - } +func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { + graphsyncNetwork := gsnet.NewFromLibp2pHost(h) + loader := storeutil.LoaderForBlockstore(clientBs) + storer := storeutil.StorerForBlockstore(clientBs) - cfg, ok := raw.(*config.FullNode) - if !ok { - return nil, xerrors.New("expected address of config.FullNode") - } - - gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(cfg.Client.SimultaneousTransfers)) - chainLoader := storeutil.LoaderForBlockstore(chainBs) - chainStorer := storeutil.StorerForBlockstore(chainBs) - err = gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) - if err != nil { - return nil, err - } - gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { - _, has := requestData.Extension("chainsync") - if has { - // TODO: we should confirm the selector is a reasonable one before we validate - // TODO: this code will get more complicated and should probably not live here eventually - hookActions.ValidateRequest() - hookActions.UsePersistenceOption("chainstore") + gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers)) + chainLoader := storeutil.LoaderForBlockstore(chainBs) + chainStorer := storeutil.StorerForBlockstore(chainBs) + err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) + if err != nil { + return nil, err } - }) - gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { - _, has := requestData.Extension("chainsync") - if has { - hookActions.UsePersistenceOption("chainstore") - } - }) - return gs, nil + gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + _, has := requestData.Extension("chainsync") + if has { + // TODO: we should confirm the selector is a reasonable one before we validate + // TODO: this code will get more complicated and should probably not live here eventually + hookActions.ValidateRequest() + hookActions.UsePersistenceOption("chainstore") + } + }) + gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) { + _, has := requestData.Extension("chainsync") + if has { + hookActions.UsePersistenceOption("chainstore") + } + }) + return gs, nil + } }