refactor(datatransfer): add comments, renames

minor name changes and additional comments to clarify how data transfer works
This commit is contained in:
hannahhoward 2019-10-28 17:57:12 -07:00
parent 905259e192
commit d0b705705f
7 changed files with 122 additions and 32 deletions

View File

@ -50,8 +50,11 @@ type Client struct {
chain *store.ChainStore
h host.Host
w *wallet.Wallet
// dataTransfer -- not quite sure how this is referenced directly on client
// side
// dataTransfer
// TODO: once the data transfer module is complete, the
// client will listen to events on the data transfer module
// Because we are using only a fake DAGService
// implementation, there's no validation or events on the client side
dataTransfer datatransfer.ClientDataTransfer
dag dtypes.ClientDAG
discovery *discovery.Local

View File

@ -49,10 +49,11 @@ type Provider struct {
sminer *storage.Miner
full api.FullNode
// TODO: This will go away once the whole storage market module + CAR
// TODO: This will go away once storage market module + CAR
// is implemented
dag dtypes.StagingDAG
// dataTransfer is the manager of data transfers used by this storage provider
dataTransfer datatransfer.ProviderDataTransfer
deals *statestore.StateStore
@ -124,6 +125,8 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
}
}
// register a data transfer event handler -- this will move deals from
// accepted to staged
h.dataTransfer.SubscribeToEvents(h.onDataTransferEvent)
return h, nil
@ -203,6 +206,11 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
}
}
// onDataTransferEvent is the function called when an event occurs in a data
// transfer -- it reads the voucher to verify this even occurred in a storage
// market deal, then, based on the data transfer event that occurred, it generates
// and update message for the deal -- either moving to staged for a completion
// event or moving to error if a data transfer error occurs
func (p *Provider) onDataTransferEvent(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, ok := channelState.Voucher().(StorageDataTransferVoucher)
// if this event is for a transfer not related to storage, ignore

View File

@ -156,9 +156,13 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
// this is the selector for "get the whole DAG"
// TODO: support storage deals with custom payload selectors
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
// initiate a pull data transfer. This will complete asynchronously and the
// completion of the data transfer will trigger a change in deal state
// (see onDataTransferEvent)
_, err = p.dataTransfer.OpenPullDataChannel(deal.Client,
StorageDataTransferVoucher{Proposal: deal.ProposalCid},
deal.Ref,

View File

@ -38,7 +38,7 @@ var (
// ErrWrongPiece means that the pieceref for this data transfer request does not match
// the one specified in the deal
ErrWrongPiece = errors.New("PieceRef for deal does not match piece ref for piece")
ErrWrongPiece = errors.New("Base CID for deal does not match CID for piece")
// ErrInacceptableDealState means the deal for this transfer is not in a deal state
// where transfer can be performed
@ -48,14 +48,18 @@ var (
AcceptableDealStates = []api.DealState{api.DealAccepted, api.DealUnknown}
)
// StorageDataTransferVoucher is the voucher type for data transfers
// used by the storage market
type StorageDataTransferVoucher struct {
Proposal cid.Cid
}
// ToBytes converts the StorageDataTransferVoucher to raw bytes
func (dv StorageDataTransferVoucher) ToBytes() []byte {
return dv.Proposal.Bytes()
}
// FromBytes converts the StorageDataTransferVoucher to raw bytes
func (dv StorageDataTransferVoucher) FromBytes(raw []byte) (datatransfer.Voucher, error) {
c, err := cid.Cast(raw)
if err != nil {
@ -64,16 +68,22 @@ func (dv StorageDataTransferVoucher) FromBytes(raw []byte) (datatransfer.Voucher
return StorageDataTransferVoucher{c}, nil
}
// Identifier is the unique string identifier for a StorageDataTransferVoucher
func (dv StorageDataTransferVoucher) Identifier() string {
return "StorageDataTransferVoucher"
}
var _ datatransfer.RequestValidator = &ClientRequestValidator{}
// ClientRequestValidator validates data transfer requests for the client
// in a storage market
type ClientRequestValidator struct {
deals *statestore.StateStore
}
// RegisterClientValidator is an initialization hook that registers the client
// request validator with the data transfer module as the validator for
// StorageDataTransferVoucher types
func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm datatransfer.ClientDataTransfer) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
@ -82,6 +92,8 @@ func RegisterClientValidator(lc fx.Lifecycle, crv *ClientRequestValidator, dtm d
})
}
// NewClientRequestValidator returns a new client request validator for the
// given datastore
func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator {
crv := &ClientRequestValidator{
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
@ -89,18 +101,28 @@ func NewClientRequestValidator(ds dtypes.MetadataDS) *ClientRequestValidator {
return crv
}
// ValidatePush validates a push request received from the peer that will send data
// Will always error because clients should not accept push requests from a provider
// in a storage deal (i.e. send data to client).
func (c *ClientRequestValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
PieceRef cid.Cid,
baseCid cid.Cid,
Selector ipld.Node) error {
return ErrNoPushAccepted
}
// ValidatePull validates a pull request received from the peer that will receive data
// Will succeed only if:
// - voucher has correct type
// - voucher references an active deal
// - referenced deal matches the receiver (miner)
// - referenced deal matches the given base CID
// - referenced deal is in an acceptable state
func (c *ClientRequestValidator) ValidatePull(
receiver peer.ID,
voucher datatransfer.Voucher,
PieceRef cid.Cid,
baseCid cid.Cid,
Selector ipld.Node) error {
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
if !ok {
@ -118,7 +140,7 @@ func (c *ClientRequestValidator) ValidatePull(
if deal.Miner != receiver {
return ErrWrongPeer
}
if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) {
if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
return ErrWrongPiece
}
for _, state := range AcceptableDealStates {
@ -131,10 +153,15 @@ func (c *ClientRequestValidator) ValidatePull(
var _ datatransfer.RequestValidator = &ProviderRequestValidator{}
// ProviderRequestValidator validates data transfer requests for the provider
// in a storage market
type ProviderRequestValidator struct {
deals *statestore.StateStore
}
// RegisterProviderValidator is an initialization hook that registers the provider
// request validator with the data transfer module as the validator for
// StorageDataTransferVoucher types
func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, dtm datatransfer.ProviderDataTransfer) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
@ -143,16 +170,25 @@ func RegisterProviderValidator(lc fx.Lifecycle, mrv *ProviderRequestValidator, d
})
}
// NewProviderRequestValidator returns a new client request validator for the
// given datastore
func NewProviderRequestValidator(ds dtypes.MetadataDS) *ProviderRequestValidator {
return &ProviderRequestValidator{
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
}
}
// ValidatePush validates a push request received from the peer that will send data
// Will succeed only if:
// - voucher has correct type
// - voucher references an active deal
// - referenced deal matches the client
// - referenced deal matches the given base CID
// - referenced deal is in an acceptable state
func (m *ProviderRequestValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
PieceRef cid.Cid,
baseCid cid.Cid,
Selector ipld.Node) error {
dealVoucher, ok := voucher.(StorageDataTransferVoucher)
if !ok {
@ -171,7 +207,7 @@ func (m *ProviderRequestValidator) ValidatePush(
return ErrWrongPeer
}
if !bytes.Equal(deal.Proposal.PieceRef, PieceRef.Bytes()) {
if !bytes.Equal(deal.Proposal.PieceRef, baseCid.Bytes()) {
return ErrWrongPiece
}
for _, state := range AcceptableDealStates {
@ -182,10 +218,13 @@ func (m *ProviderRequestValidator) ValidatePush(
return ErrInacceptableDealState
}
// ValidatePull validates a pull request received from the peer that will receive data.
// Will always error because providers should not accept pull requests from a client
// in a storage deal (i.e. send data to client).
func (m *ProviderRequestValidator) ValidatePull(
receiver peer.ID,
voucher datatransfer.Voucher,
PieceRef cid.Cid,
baseCid cid.Cid,
Selector ipld.Node) error {
return ErrNoPullAccepted
}

View File

@ -15,15 +15,28 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
// This file implements a VERY simple, incomplete version of the data transfer
// module that allows us to make the neccesary insertions of data transfer
// functionality into the storage market
// It does not:
// -- actually validate requests
// -- support Push requests
// -- support multiple subscribers
// -- do any actual network coordination or use Graphsync
type dagserviceImpl struct {
dag ipldformat.DAGService
subscriber Subscriber
}
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer(dag dtypes.StagingDAG) Manager {
return &dagserviceImpl{dag, nil}
}
// NewClientDAGServiceDataTransfer returns a data transfer manager that just
// uses the clients's Client DAG service for transfers
func NewClientDAGServiceDataTransfer(dag dtypes.ClientDAG) Manager {
return &dagserviceImpl{dag, nil}
}
@ -38,14 +51,14 @@ func (impl *dagserviceImpl) RegisterVoucherType(voucherType reflect.Type, valida
// open a data transfer that will send data to the recipient peer and
// open a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
func (impl *dagserviceImpl) OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) {
func (impl *dagserviceImpl) OpenPushDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
return ChannelID{}, fmt.Errorf("not implemented")
}
// open a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
func (impl *dagserviceImpl) OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error) {
err := merkledag.FetchGraph(context.TODO(), PieceRef, impl.dag)
func (impl *dagserviceImpl) OpenPullDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
err := merkledag.FetchGraph(context.TODO(), baseCid, impl.dag)
var event Event
if err != nil {
event = Error

View File

@ -11,10 +11,13 @@ import (
// Voucher is used to validate
// a data transfer request against the underlying storage or retrieval deal
// that precipitated it. The only requirement is a voucher can read and write
// from bytes
// from bytes, and has a string identifier type
type Voucher interface {
// ToBytes converts the Voucher to raw bytes
ToBytes() []byte
// FromBytes reads a Voucher from raw bytes
FromBytes([]byte) (Voucher, error)
// Identifier is a unique string identifier for this voucher type
Identifier() string
}
@ -51,7 +54,7 @@ type Channel struct {
// an identifier for this channel shared by request and responder, set by requestor through protocol
transferID TransferID
// base CID for the piece being transferred
pieceRef cid.Cid
baseCid cid.Cid
// portion of Piece to return, spescified by an IPLD selector
selector ipld.Node
// used to verify this channel
@ -64,12 +67,26 @@ type Channel struct {
totalSize uint64
}
func (c Channel) TranfersID() TransferID { return c.transferID }
func (c Channel) PieceRef() cid.Cid { return c.pieceRef }
// TransferID returns the transfer id for this channel
func (c Channel) TransferID() TransferID { return c.transferID }
// BaseCID returns the CID that is at the root of this data transfer
func (c Channel) BaseCID() cid.Cid { return c.baseCid }
// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c Channel) Selector() ipld.Node { return c.selector }
// Voucher returns the voucher for this data transfer
func (c Channel) Voucher() Voucher { return c.voucher }
// Sender returns the peer id for the node that is sending data
func (c Channel) Sender() peer.ID { return c.sender }
// Recipient returns the peer id for the node that is receiving data
func (c Channel) Recipient() peer.ID { return c.recipient }
// TotalSize returns the total size for the data being transferred
func (c Channel) TotalSize() uint64 { return c.totalSize }
// ChannelState is immutable channel data plus mutable state
@ -81,7 +98,10 @@ type ChannelState struct {
received uint64
}
// Sent returns the number of bytes sent
func (c ChannelState) Sent() uint64 { return c.sent }
// Received returns the number of bytes received
func (c ChannelState) Received() uint64 { return c.received }
// Event is a name for an event that occurs on a data transfer channel
@ -107,16 +127,18 @@ type Subscriber func(event Event, channelState ChannelState)
// RequestValidator is an interface implemented by the client of the
// data transfer module to validate requests
type RequestValidator interface {
// ValidatePush validates a push request received from the peer that will send data
ValidatePush(
sender peer.ID,
voucher Voucher,
PieceRef cid.Cid,
Selector ipld.Node) error
baseCid cid.Cid,
selector ipld.Node) error
// ValidatePull validates a pull request received from the peer that will receive data
ValidatePull(
receiver peer.ID,
voucher Voucher,
PieceRef cid.Cid,
Selector ipld.Node) error
baseCid cid.Cid,
selector ipld.Node) error
}
// Manager is the core interface presented by all implementations of
@ -130,11 +152,11 @@ type Manager interface {
// open a data transfer that will send data to the recipient peer and
// open a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
OpenPushDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error)
OpenPushDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)
// open a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
OpenPullDataChannel(to peer.ID, voucher Voucher, PieceRef cid.Cid, Selector ipld.Node) (ChannelID, error)
OpenPullDataChannel(to peer.ID, voucher Voucher, baseCid cid.Cid, selector ipld.Node) (ChannelID, error)
// close an open channel (effectively a cancel)
CloseDataTransferChannel(x ChannelID)
@ -149,5 +171,8 @@ type Manager interface {
InProgressChannels() map[ChannelID]ChannelState
}
// ClientDataTransfer is a data transfer manager for the client
type ClientDataTransfer Manager
// ProviderDataTransfer is a data transfer manager for the provider
type ProviderDataTransfer Manager

2
go.sum
View File

@ -231,8 +231,6 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-ipld-prime v0.0.1 h1:ZTjkYODUmJAca7w5hfzP9emyJfUxTy/Xi4zsUoqHggQ=
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b h1:ACSEK4f1SDQC+FJ4B4pqHFW14d7kEW2ufwXA/c7eLP0=
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=