Merge pull request #488 from filecoin-project/feat/dt-skeleton
Data Transfer Integration For Storage Deals (w/ Skeleton Module)
This commit is contained in:
commit
aafccaf021
@ -851,3 +851,51 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
t.SectorID = uint64(extra)
|
t.SectorID = uint64(extra)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *StorageDataTransferVoucher) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{129}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.t.Proposal (cid.Cid)
|
||||||
|
|
||||||
|
if err := cbg.WriteCid(w, t.Proposal); err != nil {
|
||||||
|
return xerrors.Errorf("failed to write cid field t.Proposal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *StorageDataTransferVoucher) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajArray {
|
||||||
|
return fmt.Errorf("cbor input should be of type array")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra != 1 {
|
||||||
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.t.Proposal (cid.Cid)
|
||||||
|
|
||||||
|
{
|
||||||
|
|
||||||
|
c, err := cbg.ReadCid(br)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to read cid field t.Proposal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Proposal = c
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -4,8 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
@ -44,14 +42,20 @@ type ClientDeal struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
sm *stmgr.StateManager
|
sm *stmgr.StateManager
|
||||||
chain *store.ChainStore
|
chain *store.ChainStore
|
||||||
h host.Host
|
h host.Host
|
||||||
w *wallet.Wallet
|
w *wallet.Wallet
|
||||||
dag dtypes.ClientDAG
|
// dataTransfer
|
||||||
discovery *discovery.Local
|
// TODO: once the data transfer module is complete, the
|
||||||
events *events.Events
|
// client will listen to events on the data transfer module
|
||||||
fm *market.FundMgr
|
// Because we are using only a fake DAGService
|
||||||
|
// implementation, there's no validation or events on the client side
|
||||||
|
dataTransfer dtypes.ClientDataTransfer
|
||||||
|
dag dtypes.ClientDAG
|
||||||
|
discovery *discovery.Local
|
||||||
|
events *events.Events
|
||||||
|
fm *market.FundMgr
|
||||||
|
|
||||||
deals *statestore.StateStore
|
deals *statestore.StateStore
|
||||||
conns map[cid.Cid]inet.Stream
|
conns map[cid.Cid]inet.Stream
|
||||||
@ -70,18 +74,19 @@ type clientDealUpdate struct {
|
|||||||
mut func(*ClientDeal)
|
mut func(*ClientDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, fm *market.FundMgr, chainapi full.ChainAPI) *Client {
|
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI) *Client {
|
||||||
c := &Client{
|
c := &Client{
|
||||||
sm: sm,
|
sm: sm,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
h: h,
|
h: h,
|
||||||
w: w,
|
w: w,
|
||||||
dag: dag,
|
dataTransfer: dataTransfer,
|
||||||
discovery: discovery,
|
dag: dag,
|
||||||
fm: fm,
|
discovery: discovery,
|
||||||
events: events.NewEvents(context.TODO(), &chainapi),
|
fm: fm,
|
||||||
|
events: events.NewEvents(context.TODO(), &chainapi),
|
||||||
|
|
||||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
deals: deals,
|
||||||
conns: map[cid.Cid]inet.Stream{},
|
conns: map[cid.Cid]inet.Stream{},
|
||||||
|
|
||||||
incoming: make(chan *ClientDeal, 16),
|
incoming: make(chan *ClientDeal, 16),
|
||||||
|
@ -1,17 +1,23 @@
|
|||||||
package deals
|
package deals
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
files "github.com/ipfs/go-ipfs-files"
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
|
"github.com/ipld/go-ipld-prime"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/lib/padreader"
|
"github.com/filecoin-project/lotus/lib/padreader"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) failDeal(id cid.Cid, cerr error) {
|
func (c *Client) failDeal(id cid.Cid, cerr error) {
|
||||||
@ -98,3 +104,67 @@ func (c *Client) disconnect(deal ClientDeal) error {
|
|||||||
delete(c.conns, deal.ProposalCid)
|
delete(c.conns, deal.ProposalCid)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ datatransfer.RequestValidator = &ClientRequestValidator{}
|
||||||
|
|
||||||
|
// ClientRequestValidator validates data transfer requests for the client
|
||||||
|
// in a storage market
|
||||||
|
type ClientRequestValidator struct {
|
||||||
|
deals *statestore.StateStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientRequestValidator returns a new client request validator for the
|
||||||
|
// given datastore
|
||||||
|
func NewClientRequestValidator(deals dtypes.ClientDealStore) *ClientRequestValidator {
|
||||||
|
crv := &ClientRequestValidator{
|
||||||
|
deals: deals,
|
||||||
|
}
|
||||||
|
return crv
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidatePush validates a push request received from the peer that will send data
|
||||||
|
// Will always error because clients should not accept push requests from a provider
|
||||||
|
// in a storage deal (i.e. send data to client).
|
||||||
|
func (c *ClientRequestValidator) ValidatePush(
|
||||||
|
sender peer.ID,
|
||||||
|
voucher datatransfer.Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
Selector ipld.Node) error {
|
||||||
|
return ErrNoPushAccepted
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidatePull validates a pull request received from the peer that will receive data
|
||||||
|
// Will succeed only if:
|
||||||
|
// - voucher has correct type
|
||||||
|
// - voucher references an active deal
|
||||||
|
// - referenced deal matches the receiver (miner)
|
||||||
|
// - referenced deal matches the given base CID
|
||||||
|
// - referenced deal is in an acceptable state
|
||||||
|
func (c *ClientRequestValidator) ValidatePull(
|
||||||
|
receiver peer.ID,
|
||||||
|
voucher datatransfer.Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
Selector ipld.Node) error {
|
||||||
|
dealVoucher, ok := voucher.(*StorageDataTransferVoucher)
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("voucher type %s: %w", voucher.Identifier(), ErrWrongVoucherType)
|
||||||
|
}
|
||||||
|
|
||||||
|
var deal ClientDeal
|
||||||
|
err := c.deals.Get(dealVoucher.Proposal, &deal)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Proposal CID %s: %w", dealVoucher.Proposal.String(), ErrNoDeal)
|
||||||
|
}
|
||||||
|
if deal.Miner != receiver {
|
||||||
|
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Miner.String(), receiver.String(), ErrWrongPeer)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
|
||||||
|
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
|
||||||
|
}
|
||||||
|
for _, state := range DataTransferStates {
|
||||||
|
if deal.State == state {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return xerrors.Errorf("Deal State %s: %w", deal.State, ErrInacceptableDealState)
|
||||||
|
}
|
||||||
|
@ -2,6 +2,7 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
@ -15,6 +16,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
"github.com/filecoin-project/lotus/lib/statestore"
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
@ -47,10 +49,13 @@ type Provider struct {
|
|||||||
sminer *storage.Miner
|
sminer *storage.Miner
|
||||||
full api.FullNode
|
full api.FullNode
|
||||||
|
|
||||||
// TODO: Use a custom protocol or graphsync in the future
|
// TODO: This will go away once storage market module + CAR
|
||||||
// TODO: GC
|
// is implemented
|
||||||
dag dtypes.StagingDAG
|
dag dtypes.StagingDAG
|
||||||
|
|
||||||
|
// dataTransfer is the manager of data transfers used by this storage provider
|
||||||
|
dataTransfer dtypes.ProviderDataTransfer
|
||||||
|
|
||||||
deals *statestore.StateStore
|
deals *statestore.StateStore
|
||||||
ds dtypes.MetadataDS
|
ds dtypes.MetadataDS
|
||||||
|
|
||||||
@ -71,7 +76,12 @@ type minerDealUpdate struct {
|
|||||||
mut func(*MinerDeal)
|
mut func(*MinerDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
var (
|
||||||
|
// ErrDataTransferFailed means a data transfer for a deal failed
|
||||||
|
ErrDataTransferFailed = errors.New("Deal data transfer failed")
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
|
||||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -82,10 +92,11 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
|
|||||||
}
|
}
|
||||||
|
|
||||||
h := &Provider{
|
h := &Provider{
|
||||||
sminer: sminer,
|
sminer: sminer,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
full: fullNode,
|
dataTransfer: dataTransfer,
|
||||||
secb: secb,
|
full: fullNode,
|
||||||
|
secb: secb,
|
||||||
|
|
||||||
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
|
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
|
||||||
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))
|
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))
|
||||||
@ -115,6 +126,10 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// register a data transfer event handler -- this will move deals from
|
||||||
|
// accepted to staged
|
||||||
|
h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent)
|
||||||
|
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +197,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
|
|||||||
|
|
||||||
switch update.newState {
|
switch update.newState {
|
||||||
case api.DealAccepted:
|
case api.DealAccepted:
|
||||||
p.handle(ctx, deal, p.accept, api.DealStaged)
|
p.handle(ctx, deal, p.accept, api.DealNoUpdate)
|
||||||
case api.DealStaged:
|
case api.DealStaged:
|
||||||
p.handle(ctx, deal, p.staged, api.DealSealing)
|
p.handle(ctx, deal, p.staged, api.DealSealing)
|
||||||
case api.DealSealing:
|
case api.DealSealing:
|
||||||
@ -192,6 +207,44 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// onDataTransferEvent is the function called when an event occurs in a data
|
||||||
|
// transfer -- it reads the voucher to verify this even occurred in a storage
|
||||||
|
// market deal, then, based on the data transfer event that occurred, it generates
|
||||||
|
// and update message for the deal -- either moving to staged for a completion
|
||||||
|
// event or moving to error if a data transfer error occurs
|
||||||
|
func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) {
|
||||||
|
voucher, ok := channelState.Voucher().(*StorageDataTransferVoucher)
|
||||||
|
// if this event is for a transfer not related to storage, ignore
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// data transfer events for opening and progress do not affect deal state
|
||||||
|
var next api.DealState
|
||||||
|
var err error
|
||||||
|
switch event {
|
||||||
|
case datatransfer.Complete:
|
||||||
|
next = api.DealStaged
|
||||||
|
err = nil
|
||||||
|
case datatransfer.Error:
|
||||||
|
next = api.DealFailed
|
||||||
|
err = ErrDataTransferFailed
|
||||||
|
default:
|
||||||
|
// the only events we care about are complete and error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case p.updated <- minerDealUpdate{
|
||||||
|
newState: next,
|
||||||
|
id: voucher.Proposal,
|
||||||
|
err: err,
|
||||||
|
mut: nil,
|
||||||
|
}:
|
||||||
|
case <-p.stop:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
|
func (p *Provider) newDeal(s inet.Stream, proposal Proposal) (MinerDeal, error) {
|
||||||
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
|
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -4,7 +4,10 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/ipfs/go-merkledag"
|
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
|
||||||
|
"github.com/ipld/go-ipld-prime/traversal/selector"
|
||||||
|
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
|
||||||
|
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -149,9 +152,26 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
log.Warnf("closing client connection: %+v", err)
|
log.Warnf("closing client connection: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
|
||||||
|
|
||||||
|
// this is the selector for "get the whole DAG"
|
||||||
|
// TODO: support storage deals with custom payload selectors
|
||||||
|
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
|
||||||
|
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
|
||||||
|
|
||||||
|
// initiate a pull data transfer. This will complete asynchronously and the
|
||||||
|
// completion of the data transfer will trigger a change in deal state
|
||||||
|
// (see onDataTransferEvent)
|
||||||
|
_, err = p.dataTransfer.OpenPullDataChannel(ctx,
|
||||||
|
deal.Client,
|
||||||
|
&StorageDataTransferVoucher{Proposal: deal.ProposalCid},
|
||||||
|
deal.Ref,
|
||||||
|
allSelector,
|
||||||
|
)
|
||||||
|
|
||||||
return func(deal *MinerDeal) {
|
return func(deal *MinerDeal) {
|
||||||
deal.DealID = resp.DealIDs[0]
|
deal.DealID = resp.DealIDs[0]
|
||||||
}, merkledag.FetchGraph(ctx, deal.Ref, p.dag)
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// STAGED
|
// STAGED
|
||||||
|
@ -1,17 +1,24 @@
|
|||||||
package deals
|
package deals
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/ipld/go-ipld-prime"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
inet "github.com/libp2p/go-libp2p-core/network"
|
inet "github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -125,3 +132,67 @@ func (p *Provider) getWorker(miner address.Address) (address.Address, error) {
|
|||||||
|
|
||||||
return address.NewFromBytes(r.Return)
|
return address.NewFromBytes(r.Return)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ datatransfer.RequestValidator = &ProviderRequestValidator{}
|
||||||
|
|
||||||
|
// ProviderRequestValidator validates data transfer requests for the provider
|
||||||
|
// in a storage market
|
||||||
|
type ProviderRequestValidator struct {
|
||||||
|
deals *statestore.StateStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProviderRequestValidator returns a new client request validator for the
|
||||||
|
// given datastore
|
||||||
|
func NewProviderRequestValidator(deals dtypes.ProviderDealStore) *ProviderRequestValidator {
|
||||||
|
return &ProviderRequestValidator{
|
||||||
|
deals: deals,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidatePush validates a push request received from the peer that will send data
|
||||||
|
// Will succeed only if:
|
||||||
|
// - voucher has correct type
|
||||||
|
// - voucher references an active deal
|
||||||
|
// - referenced deal matches the client
|
||||||
|
// - referenced deal matches the given base CID
|
||||||
|
// - referenced deal is in an acceptable state
|
||||||
|
func (m *ProviderRequestValidator) ValidatePush(
|
||||||
|
sender peer.ID,
|
||||||
|
voucher datatransfer.Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
Selector ipld.Node) error {
|
||||||
|
dealVoucher, ok := voucher.(*StorageDataTransferVoucher)
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("voucher type %s: %w", voucher.Identifier(), ErrWrongVoucherType)
|
||||||
|
}
|
||||||
|
|
||||||
|
var deal MinerDeal
|
||||||
|
err := m.deals.Get(dealVoucher.Proposal, &deal)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Proposal CID %s: %w", dealVoucher.Proposal.String(), ErrNoDeal)
|
||||||
|
}
|
||||||
|
if deal.Client != sender {
|
||||||
|
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Client.String(), sender.String(), ErrWrongPeer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
|
||||||
|
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
|
||||||
|
}
|
||||||
|
for _, state := range DataTransferStates {
|
||||||
|
if deal.State == state {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return xerrors.Errorf("Deal State %s: %w", deal.State, ErrInacceptableDealState)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValidatePull validates a pull request received from the peer that will receive data.
|
||||||
|
// Will always error because providers should not accept pull requests from a client
|
||||||
|
// in a storage deal (i.e. send data to client).
|
||||||
|
func (m *ProviderRequestValidator) ValidatePull(
|
||||||
|
receiver peer.ID,
|
||||||
|
voucher datatransfer.Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
Selector ipld.Node) error {
|
||||||
|
return ErrNoPullAccepted
|
||||||
|
}
|
||||||
|
288
chain/deals/request_validation_test.go
Normal file
288
chain/deals/request_validation_test.go
Normal file
@ -0,0 +1,288 @@
|
|||||||
|
package deals_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
dss "github.com/ipfs/go-datastore/sync"
|
||||||
|
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
xerrors "golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
|
)
|
||||||
|
|
||||||
|
var blockGenerator = blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
|
type wrongDTType struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrongDTType) ToBytes() ([]byte, error) {
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrongDTType) FromBytes([]byte) error {
|
||||||
|
return fmt.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wrongDTType) Identifier() string {
|
||||||
|
return "WrongDTTYPE"
|
||||||
|
}
|
||||||
|
|
||||||
|
func uniqueStorageDealProposal() (actors.StorageDealProposal, error) {
|
||||||
|
clientAddr, err := address.NewIDAddress(uint64(rand.Int()))
|
||||||
|
if err != nil {
|
||||||
|
return actors.StorageDealProposal{}, err
|
||||||
|
}
|
||||||
|
providerAddr, err := address.NewIDAddress(uint64(rand.Int()))
|
||||||
|
if err != nil {
|
||||||
|
return actors.StorageDealProposal{}, err
|
||||||
|
}
|
||||||
|
return actors.StorageDealProposal{
|
||||||
|
PieceRef: blockGenerator.Next().Cid().Bytes(),
|
||||||
|
Client: clientAddr,
|
||||||
|
Provider: providerAddr,
|
||||||
|
ProposerSignature: &types.Signature{
|
||||||
|
Data: []byte("foo bar cat dog"),
|
||||||
|
Type: types.KTBLS,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClientDeal(minerID peer.ID, state api.DealState) (deals.ClientDeal, error) {
|
||||||
|
newProposal, err := uniqueStorageDealProposal()
|
||||||
|
if err != nil {
|
||||||
|
return deals.ClientDeal{}, err
|
||||||
|
}
|
||||||
|
proposalNd, err := cborutil.AsIpld(&newProposal)
|
||||||
|
if err != nil {
|
||||||
|
return deals.ClientDeal{}, err
|
||||||
|
}
|
||||||
|
minerAddr, err := address.NewIDAddress(uint64(rand.Int()))
|
||||||
|
|
||||||
|
return deals.ClientDeal{
|
||||||
|
Proposal: newProposal,
|
||||||
|
ProposalCid: proposalNd.Cid(),
|
||||||
|
Miner: minerID,
|
||||||
|
MinerWorker: minerAddr,
|
||||||
|
State: state,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMinerDeal(clientID peer.ID, state api.DealState) (deals.MinerDeal, error) {
|
||||||
|
newProposal, err := uniqueStorageDealProposal()
|
||||||
|
if err != nil {
|
||||||
|
return deals.MinerDeal{}, err
|
||||||
|
}
|
||||||
|
proposalNd, err := cborutil.AsIpld(&newProposal)
|
||||||
|
if err != nil {
|
||||||
|
return deals.MinerDeal{}, err
|
||||||
|
}
|
||||||
|
ref, err := cid.Cast(newProposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
return deals.MinerDeal{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return deals.MinerDeal{
|
||||||
|
Proposal: newProposal,
|
||||||
|
ProposalCid: proposalNd.Cid(),
|
||||||
|
Client: clientID,
|
||||||
|
State: state,
|
||||||
|
Ref: ref,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientRequestValidation(t *testing.T) {
|
||||||
|
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||||
|
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||||
|
|
||||||
|
crv := deals.NewClientRequestValidator(state)
|
||||||
|
minerID := peer.ID("fakepeerid")
|
||||||
|
block := blockGenerator.Next()
|
||||||
|
t.Run("ValidatePush fails", func(t *testing.T) {
|
||||||
|
if !xerrors.Is(crv.ValidatePush(minerID, wrongDTType{}, block.Cid(), nil), deals.ErrNoPushAccepted) {
|
||||||
|
t.Fatal("Push should fail for the client request validator for storage deals")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePull fails deal not found", func(t *testing.T) {
|
||||||
|
proposal, err := uniqueStorageDealProposal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating proposal")
|
||||||
|
}
|
||||||
|
proposalNd, err := cborutil.AsIpld(&proposal)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error serializing proposal")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{proposalNd.Cid()}, pieceRef, nil), deals.ErrNoDeal) {
|
||||||
|
t.Fatal("Pull should fail if there is no deal stored")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePull fails wrong client", func(t *testing.T) {
|
||||||
|
otherMiner := peer.ID("otherminer")
|
||||||
|
clientDeal, err := newClientDeal(otherMiner, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil), deals.ErrWrongPeer) {
|
||||||
|
t.Fatal("Pull should fail if miner address is incorrect")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePull fails wrong piece ref", func(t *testing.T) {
|
||||||
|
clientDeal, err := newClientDeal(minerID, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, blockGenerator.Next().Cid(), nil), deals.ErrWrongPiece) {
|
||||||
|
t.Fatal("Pull should fail if piece ref is incorrect")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePull fails wrong deal state", func(t *testing.T) {
|
||||||
|
clientDeal, err := newClientDeal(minerID, api.DealComplete)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil), deals.ErrInacceptableDealState) {
|
||||||
|
t.Fatal("Pull should fail if deal is in a state that cannot be data transferred")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePull succeeds", func(t *testing.T) {
|
||||||
|
clientDeal, err := newClientDeal(minerID, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(clientDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, pieceRef, nil) != nil {
|
||||||
|
t.Fatal("Pull should should succeed when all parameters are correct")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProviderRequestValidation(t *testing.T) {
|
||||||
|
ds := dss.MutexWrap(datastore.NewMapDatastore())
|
||||||
|
state := statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||||
|
|
||||||
|
mrv := deals.NewProviderRequestValidator(state)
|
||||||
|
clientID := peer.ID("fakepeerid")
|
||||||
|
block := blockGenerator.Next()
|
||||||
|
t.Run("ValidatePull fails", func(t *testing.T) {
|
||||||
|
if !xerrors.Is(mrv.ValidatePull(clientID, wrongDTType{}, block.Cid(), nil), deals.ErrNoPullAccepted) {
|
||||||
|
t.Fatal("Pull should fail for the provider request validator for storage deals")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ValidatePush fails deal not found", func(t *testing.T) {
|
||||||
|
proposal, err := uniqueStorageDealProposal()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating proposal")
|
||||||
|
}
|
||||||
|
proposalNd, err := cborutil.AsIpld(&proposal)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error serializing proposal")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{proposalNd.Cid()}, pieceRef, nil), deals.ErrNoDeal) {
|
||||||
|
t.Fatal("Push should fail if there is no deal stored")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePush fails wrong miner", func(t *testing.T) {
|
||||||
|
otherClient := peer.ID("otherclient")
|
||||||
|
minerDeal, err := newMinerDeal(otherClient, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil), deals.ErrWrongPeer) {
|
||||||
|
t.Fatal("Push should fail if miner address is incorrect")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePush fails wrong piece ref", func(t *testing.T) {
|
||||||
|
minerDeal, err := newMinerDeal(clientID, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, blockGenerator.Next().Cid(), nil), deals.ErrWrongPiece) {
|
||||||
|
t.Fatal("Push should fail if piece ref is incorrect")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePush fails wrong deal state", func(t *testing.T) {
|
||||||
|
minerDeal, err := newMinerDeal(clientID, api.DealComplete)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil), deals.ErrInacceptableDealState) {
|
||||||
|
t.Fatal("Push should fail if deal is in a state that cannot be data transferred")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("ValidatePush succeeds", func(t *testing.T) {
|
||||||
|
minerDeal, err := newMinerDeal(clientID, api.DealAccepted)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("error creating client deal")
|
||||||
|
}
|
||||||
|
if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil {
|
||||||
|
t.Fatal("deal tracking failed")
|
||||||
|
}
|
||||||
|
pieceRef, err := cid.Cast(minerDeal.Proposal.PieceRef)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("unable to construct piece cid")
|
||||||
|
}
|
||||||
|
if mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, pieceRef, nil) != nil {
|
||||||
|
t.Fatal("Push should should succeed when all parameters are correct")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
@ -1,6 +1,9 @@
|
|||||||
package deals
|
package deals
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
@ -9,6 +12,35 @@ import (
|
|||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrWrongVoucherType means the voucher was not the correct type can validate against
|
||||||
|
ErrWrongVoucherType = errors.New("cannot validate voucher type.")
|
||||||
|
|
||||||
|
// ErrNoPushAccepted just means clients do not accept pushes for storage deals
|
||||||
|
ErrNoPushAccepted = errors.New("client should not receive data for a storage deal.")
|
||||||
|
|
||||||
|
// ErrNoPullAccepted just means providers do not accept pulls for storage deals
|
||||||
|
ErrNoPullAccepted = errors.New("provider should not send data for a storage deal.")
|
||||||
|
|
||||||
|
// ErrNoDeal means no active deal was found for this vouchers proposal cid
|
||||||
|
ErrNoDeal = errors.New("no deal found for this proposal.")
|
||||||
|
|
||||||
|
// ErrWrongPeer means that the other peer for this data transfer request does not match
|
||||||
|
// the other peer for the deal
|
||||||
|
ErrWrongPeer = errors.New("data Transfer peer id and Deal peer id do not match.")
|
||||||
|
|
||||||
|
// ErrWrongPiece means that the pieceref for this data transfer request does not match
|
||||||
|
// the one specified in the deal
|
||||||
|
ErrWrongPiece = errors.New("base CID for deal does not match CID for piece.")
|
||||||
|
|
||||||
|
// ErrInacceptableDealState means the deal for this transfer is not in a deal state
|
||||||
|
// where transfer can be performed
|
||||||
|
ErrInacceptableDealState = errors.New("deal is not a in a state where deals are accepted.")
|
||||||
|
|
||||||
|
// DataTransferStates are the states in which it would make sense to actually start a data transfer
|
||||||
|
DataTransferStates = []api.DealState{api.DealAccepted, api.DealUnknown}
|
||||||
|
)
|
||||||
|
|
||||||
const DealProtocolID = "/fil/storage/mk/1.0.1"
|
const DealProtocolID = "/fil/storage/mk/1.0.1"
|
||||||
const AskProtocolID = "/fil/storage/ask/1.0.1"
|
const AskProtocolID = "/fil/storage/ask/1.0.1"
|
||||||
|
|
||||||
@ -53,3 +85,30 @@ type AskRequest struct {
|
|||||||
type AskResponse struct {
|
type AskResponse struct {
|
||||||
Ask *types.SignedStorageAsk
|
Ask *types.SignedStorageAsk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StorageDataTransferVoucher is the voucher type for data transfers
|
||||||
|
// used by the storage market
|
||||||
|
type StorageDataTransferVoucher struct {
|
||||||
|
Proposal cid.Cid
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToBytes converts the StorageDataTransferVoucher to raw bytes
|
||||||
|
func (dv *StorageDataTransferVoucher) ToBytes() ([]byte, error) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := dv.MarshalCBOR(&buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buf.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromBytes converts the StorageDataTransferVoucher to raw bytes
|
||||||
|
func (dv *StorageDataTransferVoucher) FromBytes(raw []byte) error {
|
||||||
|
r := bytes.NewReader(raw)
|
||||||
|
return dv.UnmarshalCBOR(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Identifier is the unique string identifier for a StorageDataTransferVoucher
|
||||||
|
func (dv *StorageDataTransferVoucher) Identifier() string {
|
||||||
|
return "StorageDataTransferVoucher"
|
||||||
|
}
|
||||||
|
79
datatransfer/dagservice_impl.go
Normal file
79
datatransfer/dagservice_impl.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package datatransfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
ipldformat "github.com/ipfs/go-ipld-format"
|
||||||
|
"github.com/ipfs/go-merkledag"
|
||||||
|
ipld "github.com/ipld/go-ipld-prime"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This file implements a VERY simple, incomplete version of the data transfer
|
||||||
|
// module that allows us to make the neccesary insertions of data transfer
|
||||||
|
// functionality into the storage market
|
||||||
|
// It does not:
|
||||||
|
// -- actually validate requests
|
||||||
|
// -- support Push requests
|
||||||
|
// -- support multiple subscribers
|
||||||
|
// -- do any actual network coordination or use Graphsync
|
||||||
|
|
||||||
|
type dagserviceImpl struct {
|
||||||
|
dag ipldformat.DAGService
|
||||||
|
subscriber Subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDAGServiceDataTransfer returns a data transfer manager based on
|
||||||
|
// an IPLD DAGService
|
||||||
|
func NewDAGServiceDataTransfer(dag ipldformat.DAGService) Manager {
|
||||||
|
return &dagserviceImpl{dag, nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterVoucherType registers a validator for the given voucher type
|
||||||
|
// will error if voucher type does not implement voucher
|
||||||
|
// or if there is a voucher type registered with an identical identifier
|
||||||
|
func (impl *dagserviceImpl) RegisterVoucherType(voucherType reflect.Type, validator RequestValidator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
func (impl *dagserviceImpl) OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
|
||||||
|
return ChannelID{}, xerrors.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// open a data transfer that will request data from the sending peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
func (impl *dagserviceImpl) OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
go func() {
|
||||||
|
defer cancel()
|
||||||
|
err := merkledag.FetchGraph(ctx, baseCid, impl.dag)
|
||||||
|
var event Event
|
||||||
|
if err != nil {
|
||||||
|
event = Error
|
||||||
|
} else {
|
||||||
|
event = Complete
|
||||||
|
}
|
||||||
|
impl.subscriber(event, ChannelState{Channel: Channel{voucher: voucher}})
|
||||||
|
}()
|
||||||
|
return ChannelID{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// close an open channel (effectively a cancel)
|
||||||
|
func (impl *dagserviceImpl) CloseDataTransferChannel(x ChannelID) {}
|
||||||
|
|
||||||
|
// get status of a transfer
|
||||||
|
func (impl *dagserviceImpl) TransferChannelStatus(x ChannelID) Status { return ChannelNotFoundError }
|
||||||
|
|
||||||
|
// get notified when certain types of events happen
|
||||||
|
func (impl *dagserviceImpl) SubscribeToEvents(subscriber Subscriber) {
|
||||||
|
impl.subscriber = subscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
// get all in progress transfers
|
||||||
|
func (impl *dagserviceImpl) InProgressChannels() map[ChannelID]ChannelState { return nil }
|
173
datatransfer/types.go
Normal file
173
datatransfer/types.go
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
package datatransfer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
ipld "github.com/ipld/go-ipld-prime"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Voucher is used to validate
|
||||||
|
// a data transfer request against the underlying storage or retrieval deal
|
||||||
|
// that precipitated it. The only requirement is a voucher can read and write
|
||||||
|
// from bytes, and has a string identifier type
|
||||||
|
type Voucher interface {
|
||||||
|
// ToBytes converts the Voucher to raw bytes
|
||||||
|
ToBytes() ([]byte, error)
|
||||||
|
// FromBytes reads a Voucher from raw bytes
|
||||||
|
FromBytes([]byte) error
|
||||||
|
// Identifier is a unique string identifier for this voucher type
|
||||||
|
Identifier() string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status is the status of transfer for a given channel
|
||||||
|
type Status int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Ongoing means the data transfer is in progress
|
||||||
|
Ongoing Status = iota
|
||||||
|
|
||||||
|
// Completed means the data transfer is completed successfully
|
||||||
|
Completed
|
||||||
|
|
||||||
|
// Failed means the data transfer failed
|
||||||
|
Failed
|
||||||
|
|
||||||
|
// ChannelNotFoundError means the searched for data transfer does not exist
|
||||||
|
ChannelNotFoundError
|
||||||
|
)
|
||||||
|
|
||||||
|
// TransferID is an identifier for a data transfer, shared between
|
||||||
|
// request/responder and unique to the requestor
|
||||||
|
type TransferID uint64
|
||||||
|
|
||||||
|
// ChannelID is a unique identifier for a channel, distinct by both the other
|
||||||
|
// party's peer ID + the transfer ID
|
||||||
|
type ChannelID struct {
|
||||||
|
to peer.ID
|
||||||
|
id TransferID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Channel represents all the parameters for a single data transfer
|
||||||
|
type Channel struct {
|
||||||
|
// an identifier for this channel shared by request and responder, set by requestor through protocol
|
||||||
|
transferID TransferID
|
||||||
|
// base CID for the piece being transferred
|
||||||
|
baseCid cid.Cid
|
||||||
|
// portion of Piece to return, spescified by an IPLD selector
|
||||||
|
selector ipld.Node
|
||||||
|
// used to verify this channel
|
||||||
|
voucher Voucher
|
||||||
|
// the party that is sending the data (not who initiated the request)
|
||||||
|
sender peer.ID
|
||||||
|
// the party that is receiving the data (not who initiated the request)
|
||||||
|
recipient peer.ID
|
||||||
|
// expected amount of data to be transferred
|
||||||
|
totalSize uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransferID returns the transfer id for this channel
|
||||||
|
func (c Channel) TransferID() TransferID { return c.transferID }
|
||||||
|
|
||||||
|
// BaseCID returns the CID that is at the root of this data transfer
|
||||||
|
func (c Channel) BaseCID() cid.Cid { return c.baseCid }
|
||||||
|
|
||||||
|
// Selector returns the IPLD selector for this data transfer (represented as
|
||||||
|
// an IPLD node)
|
||||||
|
func (c Channel) Selector() ipld.Node { return c.selector }
|
||||||
|
|
||||||
|
// Voucher returns the voucher for this data transfer
|
||||||
|
func (c Channel) Voucher() Voucher { return c.voucher }
|
||||||
|
|
||||||
|
// Sender returns the peer id for the node that is sending data
|
||||||
|
func (c Channel) Sender() peer.ID { return c.sender }
|
||||||
|
|
||||||
|
// Recipient returns the peer id for the node that is receiving data
|
||||||
|
func (c Channel) Recipient() peer.ID { return c.recipient }
|
||||||
|
|
||||||
|
// TotalSize returns the total size for the data being transferred
|
||||||
|
func (c Channel) TotalSize() uint64 { return c.totalSize }
|
||||||
|
|
||||||
|
// ChannelState is immutable channel data plus mutable state
|
||||||
|
type ChannelState struct {
|
||||||
|
Channel
|
||||||
|
// total bytes sent from this node (0 if receiver)
|
||||||
|
sent uint64
|
||||||
|
// total bytes received by this node (0 if sender)
|
||||||
|
received uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sent returns the number of bytes sent
|
||||||
|
func (c ChannelState) Sent() uint64 { return c.sent }
|
||||||
|
|
||||||
|
// Received returns the number of bytes received
|
||||||
|
func (c ChannelState) Received() uint64 { return c.received }
|
||||||
|
|
||||||
|
// Event is a name for an event that occurs on a data transfer channel
|
||||||
|
type Event int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Open is an event occurs when a channel is first opened
|
||||||
|
Open Event = iota
|
||||||
|
|
||||||
|
// Progress is an event that gets emitted every time more data is transferred
|
||||||
|
Progress
|
||||||
|
|
||||||
|
// Error is an event that emits when an error occurs in a data transfer
|
||||||
|
Error
|
||||||
|
|
||||||
|
// Complete is emitted when a data transfer is complete
|
||||||
|
Complete
|
||||||
|
)
|
||||||
|
|
||||||
|
// Subscriber is a callback that is called when events are emitted
|
||||||
|
type Subscriber func(event Event, channelState ChannelState)
|
||||||
|
|
||||||
|
// RequestValidator is an interface implemented by the client of the
|
||||||
|
// data transfer module to validate requests
|
||||||
|
type RequestValidator interface {
|
||||||
|
// ValidatePush validates a push request received from the peer that will send data
|
||||||
|
ValidatePush(
|
||||||
|
sender peer.ID,
|
||||||
|
voucher Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
selector ipld.Node) error
|
||||||
|
// ValidatePull validates a pull request received from the peer that will receive data
|
||||||
|
ValidatePull(
|
||||||
|
receiver peer.ID,
|
||||||
|
voucher Voucher,
|
||||||
|
baseCid cid.Cid,
|
||||||
|
selector ipld.Node) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Manager is the core interface presented by all implementations of
|
||||||
|
// of the data transfer sub system
|
||||||
|
type Manager interface {
|
||||||
|
// RegisterVoucherType registers a validator for the given voucher type
|
||||||
|
// will error if voucher type does not implement voucher
|
||||||
|
// or if there is a voucher type registered with an identical identifier
|
||||||
|
RegisterVoucherType(voucherType reflect.Type, validator RequestValidator) error
|
||||||
|
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// open a data transfer that will send data to the recipient peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)
|
||||||
|
|
||||||
|
// open a data transfer that will request data from the sending peer and
|
||||||
|
// transfer parts of the piece that match the selector
|
||||||
|
OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)
|
||||||
|
|
||||||
|
// close an open channel (effectively a cancel)
|
||||||
|
CloseDataTransferChannel(x ChannelID)
|
||||||
|
|
||||||
|
// get status of a transfer
|
||||||
|
TransferChannelStatus(x ChannelID) Status
|
||||||
|
|
||||||
|
// get notified when certain types of events happen
|
||||||
|
SubscribeToEvents(subscriber Subscriber)
|
||||||
|
|
||||||
|
// get all in progress transfers
|
||||||
|
InProgressChannels() map[ChannelID]ChannelState
|
||||||
|
}
|
@ -143,6 +143,7 @@ func main() {
|
|||||||
deals.ClientDealProposal{},
|
deals.ClientDealProposal{},
|
||||||
deals.ClientDeal{},
|
deals.ClientDeal{},
|
||||||
deals.MinerDeal{},
|
deals.MinerDeal{},
|
||||||
|
deals.StorageDataTransferVoucher{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
2
go.mod
2
go.mod
@ -30,6 +30,7 @@ require (
|
|||||||
github.com/ipfs/go-fs-lock v0.0.1
|
github.com/ipfs/go-fs-lock v0.0.1
|
||||||
github.com/ipfs/go-hamt-ipld v0.0.12-0.20190910032255-ee6e898f0456
|
github.com/ipfs/go-hamt-ipld v0.0.12-0.20190910032255-ee6e898f0456
|
||||||
github.com/ipfs/go-ipfs-blockstore v0.1.0
|
github.com/ipfs/go-ipfs-blockstore v0.1.0
|
||||||
|
github.com/ipfs/go-ipfs-blocksutil v0.0.1
|
||||||
github.com/ipfs/go-ipfs-chunker v0.0.1
|
github.com/ipfs/go-ipfs-chunker v0.0.1
|
||||||
github.com/ipfs/go-ipfs-ds-help v0.0.1
|
github.com/ipfs/go-ipfs-ds-help v0.0.1
|
||||||
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
|
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
|
||||||
@ -41,6 +42,7 @@ require (
|
|||||||
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144
|
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144
|
||||||
github.com/ipfs/go-merkledag v0.2.3
|
github.com/ipfs/go-merkledag v0.2.3
|
||||||
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
|
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
|
||||||
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52
|
||||||
github.com/libp2p/go-libp2p v0.3.0
|
github.com/libp2p/go-libp2p v0.3.0
|
||||||
github.com/libp2p/go-libp2p-circuit v0.1.1
|
github.com/libp2p/go-libp2p-circuit v0.1.1
|
||||||
|
2
go.sum
2
go.sum
@ -233,6 +233,8 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD
|
|||||||
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
|
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
|
||||||
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
|
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
|
||||||
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
|
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
|
||||||
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b h1:ACSEK4f1SDQC+FJ4B4pqHFW14d7kEW2ufwXA/c7eLP0=
|
||||||
|
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||||
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
|
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
|
||||||
|
@ -85,12 +85,14 @@ const (
|
|||||||
HandleIncomingMessagesKey
|
HandleIncomingMessagesKey
|
||||||
|
|
||||||
RunDealClientKey
|
RunDealClientKey
|
||||||
|
RegisterClientValidatorKey
|
||||||
|
|
||||||
// storage miner
|
// storage miner
|
||||||
HandleDealsKey
|
HandleDealsKey
|
||||||
HandleRetrievalKey
|
HandleRetrievalKey
|
||||||
RunSectorServiceKey
|
RunSectorServiceKey
|
||||||
RegisterMinerKey
|
RegisterMinerKey
|
||||||
|
RegisterProviderValidatorKey
|
||||||
|
|
||||||
// daemon
|
// daemon
|
||||||
ExtractApiKey
|
ExtractApiKey
|
||||||
@ -218,7 +220,11 @@ func Online() Option {
|
|||||||
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
||||||
|
|
||||||
Override(new(*retrieval.Client), retrieval.NewClient),
|
Override(new(*retrieval.Client), retrieval.NewClient),
|
||||||
|
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
|
||||||
|
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
|
||||||
|
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
|
||||||
Override(new(*deals.Client), deals.NewClient),
|
Override(new(*deals.Client), deals.NewClient),
|
||||||
|
Override(RegisterClientValidatorKey, modules.RegisterClientValidator),
|
||||||
Override(RunDealClientKey, modules.RunDealClient),
|
Override(RunDealClientKey, modules.RunDealClient),
|
||||||
|
|
||||||
Override(new(*paych.Store), paych.NewStore),
|
Override(new(*paych.Store), paych.NewStore),
|
||||||
@ -238,7 +244,11 @@ func Online() Option {
|
|||||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||||
|
|
||||||
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
||||||
|
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
|
||||||
|
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||||
|
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
|
||||||
Override(new(*deals.Provider), deals.NewProvider),
|
Override(new(*deals.Provider), deals.NewProvider),
|
||||||
|
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),
|
||||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||||
Override(HandleDealsKey, modules.HandleDeals),
|
Override(HandleDealsKey, modules.HandleDeals),
|
||||||
Override(RegisterMinerKey, modules.RegisterMiner),
|
Override(RegisterMinerKey, modules.RegisterMiner),
|
||||||
|
@ -2,12 +2,15 @@ package modules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
"github.com/ipfs/go-bitswap/network"
|
"github.com/ipfs/go-bitswap/network"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/routing"
|
"github.com/libp2p/go-libp2p-core/routing"
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -17,6 +20,8 @@ import (
|
|||||||
"github.com/ipfs/go-merkledag"
|
"github.com/ipfs/go-merkledag"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
@ -40,6 +45,24 @@ func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore {
|
|||||||
return blockstore.NewIdStore((*filestore.Filestore)(fstore))
|
return blockstore.NewIdStore((*filestore.Filestore)(fstore))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterClientValidator is an initialization hook that registers the client
|
||||||
|
// request validator with the data transfer module as the validator for
|
||||||
|
// StorageDataTransferVoucher types
|
||||||
|
func RegisterClientValidator(crv *deals.ClientRequestValidator, dtm dtypes.ClientDataTransfer) {
|
||||||
|
dtm.RegisterVoucherType(reflect.TypeOf(deals.StorageDataTransferVoucher{}), crv)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientDAGServiceDataTransfer returns a data transfer manager that just
|
||||||
|
// uses the clients's Client DAG service for transfers
|
||||||
|
func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) dtypes.ClientDataTransfer {
|
||||||
|
return datatransfer.NewDAGServiceDataTransfer(dag)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientDealStore creates a statestore for the client to store its deals
|
||||||
|
func NewClientDealStore(ds dtypes.MetadataDS) dtypes.ClientDealStore {
|
||||||
|
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||||
|
}
|
||||||
|
|
||||||
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
|
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
|
||||||
bitswapNetwork := network.NewFromIpfsHost(h, rt)
|
bitswapNetwork := network.NewFromIpfsHost(h, rt)
|
||||||
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs)
|
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs)
|
||||||
|
@ -7,6 +7,9 @@ import (
|
|||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||||
ipld "github.com/ipfs/go-ipld-format"
|
ipld "github.com/ipfs/go-ipld-format"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MetadataDS stores metadata
|
// MetadataDS stores metadata
|
||||||
@ -23,5 +26,14 @@ type ChainBlockService bserv.BlockService
|
|||||||
type ClientFilestore *filestore.Filestore
|
type ClientFilestore *filestore.Filestore
|
||||||
type ClientBlockstore blockstore.Blockstore
|
type ClientBlockstore blockstore.Blockstore
|
||||||
type ClientDAG ipld.DAGService
|
type ClientDAG ipld.DAGService
|
||||||
|
type ClientDealStore *statestore.StateStore
|
||||||
|
|
||||||
|
// ClientDataTransfer is a data transfer manager for the client
|
||||||
|
type ClientDataTransfer datatransfer.Manager
|
||||||
|
|
||||||
|
type ProviderDealStore *statestore.StateStore
|
||||||
|
|
||||||
|
// ProviderDataTransfer is a data transfer manager for the provider
|
||||||
|
type ProviderDataTransfer datatransfer.Manager
|
||||||
|
|
||||||
type StagingDAG ipld.DAGService
|
type StagingDAG ipld.DAGService
|
||||||
|
@ -5,11 +5,13 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
"github.com/ipfs/go-bitswap/network"
|
"github.com/ipfs/go-bitswap/network"
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
"github.com/ipfs/go-merkledag"
|
"github.com/ipfs/go-merkledag"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
@ -22,7 +24,9 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
|
"github.com/filecoin-project/lotus/datatransfer"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
@ -130,6 +134,24 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *de
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterProviderValidator is an initialization hook that registers the provider
|
||||||
|
// request validator with the data transfer module as the validator for
|
||||||
|
// StorageDataTransferVoucher types
|
||||||
|
func RegisterProviderValidator(mrv *deals.ProviderRequestValidator, dtm dtypes.ProviderDataTransfer) {
|
||||||
|
dtm.RegisterVoucherType(reflect.TypeOf(deals.StorageDataTransferVoucher{}), mrv)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
||||||
|
// uses the provider's Staging DAG service for transfers
|
||||||
|
func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) dtypes.ProviderDataTransfer {
|
||||||
|
return datatransfer.NewDAGServiceDataTransfer(dag)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProviderDealStore creates a statestore for the client to store its deals
|
||||||
|
func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore {
|
||||||
|
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client")))
|
||||||
|
}
|
||||||
|
|
||||||
func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {
|
func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {
|
||||||
stagingds, err := r.Datastore("/staging")
|
stagingds, err := r.Datastore("/staging")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user