lotus/chain/deals/client.go

312 lines
7.9 KiB
Go
Raw Normal View History

2019-08-01 17:12:41 +00:00
package deals
import (
"context"
2019-08-01 17:12:41 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
2019-08-02 14:09:54 +00:00
"github.com/libp2p/go-libp2p-core/host"
2019-08-07 19:48:53 +00:00
inet "github.com/libp2p/go-libp2p-core/network"
2019-08-01 17:12:41 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-09-13 21:00:36 +00:00
"golang.org/x/xerrors"
2019-08-01 17:12:41 +00:00
"github.com/filecoin-project/go-address"
2020-01-07 14:00:10 +00:00
"github.com/filecoin-project/go-cbor-util"
2020-01-07 16:18:35 +00:00
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
2019-11-08 17:15:38 +00:00
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/stmgr"
2019-10-22 11:29:41 +00:00
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
2019-11-08 17:15:38 +00:00
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/retrieval/discovery"
2019-08-01 17:12:41 +00:00
)
var log = logging.Logger("deals")
2019-08-06 22:04:21 +00:00
type ClientDeal struct {
ProposalCid cid.Cid
Proposal actors.StorageDealProposal
2019-09-10 14:13:24 +00:00
State api.DealState
2019-08-06 22:04:21 +00:00
Miner peer.ID
MinerWorker address.Address
DealID uint64
2019-09-10 12:35:43 +00:00
2019-12-07 14:12:10 +00:00
PublishMessage *types.SignedMessage
2019-11-07 13:29:43 +00:00
2019-09-10 12:35:43 +00:00
s inet.Stream
2019-08-01 17:12:41 +00:00
}
type Client struct {
2019-11-11 19:55:25 +00:00
sm *stmgr.StateManager
chain *store.ChainStore
h host.Host
w *wallet.Wallet
// dataTransfer
// TODO: once the data transfer module is complete, the
// client will listen to events on the data transfer module
// Because we are using only a fake DAGService
// implementation, there's no validation or events on the client side
dataTransfer dtypes.ClientDataTransfer
dag dtypes.ClientDAG
discovery *discovery.Local
events *events.Events
fm *market.FundMgr
2019-08-01 17:12:41 +00:00
2019-11-01 11:07:05 +00:00
deals *statestore.StateStore
2019-09-10 12:35:43 +00:00
conns map[cid.Cid]inet.Stream
2019-08-01 17:12:41 +00:00
incoming chan *ClientDeal
2019-09-10 12:35:43 +00:00
updated chan clientDealUpdate
2019-08-01 17:12:41 +00:00
stop chan struct{}
stopped chan struct{}
}
2019-09-10 12:35:43 +00:00
type clientDealUpdate struct {
2019-09-10 14:13:24 +00:00
newState api.DealState
2019-09-10 12:35:43 +00:00
id cid.Cid
err error
2019-11-07 13:29:43 +00:00
mut func(*ClientDeal)
2019-09-10 12:35:43 +00:00
}
2019-11-20 16:36:37 +00:00
type clientApi struct {
full.ChainAPI
full.StateAPI
}
2019-11-19 21:27:25 +00:00
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client {
2019-08-01 17:12:41 +00:00
c := &Client{
sm: sm,
chain: chain,
h: h,
w: w,
dataTransfer: dataTransfer,
dag: dag,
discovery: discovery,
2019-11-11 19:55:25 +00:00
fm: fm,
2019-11-20 16:36:37 +00:00
events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}),
2019-08-01 17:12:41 +00:00
deals: deals,
2019-09-10 12:35:43 +00:00
conns: map[cid.Cid]inet.Stream{},
2019-08-01 17:12:41 +00:00
incoming: make(chan *ClientDeal, 16),
2019-09-10 12:35:43 +00:00
updated: make(chan clientDealUpdate, 16),
2019-08-01 17:12:41 +00:00
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
return c
}
2019-09-10 12:35:43 +00:00
func (c *Client) Run(ctx context.Context) {
2019-08-01 17:12:41 +00:00
go func() {
defer close(c.stopped)
for {
select {
case deal := <-c.incoming:
2019-09-10 12:35:43 +00:00
c.onIncoming(deal)
case update := <-c.updated:
c.onUpdated(ctx, update)
2019-08-01 17:12:41 +00:00
case <-c.stop:
return
}
}
}()
}
func (c *Client) onIncoming(deal *ClientDeal) {
2019-09-10 12:35:43 +00:00
log.Info("incoming deal")
2019-08-07 19:48:53 +00:00
2019-09-10 12:35:43 +00:00
if _, ok := c.conns[deal.ProposalCid]; ok {
log.Errorf("tracking deal connection: already tracking connection for deal %s", deal.ProposalCid)
return
2019-08-02 14:09:54 +00:00
}
2019-09-10 12:35:43 +00:00
c.conns[deal.ProposalCid] = deal.s
2019-08-07 19:48:53 +00:00
2019-09-10 12:35:43 +00:00
if err := c.deals.Begin(deal.ProposalCid, deal); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
c.failDeal(deal.ProposalCid, err)
return
2019-08-02 14:09:54 +00:00
}
2019-08-07 19:48:53 +00:00
2019-09-10 12:35:43 +00:00
go func() {
c.updated <- clientDealUpdate{
2019-09-10 14:13:24 +00:00
newState: api.DealUnknown,
2019-09-10 12:35:43 +00:00
id: deal.ProposalCid,
err: nil,
}
}()
2019-08-07 19:48:53 +00:00
}
2019-09-10 12:35:43 +00:00
func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
2019-11-08 17:15:38 +00:00
log.Infof("Client deal %s updated state to %s", update.id, api.DealStates[update.newState])
2019-09-10 12:35:43 +00:00
var deal ClientDeal
2019-11-01 11:07:05 +00:00
err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
2019-09-10 12:35:43 +00:00
d.State = update.newState
2019-11-07 13:29:43 +00:00
if update.mut != nil {
update.mut(d)
}
2019-09-10 12:35:43 +00:00
deal = *d
return nil
})
2019-09-13 19:43:33 +00:00
if update.err != nil {
log.Errorf("deal %s failed: %s", update.id, update.err)
c.failDeal(update.id, update.err)
return
}
2019-08-02 14:09:54 +00:00
if err != nil {
2019-09-10 12:35:43 +00:00
c.failDeal(update.id, err)
return
2019-08-07 19:48:53 +00:00
}
2019-09-10 12:35:43 +00:00
switch update.newState {
2019-09-10 14:13:24 +00:00
case api.DealUnknown: // new
c.handle(ctx, deal, c.new, api.DealAccepted)
case api.DealAccepted:
c.handle(ctx, deal, c.accepted, api.DealStaged)
case api.DealStaged:
c.handle(ctx, deal, c.staged, api.DealSealing)
case api.DealSealing:
c.handle(ctx, deal, c.sealing, api.DealNoUpdate)
// TODO: DealComplete -> watch for faults, expiration, etc.
2019-08-02 14:09:54 +00:00
}
2019-08-07 19:48:53 +00:00
}
2019-08-15 00:28:52 +00:00
type ClientDealProposal struct {
2019-10-23 18:04:07 +00:00
Data cid.Cid
2019-08-15 00:28:52 +00:00
2019-10-29 10:01:18 +00:00
PricePerEpoch types.BigInt
ProposalExpiration uint64
Duration uint64
2019-08-15 00:28:52 +00:00
ProviderAddress address.Address
Client address.Address
MinerWorker address.Address
MinerID peer.ID
2019-08-15 00:28:52 +00:00
}
func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) {
2019-11-08 17:15:38 +00:00
if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil {
return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err)
}
2019-11-06 17:38:42 +00:00
commP, pieceSize, err := c.commP(ctx, p.Data)
2019-12-05 05:14:19 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("computing commP failed: %w", err)
}
2019-10-23 18:04:07 +00:00
2019-11-06 17:38:42 +00:00
dealProposal := &actors.StorageDealProposal{
PieceRef: commP,
PieceSize: uint64(pieceSize),
2019-10-29 10:01:18 +00:00
Client: p.Client,
Provider: p.ProviderAddress,
ProposalExpiration: p.ProposalExpiration,
Duration: p.Duration,
StoragePricePerEpoch: p.PricePerEpoch,
2019-11-06 17:38:42 +00:00
StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc
2019-08-01 17:12:41 +00:00
}
2019-11-06 17:38:42 +00:00
if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil {
2019-10-23 10:42:52 +00:00
return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err)
2019-08-15 00:28:52 +00:00
}
2019-11-07 14:11:39 +00:00
proposalNd, err := cborutil.AsIpld(dealProposal)
2019-08-01 17:12:41 +00:00
if err != nil {
2019-10-23 10:42:52 +00:00
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
2019-08-01 17:12:41 +00:00
}
2019-08-02 16:25:10 +00:00
s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID)
if err != nil {
2019-10-23 10:42:52 +00:00
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
2019-08-06 22:04:21 +00:00
}
2019-11-06 17:38:42 +00:00
proposal := &Proposal{
DealProposal: dealProposal,
Piece: p.Data,
}
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, proposal); err != nil {
s.Reset()
2019-10-23 10:42:52 +00:00
return cid.Undef, xerrors.Errorf("sending proposal to storage provider failed: %w", err)
2019-08-06 22:04:21 +00:00
}
deal := &ClientDeal{
2019-09-10 12:35:43 +00:00
ProposalCid: proposalNd.Cid(),
2019-11-06 17:38:42 +00:00
Proposal: *dealProposal,
2019-09-10 14:13:24 +00:00
State: api.DealUnknown,
2019-09-10 12:35:43 +00:00
Miner: p.MinerID,
MinerWorker: p.MinerWorker,
2019-09-10 12:35:43 +00:00
s: s,
}
2019-08-01 17:12:41 +00:00
2019-09-10 12:35:43 +00:00
c.incoming <- deal
2019-08-26 13:45:36 +00:00
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
2019-11-06 17:38:42 +00:00
Address: dealProposal.Provider,
2019-08-26 13:45:36 +00:00
ID: deal.Miner,
})
2019-08-01 17:12:41 +00:00
}
2019-09-13 21:00:36 +00:00
func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
s, err := c.h.NewStream(ctx, p, AskProtocolID)
if err != nil {
2019-12-16 17:14:21 +00:00
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
2019-09-13 21:00:36 +00:00
}
req := &AskRequest{
Miner: a,
}
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(s, req); err != nil {
2019-09-13 21:00:36 +00:00
return nil, xerrors.Errorf("failed to send ask request: %w", err)
}
var out AskResponse
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(s, &out); err != nil {
2019-09-13 21:00:36 +00:00
return nil, xerrors.Errorf("failed to read ask response: %w", err)
}
if out.Ask == nil {
return nil, xerrors.Errorf("got no ask back")
}
if out.Ask.Ask.Miner != a {
return nil, xerrors.Errorf("got back ask for wrong miner")
}
if err := c.checkAskSignature(out.Ask); err != nil {
return nil, xerrors.Errorf("ask was not properly signed")
}
return out.Ask, nil
}
2019-09-10 14:13:24 +00:00
func (c *Client) List() ([]ClientDeal, error) {
2019-11-01 11:07:05 +00:00
var out []ClientDeal
if err := c.deals.List(&out); err != nil {
return nil, err
}
return out, nil
2019-09-10 14:13:24 +00:00
}
2019-11-06 19:44:28 +00:00
func (c *Client) GetDeal(d cid.Cid) (*ClientDeal, error) {
var out ClientDeal
if err := c.deals.Get(d, &out); err != nil {
return nil, err
}
return &out, nil
}
2019-08-01 17:12:41 +00:00
func (c *Client) Stop() {
close(c.stop)
<-c.stopped
}