lotus/datatransfer/dagservice_impl.go

79 lines
2.7 KiB
Go
Raw Normal View History

package datatransfer
import (
"context"
"reflect"
"github.com/ipfs/go-cid"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
)
// This file implements a VERY simple, incomplete version of the data transfer
2019-11-15 08:08:48 +00:00
// module that allows us to make the necessary insertions of data transfer
// functionality into the storage market
// It does not:
// -- actually validate requests
// -- support Push requests
// -- support multiple subscribers
// -- do any actual network coordination or use Graphsync
type dagserviceImpl struct {
dag ipldformat.DAGService
subscriber Subscriber
}
// NewDAGServiceDataTransfer returns a data transfer manager based on
// an IPLD DAGService
func NewDAGServiceDataTransfer(dag ipldformat.DAGService) Manager {
return &dagserviceImpl{dag, nil}
}
// RegisterVoucherType registers a validator for the given voucher type
// will error if voucher type does not implement voucher
// or if there is a voucher type registered with an identical identifier
func (impl *dagserviceImpl) RegisterVoucherType(voucherType reflect.Type, validator RequestValidator) error {
return nil
}
// open a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
func (impl *dagserviceImpl) OpenPushDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
return ChannelID{}, xerrors.Errorf("not implemented")
}
// open a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
func (impl *dagserviceImpl) OpenPullDataChannel(ctx context.Context, to peer.ID, voucher Voucher, baseCid cid.Cid, Selector ipld.Node) (ChannelID, error) {
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
err := merkledag.FetchGraph(ctx, baseCid, impl.dag)
var event Event
if err != nil {
event = Error
} else {
event = Complete
}
impl.subscriber(event, ChannelState{Channel: Channel{voucher: voucher}})
}()
return ChannelID{}, nil
}
// close an open channel (effectively a cancel)
func (impl *dagserviceImpl) CloseDataTransferChannel(x ChannelID) {}
// get status of a transfer
func (impl *dagserviceImpl) TransferChannelStatus(x ChannelID) Status { return ChannelNotFoundError }
// get notified when certain types of events happen
func (impl *dagserviceImpl) SubscribeToEvents(subscriber Subscriber) {
impl.subscriber = subscriber
}
// get all in progress transfers
func (impl *dagserviceImpl) InProgressChannels() map[ChannelID]ChannelState { return nil }