feat(datatransfer): integration w/ simple merkledag
Integrates the data transfer module with a mock version of the module that just calls the dag service
This commit is contained in:
parent
98cf0769ff
commit
905259e192
@ -21,11 +21,13 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/lib/statestore"
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("deals")
|
var log = logging.Logger("deals")
|
||||||
@ -48,6 +50,9 @@ type Client struct {
|
|||||||
chain *store.ChainStore
|
chain *store.ChainStore
|
||||||
h host.Host
|
h host.Host
|
||||||
w *wallet.Wallet
|
w *wallet.Wallet
|
||||||
|
// dataTransfer -- not quite sure how this is referenced directly on client
|
||||||
|
// side
|
||||||
|
dataTransfer datatransfer.ClientDataTransfer
|
||||||
dag dtypes.ClientDAG
|
dag dtypes.ClientDAG
|
||||||
discovery *discovery.Local
|
discovery *discovery.Local
|
||||||
events *events.Events
|
events *events.Events
|
||||||
@ -70,12 +75,13 @@ type clientDealUpdate struct {
|
|||||||
mut func(*ClientDeal)
|
mut func(*ClientDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, fm *market.FundMgr, chainapi full.ChainAPI) *Client {
|
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, dataTransfer datatransfer.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, chainapi full.ChainAPI) *Client {
|
||||||
c := &Client{
|
c := &Client{
|
||||||
sm: sm,
|
sm: sm,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
h: h,
|
h: h,
|
||||||
w: w,
|
w: w,
|
||||||
|
dataTransfer: dataTransfer,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
discovery: discovery,
|
discovery: discovery,
|
||||||
fm: fm,
|
fm: fm,
|
||||||
|
@ -2,6 +2,7 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
@ -15,6 +16,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/lib/statestore"
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
@ -47,10 +49,12 @@ type Provider struct {
|
|||||||
sminer *storage.Miner
|
sminer *storage.Miner
|
||||||
full api.FullNode
|
full api.FullNode
|
||||||
|
|
||||||
// TODO: Use a custom protocol or graphsync in the future
|
// TODO: This will go away once the whole storage market module + CAR
|
||||||
// TODO: GC
|
// is implemented
|
||||||
dag dtypes.StagingDAG
|
dag dtypes.StagingDAG
|
||||||
|
|
||||||
|
dataTransfer datatransfer.ProviderDataTransfer
|
||||||
|
|
||||||
deals *statestore.StateStore
|
deals *statestore.StateStore
|
||||||
ds dtypes.MetadataDS
|
ds dtypes.MetadataDS
|
||||||
|
|
||||||
@ -71,7 +75,11 @@ type minerDealUpdate struct {
|
|||||||
mut func(*MinerDeal)
|
mut func(*MinerDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
var (
|
||||||
|
// ErrDataTransferFailed means a data transfer for a deal failed
|
||||||
|
ErrDataTransferFailed = errors.New("Deal data transfer failed")
|
||||||
|
)
|
||||||
|
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer datatransfer.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
|
||||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -84,6 +92,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
|
|||||||
h := &Provider{
|
h := &Provider{
|
||||||
sminer: sminer,
|
sminer: sminer,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
|
dataTransfer: dataTransfer,
|
||||||
full: fullNode,
|
full: fullNode,
|
||||||
secb: secb,
|
secb: secb,
|
||||||
|
|
||||||
@ -115,6 +124,8 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent)
|
||||||
|
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +193,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
|
|||||||
|
|
||||||
switch update.newState {
|
switch update.newState {
|
||||||
case api.DealAccepted:
|
case api.DealAccepted:
|
||||||
p.handle(ctx, deal, p.accept, api.DealStaged)
|
p.handle(ctx, deal, p.accept, api.DealNoUpdate)
|
||||||
case api.DealStaged:
|
case api.DealStaged:
|
||||||
p.handle(ctx, deal, p.staged, api.DealSealing)
|
p.handle(ctx, deal, p.staged, api.DealSealing)
|
||||||
case api.DealSealing:
|
case api.DealSealing:
|
||||||
@ -192,6 +203,39 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) {
|
||||||
|
voucher, ok := channelState.Voucher().(StorageDataTransferVoucher)
|
||||||
|
// if this event is for a transfer not related to storage, ignore
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// data transfer events for opening and progress do not affect deal state
|
||||||
|
var next api.DealState
|
||||||
|
var err error
|
||||||
|
switch event {
|
||||||
|
case datatransfer.Complete:
|
||||||
|
next = api.DealStaged
|
||||||
|
err = nil
|
||||||
|
case datatransfer.Error:
|
||||||
|
next = api.DealFailed
|
||||||
|
err = ErrDataTransferFailed
|
||||||
|
default:
|
||||||
|
// the only events we care about are complete and error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.updated <- minerDealUpdate{
|
||||||
|
newState: next,
|
||||||
|
id: voucher.Proposal,
|
||||||
|
err: err,
|
||||||
|
mut: nil,
|
||||||
|
}:
|
||||||
|
case <-p.stop:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
|
func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
|
||||||
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
|
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -4,7 +4,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/ipfs/go-merkledag"
|
|
||||||
|
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
|
||||||
|
"github.com/ipld/go-ipld-prime/traversal/selector"
|
||||||
|
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
|
||||||
|
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -149,9 +153,21 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
log.Warnf("closing client connection: %+v", err)
|
log.Warnf("closing client connection: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
|
||||||
|
|
||||||
|
// this is the selector for "get the whole DAG"
|
||||||
|
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
|
||||||
|
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
|
||||||
|
|
||||||
|
_, err = p.dataTransfer.OpenPullDataChannel(deal.Client,
|
||||||
|
StorageDataTransferVoucher{Proposal: deal.ProposalCid},
|
||||||
|
deal.Ref,
|
||||||
|
allSelector,
|
||||||
|
)
|
||||||
|
|
||||||
return func(deal *MinerDeal) {
|
return func(deal *MinerDeal) {
|
||||||
deal.DealID = resp.DealIDs[0]
|
deal.DealID = resp.DealIDs[0]
|
||||||
}, merkledag.FetchGraph(ctx, deal.Ref, p.dag)
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// STAGED
|
// STAGED
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
datastore "github.com/ipfs/go-datastore"
|
datastore "github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/ipld/go-ipld-prime/traversal/selector"
|
"github.com/ipld/go-ipld-prime"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
@ -74,7 +74,7 @@ type ClientRequestValidator struct {
|
|||||||
deals *statestore.StateStore
|
deals *statestore.StateStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm datatransfer.Manager) {
|
func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm datatransfer.ClientDataTransfer) {
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), crv)
|
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), crv)
|
||||||
@ -93,7 +93,7 @@ func (c *ClientRequestValidator) ValidatePush(
|
|||||||
sender peer.ID,
|
sender peer.ID,
|
||||||
voucher datatransfer.Voucher,
|
voucher datatransfer.Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error {
|
Selector ipld.Node) error {
|
||||||
return ErrNoPushAccepted
|
return ErrNoPushAccepted
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,7 +101,7 @@ func (c *ClientRequestValidator) ValidatePull(
|
|||||||
receiver peer.ID,
|
receiver peer.ID,
|
||||||
voucher datatransfer.Voucher,
|
voucher datatransfer.Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error {
|
Selector ipld.Node) error {
|
||||||
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrWrongVoucherType
|
return ErrWrongVoucherType
|
||||||
@ -135,7 +135,7 @@ type ProviderRequestValidator struct {
|
|||||||
deals *statestore.StateStore
|
deals *statestore.StateStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, dtm datatransfer.Manager) {
|
func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, dtm datatransfer.ProviderDataTransfer) {
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(ctx context.Context) error {
|
OnStart: func(ctx context.Context) error {
|
||||||
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), mrv)
|
return dtm.RegisterVoucherType(reflect.TypeOf(StorageDataTransferVoucher{}), mrv)
|
||||||
@ -153,7 +153,7 @@ func (m *ProviderRequestValidator) ValidatePush(
|
|||||||
sender peer.ID,
|
sender peer.ID,
|
||||||
voucher datatransfer.Voucher,
|
voucher datatransfer.Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error {
|
Selector ipld.Node) error {
|
||||||
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ErrWrongVoucherType
|
return ErrWrongVoucherType
|
||||||
@ -186,6 +186,6 @@ func (m *ProviderRequestValidator) ValidatePull(
|
|||||||
receiver peer.ID,
|
receiver peer.ID,
|
||||||
voucher datatransfer.Voucher,
|
voucher datatransfer.Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error {
|
Selector ipld.Node) error {
|
||||||
return ErrNoPullAccepted
|
return ErrNoPullAccepted
|
||||||
}
|
}
|
||||||
|
71
datatransfer/dagservice_impl.go
Normal file
71
datatransfer/dagservice_impl.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package datatransfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
ipldformat "github.com/ipfs/go-ipld-format"
|
||||||
|
"github.com/ipfs/go-merkledag"
|
||||||
|
ipld "github.com/ipld/go-ipld-prime"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
)
|
||||||
|
|
||||||
|
type dagserviceImpl struct {
|
||||||
|
dag ipldformat.DAGService
|
||||||
|
subscriber Subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) Manager {
|
||||||
|
return &dagserviceImpl{dag, nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) Manager {
|
||||||
|
return &dagserviceImpl{dag, nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterVoucherType registers a validator for the given voucher type
|
||||||
|
// will error if voucher type does not implement voucher
|
||||||
|
// or if there is a voucher type registered with an identical identifier
|
||||||
|
func (impl *dagserviceImpl) RegisterVoucherType(voucherType reflect.Type, validator RequestValidator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
func (impl *dagserviceImpl) OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) {
|
||||||
|
return ChannelID{}, fmt.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a data transfer that will request data from the sending peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
func (impl *dagserviceImpl) OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) {
|
||||||
|
err := merkledag.FetchGraph(context.TODO(), PieceRef, impl.dag)
|
||||||
|
var event Event
|
||||||
|
if err != nil {
|
||||||
|
event = Error
|
||||||
|
} else {
|
||||||
|
event = Complete
|
||||||
|
}
|
||||||
|
impl.subscriber(event, ChannelState{Channel: Channel{voucher: voucher}})
|
||||||
|
return ChannelID{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// close an open channel (effectively a cancel)
|
||||||
|
func (impl *dagserviceImpl) CloseDataTransferChannel(x ChannelID) {}
|
||||||
|
|
||||||
|
// get status of a transfer
|
||||||
|
func (impl *dagserviceImpl) TransferChannelStatus(x ChannelID) Status { return ChannelNotFoundError }
|
||||||
|
|
||||||
|
// get notified when certain types of events happen
|
||||||
|
func (impl *dagserviceImpl) SubscribeToEvents(subscriber Subscriber) {
|
||||||
|
impl.subscriber = subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
// get all in progress transfers
|
||||||
|
func (impl *dagserviceImpl) InProgressChannels() map[ChannelID]ChannelState { return nil }
|
@ -4,7 +4,7 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
selector "github.com/ipld/go-ipld-prime/traversal/selector"
|
ipld "github.com/ipld/go-ipld-prime"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,9 +51,9 @@ type Channel struct {
|
|||||||
// an identifier for this channel shared by request and responder, set by requestor through protocol
|
// an identifier for this channel shared by request and responder, set by requestor through protocol
|
||||||
transferID TransferID
|
transferID TransferID
|
||||||
// base CID for the piece being transferred
|
// base CID for the piece being transferred
|
||||||
PieceRef cid.Cid
|
pieceRef cid.Cid
|
||||||
// portion of Piece to return, spescified by an IPLD selector
|
// portion of Piece to return, spescified by an IPLD selector
|
||||||
Selector selector.Selector
|
selector ipld.Node
|
||||||
// used to verify this channel
|
// used to verify this channel
|
||||||
voucher Voucher
|
voucher Voucher
|
||||||
// the party that is sending the data (not who initiated the request)
|
// the party that is sending the data (not who initiated the request)
|
||||||
@ -64,6 +64,14 @@ type Channel struct {
|
|||||||
totalSize uint64
|
totalSize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c Channel) TranfersID() TransferID { return c.transferID }
|
||||||
|
func (c Channel) PieceRef() cid.Cid { return c.pieceRef }
|
||||||
|
func (c Channel) Selector() ipld.Node { return c.selector }
|
||||||
|
func (c Channel) Voucher() Voucher { return c.voucher }
|
||||||
|
func (c Channel) Sender() peer.ID { return c.sender }
|
||||||
|
func (c Channel) Recipient() peer.ID { return c.recipient }
|
||||||
|
func (c Channel) TotalSize() uint64 { return c.totalSize }
|
||||||
|
|
||||||
// ChannelState is immutable channel data plus mutable state
|
// ChannelState is immutable channel data plus mutable state
|
||||||
type ChannelState struct {
|
type ChannelState struct {
|
||||||
Channel
|
Channel
|
||||||
@ -73,6 +81,9 @@ type ChannelState struct {
|
|||||||
received uint64
|
received uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c ChannelState) Sent() uint64 { return c.sent }
|
||||||
|
func (c ChannelState) Received() uint64 { return c.received }
|
||||||
|
|
||||||
// Event is a name for an event that occurs on a data transfer channel
|
// Event is a name for an event that occurs on a data transfer channel
|
||||||
type Event string
|
type Event string
|
||||||
|
|
||||||
@ -100,12 +111,12 @@ type RequestValidator interface {
|
|||||||
sender peer.ID,
|
sender peer.ID,
|
||||||
voucher Voucher,
|
voucher Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error
|
Selector ipld.Node) error
|
||||||
ValidatePull(
|
ValidatePull(
|
||||||
receiver peer.ID,
|
receiver peer.ID,
|
||||||
voucher Voucher,
|
voucher Voucher,
|
||||||
PieceRef cid.Cid,
|
PieceRef cid.Cid,
|
||||||
Selector selector.Selector) error
|
Selector ipld.Node) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Manager is the core interface presented by all implementations of
|
// Manager is the core interface presented by all implementations of
|
||||||
@ -119,11 +130,11 @@ type Manager interface {
|
|||||||
// open a data transfer that will send data to the recipient peer and
|
// open a data transfer that will send data to the recipient peer and
|
||||||
// open a data transfer that will send data to the recipient peer and
|
// open a data transfer that will send data to the recipient peer and
|
||||||
// transfer parts of the piece that match the selector
|
// transfer parts of the piece that match the selector
|
||||||
OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector selector.Selector) ChannelID
|
OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error)
|
||||||
|
|
||||||
// open a data transfer that will request data from the sending peer and
|
// open a data transfer that will request data from the sending peer and
|
||||||
// transfer parts of the piece that match the selector
|
// transfer parts of the piece that match the selector
|
||||||
OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector selector.Selector) ChannelID
|
OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error)
|
||||||
|
|
||||||
// close an open channel (effectively a cancel)
|
// close an open channel (effectively a cancel)
|
||||||
CloseDataTransferChannel(x ChannelID)
|
CloseDataTransferChannel(x ChannelID)
|
||||||
@ -137,3 +148,6 @@ type Manager interface {
|
|||||||
// get all in progress transfers
|
// get all in progress transfers
|
||||||
InProgressChannels() map[ChannelID]ChannelState
|
InProgressChannels() map[ChannelID]ChannelState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClientDataTransfer Manager
|
||||||
|
type ProviderDataTransfer Manager
|
||||||
|
2
go.mod
2
go.mod
@ -40,7 +40,7 @@ require (
|
|||||||
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144
|
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144
|
||||||
github.com/ipfs/go-merkledag v0.2.3
|
github.com/ipfs/go-merkledag v0.2.3
|
||||||
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
|
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
|
||||||
github.com/ipld/go-ipld-prime v0.0.1
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52
|
||||||
github.com/libp2p/go-libp2p v0.3.0
|
github.com/libp2p/go-libp2p v0.3.0
|
||||||
github.com/libp2p/go-libp2p-circuit v0.1.1
|
github.com/libp2p/go-libp2p-circuit v0.1.1
|
||||||
|
2
go.sum
2
go.sum
@ -233,6 +233,8 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2
|
|||||||
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
|
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
|
||||||
github.com/ipld/go-ipld-prime v0.0.1 h1:ZTjkYODUmJAca7w5hfzP9emyJfUxTy/Xi4zsUoqHggQ=
|
github.com/ipld/go-ipld-prime v0.0.1 h1:ZTjkYODUmJAca7w5hfzP9emyJfUxTy/Xi4zsUoqHggQ=
|
||||||
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
||||||
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b h1:ACSEK4f1SDQC+FJ4B4pqHFW14d7kEW2ufwXA/c7eLP0=
|
||||||
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||||
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
|
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node/config"
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
@ -85,12 +86,14 @@ const (
|
|||||||
HandleIncomingMessagesKey
|
HandleIncomingMessagesKey
|
||||||
|
|
||||||
RunDealClientKey
|
RunDealClientKey
|
||||||
|
RegisterClientValidatorKey
|
||||||
|
|
||||||
// storage miner
|
// storage miner
|
||||||
HandleDealsKey
|
HandleDealsKey
|
||||||
HandleRetrievalKey
|
HandleRetrievalKey
|
||||||
RunSectorServiceKey
|
RunSectorServiceKey
|
||||||
RegisterMinerKey
|
RegisterMinerKey
|
||||||
|
RegisterProviderValidatorKey
|
||||||
|
|
||||||
// daemon
|
// daemon
|
||||||
ExtractApiKey
|
ExtractApiKey
|
||||||
@ -218,7 +221,10 @@ func Online() Option {
|
|||||||
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
||||||
|
|
||||||
Override(new(*retrieval.Client), retrieval.NewClient),
|
Override(new(*retrieval.Client), retrieval.NewClient),
|
||||||
|
Override(new(datatransfer.ClientDataTransfer), datatransfer.NewClientDAGServiceDataTransfer),
|
||||||
|
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
|
||||||
Override(new(*deals.Client), deals.NewClient),
|
Override(new(*deals.Client), deals.NewClient),
|
||||||
|
Override(RegisterClientValidatorKey, deals.RegisterClientValidator),
|
||||||
Override(RunDealClientKey, modules.RunDealClient),
|
Override(RunDealClientKey, modules.RunDealClient),
|
||||||
|
|
||||||
Override(new(*paych.Store), paych.NewStore),
|
Override(new(*paych.Store), paych.NewStore),
|
||||||
@ -238,7 +244,10 @@ func Online() Option {
|
|||||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||||
|
|
||||||
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
||||||
|
Override(new(datatransfer.ProviderDataTransfer), datatransfer.NewProviderDAGServiceDataTransfer),
|
||||||
|
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
|
||||||
Override(new(*deals.Provider), deals.NewProvider),
|
Override(new(*deals.Provider), deals.NewProvider),
|
||||||
|
Override(RegisterProviderValidatorKey, deals.RegisterProviderValidator),
|
||||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||||
Override(HandleDealsKey, modules.HandleDeals),
|
Override(HandleDealsKey, modules.HandleDeals),
|
||||||
Override(RegisterMinerKey, modules.RegisterMiner),
|
Override(RegisterMinerKey, modules.RegisterMiner),
|
||||||
|
Loading…
Reference in New Issue
Block a user