Merge pull request #2315 from filecoin-project/feat/upgrade-markets-0.4.0

Upgrade markets with fast retrieval support
This commit is contained in:
Łukasz Magiera 2020-07-08 21:12:35 +02:00 committed by GitHub
commit 15f19adc6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 182 additions and 35 deletions

6
go.mod
View File

@ -21,9 +21,9 @@ require (
github.com/filecoin-project/go-bitfield v0.0.2-0.20200629135455-587b27927d38
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.3.0
github.com/filecoin-project/go-data-transfer v0.4.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-fil-markets v0.3.2
github.com/filecoin-project/go-fil-markets v0.4.0
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
github.com/filecoin-project/go-statestore v0.1.0
@ -52,7 +52,7 @@ require (
github.com/ipfs/go-ds-measure v0.1.0
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c
github.com/ipfs/go-hamt-ipld v0.1.1-0.20200605182717-0310ad2b0b1f
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-chunker v0.0.5

18
go.sum
View File

@ -231,12 +231,12 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.3.0 h1:BwBrrXu9Unh9JjjX4GAc5FfzUNioor/aATIjfc7JTBg=
github.com/filecoin-project/go-data-transfer v0.3.0/go.mod h1:cONglGP4s/d+IUQw5mWZrQK+FQATQxr3AXzi4dRh0l4=
github.com/filecoin-project/go-data-transfer v0.4.0 h1:xiC0qVZten8VtqEs5rRjyz2n/nZ8prbZSWvAr1V+CBE=
github.com/filecoin-project/go-data-transfer v0.4.0/go.mod h1:5ksROBkSREsb2O4h5vBcGMr9lXTpfeyjHo8o0yxf6FQ=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go.mod h1:JbkIgFF/Z9BDlvrJO1FuKkaWsH673/UdFaiVS6uIHlA=
github.com/filecoin-project/go-fil-markets v0.3.2 h1:fvNgdTTIVtckBu61wxbKYSMJzedoFFIKYJagiCDFCiM=
github.com/filecoin-project/go-fil-markets v0.3.2/go.mod h1:e/IofcotbwH7ftgeK+TjjdjFsrCDWrh5vvnr7k1OSH8=
github.com/filecoin-project/go-fil-markets v0.4.0 h1:toDPViYyQOHtUs6jl0KB9EzgdfCxXR11dZO/rqWbFtU=
github.com/filecoin-project/go-fil-markets v0.4.0/go.mod h1:VAH6h+sWuhPAsSwAS9Kecx8MI/dIjFkrLO8jJUmLWQc=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
@ -247,8 +247,8 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200619205156-c7bf525c06ef h1:qFXGHKLq49qFmvXjvhvQ2eU3jVk2Z0QaKYQpO5S3SF0=
github.com/filecoin-project/go-statemachine v0.0.0-20200619205156-c7bf525c06ef/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9 h1:NagIOq5osclBprc95ILEnGCOpubuhalqwWvayYJmXLQ=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
@ -262,6 +262,7 @@ github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.m
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=
github.com/filecoin-project/specs-actors v0.6.0/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.6.1/go.mod h1:dRdy3cURykh2R8O/DKqy8olScl70rmIS7GrB4hB1IDY=
github.com/filecoin-project/specs-actors v0.7.0/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws=
github.com/filecoin-project/specs-actors v0.7.1 h1:/zW++MN4gGIPvG+s0zmSI97k0Z/aaeiREjLC10gQbco=
github.com/filecoin-project/specs-actors v0.7.1/go.mod h1:+z0htZu/wLBDbOLcQTKKUEC2rkUTFzL2KJ/bRAVWkws=
github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sgOVdJbvFjOnD5w94=
@ -515,8 +516,8 @@ github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEP
github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM=
github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0=
github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y=
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103 h1:SD+bXod/pOWKJCGj0tG140ht8Us5k+3JBcHw0PVYTho=
github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c h1:fCW8JzwvBMfODvdliK+s3ziYZPD/5FAzluahZYXVg3k=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200131012125-dd88a59d3f2e/go.mod h1:9aQJu/i/TaRDW6jqB5U217dLIDopn50wxLdHXM2CTfE=
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg=
github.com/ipfs/go-hamt-ipld v0.1.1-0.20200501020327-d53d20a7063e h1:Klv6s+kbuhh0JVpGFmFK2t6AtZxJfAnVneQHh1DlFOo=
@ -1478,6 +1479,7 @@ golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=

View File

@ -6,7 +6,6 @@ import (
"bytes"
"context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/state"
"golang.org/x/xerrors"
@ -341,16 +340,18 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
}
func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
var sd *api.MarketDeal
head, err := c.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := c.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err = stmgr.GetStorageDeal(ctx, c.StateManager, dealID, ts)
if err != nil {
return false, false, xerrors.Errorf("client: failed to look up deal on chain: %w", err)
}
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil)
@ -371,17 +372,18 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil)
return false, nil
}
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals)
if !ok {
panic("Expected state.ChangedDeals")
@ -417,7 +419,10 @@ func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID a
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, build.SealRandomnessLookbackLimit, match); err != nil {
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := c.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/verifreg"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
@ -25,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/markets/utils"
@ -33,7 +35,7 @@ import (
sealing "github.com/filecoin-project/storage-fsm"
)
var log = logging.Logger("provideradapter")
var log = logging.Logger("storageadapter")
type ProviderNodeAdapter struct {
api.FullNode
@ -345,4 +347,102 @@ func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid,
return cb(receipt.Receipt.ExitCode, receipt.Receipt.Return, nil)
}
func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (*verifreg.DataCap, error) {
tsk, err := types.TipSetKeyFromBytes(encodedTs)
if err != nil {
return nil, err
}
return n.StateVerifiedClientStatus(ctx, addr, tsk)
}
func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error {
head, err := n.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("client: failed to get chain head: %w", err)
}
sd, err := n.StateMarketStorageDeal(ctx, dealID, head.Key())
if err != nil {
return xerrors.Errorf("client: failed to look up deal %d on chain: %w", dealID, err)
}
// Called immediately to check if the deal has already expired or been slashed
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts.Height() {
onDealExpired(nil)
return true, false, nil
}
// If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 {
onDealSlashed(ts.Height(), nil)
return true, false, nil
}
// No events have occurred yet, so return
// done: false, more: true (keep listening for events)
return false, true, nil
}
// Called when there was a match against the state change we're looking for
// and the chain has advanced to the confidence height
stateChanged := func(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= ts2.Height() {
onDealExpired(nil)
return false, nil
}
// Timeout waiting for state change
if states == nil {
log.Error("timed out waiting for deal expiry")
return false, nil
}
changedDeals, ok := states.(state.ChangedDeals)
if !ok {
panic("Expected state.ChangedDeals")
}
deal, ok := changedDeals[dealID]
if !ok {
// No change to deal
return true, nil
}
// Deal was slashed
if deal.To == nil {
onDealSlashed(ts2.Height(), nil)
return false, nil
}
return true, nil
}
// Called when there was a chain reorg and the state change was reverted
revert := func(ctx context.Context, ts *types.TipSet) error {
// TODO: Is it ok to just ignore this?
log.Warn("deal state reverted; TODO: actually handle this!")
return nil
}
// Watch for state changes to the deal
preds := state.NewStatePredicates(n)
dealDiff := preds.OnStorageMarketActorChanged(
preds.OnDealStateChanged(
preds.DealStateChangedForIDs([]abi.DealID{dealID})))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiff(ctx, oldTs.Key(), newTs.Key())
}
// Wait until after the end epoch for the deal and then timeout
timeout := (sd.Proposal.EndEpoch - head.Height()) + 1
if err := n.ev.StateChanged(checkFunc, stateChanged, revert, int(build.MessageConfidence)+1, timeout, match); err != nil {
return xerrors.Errorf("failed to set up state changed handler: %w", err)
}
return nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}

