Merge pull request #4996 from filecoin-project/feat/configure-graphsync

Configure simultaneous requests
This commit is contained in:
Łukasz Magiera 2020-11-26 11:24:39 +01:00 committed by GitHub
commit 91259c29d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 32 deletions

2
go.mod
View File

@ -69,7 +69,7 @@ require (
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-graphsync v0.5.1
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0

2
go.sum
View File

@ -569,6 +569,8 @@ github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.1 h1:4fXBRvRKicTgTmCFMmEua/H5jvmAOLgU9Z7PCPWt2ec=
github.com/ipfs/go-graphsync v0.5.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=

View File

@ -290,7 +290,7 @@ func Online() Option {
Override(new(exchange.Server), exchange.NewServer),
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(new(dtypes.Graphsync), modules.Graphsync),
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
@ -465,12 +465,15 @@ func ConfigFullNode(c interface{}) Option {
ipfsMaddr := cfg.Client.IpfsMAddr
return Options(
ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)),
If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
),
),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),

View File

@ -105,10 +105,11 @@ type Metrics struct {
}
type Client struct {
UseIpfs bool
IpfsOnlineMode bool
IpfsMAddr string
IpfsUseForRetrieval bool
UseIpfs bool
IpfsOnlineMode bool
IpfsMAddr string
IpfsUseForRetrieval bool
SimultaneousTransfers uint64
}
type Wallet struct {
@ -149,6 +150,7 @@ func defCommon() Common {
}
var DefaultDefaultMaxFee = types.MustParseFIL("0.007")
var DefaultSimultaneousTransfers = uint64(20)
// DefaultFullNode returns the default config
func DefaultFullNode() *FullNode {
@ -157,6 +159,9 @@ func DefaultFullNode() *FullNode {
Fees: FeeConfig{
DefaultMaxFee: DefaultDefaultMaxFee,
},
Client: Client{
SimultaneousTransfers: DefaultSimultaneousTransfers,
},
}
}

View File

@ -3,6 +3,7 @@ package modules
import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
@ -13,31 +14,34 @@ import (
)
// Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(clientBs)
storer := storeutil.StorerForBlockstore(clientBs)
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault())
chainLoader := storeutil.LoaderForBlockstore(chainBs)
chainStorer := storeutil.StorerForBlockstore(chainBs)
err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer)
if err != nil {
return nil, err
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(clientBs)
storer := storeutil.StorerForBlockstore(clientBs)
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers))
chainLoader := storeutil.LoaderForBlockstore(chainBs)
chainStorer := storeutil.StorerForBlockstore(chainBs)
err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer)
if err != nil {
return nil, err
}
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
}
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
}