From 694834e8d5c1831bbd680d8874f5c3c4822915b2 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 24 Nov 2020 14:32:30 -0800 Subject: [PATCH] feat(graphsync): configure simultaneous requests allow configuration of the number of simultaneous requests graphsync will process at once --- go.mod | 2 +- go.sum | 2 ++ node/config/def.go | 13 +++++++++---- node/modules/graphsync.go | 19 ++++++++++++++++--- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index e0b970b3c..a32929168 100644 --- a/go.mod +++ b/go.mod @@ -69,7 +69,7 @@ require ( github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.5.0 + github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 diff --git a/go.sum b/go.sum index 21e99f85b..00d3e6a88 100644 --- a/go.sum +++ b/go.sum @@ -569,6 +569,8 @@ github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6 github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM= github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50 h1:PexIl92Qi3/c+gOREQP/6bv6/5/+ZbmGND21a7ZX6Yc= +github.com/ipfs/go-graphsync v0.5.1-0.20201124215250-4126e239ac50/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= diff --git a/node/config/def.go b/node/config/def.go index 3e109e84a..7e45b88a3 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -105,10 +105,11 @@ type Metrics struct { } type Client struct { - UseIpfs bool - IpfsOnlineMode bool - IpfsMAddr string - IpfsUseForRetrieval bool + UseIpfs bool + IpfsOnlineMode bool + IpfsMAddr string + IpfsUseForRetrieval bool + SimultaneousTransfers uint64 } type Wallet struct { @@ -149,6 +150,7 @@ func defCommon() Common { } var DefaultDefaultMaxFee = types.MustParseFIL("0.007") +var DefaultSimultaneousTransfers = uint64(20) // DefaultFullNode returns the default config func DefaultFullNode() *FullNode { @@ -157,6 +159,9 @@ func DefaultFullNode() *FullNode { Fees: FeeConfig{ DefaultMaxFee: DefaultDefaultMaxFee, }, + Client: Client{ + SimultaneousTransfers: DefaultSimultaneousTransfers, + }, } } diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index 9bdc9bcca..8ba372cc8 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -1,8 +1,10 @@ package modules import ( + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" "github.com/ipfs/go-graphsync" graphsyncimpl "github.com/ipfs/go-graphsync/impl" gsnet "github.com/ipfs/go-graphsync/network" @@ -10,17 +12,28 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" + "golang.org/x/xerrors" ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { +func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) loader := storeutil.LoaderForBlockstore(clientBs) storer := storeutil.StorerForBlockstore(clientBs) - gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault()) + raw, err := r.Config() + if err != nil { + return nil, err + } + + cfg, ok := raw.(*config.FullNode) + if !ok { + return nil, xerrors.New("expected address of config.FullNode") + } + + gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(cfg.Client.SimultaneousTransfers)) chainLoader := storeutil.LoaderForBlockstore(chainBs) chainStorer := storeutil.StorerForBlockstore(chainBs) - err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) + err = gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) if err != nil { return nil, err }