View File

@ -3,6 +3,7 @@ package modules
import (
"context"
"path/filepath"
"time"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-merkledag"
@ -10,7 +11,9 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/fx"
graphsyncimpl "github.com/filecoin-project/go-data-transfer/impl/graphsync"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
@ -72,9 +75,26 @@ func RegisterClientValidator(crv dtypes.ClientRequestValidator, dtm dtypes.Clien
// NewClientGraphsyncDataTransfer returns a data transfer manager that just
// uses the clients's Client DAG service for transfers
func NewClientGraphsyncDataTransfer(h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS) dtypes.ClientDataTransfer {
func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS) (dtypes.ClientDataTransfer, error) {
sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/client/counter"))
return graphsyncimpl.NewGraphSyncDataTransfer(h, gs, sc)
net := dtnet.NewFromLibp2pHost(h)
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs)
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, sc)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return dt.Start(ctx)
},
OnStop: func(context.Context) error {
return dt.Stop()
},
})
return dt, nil
}
// NewClientDealStore creates a statestore for the client to store its deals
@ -111,7 +131,7 @@ func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientReques
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn)
c, err := storageimpl.NewClient(net, ibs, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
if err != nil {
return nil, err
}

View File

@ -4,6 +4,9 @@ import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
@ -20,11 +23,11 @@ import (
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"net/http"
"time"
"github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -208,9 +211,26 @@ func RegisterProviderValidator(mrv dtypes.ProviderRequestValidator, dtm dtypes.P
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer(h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) dtypes.ProviderDataTransfer {
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {
sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/provider/counter"))
return dtgraphsync.NewGraphSyncDataTransfer(h, gs, sc)
net := dtnet.NewFromLibp2pHost(h)
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs)
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, sc)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return dt.Start(ctx)
},
OnStop: func(context.Context) error {
return dt.Stop()
},
})
return dt, nil
}
// NewProviderDealStore creates a statestore for the client to store its deals