2019-08-01 17:12:41 +00:00
|
|
|
package deals
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-11-06 06:26:50 +00:00
|
|
|
|
2019-08-01 17:12:41 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
2020-01-08 19:10:57 +00:00
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2019-08-02 14:09:54 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/host"
|
2019-08-07 19:48:53 +00:00
|
|
|
inet "github.com/libp2p/go-libp2p-core/network"
|
2019-08-01 17:12:41 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
2019-09-13 21:00:36 +00:00
|
|
|
"golang.org/x/xerrors"
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-12-19 20:13:17 +00:00
|
|
|
"github.com/filecoin-project/go-address"
|
2020-01-07 14:00:10 +00:00
|
|
|
"github.com/filecoin-project/go-cbor-util"
|
2020-01-07 16:18:35 +00:00
|
|
|
"github.com/filecoin-project/go-statestore"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
|
|
"github.com/filecoin-project/lotus/chain/actors"
|
2019-11-07 07:57:10 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/events"
|
2019-11-08 17:15:38 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/market"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
2019-10-22 11:29:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/store"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
|
|
"github.com/filecoin-project/lotus/chain/wallet"
|
2019-11-08 17:15:38 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
2019-12-10 04:19:59 +00:00
|
|
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/retrieval/discovery"
|
2019-08-01 17:12:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var log = logging.Logger("deals")
|
|
|
|
|
2019-08-06 22:04:21 +00:00
|
|
|
type ClientDeal struct {
|
|
|
|
ProposalCid cid.Cid
|
2019-10-21 18:12:11 +00:00
|
|
|
Proposal actors.StorageDealProposal
|
2019-09-10 14:13:24 +00:00
|
|
|
State api.DealState
|
2019-08-06 22:04:21 +00:00
|
|
|
Miner peer.ID
|
2019-11-07 14:09:11 +00:00
|
|
|
MinerWorker address.Address
|
2019-11-07 07:57:10 +00:00
|
|
|
DealID uint64
|
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
|
|
|
PayloadCid cid.Cid
|
2019-09-10 12:35:43 +00:00
|
|
|
|
2019-12-07 14:12:10 +00:00
|
|
|
PublishMessage *types.SignedMessage
|
2019-11-07 13:29:43 +00:00
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
s inet.Stream
|
2019-08-01 17:12:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Client struct {
|
2019-11-11 19:55:25 +00:00
|
|
|
sm *stmgr.StateManager
|
|
|
|
chain *store.ChainStore
|
|
|
|
h host.Host
|
|
|
|
w *wallet.Wallet
|
2019-10-29 00:57:12 +00:00
|
|
|
// dataTransfer
|
|
|
|
// TODO: once the data transfer module is complete, the
|
|
|
|
// client will listen to events on the data transfer module
|
|
|
|
// Because we are using only a fake DAGService
|
|
|
|
// implementation, there's no validation or events on the client side
|
2019-11-11 20:51:28 +00:00
|
|
|
dataTransfer dtypes.ClientDataTransfer
|
2019-10-28 23:51:50 +00:00
|
|
|
dag dtypes.ClientDAG
|
|
|
|
discovery *discovery.Local
|
|
|
|
events *events.Events
|
|
|
|
fm *market.FundMgr
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-11-01 11:07:05 +00:00
|
|
|
deals *statestore.StateStore
|
2019-09-10 12:35:43 +00:00
|
|
|
conns map[cid.Cid]inet.Stream
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-10-23 12:59:57 +00:00
|
|
|
incoming chan *ClientDeal
|
2019-09-10 12:35:43 +00:00
|
|
|
updated chan clientDealUpdate
|
2019-08-01 17:12:41 +00:00
|
|
|
|
|
|
|
stop chan struct{}
|
|
|
|
stopped chan struct{}
|
|
|
|
}
|
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
type clientDealUpdate struct {
|
2019-09-10 14:13:24 +00:00
|
|
|
newState api.DealState
|
2019-09-10 12:35:43 +00:00
|
|
|
id cid.Cid
|
|
|
|
err error
|
2019-11-07 13:29:43 +00:00
|
|
|
mut func(*ClientDeal)
|
2019-09-10 12:35:43 +00:00
|
|
|
}
|
|
|
|
|
2019-11-20 16:36:37 +00:00
|
|
|
type clientApi struct {
|
|
|
|
full.ChainAPI
|
|
|
|
full.StateAPI
|
|
|
|
}
|
|
|
|
|
2019-11-19 21:27:25 +00:00
|
|
|
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client {
|
2019-08-01 17:12:41 +00:00
|
|
|
c := &Client{
|
2019-10-28 23:51:50 +00:00
|
|
|
sm: sm,
|
|
|
|
chain: chain,
|
|
|
|
h: h,
|
|
|
|
w: w,
|
|
|
|
dataTransfer: dataTransfer,
|
|
|
|
dag: dag,
|
|
|
|
discovery: discovery,
|
2019-11-11 19:55:25 +00:00
|
|
|
fm: fm,
|
2019-11-20 16:36:37 +00:00
|
|
|
events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}),
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-11-11 20:51:28 +00:00
|
|
|
deals: deals,
|
2019-09-10 12:35:43 +00:00
|
|
|
conns: map[cid.Cid]inet.Stream{},
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-10-23 12:59:57 +00:00
|
|
|
incoming: make(chan *ClientDeal, 16),
|
2019-09-10 12:35:43 +00:00
|
|
|
updated: make(chan clientDealUpdate, 16),
|
2019-08-01 17:12:41 +00:00
|
|
|
|
|
|
|
stop: make(chan struct{}),
|
|
|
|
stopped: make(chan struct{}),
|
|
|
|
}
|
|
|
|
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
func (c *Client) Run(ctx context.Context) {
|
2019-08-01 17:12:41 +00:00
|
|
|
go func() {
|
|
|
|
defer close(c.stopped)
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case deal := <-c.incoming:
|
2019-09-10 12:35:43 +00:00
|
|
|
c.onIncoming(deal)
|
|
|
|
case update := <-c.updated:
|
|
|
|
c.onUpdated(ctx, update)
|
2019-08-01 17:12:41 +00:00
|
|
|
case <-c.stop:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2019-10-23 12:59:57 +00:00
|
|
|
func (c *Client) onIncoming(deal *ClientDeal) {
|
2019-09-10 12:35:43 +00:00
|
|
|
log.Info("incoming deal")
|
2019-08-07 19:48:53 +00:00
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
if _, ok := c.conns[deal.ProposalCid]; ok {
|
|
|
|
log.Errorf("tracking deal connection: already tracking connection for deal %s", deal.ProposalCid)
|
|
|
|
return
|
2019-08-02 14:09:54 +00:00
|
|
|
}
|
2019-09-10 12:35:43 +00:00
|
|
|
c.conns[deal.ProposalCid] = deal.s
|
2019-08-07 19:48:53 +00:00
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
|
|
|
|
// We may have re-sent the proposal
|
|
|
|
log.Errorf("deal tracking failed: %s", err)
|
|
|
|
c.failDeal(deal.ProposalCid, err)
|
|
|
|
return
|
2019-08-02 14:09:54 +00:00
|
|
|
}
|
2019-08-07 19:48:53 +00:00
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
go func() {
|
|
|
|
c.updated <- clientDealUpdate{
|
2019-09-10 14:13:24 +00:00
|
|
|
newState: api.DealUnknown,
|
2019-09-10 12:35:43 +00:00
|
|
|
id: deal.ProposalCid,
|
|
|
|
err: nil,
|
|
|
|
}
|
|
|
|
}()
|
2019-08-07 19:48:53 +00:00
|
|
|
}
|
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
|
2019-11-08 17:15:38 +00:00
|
|
|
log.Infof("Client deal %s updated state to %s", update.id, api.DealStates[update.newState])
|
2019-09-10 12:35:43 +00:00
|
|
|
var deal ClientDeal
|
2019-11-01 11:07:05 +00:00
|
|
|
err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
|
2019-09-10 12:35:43 +00:00
|
|
|
d.State = update.newState
|
2019-11-07 13:29:43 +00:00
|
|
|
if update.mut != nil {
|
|
|
|
update.mut(d)
|
|
|
|
}
|
2019-09-10 12:35:43 +00:00
|
|
|
deal = *d
|
|
|
|
return nil
|
|
|
|
})
|
2019-09-13 19:43:33 +00:00
|
|
|
if update.err != nil {
|
|
|
|
log.Errorf("deal %s failed: %s", update.id, update.err)
|
|
|
|
c.failDeal(update.id, update.err)
|
|
|
|
return
|
|
|
|
}
|
2019-08-02 14:09:54 +00:00
|
|
|
if err != nil {
|
2019-09-10 12:35:43 +00:00
|
|
|
c.failDeal(update.id, err)
|
|
|
|
return
|
2019-08-07 19:48:53 +00:00
|
|
|
}
|
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
switch update.newState {
|
2019-09-10 14:13:24 +00:00
|
|
|
case api.DealUnknown: // new
|
|
|
|
c.handle(ctx, deal, c.new, api.DealAccepted)
|
|
|
|
case api.DealAccepted:
|
|
|
|
c.handle(ctx, deal, c.accepted, api.DealStaged)
|
|
|
|
case api.DealStaged:
|
|
|
|
c.handle(ctx, deal, c.staged, api.DealSealing)
|
|
|
|
case api.DealSealing:
|
2019-11-07 09:06:06 +00:00
|
|
|
c.handle(ctx, deal, c.sealing, api.DealNoUpdate)
|
|
|
|
// TODO: DealComplete -> watch for faults, expiration, etc.
|
2019-08-02 14:09:54 +00:00
|
|
|
}
|
2019-08-07 19:48:53 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 00:28:52 +00:00
|
|
|
type ClientDealProposal struct {
|
2019-10-23 18:04:07 +00:00
|
|
|
Data cid.Cid
|
2019-08-15 00:28:52 +00:00
|
|
|
|
2019-10-29 10:01:18 +00:00
|
|
|
PricePerEpoch types.BigInt
|
2019-10-21 18:12:11 +00:00
|
|
|
ProposalExpiration uint64
|
|
|
|
Duration uint64
|
2019-08-15 00:28:52 +00:00
|
|
|
|
2019-10-21 18:12:11 +00:00
|
|
|
ProviderAddress address.Address
|
|
|
|
Client address.Address
|
2019-11-07 14:09:11 +00:00
|
|
|
MinerWorker address.Address
|
2019-10-21 18:12:11 +00:00
|
|
|
MinerID peer.ID
|
2019-08-15 00:28:52 +00:00
|
|
|
}
|
|
|
|
|
2019-10-21 18:12:11 +00:00
|
|
|
func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) {
|
2019-11-08 17:15:38 +00:00
|
|
|
if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil {
|
|
|
|
return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err)
|
2019-10-23 17:39:14 +00:00
|
|
|
}
|
|
|
|
|
2019-11-06 17:38:42 +00:00
|
|
|
commP, pieceSize, err := c.commP(ctx, p.Data)
|
2019-12-05 05:14:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return cid.Undef, xerrors.Errorf("computing commP failed: %w", err)
|
|
|
|
}
|
2019-10-23 18:04:07 +00:00
|
|
|
|
2019-11-06 17:38:42 +00:00
|
|
|
dealProposal := &actors.StorageDealProposal{
|
|
|
|
PieceRef: commP,
|
|
|
|
PieceSize: uint64(pieceSize),
|
2019-10-29 10:01:18 +00:00
|
|
|
Client: p.Client,
|
|
|
|
Provider: p.ProviderAddress,
|
|
|
|
ProposalExpiration: p.ProposalExpiration,
|
|
|
|
Duration: p.Duration,
|
|
|
|
StoragePricePerEpoch: p.PricePerEpoch,
|
2019-11-06 17:38:42 +00:00
|
|
|
StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc
|
2019-08-01 17:12:41 +00:00
|
|
|
}
|
|
|
|
|
2019-11-06 17:38:42 +00:00
|
|
|
if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil {
|
2019-10-23 10:42:52 +00:00
|
|
|
return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err)
|
2019-08-15 00:28:52 +00:00
|
|
|
}
|
|
|
|
|
2019-11-07 14:11:39 +00:00
|
|
|
proposalNd, err := cborutil.AsIpld(dealProposal)
|
2019-08-01 17:12:41 +00:00
|
|
|
if err != nil {
|
2019-10-23 10:42:52 +00:00
|
|
|
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
|
2019-08-01 17:12:41 +00:00
|
|
|
}
|
2019-08-02 16:25:10 +00:00
|
|
|
|
2019-10-21 18:12:11 +00:00
|
|
|
s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID)
|
|
|
|
if err != nil {
|
2019-10-23 10:42:52 +00:00
|
|
|
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
|
2019-08-06 22:04:21 +00:00
|
|
|
}
|
|
|
|
|
2019-11-06 17:38:42 +00:00
|
|
|
proposal := &Proposal{
|
|
|
|
DealProposal: dealProposal,
|
|
|
|
Piece: p.Data,
|
|
|
|
}
|
|
|
|
|
2019-11-07 14:11:39 +00:00
|
|
|
if err := cborutil.WriteCborRPC(s, proposal); err != nil {
|
2019-10-21 18:12:11 +00:00
|
|
|
s.Reset()
|
2019-10-23 10:42:52 +00:00
|
|
|
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
|
2019-08-06 22:04:21 +00:00
|
|
|
}
|
|
|
|
|
2019-10-23 12:59:57 +00:00
|
|
|
deal := &ClientDeal{
|
2019-09-10 12:35:43 +00:00
|
|
|
ProposalCid: proposalNd.Cid(),
|
2019-11-06 17:38:42 +00:00
|
|
|
Proposal: *dealProposal,
|
2019-09-10 14:13:24 +00:00
|
|
|
State: api.DealUnknown,
|
2019-09-10 12:35:43 +00:00
|
|
|
Miner: p.MinerID,
|
2019-11-07 14:09:11 +00:00
|
|
|
MinerWorker: p.MinerWorker,
|
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
|
|
|
PayloadCid: p.Data,
|
|
|
|
s: s,
|
2019-09-10 12:35:43 +00:00
|
|
|
}
|
2019-08-01 17:12:41 +00:00
|
|
|
|
2019-09-10 12:35:43 +00:00
|
|
|
c.incoming <- deal
|
2019-08-26 13:45:36 +00:00
|
|
|
|
2019-12-10 04:19:59 +00:00
|
|
|
return deal.ProposalCid, c.discovery.AddPeer(p.Data, retrievalmarket.RetrievalPeer{
|
2019-11-06 17:38:42 +00:00
|
|
|
Address: dealProposal.Provider,
|
2019-08-26 13:45:36 +00:00
|
|
|
ID: deal.Miner,
|
|
|
|
})
|
2019-08-01 17:12:41 +00:00
|
|
|
}
|
|
|
|
|
2019-09-13 21:00:36 +00:00
|
|
|
func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
|
|
|
|
s, err := c.h.NewStream(ctx, p, AskProtocolID)
|
|
|
|
if err != nil {
|
2019-12-16 17:14:21 +00:00
|
|
|
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
|
2019-09-13 21:00:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
req := &AskRequest{
|
|
|
|
Miner: a,
|
|
|
|
}
|
2019-11-07 14:11:39 +00:00
|
|
|
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
2019-09-13 21:00:36 +00:00
|
|
|
return nil, xerrors.Errorf("failed to send ask request: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var out AskResponse
|
2019-11-07 14:11:39 +00:00
|
|
|
if err := cborutil.ReadCborRPC(s, &out); err != nil {
|
2019-09-13 21:00:36 +00:00
|
|
|
return nil, xerrors.Errorf("failed to read ask response: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if out.Ask == nil {
|
|
|
|
return nil, xerrors.Errorf("got no ask back")
|
|
|
|
}
|
|
|
|
|
|
|
|
if out.Ask.Ask.Miner != a {
|
|
|
|
return nil, xerrors.Errorf("got back ask for wrong miner")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := c.checkAskSignature(out.Ask); err != nil {
|
|
|
|
return nil, xerrors.Errorf("ask was not properly signed")
|
|
|
|
}
|
|
|
|
|
|
|
|
return out.Ask, nil
|
|
|
|
}
|
|
|
|
|
2019-09-10 14:13:24 +00:00
|
|
|
func (c *Client) List() ([]ClientDeal, error) {
|
2019-11-01 11:07:05 +00:00
|
|
|
var out []ClientDeal
|
|
|
|
if err := c.deals.List(&out); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return out, nil
|
2019-09-10 14:13:24 +00:00
|
|
|
}
|
|
|
|
|
2019-11-06 19:44:28 +00:00
|
|
|
func (c *Client) GetDeal(d cid.Cid) (*ClientDeal, error) {
|
|
|
|
var out ClientDeal
|
|
|
|
if err := c.deals.Get(d, &out); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &out, nil
|
|
|
|
}
|
|
|
|
|
2019-08-01 17:12:41 +00:00
|
|
|
func (c *Client) Stop() {
|
|
|
|
close(c.stop)
|
|
|
|
<-c.stopped
|
|
|
|
}
|