feat(graphsync): update to v0.10.0-rc1

also add config changes
This commit is contained in:
hannahhoward 2021-09-29 18:49:59 -07:00 committed by Rod Vagg
parent fc10281d96
commit 368d72ebfe
No known key found for this signature in database
GPG Key ID: C273792F7D83545D
11 changed files with 47 additions and 25 deletions

4
go.mod
View File

@ -78,7 +78,7 @@ require (
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.9.3
github.com/ipfs/go-graphsync v0.10.0-rc3
github.com/ipfs/go-ipfs-blockstore v1.0.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
@ -100,7 +100,7 @@ require (
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
github.com/ipld/go-ipld-prime v0.12.0
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1

7
go.sum
View File

@ -689,8 +689,8 @@ github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZ
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.9.3 h1:oWqUuN3OYqLwu669fxYbgymBrIodB0fD7vFZfF//X7Y=
github.com/ipfs/go-graphsync v0.9.3/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.10.0-rc3 h1:rMu7cZ19JGLoqscBkLXJwkP2h1chHMLlJx450dFLCdQ=
github.com/ipfs/go-graphsync v0.10.0-rc3/go.mod h1:pA673SIspz4lHNAPgYxyMKXdpWcdIe5aL1OJCsCNejE=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
@ -816,8 +816,9 @@ github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xE
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e h1:HPLQ9V/OFHKjfbFio8vQV+EW7lpQPj+iPl93VcwSTYM=
github.com/ipld/go-ipld-prime v0.12.3-0.20210929125341-05d5528bd84e/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=

View File

@ -139,8 +139,8 @@ func TestSimultanenousTransferLimit(t *testing.T) {
)
runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle)),
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)),
))
ens.InterconnectAll().BeginMining(250 * time.Millisecond)
dh := kit.NewDealHarness(t, client, miner, miner)

View File

@ -100,7 +100,7 @@ var ChainNode = Options(
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
// Shared graphsync (markets, serving chain)
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfersForStorage, config.DefaultFullNode().Client.SimultaneousTransfersForRetrieval)),
// Service: Wallet
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
@ -219,7 +219,7 @@ func ConfigFullNode(c interface{}) Option {
Override(new(retrievalmarket.BlockstoreAccessor), modules.IpfsRetrievalBlockstoreAccessor),
),
),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)),
If(cfg.Wallet.RemoteBackend != "",
Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)),

View File

@ -136,7 +136,7 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableMarkets,
// Markets
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfersForStorage, cfg.Dealmaking.SimultaneousTransfersForRetrieval)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

View File

@ -65,7 +65,8 @@ func DefaultFullNode() *FullNode {
DefaultMaxFee: DefaultDefaultMaxFee,
},
Client: Client{
SimultaneousTransfers: DefaultSimultaneousTransfers,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
},
Chainstore: Chainstore{
EnableSplitstore: false,
@ -146,7 +147,8 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2,
SimultaneousTransfers: DefaultSimultaneousTransfers,
SimultaneousTransfersForStorage: DefaultSimultaneousTransfers,
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
StartEpochSealingBuffer: 480, // 480 epochs buffer == 4 hours from adding deal to sector to sector being sealed

View File

@ -92,11 +92,18 @@ your node if metadata log is disabled`,
Comment: ``,
},
{
Name: "SimultaneousTransfers",
Name: "SimultaneousTransfersForStorage",
Type: "uint64",
Comment: `The maximum number of simultaneous data transfers between the client
and storage providers`,
and storage providers for storage deals`,
},
{
Name: "SimultaneousTransfersForRetrieval",
Type: "uint64",
Comment: `The maximum number of simultaneous data transfers between the client
and storage providers for retrieval deals`,
},
},
"Common": []DocField{
@ -260,10 +267,16 @@ as a multiplier of the minimum collateral bound`,
passed to the sealing node by the markets service. 0 is unlimited.`,
},
{
Name: "SimultaneousTransfers",
Name: "SimultaneousTransfersForStorage",
Type: "uint64",
Comment: `The maximum number of parallel online data transfers (storage+retrieval)`,
Comment: `The maximum number of parallel online data transfers for storage deals`,
},
{
Name: "SimultaneousTransfersForRetrieval",
Type: "uint64",
Comment: `The maximum number of parallel online data transfers for retrieval deals`,
},
{
Name: "StartEpochSealingBuffer",

View File

@ -129,8 +129,10 @@ type DealmakingConfig struct {
// The maximum allowed disk usage size in bytes of staging deals not yet
// passed to the sealing node by the markets service. 0 is unlimited.
MaxStagingDealsBytes int64
// The maximum number of parallel online data transfers (storage+retrieval)
SimultaneousTransfers uint64
// The maximum number of parallel online data transfers for storage deals
SimultaneousTransfersForStorage uint64
// The maximum number of parallel online data transfers for retrieval deals
SimultaneousTransfersForRetrieval uint64
// Minimum start epoch buffer to give time for sealing of sector with deal.
StartEpochSealingBuffer uint64
@ -360,8 +362,11 @@ type Client struct {
IpfsMAddr string
IpfsUseForRetrieval bool
// The maximum number of simultaneous data transfers between the client
// and storage providers
SimultaneousTransfers uint64
// and storage providers for storage deals
SimultaneousTransfersForStorage uint64
// The maximum number of simultaneous data transfers between the client
// and storage providers for retrieval deals
SimultaneousTransfersForRetrieval uint64
}
type Wallet struct {

View File

@ -15,12 +15,12 @@ import (
)
// Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
lsys := storeutil.LinkSystemForBlockstore(clientBs)
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers))
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressIncomingRequests(parallelTransfersForStorage), graphsyncimpl.MaxInProgressOutgoingRequests(parallelTransfersForRetrieval))
chainLinkSystem := storeutil.LinkSystemForBlockstore(chainBs)
err := gs.RegisterPersistenceOption("chainstore", chainLinkSystem)
if err != nil {

View File

@ -394,11 +394,11 @@ func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRe
// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
lsys := storeutil.LinkSystemForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers))
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, lsys, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressOutgoingRequests(parallelTransfersForStorage), graphsync.MaxInProgressIncomingRequests(parallelTransfersForRetrieval))
return gs
}

View File

@ -20,7 +20,8 @@ ListenAddresses = ["/ip4/0.0.0.0/tcp/5678", "/ip6/::/tcp/5678"]
# IpfsOnlineMode = false
# IpfsMAddr = ""
# IpfsUseForRetrieval = false
# SimultaneousTransfers = 20
# SimultaneousTransfersForStorage = 20
# SimultaneousTransfersForRetrieval = 20
#
[Metrics]
# Nickname = ""