From d0b705705f6864bc8cbb0fc46c0321d5ec9ce804 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 28 Oct 2019 17:57:12 -0700 Subject: [PATCH] refactor(datatransfer): add comments, renames minor name changes and additional comments to clarify how data transfer works --- chain/deals/client.go | 9 +++-- chain/deals/provider.go | 10 +++++- chain/deals/provider_states.go | 4 +++ chain/deals/request_validation.go | 53 ++++++++++++++++++++++++---- datatransfer/dagservice_impl.go | 19 +++++++++-- datatransfer/types.go | 57 ++++++++++++++++++++++--------- go.sum | 2 -- 7 files changed, 122 insertions(+), 32 deletions(-) diff --git a/chain/deals/client.go b/chain/deals/client.go index b377c615d..2cc5dfd9e 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -46,12 +46,15 @@ type ClientDeal struct { } type Client struct { - sm *stmgr.StateManager + sm *stmgr.StateManager chain *store.ChainStore h host.Host w *wallet.Wallet - // dataTransfer -- not quite sure how this is referenced directly on client - // side + // dataTransfer + // TODO: once the data transfer module is complete, the + // client will listen to events on the data transfer module + // Because we are using only a fake DAGService + // implementation, there's no validation or events on the client side dataTransfer datatransfer.ClientDataTransfer dag dtypes.ClientDAG discovery *discovery.Local diff --git a/chain/deals/provider.go b/chain/deals/provider.go index 9ecc59a63..29ddacc3e 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -49,10 +49,11 @@ type Provider struct { sminer *storage.Miner full api.FullNode - // TODO: This will go away once the whole storage market module + CAR + // TODO: This will go away once storage market module + CAR // is implemented dag dtypes.StagingDAG + // dataTransfer is the manager of data transfers used by this storage provider dataTransfer datatransfer.ProviderDataTransfer deals *statestore.StateStore @@ -124,6 +125,8 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks } } + // register a data transfer event handler -- this will move deals from + // accepted to staged h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent) return h, nil @@ -203,6 +206,11 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { } } +// onDataTransferEvent is the function called when an event occurs in a data +// transfer -- it reads the voucher to verify this even occurred in a storage +// market deal, then, based on the data transfer event that occurred, it generates +// and update message for the deal -- either moving to staged for a completion +// event or moving to error if a data transfer error occurs func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) { voucher, ok := channelState.Voucher().(StorageDataTransferVoucher) // if this event is for a transfer not related to storage, ignore diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go index 00f137c41..d9cb26339 100644 --- a/chain/deals/provider_states.go +++ b/chain/deals/provider_states.go @@ -156,9 +156,13 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder()) // this is the selector for "get the whole DAG" + // TODO: support storage deals with custom payload selectors allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + // initiate a pull data transfer. This will complete asynchronously and the + // completion of the data transfer will trigger a change in deal state + // (see onDataTransferEvent) _, err = p.dataTransfer.OpenPullDataChannel(deal.Client, StorageDataTransferVoucher{Proposal: deal.ProposalCid}, deal.Ref, diff --git a/chain/deals/request_validation.go b/chain/deals/request_validation.go index baffe4d44..ae7ef3361 100644 --- a/chain/deals/request_validation.go +++ b/chain/deals/request_validation.go @@ -38,7 +38,7 @@ var ( // ErrWrongPiece means that the pieceref for this data transfer request does not match // the one specified in the deal - ErrWrongPiece = errors.New("PieceRef for deal does not match piece ref for piece") + ErrWrongPiece = errors.New("Base CID for deal does not match CID for piece") // ErrInacceptableDealState means the deal for this transfer is not in a deal state // where transfer can be performed @@ -48,14 +48,18 @@ var ( AcceptableDealStates = []api.DealState{api.DealAccepted, api.DealUnknown} ) +// StorageDataTransferVoucher is the voucher type for data transfers +// used by the storage market type StorageDataTransferVoucher struct { Proposal cid.Cid } +// ToBytes converts the StorageDataTransferVoucher to raw bytes func (dv StorageDataTransferVoucher) ToBytes() []byte { return dv.Proposal.Bytes() } +// FromBytes converts the StorageDataTransferVoucher to raw bytes func (dv StorageDataTransferVoucher) FromBytes(raw []byte) (datatransfer.Voucher, error) { c, err := cid.Cast(raw) if err != nil { @@ -64,16 +68,22 @@ func (dv StorageDataTransferVoucher) FromBytes(raw []byte) (datatransfer.Voucher return StorageDataTransferVoucher{c}, nil } +// Identifier is the unique string identifier for a StorageDataTransferVoucher func (dv StorageDataTransferVoucher) Identifier() string { return "StorageDataTransferVoucher" } var _ datatransfer.RequestValidator = &ClientRequestValidator{} +// ClientRequestValidator validates data transfer requests for the client +// in a storage market type ClientRequestValidator struct { deals *statestore.StateStore } +// RegisterClientValidator is an initialization hook that registers the client +// request validator with the data transfer module as the validator for +// StorageDataTransferVoucher types func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm datatransfer.ClientDataTransfer) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -82,6 +92,8 @@ func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm d }) } +// NewClientRequestValidator returns a new client request validator for the +// given datastore func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator { crv := &ClientRequestValidator{ deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))), @@ -89,18 +101,28 @@ func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator { return crv } +// ValidatePush validates a push request received from the peer that will send data +// Will always error because clients should not accept push requests from a provider +// in a storage deal (i.e. send data to client). func (c *ClientRequestValidator) ValidatePush( sender peer.ID, voucher datatransfer.Voucher, - PieceRef cid.Cid, + baseCid cid.Cid, Selector ipld.Node) error { return ErrNoPushAccepted } +// ValidatePull validates a pull request received from the peer that will receive data +// Will succeed only if: +// - voucher has correct type +// - voucher references an active deal +// - referenced deal matches the receiver (miner) +// - referenced deal matches the given base CID +// - referenced deal is in an acceptable state func (c *ClientRequestValidator) ValidatePull( receiver peer.ID, voucher datatransfer.Voucher, - PieceRef cid.Cid, + baseCid cid.Cid, Selector ipld.Node) error { dealVoucher, ok := voucher.(StorageDataTransferVoucher) if !ok { @@ -118,7 +140,7 @@ func (c *ClientRequestValidator) ValidatePull( if deal.Miner != receiver { return ErrWrongPeer } - if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) { + if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) { return ErrWrongPiece } for _, state := range AcceptableDealStates { @@ -131,10 +153,15 @@ func (c *ClientRequestValidator) ValidatePull( var _ datatransfer.RequestValidator = &ProviderRequestValidator{} +// ProviderRequestValidator validates data transfer requests for the provider +// in a storage market type ProviderRequestValidator struct { deals *statestore.StateStore } +// RegisterProviderValidator is an initialization hook that registers the provider +// request validator with the data transfer module as the validator for +// StorageDataTransferVoucher types func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, dtm datatransfer.ProviderDataTransfer) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -143,16 +170,25 @@ func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, d }) } +// NewProviderRequestValidator returns a new client request validator for the +// given datastore func NewProviderRequestValidator(ds dtypes.MetadataDS) *ProviderRequestValidator { return &ProviderRequestValidator{ deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))), } } +// ValidatePush validates a push request received from the peer that will send data +// Will succeed only if: +// - voucher has correct type +// - voucher references an active deal +// - referenced deal matches the client +// - referenced deal matches the given base CID +// - referenced deal is in an acceptable state func (m *ProviderRequestValidator) ValidatePush( sender peer.ID, voucher datatransfer.Voucher, - PieceRef cid.Cid, + baseCid cid.Cid, Selector ipld.Node) error { dealVoucher, ok := voucher.(StorageDataTransferVoucher) if !ok { @@ -171,7 +207,7 @@ func (m *ProviderRequestValidator) ValidatePush( return ErrWrongPeer } - if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) { + if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) { return ErrWrongPiece } for _, state := range AcceptableDealStates { @@ -182,10 +218,13 @@ func (m *ProviderRequestValidator) ValidatePush( return ErrInacceptableDealState } +// ValidatePull validates a pull request received from the peer that will receive data. +// Will always error because providers should not accept pull requests from a client +// in a storage deal (i.e. send data to client). func (m *ProviderRequestValidator) ValidatePull( receiver peer.ID, voucher datatransfer.Voucher, - PieceRef cid.Cid, + baseCid cid.Cid, Selector ipld.Node) error { return ErrNoPullAccepted } diff --git a/datatransfer/dagservice_impl.go b/datatransfer/dagservice_impl.go index 2e85d6ba7..f4984f9c2 100644 --- a/datatransfer/dagservice_impl.go +++ b/datatransfer/dagservice_impl.go @@ -15,15 +15,28 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" ) +// This file implements a VERY simple, incomplete version of the data transfer +// module that allows us to make the neccesary insertions of data transfer +// functionality into the storage market +// It does not: +// -- actually validate requests +// -- support Push requests +// -- support multiple subscribers +// -- do any actual network coordination or use Graphsync + type dagserviceImpl struct { dag ipldformat.DAGService subscriber Subscriber } +// NewProviderDAGServiceDataTransfer returns a data transfer manager that just +// uses the provider's Staging DAG service for transfers func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) Manager { return &dagserviceImpl{dag, nil} } +// NewClientDAGServiceDataTransfer returns a data transfer manager that just +// uses the clients's Client DAG service for transfers func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) Manager { return &dagserviceImpl{dag, nil} } @@ -38,14 +51,14 @@ func (impl *dagserviceImpl) RegisterVoucherType(voucherType reflect.Type, valida // open a data transfer that will send data to the recipient peer and // open a data transfer that will send data to the recipient peer and // transfer parts of the piece that match the selector -func (impl *dagserviceImpl) OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) { +func (impl *dagserviceImpl) OpenPushDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) { return ChannelID{}, fmt.Errorf("not implemented") } // open a data transfer that will request data from the sending peer and // transfer parts of the piece that match the selector -func (impl *dagserviceImpl) OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) { - err := merkledag.FetchGraph(context.TODO(), PieceRef, impl.dag) +func (impl *dagserviceImpl) OpenPullDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) { + err := merkledag.FetchGraph(context.TODO(), baseCid, impl.dag) var event Event if err != nil { event = Error diff --git a/datatransfer/types.go b/datatransfer/types.go index f39d37393..6dde5343c 100644 --- a/datatransfer/types.go +++ b/datatransfer/types.go @@ -11,10 +11,13 @@ import ( // Voucher is used to validate // a data transfer request against the underlying storage or retrieval deal // that precipitated it. The only requirement is a voucher can read and write -// from bytes +// from bytes, and has a string identifier type type Voucher interface { + // ToBytes converts the Voucher to raw bytes ToBytes() []byte + // FromBytes reads a Voucher from raw bytes FromBytes([]byte) (Voucher, error) + // Identifier is a unique string identifier for this voucher type Identifier() string } @@ -51,7 +54,7 @@ type Channel struct { // an identifier for this channel shared by request and responder, set by requestor through protocol transferID TransferID // base CID for the piece being transferred - pieceRef cid.Cid + baseCid cid.Cid // portion of Piece to return, spescified by an IPLD selector selector ipld.Node // used to verify this channel @@ -64,13 +67,27 @@ type Channel struct { totalSize uint64 } -func (c Channel) TranfersID() TransferID { return c.transferID } -func (c Channel) PieceRef() cid.Cid { return c.pieceRef } -func (c Channel) Selector() ipld.Node { return c.selector } -func (c Channel) Voucher() Voucher { return c.voucher } -func (c Channel) Sender() peer.ID { return c.sender } -func (c Channel) Recipient() peer.ID { return c.recipient } -func (c Channel) TotalSize() uint64 { return c.totalSize } +// TransferID returns the transfer id for this channel +func (c Channel) TransferID() TransferID { return c.transferID } + +// BaseCID returns the CID that is at the root of this data transfer +func (c Channel) BaseCID() cid.Cid { return c.baseCid } + +// Selector returns the IPLD selector for this data transfer (represented as +// an IPLD node) +func (c Channel) Selector() ipld.Node { return c.selector } + +// Voucher returns the voucher for this data transfer +func (c Channel) Voucher() Voucher { return c.voucher } + +// Sender returns the peer id for the node that is sending data +func (c Channel) Sender() peer.ID { return c.sender } + +// Recipient returns the peer id for the node that is receiving data +func (c Channel) Recipient() peer.ID { return c.recipient } + +// TotalSize returns the total size for the data being transferred +func (c Channel) TotalSize() uint64 { return c.totalSize } // ChannelState is immutable channel data plus mutable state type ChannelState struct { @@ -81,7 +98,10 @@ type ChannelState struct { received uint64 } -func (c ChannelState) Sent() uint64 { return c.sent } +// Sent returns the number of bytes sent +func (c ChannelState) Sent() uint64 { return c.sent } + +// Received returns the number of bytes received func (c ChannelState) Received() uint64 { return c.received } // Event is a name for an event that occurs on a data transfer channel @@ -107,16 +127,18 @@ type Subscriber func(event Event, channelState ChannelState) // RequestValidator is an interface implemented by the client of the // data transfer module to validate requests type RequestValidator interface { + // ValidatePush validates a push request received from the peer that will send data ValidatePush( sender peer.ID, voucher Voucher, - PieceRef cid.Cid, - Selector ipld.Node) error + baseCid cid.Cid, + selector ipld.Node) error + // ValidatePull validates a pull request received from the peer that will receive data ValidatePull( receiver peer.ID, voucher Voucher, - PieceRef cid.Cid, - Selector ipld.Node) error + baseCid cid.Cid, + selector ipld.Node) error } // Manager is the core interface presented by all implementations of @@ -130,11 +152,11 @@ type Manager interface { // open a data transfer that will send data to the recipient peer and // open a data transfer that will send data to the recipient peer and // transfer parts of the piece that match the selector - OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) + OpenPushDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) // open a data transfer that will request data from the sending peer and // transfer parts of the piece that match the selector - OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) + OpenPullDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error) // close an open channel (effectively a cancel) CloseDataTransferChannel(x ChannelID) @@ -149,5 +171,8 @@ type Manager interface { InProgressChannels() map[ChannelID]ChannelState } +// ClientDataTransfer is a data transfer manager for the client type ClientDataTransfer Manager + +// ProviderDataTransfer is a data transfer manager for the provider type ProviderDataTransfer Manager diff --git a/go.sum b/go.sum index 12d1b1278..df1915208 100644 --- a/go.sum +++ b/go.sum @@ -231,8 +231,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E= github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= -github.com/ipld/go-ipld-prime v0.0.1 h1:ZTjkYODUmJAca7w5hfzP9emyJfUxTy/Xi4zsUoqHggQ= -github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b h1:ACSEK4f1SDQC+FJ4B4pqHFW14d7kEW2ufwXA/c7eLP0= github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=