From da4528932ad31d8afe6947ef0cda3c6d30065e23 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 4 Nov 2019 11:57:54 -0800 Subject: [PATCH] feat(storagemarket): initial extraction Types for storage market Modify deals.Provider to implement storagemarket.StorageProvider Inject storagemarket.StorageProvider Storage Provider interfaces Storage Client interfaces Add ValidatePublishedDeal to ClientNodeAdapter Remove FundManager from client Remove Wallet from client Remove StateManager, Events, Wallet from client Rebasing - Copy types.BigInt, use TokenAmount/BigInt for token amounts - Remove auto-imported log package - Move `checkAskSignature` to a client file. - Plumb contexts through fix(storagemarket): use publish cids Switch back to publish message cids to reduce the dependency surface area --- chain/deals/cbor_gen.go | 165 ++--------- chain/deals/client.go | 66 ++--- chain/deals/client_states.go | 158 +--------- chain/deals/client_storagemarket.go | 118 ++++++++ chain/deals/provider.go | 29 +- chain/deals/provider_asks.go | 35 +-- chain/deals/provider_states.go | 105 ++----- chain/deals/provider_storagemarket.go | 65 +++++ chain/deals/provider_utils.go | 41 +-- chain/deals/request_validation_test.go | 15 +- chain/deals/types.go | 8 +- chain/types/cbor_gen.go | 2 +- gen/main.go | 10 + node/builder.go | 8 +- node/impl/client/client.go | 38 +-- node/modules/services.go | 4 +- node/modules/storageminer.go | 7 +- retrieval/impl/cbor_gen.go | 14 +- retrievaladapter/provider.go | 2 +- storagemarket/bigint.go | 241 ++++++++++++++++ storagemarket/cbor_gen.go | 352 +++++++++++++++++++++++ storagemarket/types.go | 199 +++++++++++++ storagemarketadapter/client_adapter.go | 328 +++++++++++++++++++++ storagemarketadapter/provider_adapter.go | 196 +++++++++++++ 24 files changed, 1684 insertions(+), 522 deletions(-) create mode 100644 chain/deals/client_storagemarket.go create mode 100644 chain/deals/provider_storagemarket.go create mode 100644 storagemarket/bigint.go create mode 100644 storagemarket/cbor_gen.go create mode 100644 storagemarket/types.go create mode 100644 storagemarketadapter/client_adapter.go create mode 100644 storagemarketadapter/provider_adapter.go diff --git a/chain/deals/cbor_gen.go b/chain/deals/cbor_gen.go index bd56e0551..5abb92825 100644 --- a/chain/deals/cbor_gen.go +++ b/chain/deals/cbor_gen.go @@ -219,10 +219,18 @@ func (t *Response) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) } - // t.StorageDealSubmission (types.SignedMessage) (struct) - if err := t.StorageDealSubmission.MarshalCBOR(w); err != nil { - return err + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } } + return nil } @@ -273,7 +281,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { t.Proposal = c } - // t.StorageDealSubmission (types.SignedMessage) (struct) + // t.PublishMessage (cid.Cid) (struct) { @@ -287,10 +295,13 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { return err } } else { - t.StorageDealSubmission = new(types.SignedMessage) - if err := t.StorageDealSubmission.UnmarshalCBOR(br); err != nil { - return err + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) } + + t.PublishMessage = &c } } @@ -526,56 +537,12 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{136}); err != nil { + if _, err := w.Write([]byte{129}); err != nil { return err } - // t.ProposalCid (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.ProposalCid); err != nil { - return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) - } - - // t.Proposal (actors.StorageDealProposal) (struct) - if err := t.Proposal.MarshalCBOR(w); err != nil { - return err - } - - // t.State (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Miner (peer.ID) (string) - if len(t.Miner) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Miner was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Miner)); err != nil { - return err - } - - // t.MinerWorker (address.Address) (struct) - if err := t.MinerWorker.MarshalCBOR(w); err != nil { - return err - } - - // t.DealID (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { - return err - } - - // t.PayloadCid (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.PayloadCid); err != nil { - return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err) - } - - // t.PublishMessage (types.SignedMessage) (struct) - if err := t.PublishMessage.MarshalCBOR(w); err != nil { + // t.ClientDeal (storagemarket.ClientDeal) (struct) + if err := t.ClientDeal.MarshalCBOR(w); err != nil { return err } return nil @@ -592,102 +559,18 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 8 { + if extra != 1 { return fmt.Errorf("cbor input had wrong number of fields") } - // t.ProposalCid (cid.Cid) (struct) + // t.ClientDeal (storagemarket.ClientDeal) (struct) { - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) - } - - t.ProposalCid = c - - } - // t.Proposal (actors.StorageDealProposal) (struct) - - { - - if err := t.Proposal.UnmarshalCBOR(br); err != nil { + if err := t.ClientDeal.UnmarshalCBOR(br); err != nil { return err } - } - // t.State (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - // t.Miner (peer.ID) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Miner = peer.ID(sval) - } - // t.MinerWorker (address.Address) (struct) - - { - - if err := t.MinerWorker.UnmarshalCBOR(br); err != nil { - return err - } - - } - // t.DealID (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.DealID = uint64(extra) - // t.PayloadCid (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err) - } - - t.PayloadCid = c - - } - // t.PublishMessage (types.SignedMessage) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.PublishMessage = new(types.SignedMessage) - if err := t.PublishMessage.UnmarshalCBOR(br); err != nil { - return err - } - } - } return nil } diff --git a/chain/deals/client.go b/chain/deals/client.go index 4e1d3df4a..879e408e2 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -15,39 +15,24 @@ import ( "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/market" - "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" + "github.com/filecoin-project/lotus/storagemarket" ) var log = logging.Logger("deals") type ClientDeal struct { - ProposalCid cid.Cid - Proposal actors.StorageDealProposal - State api.DealState - Miner peer.ID - MinerWorker address.Address - DealID uint64 - PayloadCid cid.Cid - - PublishMessage *types.SignedMessage + storagemarket.ClientDeal s inet.Stream } type Client struct { - sm *stmgr.StateManager - chain *store.ChainStore - h host.Host - w *wallet.Wallet + h host.Host + // dataTransfer // TODO: once the data transfer module is complete, the // client will listen to events on the data transfer module @@ -56,8 +41,8 @@ type Client struct { dataTransfer dtypes.ClientDataTransfer dag dtypes.ClientDAG discovery *discovery.Local - events *events.Events - fm *market.FundMgr + + node storagemarket.StorageClientNode deals *statestore.StateStore conns map[cid.Cid]inet.Stream @@ -76,22 +61,13 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -type clientApi struct { - full.ChainAPI - full.StateAPI -} - -func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client { +func NewClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) *Client { c := &Client{ - sm: sm, - chain: chain, h: h, - w: w, dataTransfer: dataTransfer, dag: dag, discovery: discovery, - fm: fm, - events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}), + node: scn, deals: deals, conns: map[cid.Cid]inet.Stream{}, @@ -196,7 +172,8 @@ type ClientDealProposal struct { } func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) { - if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil { + amount := types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration)) + if err := c.node.EnsureFunds(ctx, p.Client, storagemarket.TokenAmount(amount)); err != nil { return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err) } @@ -216,7 +193,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc } - if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil { + if err := c.node.SignProposal(ctx, p.Client, dealProposal); err != nil { return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err) } @@ -225,7 +202,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) } - s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID) + s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID) if err != nil { return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } @@ -241,13 +218,16 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } deal := &ClientDeal{ - ProposalCid: proposalNd.Cid(), - Proposal: *dealProposal, - State: api.DealUnknown, - Miner: p.MinerID, - MinerWorker: p.MinerWorker, - PayloadCid: p.Data, - s: s, + ClientDeal: storagemarket.ClientDeal{ + ProposalCid: proposalNd.Cid(), + Proposal: *dealProposal, + State: api.DealUnknown, + Miner: p.MinerID, + MinerWorker: p.MinerWorker, + PayloadCid: p.Data, + }, + + s: s, } c.incoming <- deal @@ -259,7 +239,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) { - s, err := c.h.NewStream(ctx, p, AskProtocolID) + s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index d19765f60..e82beb126 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -1,16 +1,11 @@ package deals import ( - "bytes" "context" "golang.org/x/xerrors" - "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" ) @@ -57,70 +52,20 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), e } return func(info *ClientDeal) { - info.PublishMessage = resp.StorageDealSubmission + info.PublishMessage = resp.PublishMessage }, nil } func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) { log.Infow("DEAL ACCEPTED!") - pubmsg := deal.PublishMessage.Message - pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider) + dealId, err := c.node.ValidatePublishedDeal(ctx, deal.ClientDeal) if err != nil { - return nil, xerrors.Errorf("getting miner worker failed: %w", err) - } - - if pubmsg.From != pw { - return nil, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider) - } - - if pubmsg.To != actors.StorageMarketAddress { - return nil, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To) - } - - if pubmsg.Method != actors.SMAMethods.PublishStorageDeals { - return nil, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method) - } - - var params actors.PublishStorageDealsParams - if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { - return nil, err - } - - dealIdx := -1 - for i, storageDeal := range params.Deals { - // TODO: make it less hacky - sd := storageDeal - eq, err := cborutil.Equals(&deal.Proposal, &sd) - if err != nil { - return nil, err - } - if eq { - dealIdx = i - break - } - } - - if dealIdx == -1 { - return nil, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage.Cid()) - } - - // TODO: timeout - _, ret, err := c.sm.WaitForMessage(ctx, deal.PublishMessage.Cid()) - if err != nil { - return nil, xerrors.Errorf("waiting for deal publish message: %w", err) - } - if ret.ExitCode != 0 { - return nil, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode) - } - - var res actors.PublishStorageDealResponse - if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil { return nil, err } return func(info *ClientDeal) { - info.DealID = res.DealIDs[dealIdx] + info.DealID = dealId }, nil } @@ -131,103 +76,22 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) (func(*ClientDeal) } func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) { - checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts) - if err != nil { - // TODO: This may be fine for some errors - return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) - } - - if sd.ActivationEpoch > 0 { - select { - case c.updated <- clientDealUpdate{ - newState: api.DealComplete, - id: deal.ProposalCid, - }: - case <-c.stop: - } - - return true, false, nil - } - - return false, true, nil - } - - called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { - defer func() { - if err != nil { - select { - case c.updated <- clientDealUpdate{ - newState: api.DealComplete, - id: deal.ProposalCid, - err: xerrors.Errorf("handling applied event: %w", err), - }: - case <-c.stop: - } - } - }() - - if msg == nil { - log.Error("timed out waiting for deal activation... what now?") - return false, nil - } - - sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts) - if err != nil { - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) - } - - if sd.ActivationEpoch == 0 { - return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", deal.DealID, ts.ParentState(), ts.Height()) - } - - log.Infof("Storage deal %d activated at epoch %d", deal.DealID, sd.ActivationEpoch) - + cb := func(err error) { select { case c.updated <- clientDealUpdate{ newState: api.DealComplete, id: deal.ProposalCid, + err: err, }: case <-c.stop: } - - return false, nil } - revert := func(ctx context.Context, ts *types.TipSet) error { - log.Warn("deal activation reverted; TODO: actually handle this!") - // TODO: Just go back to DealSealing? - return nil - } + err := c.node.OnDealSectorCommitted(ctx, deal.Proposal.Provider, deal.DealID, cb) - matchEvent := func(msg *types.Message) (bool, error) { - if msg.To != deal.Proposal.Provider { - return false, nil - } - - if msg.Method != actors.MAMethods.ProveCommitSector { - return false, nil - } - - var params actors.SectorProveCommitInfo - if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { - return false, err - } - - var found bool - for _, dealID := range params.DealIDs { - if dealID == deal.DealID { - found = true - break - } - } - - return found, nil - } - - if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil { - return nil, xerrors.Errorf("failed to set up called handler") - } - - return nil, nil + return nil, err +} + +func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error { + return c.node.ValidateAskSignature(ask) } diff --git a/chain/deals/client_storagemarket.go b/chain/deals/client_storagemarket.go new file mode 100644 index 000000000..08dacc7a3 --- /dev/null +++ b/chain/deals/client_storagemarket.go @@ -0,0 +1,118 @@ +package deals + +// this file implements storagemarket.StorageClient + +import ( + "context" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" +) + +func (c *Client) ListProviders(ctx context.Context) (<-chan storagemarket.StorageProviderInfo, error) { + providers, err := c.node.ListStorageProviders(ctx) + if err != nil { + return nil, err + } + + out := make(chan storagemarket.StorageProviderInfo) + + go func() { + for _, p := range providers { + select { + case out <- *p: + case <-ctx.Done(): + return + } + + } + }() + + return out, nil +} + +func (c *Client) ListDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) { + return c.node.ListClientDeals(ctx, addr) +} + +func (c *Client) ListInProgressDeals(ctx context.Context) ([]storagemarket.ClientDeal, error) { + deals, err := c.List() + if err != nil { + return nil, err + } + + out := make([]storagemarket.ClientDeal, len(deals)) + for k, v := range deals { + out[k] = storagemarket.ClientDeal{ + ProposalCid: v.ProposalCid, + Proposal: v.Proposal, + State: v.State, + Miner: v.Miner, + MinerWorker: v.MinerWorker, + DealID: v.DealID, + PublishMessage: v.PublishMessage, + } + } + + return out, nil +} + +func (c *Client) GetInProgressDeal(ctx context.Context, cid cid.Cid) (storagemarket.ClientDeal, error) { + deals, err := c.ListInProgressDeals(ctx) + if err != nil { + return storagemarket.ClientDeal{}, err + } + + for _, deal := range deals { + if deal.ProposalCid == cid { + return deal, nil + } + } + + return storagemarket.ClientDeal{}, xerrors.Errorf("couldn't find client deal") +} + +func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderInfo) (*storagemarket.StorageAsk, error) { + return c.QueryAsk(ctx, info.PeerID, info.Address) +} + +func (c *Client) ProposeStorageDeal(ctx context.Context, addr address.Address, info *storagemarket.StorageProviderInfo, payloadCid cid.Cid, proposalExpiration storagemarket.Epoch, duration storagemarket.Epoch, price storagemarket.TokenAmount, collateral storagemarket.TokenAmount) (*storagemarket.ProposeStorageDealResult, error) { + + proposal := ClientDealProposal{ + Data: payloadCid, + PricePerEpoch: types.BigInt(price), + ProposalExpiration: uint64(proposalExpiration), + Duration: uint64(duration), + Client: addr, + ProviderAddress: info.Address, + MinerWorker: info.Worker, + MinerID: info.PeerID, + } + + proposalCid, err := c.Start(ctx, proposal) + + result := &storagemarket.ProposeStorageDealResult{ + ProposalCid: proposalCid, + } + + return result, err +} + +func (c *Client) GetPaymentEscrow(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + + balance, err := c.node.GetBalance(ctx, addr) + + return balance, err +} + +func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + + return c.node.AddFunds(ctx, addr, amount) +} + +var _ storagemarket.StorageClient = &Client{} diff --git a/chain/deals/provider.go b/chain/deals/provider.go index 0b1fa3900..1a020de71 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -5,9 +5,10 @@ import ( "errors" "sync" - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -20,8 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) var ProviderDsPrefix = "/deals/provider" @@ -47,9 +47,7 @@ type Provider struct { ask *types.SignedStorageAsk askLk sync.Mutex - secb *sectorblocks.SectorBlocks - sminer *storage.Miner - full api.FullNode + spn storagemarket.StorageProviderNode // TODO: This will go away once storage market module + CAR // is implemented @@ -83,7 +81,7 @@ var ( ErrDataTransferFailed = errors.New("deal data transfer failed") ) -func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) { +func NewProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -94,11 +92,9 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks } h := &Provider{ - sminer: sminer, dag: dag, dataTransfer: dataTransfer, - full: fullNode, - secb: secb, + spn: spn, pricePerByteBlock: types.NewInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) @@ -135,9 +131,12 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks return h, nil } -func (p *Provider) Run(ctx context.Context) { +func (p *Provider) Run(ctx context.Context, host host.Host) { // TODO: restore state + host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream) + host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream) + go func() { defer log.Warn("quitting deal provider loop") defer close(p.stopped) @@ -162,7 +161,7 @@ func (p *Provider) onIncoming(deal MinerDeal) { if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil { // This can happen when client re-sends proposal - p.failDeal(deal.ProposalCid, err) + p.failDeal(context.TODO(), deal.ProposalCid, err) log.Errorf("deal tracking failed: %s", err) return } @@ -180,7 +179,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState]) if update.err != nil { log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err) - p.failDeal(update.id, update.err) + p.failDeal(ctx, update.id, update.err) return } var deal MinerDeal @@ -193,7 +192,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { return nil }) if err != nil { - p.failDeal(update.id, err) + p.failDeal(ctx, update.id, err) return } diff --git a/chain/deals/provider_asks.go b/chain/deals/provider_asks.go index 1170eb28b..a0cdd9c71 100644 --- a/chain/deals/provider_asks.go +++ b/chain/deals/provider_asks.go @@ -5,13 +5,13 @@ import ( "context" "time" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/types" - datastore "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore" inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/chain/types" ) func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error { @@ -41,7 +41,7 @@ func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error { return p.saveAsk(ssa) } -func (p *Provider) getAsk(m address.Address) *types.SignedStorageAsk { +func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk { p.askLk.Lock() defer p.askLk.Unlock() if m != p.actor { @@ -69,7 +69,7 @@ func (p *Provider) HandleAskStream(s inet.Stream) { func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse { return &AskResponse{ - Ask: p.getAsk(ar.Miner), + Ask: p.GetAsk(ar.Miner), } } @@ -112,12 +112,12 @@ func (p *Provider) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) return nil, err } - worker, err := p.getWorker(p.actor) + worker, err := p.spn.GetMinerWorker(context.TODO(), p.actor) if err != nil { return nil, xerrors.Errorf("failed to get worker to sign ask: %w", err) } - sig, err := p.full.WalletSign(context.TODO(), worker, b) + sig, err := p.spn.SignBytes(context.TODO(), worker, b) if err != nil { return nil, err } @@ -141,20 +141,3 @@ func (p *Provider) saveAsk(a *types.SignedStorageAsk) error { p.ask = a return nil } - -func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error { - tss := c.sm.ChainStore().GetHeaviestTipSet().ParentState() - - w, err := stmgr.GetMinerWorkerRaw(context.TODO(), c.sm, tss, ask.Ask.Miner) - if err != nil { - return xerrors.Errorf("failed to get worker for miner in ask", err) - } - - sigb, err := cborutil.Dump(ask.Ask) - if err != nil { - return xerrors.Errorf("failed to re-serialize ask") - } - - return ask.Signature.Verify(w, sigb) - -} diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go index 4faa44973..76fa9a99b 100644 --- a/chain/deals/provider_states.go +++ b/chain/deals/provider_states.go @@ -1,21 +1,17 @@ package deals import ( - "bytes" "context" ipldfree "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - unixfile "github.com/ipfs/go-unixfs/file" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/padreader" - "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) @@ -43,7 +39,7 @@ func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandle // ACCEPTED func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - head, err := p.full.ChainHead(ctx) + head, err := p.spn.MostRecentStateId(ctx) if err != nil { return nil, err } @@ -63,7 +59,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) } // check market funds - clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil) + clientMarketBalance, err := p.spn.GetBalance(ctx, deal.Proposal.Client) if err != nil { return nil, xerrors.Errorf("getting client market balance failed: %w", err) } @@ -74,59 +70,36 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) return nil, xerrors.New("clientMarketBalance.Available too small") } - waddr, err := p.full.StateMinerWorker(ctx, deal.Proposal.Provider, nil) + waddr, err := p.spn.GetMinerWorker(ctx, deal.Proposal.Provider) if err != nil { return nil, err } // TODO: check StorageCollateral (may be too large (or too small)) - if err := p.full.MarketEnsureAvailable(ctx, waddr, deal.Proposal.StorageCollateral); err != nil { + if err := p.spn.EnsureFunds(ctx, waddr, storagemarket.TokenAmount(deal.Proposal.StorageCollateral)); err != nil { return nil, err } - log.Info("publishing deal") - - params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{ - Deals: []actors.StorageDealProposal{deal.Proposal}, - }) - if err != nil { - return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err) + smDeal := storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + SectorID: deal.SectorID, } - // TODO: We may want this to happen after fetching data - smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{ - To: actors.StorageMarketAddress, - From: waddr, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(1000000), - Method: actors.SMAMethods.PublishStorageDeals, - Params: params, - }) + dealId, mcid, err := p.spn.PublishDeals(ctx, smDeal) if err != nil { return nil, err } - r, err := p.full.StateWaitMsg(ctx, smsg.Cid()) - if err != nil { - return nil, err - } - if r.Receipt.ExitCode != 0 { - return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode) - } - var resp actors.PublishStorageDealResponse - if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { - return nil, err - } - if len(resp.DealIDs) != 1 { - return nil, xerrors.Errorf("got unexpected number of DealIDs from SMA") - } - log.Infof("fetching data for a deal %d", resp.DealIDs[0]) - err = p.sendSignedResponse(&Response{ + log.Infof("fetching data for a deal %d", dealId) + err = p.sendSignedResponse(ctx, &Response{ State: api.DealAccepted, - Proposal: deal.ProposalCid, - StorageDealSubmission: smsg, + Proposal: deal.ProposalCid, + PublishMessage: &mcid, }) if err != nil { return nil, err @@ -148,7 +121,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // (see onDataTransferEvent) _, err = p.dataTransfer.OpenPullDataChannel(ctx, deal.Client, - &StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: resp.DealIDs[0]}, + &StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: uint64(dealId)}, deal.Ref, allSelector, ) @@ -162,39 +135,23 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // STAGED func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - root, err := p.dag.Get(ctx, deal.Ref) - if err != nil { - return nil, xerrors.Errorf("failed to get file root for deal: %s", err) - } + sectorID, err := p.spn.OnDealComplete( + ctx, + storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + DealID: deal.DealID, + }, + "", + ) - // TODO: abstract this away into ReadSizeCloser + implement different modes - n, err := unixfile.NewUnixfsFile(ctx, p.dag, root) if err != nil { - return nil, xerrors.Errorf("cannot open unixfs file: %s", err) + return nil, err } - uf, ok := n.(sectorblocks.UnixfsReader) - if !ok { - // we probably got directory, unsupported for now - return nil, xerrors.Errorf("unsupported unixfs file type") - } - - // TODO: uf.Size() is user input, not trusted - // This won't be useful / here after we migrate to putting CARs into sectors - size, err := uf.Size() - if err != nil { - return nil, xerrors.Errorf("getting unixfs file size: %w", err) - } - if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize { - return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") - } - - sectorID, err := p.secb.AddUnixfsPiece(ctx, uf, deal.DealID) - if err != nil { - return nil, xerrors.Errorf("AddPiece failed: %s", err) - } - log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID) - return func(deal *MinerDeal) { deal.SectorID = sectorID }, nil diff --git a/chain/deals/provider_storagemarket.go b/chain/deals/provider_storagemarket.go new file mode 100644 index 000000000..4d9863bd9 --- /dev/null +++ b/chain/deals/provider_storagemarket.go @@ -0,0 +1,65 @@ +package deals + +// this file implements storagemarket.StorageClient + +import ( + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" +) + +func (p *Provider) AddAsk(price storagemarket.TokenAmount, ttlsecs int64) error { + return p.SetPrice(types.BigInt(price), ttlsecs) +} + +func (p *Provider) ListAsks(addr address.Address) []*types.SignedStorageAsk { + ask := p.GetAsk(addr) + + if ask != nil { + return []*types.SignedStorageAsk{ask} + } + + return nil +} + +func (p *Provider) ListDeals(ctx context.Context) ([]actors.OnChainDeal, error) { + return p.spn.ListProviderDeals(ctx, p.actor) +} + +func (p *Provider) AddStorageCollateral(ctx context.Context, amount storagemarket.TokenAmount) error { + return p.spn.AddFunds(ctx, p.actor, amount) +} + +func (p *Provider) GetStorageCollateral(ctx context.Context) (storagemarket.Balance, error) { + balance, err := p.spn.GetBalance(ctx, p.actor) + + return balance, err +} + +func (p *Provider) ListIncompleteDeals() ([]storagemarket.MinerDeal, error) { + var out []storagemarket.MinerDeal + + var deals []MinerDeal + if err := p.deals.List(&deals); err != nil { + return nil, err + } + + for _, deal := range deals { + out = append(out, storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + DealID: deal.DealID, + SectorID: deal.SectorID, + }) + } + + return out, nil +} + +var _ storagemarket.StorageProvider = &Provider{} diff --git a/chain/deals/provider_utils.go b/chain/deals/provider_utils.go index e52268dde..54b838a43 100644 --- a/chain/deals/provider_utils.go +++ b/chain/deals/provider_utils.go @@ -4,24 +4,21 @@ import ( "context" "runtime" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/ipld/go-ipld-prime" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statestore" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) -func (p *Provider) failDeal(id cid.Cid, cerr error) { +func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { if err := p.deals.End(id); err != nil { log.Warnf("deals.End: %s", err) } @@ -33,7 +30,7 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) { log.Warnf("deal %s failed: %s", id, cerr) - err := p.sendSignedResponse(&Response{ + err := p.sendSignedResponse(ctx, &Response{ State: api.DealFailed, Message: cerr.Error(), Proposal: id, @@ -72,7 +69,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { return } -func (p *Provider) sendSignedResponse(resp *Response) error { +func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error { s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") @@ -83,12 +80,12 @@ func (p *Provider) sendSignedResponse(resp *Response) error { return xerrors.Errorf("serializing response: %w", err) } - worker, err := p.getWorker(p.actor) + worker, err := p.spn.GetMinerWorker(ctx, p.actor) if err != nil { return err } - sig, err := p.full.WalletSign(context.TODO(), worker, msg) + sig, err := p.spn.SignBytes(ctx, worker, msg) if err != nil { return xerrors.Errorf("failed to sign response message: %w", err) } @@ -118,24 +115,6 @@ func (p *Provider) disconnect(deal MinerDeal) error { return err } -func (p *Provider) getWorker(miner address.Address) (address.Address, error) { - getworker := &types.Message{ - To: miner, - From: miner, - Method: actors.MAMethods.GetWorkerAddr, - } - r, err := p.full.StateCall(context.TODO(), getworker, nil) - if err != nil { - return address.Undef, xerrors.Errorf("getting worker address: %w", err) - } - - if r.ExitCode != 0 { - return address.Undef, xerrors.Errorf("getWorker call failed: %d", r.ExitCode) - } - - return address.NewFromBytes(r.Return) -} - var _ datatransfer.RequestValidator = &ProviderRequestValidator{} // ProviderRequestValidator validates data transfer requests for the provider diff --git a/chain/deals/request_validation_test.go b/chain/deals/request_validation_test.go index f6e6a0e33..7607eacb5 100644 --- a/chain/deals/request_validation_test.go +++ b/chain/deals/request_validation_test.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -74,12 +75,14 @@ func newClientDeal(minerID peer.ID, state api.DealState) (deals.ClientDeal, erro } return deals.ClientDeal{ - Proposal: newProposal, - ProposalCid: proposalNd.Cid(), - PayloadCid: blockGenerator.Next().Cid(), - Miner: minerID, - MinerWorker: minerAddr, - State: state, + ClientDeal: storagemarket.ClientDeal{ + Proposal: newProposal, + ProposalCid: proposalNd.Cid(), + PayloadCid: blockGenerator.Next().Cid(), + Miner: minerID, + MinerWorker: minerAddr, + State: state, + }, }, nil } diff --git a/chain/deals/types.go b/chain/deals/types.go index 4ad60a4bc..a35f537ab 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -4,12 +4,13 @@ import ( "bytes" "errors" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" ) var ( @@ -41,9 +42,6 @@ var ( DataTransferStates = []api.DealState{api.DealAccepted, api.DealUnknown} ) -const DealProtocolID = "/fil/storage/mk/1.0.1" -const AskProtocolID = "/fil/storage/ask/1.0.1" - type Proposal struct { DealProposal *actors.StorageDealProposal @@ -58,7 +56,7 @@ type Response struct { Proposal cid.Cid // DealAccepted - StorageDealSubmission *types.SignedMessage + PublishMessage *cid.Cid } // TODO: Do we actually need this to be signed? diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index 407f89809..73352df61 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -5,7 +5,7 @@ import ( "io" "math" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) diff --git a/gen/main.go b/gen/main.go index 4f90dc236..0cb569148 100644 --- a/gen/main.go +++ b/gen/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/filecoin-project/lotus/storagemarket" "os" gen "github.com/whyrusleeping/cbor-gen" @@ -122,6 +123,15 @@ func main() { os.Exit(1) } + err = gen.WriteTupleEncodersToFile("./storagemarket/cbor_gen.go", "storagemarket", + storagemarket.ClientDeal{}, + storagemarket.MinerDeal{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = gen.WriteTupleEncodersToFile("./chain/deals/cbor_gen.go", "deals", deals.AskRequest{}, deals.AskResponse{}, diff --git a/node/builder.go b/node/builder.go index 8169d3829..dcce895b7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -47,6 +47,8 @@ import ( "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" + "github.com/filecoin-project/lotus/storagemarketadapter" ) // special is a type used to give keys to modules which @@ -227,7 +229,8 @@ func Online() Option { Override(new(dtypes.ClientDealStore), modules.NewClientDealStore), Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer), Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator), - Override(new(*deals.Client), deals.NewClient), + Override(new(storagemarket.StorageClient), deals.NewClient), + Override(new(storagemarket.StorageClientNode), storagemarketadapter.NewClientNodeAdapter), Override(RegisterClientValidatorKey, modules.RegisterClientValidator), Override(RunDealClientKey, modules.RunDealClient), @@ -250,7 +253,8 @@ func Online() Option { Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator), - Override(new(*deals.Provider), deals.NewProvider), + Override(new(storagemarket.StorageProvider), deals.NewProvider), + Override(new(storagemarket.StorageProviderNode), storagemarketadapter.NewProviderNodeAdapter), Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator), Override(HandleRetrievalKey, modules.HandleRetrieval), Override(GetParamsKey, modules.GetParams), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 57df600bd..58f250f67 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -27,13 +27,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" retrievalmarket "github.com/filecoin-project/lotus/retrieval" + "github.com/filecoin-project/lotus/storagemarket" ) type API struct { @@ -44,7 +44,7 @@ type API struct { full.WalletAPI paych.PaychAPI - DealClient *deals.Client + SMDealClient storagemarket.StorageClient RetDiscovery retrievalmarket.PeerResolver Retrieval retrievalmarket.RetrievalClient Chain *store.ChainStore @@ -72,28 +72,30 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, addr address.Ad if err != nil { return nil, xerrors.Errorf("failed getting miner worker: %w", err) } - - proposal := deals.ClientDealProposal{ - Data: data, - PricePerEpoch: epochPrice, - ProposalExpiration: math.MaxUint64, // TODO: set something reasonable - Duration: blocksDuration, - Client: addr, - ProviderAddress: miner, - MinerWorker: mw, - MinerID: pid, + providerInfo := storagemarket.StorageProviderInfo{ + Address: miner, + Worker: mw, + PeerID: pid, } + result, err := a.SMDealClient.ProposeStorageDeal( + ctx, + addr, + &providerInfo, + data, + storagemarket.Epoch(math.MaxUint64), + storagemarket.Epoch(blocksDuration), + storagemarket.TokenAmount(epochPrice), + storagemarket.TokenAmount(storagemarket.EmptyInt)) - c, err := a.DealClient.Start(ctx, proposal) if err != nil { return nil, xerrors.Errorf("failed to start deal: %w", err) } - return &c, nil + return &result.ProposalCid, nil } func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { - deals, err := a.DealClient.List() + deals, err := a.SMDealClient.ListInProgressDeals(ctx) if err != nil { return nil, err } @@ -117,10 +119,11 @@ func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { } func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) { - v, err := a.DealClient.GetDeal(d) + v, err := a.SMDealClient.GetInProgressDeal(ctx, d) if err != nil { return nil, err } + return &api.DealInfo{ ProposalCid: v.ProposalCid, State: v.State, @@ -315,5 +318,6 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path } func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) { - return a.DealClient.QueryAsk(ctx, p, miner) + info := storagemarket.StorageProviderInfo{Address: miner, PeerID: p} + return a.SMDealClient.GetAsk(ctx, info) } diff --git a/node/modules/services.go b/node/modules/services.go index 9949d0d69..bd0978085 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -10,7 +10,6 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" - "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" @@ -18,6 +17,7 @@ import ( "github.com/filecoin-project/lotus/peermgr" retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" + "github.com/filecoin-project/lotus/storagemarket" ) func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) { @@ -66,7 +66,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu go sub.HandleIncomingMessages(ctx, mpool, msgsub) } -func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) { +func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.StorageClient) { ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 2a897a9a3..d868414d9 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -40,6 +40,7 @@ import ( "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -127,14 +128,12 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva }) } -func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Provider) { +func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) { ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ OnStart: func(context.Context) error { - h.Run(ctx) - host.SetStreamHandler(deals.DealProtocolID, h.HandleStream) - host.SetStreamHandler(deals.AskProtocolID, h.HandleAskStream) + h.Run(ctx, host) return nil }, OnStop: func(context.Context) error { diff --git a/retrieval/impl/cbor_gen.go b/retrieval/impl/cbor_gen.go index f2778544c..220577089 100644 --- a/retrieval/impl/cbor_gen.go +++ b/retrieval/impl/cbor_gen.go @@ -21,7 +21,7 @@ func (t *RetParams) MarshalCBOR(w io.Writer) error { return err } - // t.Unixfs0 (retrieval.Unixfs0Offer) (struct) + // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) if err := t.Unixfs0.MarshalCBOR(w); err != nil { return err } @@ -43,7 +43,7 @@ func (t *RetParams) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Unixfs0 (retrieval.Unixfs0Offer) (struct) + // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) { @@ -124,7 +124,7 @@ func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error { return err } - // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) + // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { return err } @@ -156,7 +156,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) + // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -166,7 +166,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("wrong type for uint64 field") } t.Status = OldQueryResponseStatus(extra) - // t.t.Size (uint64) (uint64) + // t.Size (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -267,7 +267,7 @@ func (t *OldDealProposal) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Ref: %w", err) } - // t.Params (retrieval.RetParams) (struct) + // t.Params (retrievalimpl.RetParams) (struct) if err := t.Params.MarshalCBOR(w); err != nil { return err } @@ -310,7 +310,7 @@ func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error { t.Ref = c } - // t.Params (retrieval.RetParams) (struct) + // t.Params (retrievalimpl.RetParams) (struct) { diff --git a/retrievaladapter/provider.go b/retrievaladapter/provider.go index f8d188a95..06757593b 100644 --- a/retrievaladapter/provider.go +++ b/retrievaladapter/provider.go @@ -3,8 +3,8 @@ package retrievaladapter import ( "context" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) diff --git a/storagemarket/bigint.go b/storagemarket/bigint.go new file mode 100644 index 000000000..f36e242b0 --- /dev/null +++ b/storagemarket/bigint.go @@ -0,0 +1,241 @@ +// Copied from lotus until this can be extracted into shared types + +package storagemarket + +import ( + "encoding/json" + "fmt" + "io" + "math/big" + + "github.com/filecoin-project/lotus/build" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/polydawn/refmt/obj/atlas" + + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" +) + +const BigIntMaxSerializedLen = 128 // is this big enough? or too big? + +var TotalFilecoinInt = FromFil(build.TotalFilecoin) + +func init() { + cbor.RegisterCborType(atlas.BuildEntry(BigInt{}).Transform(). + TransformMarshal(atlas.MakeMarshalTransformFunc( + func(i BigInt) ([]byte, error) { + return i.cborBytes(), nil + })). + TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( + func(x []byte) (BigInt, error) { + return fromCborBytes(x) + })). + Complete()) +} + +var EmptyInt = BigInt{} + +type BigInt struct { + *big.Int +} + +func NewInt(i uint64) BigInt { + return BigInt{big.NewInt(0).SetUint64(i)} +} + +func FromFil(i uint64) BigInt { + return BigMul(NewInt(i), NewInt(build.FilecoinPrecision)) +} + +func BigFromBytes(b []byte) BigInt { + i := big.NewInt(0).SetBytes(b) + return BigInt{i} +} + +func BigFromString(s string) (BigInt, error) { + v, ok := big.NewInt(0).SetString(s, 10) + if !ok { + return BigInt{}, fmt.Errorf("failed to parse string as a big int") + } + + return BigInt{v}, nil +} + +func BigMul(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Mul(a.Int, b.Int)} +} + +func BigDiv(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Div(a.Int, b.Int)} +} + +func BigMod(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Mod(a.Int, b.Int)} +} + +func BigAdd(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Add(a.Int, b.Int)} +} + +func BigSub(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Sub(a.Int, b.Int)} +} + +func BigCmp(a, b BigInt) int { + return a.Int.Cmp(b.Int) +} + +func (bi BigInt) Nil() bool { + return bi.Int == nil +} + +// LessThan returns true if bi < o +func (bi BigInt) LessThan(o BigInt) bool { + return BigCmp(bi, o) < 0 +} + +// GreaterThan returns true if bi > o +func (bi BigInt) GreaterThan(o BigInt) bool { + return BigCmp(bi, o) > 0 +} + +// Equals returns true if bi == o +func (bi BigInt) Equals(o BigInt) bool { + return BigCmp(bi, o) == 0 +} + +func (bi *BigInt) MarshalJSON() ([]byte, error) { + return json.Marshal(bi.String()) +} + +func (bi *BigInt) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + + i, ok := big.NewInt(0).SetString(s, 10) + if !ok { + if string(s) == "" { + return nil + } + return xerrors.Errorf("failed to parse bigint string: '%s'", string(b)) + } + + bi.Int = i + return nil +} + +func (bi *BigInt) Scan(value interface{}) error { + switch value := value.(type) { + case string: + i, ok := big.NewInt(0).SetString(value, 10) + if !ok { + if value == "" { + return nil + } + return xerrors.Errorf("failed to parse bigint string: '%s'", value) + } + + bi.Int = i + + return nil + case int64: + bi.Int = big.NewInt(value) + return nil + default: + return xerrors.Errorf("non-string types unsupported: %T", value) + } +} + +func (bi *BigInt) cborBytes() []byte { + if bi.Int == nil { + return []byte{} + } + + switch { + case bi.Sign() > 0: + return append([]byte{0}, bi.Bytes()...) + case bi.Sign() < 0: + return append([]byte{1}, bi.Bytes()...) + default: // bi.Sign() == 0: + return []byte{} + } +} + +func fromCborBytes(buf []byte) (BigInt, error) { + if len(buf) == 0 { + return NewInt(0), nil + } + + var negative bool + switch buf[0] { + case 0: + negative = false + case 1: + negative = true + default: + return EmptyInt, fmt.Errorf("big int prefix should be either 0 or 1, got %d", buf[0]) + } + + i := big.NewInt(0).SetBytes(buf[1:]) + if negative { + i.Neg(i) + } + + return BigInt{i}, nil +} + +func (bi *BigInt) MarshalCBOR(w io.Writer) error { + if bi.Int == nil { + zero := NewInt(0) + return zero.MarshalCBOR(w) + } + + enc := bi.cborBytes() + + header := cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(enc))) + if _, err := w.Write(header); err != nil { + return err + } + + if _, err := w.Write(enc); err != nil { + return err + } + + return nil +} + +func (bi *BigInt) UnmarshalCBOR(br io.Reader) error { + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + + if maj != cbg.MajByteString { + return fmt.Errorf("cbor input for fil big int was not a byte string (%x)", maj) + } + + if extra == 0 { + bi.Int = big.NewInt(0) + return nil + } + + if extra > BigIntMaxSerializedLen { + return fmt.Errorf("big integer byte array too long") + } + + buf := make([]byte, extra) + if _, err := io.ReadFull(br, buf); err != nil { + return err + } + + i, err := fromCborBytes(buf) + if err != nil { + return err + } + + *bi = i + + return nil +} diff --git a/storagemarket/cbor_gen.go b/storagemarket/cbor_gen.go new file mode 100644 index 000000000..e4a9efcfa --- /dev/null +++ b/storagemarket/cbor_gen.go @@ -0,0 +1,352 @@ +package storagemarket + +import ( + "fmt" + "io" + + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf + +func (t *ClientDeal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{136}); err != nil { + return err + } + + // t.ProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.ProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) + } + + // t.Proposal (actors.StorageDealProposal) (struct) + if err := t.Proposal.MarshalCBOR(w); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Miner (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Miner)); err != nil { + return err + } + + // t.MinerWorker (address.Address) (struct) + if err := t.MinerWorker.MarshalCBOR(w); err != nil { + return err + } + + // t.DealID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { + return err + } + + // t.PayloadCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.PayloadCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 8 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) + } + + t.ProposalCid = c + + } + // t.Proposal (actors.StorageDealProposal) (struct) + + { + + if err := t.Proposal.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Miner (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Miner = peer.ID(sval) + } + // t.MinerWorker (address.Address) (struct) + + { + + if err := t.MinerWorker.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.DealID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = uint64(extra) + // t.PayloadCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err) + } + + t.PayloadCid = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *MinerDeal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{136}); err != nil { + return err + } + + // t.ProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.ProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) + } + + // t.Proposal (actors.StorageDealProposal) (struct) + if err := t.Proposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Miner (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Miner)); err != nil { + return err + } + + // t.Client (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Client)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Client)); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Ref (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Ref); err != nil { + return xerrors.Errorf("failed to write cid field t.Ref: %w", err) + } + + // t.DealID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { + return err + } + + // t.SectorID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SectorID))); err != nil { + return err + } + return nil +} + +func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 8 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) + } + + t.ProposalCid = c + + } + // t.Proposal (actors.StorageDealProposal) (struct) + + { + + if err := t.Proposal.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.Miner (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Miner = peer.ID(sval) + } + // t.Client (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Client = peer.ID(sval) + } + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Ref (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Ref: %w", err) + } + + t.Ref = c + + } + // t.DealID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = uint64(extra) + // t.SectorID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorID = uint64(extra) + return nil +} diff --git a/storagemarket/types.go b/storagemarket/types.go new file mode 100644 index 000000000..1497063c4 --- /dev/null +++ b/storagemarket/types.go @@ -0,0 +1,199 @@ +package storagemarket + +import ( + "context" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +const DealProtocolID = "/fil/storage/mk/1.0.1" +const AskProtocolID = "/fil/storage/ask/1.0.1" + +// type shims - used during migration into separate module +type Balance = actors.StorageParticipantBalance +type DealID uint64 +type Signature = types.Signature +type StorageDeal = actors.OnChainDeal +type StorageAsk = types.SignedStorageAsk +type StateKey = *types.TipSet +type Epoch uint64 +type TokenAmount BigInt + +// Duplicated from deals package for now +type MinerDeal struct { + ProposalCid cid.Cid + Proposal actors.StorageDealProposal + Miner peer.ID + Client peer.ID + State api.DealState + + Ref cid.Cid + + DealID uint64 + SectorID uint64 // Set when sm >= DealStaged +} + +type ClientDeal struct { + ProposalCid cid.Cid + Proposal actors.StorageDealProposal + State api.DealState + Miner peer.ID + MinerWorker address.Address + DealID uint64 + PayloadCid cid.Cid + PublishMessage *cid.Cid +} + +// The interface provided for storage providers +type StorageProvider interface { + Run(ctx context.Context, host host.Host) + + Stop() + + AddAsk(price TokenAmount, ttlsecs int64) error + + // ListAsks lists current asks + ListAsks(addr address.Address) []*StorageAsk + + // ListDeals lists on-chain deals associated with this provider + ListDeals(ctx context.Context) ([]StorageDeal, error) + + // ListIncompleteDeals lists deals that are in progress or rejected + ListIncompleteDeals() ([]MinerDeal, error) + + // AddStorageCollateral adds storage collateral + AddStorageCollateral(ctx context.Context, amount TokenAmount) error + + // GetStorageCollateral returns the current collateral balance + GetStorageCollateral(ctx context.Context) (Balance, error) +} + +// Node dependencies for a StorageProvider +type StorageProviderNode interface { + MostRecentStateId(ctx context.Context) (StateKey, error) + + // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. + AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // Ensures that a storage market participant has a certain amount of available funds + EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. + GetBalance(ctx context.Context, addr address.Address) (Balance, error) + + // Publishes deal on chain + PublishDeals(ctx context.Context, deal MinerDeal) (DealID, cid.Cid, error) + + // ListProviderDeals lists all deals associated with a storage provider + ListProviderDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // Called when a deal is complete and on chain, and data has been transferred and is ready to be added to a sector + // returns sector id + OnDealComplete(ctx context.Context, deal MinerDeal, piecePath string) (uint64, error) + + // returns the worker address associated with a miner + GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) + + // Signs bytes + SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) +} + +type DealSectorCommittedCallback func(error) + +// Node dependencies for a StorageClient +type StorageClientNode interface { + MostRecentStateId(ctx context.Context) (StateKey, error) + + // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. + AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. + GetBalance(ctx context.Context, addr address.Address) (Balance, error) + + //// ListClientDeals lists all on-chain deals associated with a storage client + ListClientDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // GetProviderInfo returns information about a single storage provider + //GetProviderInfo(stateId StateID, addr Address) *StorageProviderInfo + + // GetStorageProviders returns information about known miners + ListStorageProviders(ctx context.Context) ([]*StorageProviderInfo, error) + + // Subscribes to storage market actor state changes for a given address. + // TODO: Should there be a timeout option for this? In the case that we are waiting for funds to be deposited and it never happens? + //SubscribeStorageMarketEvents(addr Address, handler StorageMarketEventHandler) (SubID, error) + + // Cancels a subscription + //UnsubscribeStorageMarketEvents(subId SubID) + ValidatePublishedDeal(ctx context.Context, deal ClientDeal) (uint64, error) + + // SignProposal signs a proposal + SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error + + GetDefaultWalletAddress(ctx context.Context) (address.Address, error) + + OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb DealSectorCommittedCallback) error + + ValidateAskSignature(ask *StorageAsk) error +} + +type StorageClientProofs interface { + //GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (CommP, error) +} + +// Closely follows the MinerInfo struct in the spec +type StorageProviderInfo struct { + Address address.Address // actor address + Owner address.Address + Worker address.Address // signs messages + SectorSize uint64 + PeerID peer.ID + // probably more like how much storage power, available collateral etc +} + +type ProposeStorageDealResult struct { + ProposalCid cid.Cid +} + +// The interface provided by the module to the outside world for storage clients. +type StorageClient interface { + Run(ctx context.Context) + + Stop() + + // ListProviders queries chain state and returns active storage providers + ListProviders(ctx context.Context) (<-chan StorageProviderInfo, error) + + // ListDeals lists on-chain deals associated with this provider + ListDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // ListInProgressDeals lists deals that are in progress or rejected + ListInProgressDeals(ctx context.Context) ([]ClientDeal, error) + + // ListInProgressDeals lists deals that are in progress or rejected + GetInProgressDeal(ctx context.Context, cid cid.Cid) (ClientDeal, error) + + // GetAsk returns the current ask for a storage provider + GetAsk(ctx context.Context, info StorageProviderInfo) (*StorageAsk, error) + + //// FindStorageOffers lists providers and queries them to find offers that satisfy some criteria based on price, duration, etc. + //FindStorageOffers(criteria AskCriteria, limit uint) []*StorageOffer + + // ProposeStorageDeal initiates deal negotiation with a Storage Provider + ProposeStorageDeal(ctx context.Context, addr address.Address, info *StorageProviderInfo, payloadCid cid.Cid, proposalExpiration Epoch, duration Epoch, price TokenAmount, collateral TokenAmount) (*ProposeStorageDealResult, error) + + // GetPaymentEscrow returns the current funds available for deal payment + GetPaymentEscrow(ctx context.Context, addr address.Address) (Balance, error) + + // AddStorageCollateral adds storage collateral + AddPaymentEscrow(ctx context.Context, addr address.Address, amount TokenAmount) error +} diff --git a/storagemarketadapter/client_adapter.go b/storagemarketadapter/client_adapter.go new file mode 100644 index 000000000..9e23771f5 --- /dev/null +++ b/storagemarketadapter/client_adapter.go @@ -0,0 +1,328 @@ +package storagemarketadapter + +// this file implements storagemarket.StorageClientNode + +import ( + "bytes" + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/market" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/storagemarket" +) + +type ClientNodeAdapter struct { + full.StateAPI + full.ChainAPI + full.MpoolAPI + + sm *stmgr.StateManager + cs *store.ChainStore + fm *market.FundMgr + ev *events.Events +} + +type clientApi struct { + full.ChainAPI + full.StateAPI +} + +func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode { + return &ClientNodeAdapter{ + StateAPI: state, + ChainAPI: chain, + MpoolAPI: mpool, + + sm: sm, + cs: cs, + fm: fm, + ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), + } +} + +func (n *ClientNodeAdapter) ListStorageProviders(ctx context.Context) ([]*storagemarket.StorageProviderInfo, error) { + ts, err := n.ChainHead(ctx) + if err != nil { + return nil, err + } + + addresses, err := n.StateListMiners(ctx, ts) + if err != nil { + return nil, err + } + + var out []*storagemarket.StorageProviderInfo + + for _, addr := range addresses { + workerAddr, err := n.StateMinerWorker(ctx, addr, ts) + if err != nil { + return nil, err + } + + sectorSize, err := n.StateMinerSectorSize(ctx, addr, ts) + if err != nil { + return nil, err + } + + peerId, err := n.StateMinerPeerID(ctx, addr, ts) + if err != nil { + return nil, err + } + + out = append(out, &storagemarket.StorageProviderInfo{ + Address: addr, + Worker: workerAddr, + SectorSize: sectorSize, + PeerID: peerId, + }) + } + + return out, nil +} + +func (n *ClientNodeAdapter) ListClientDeals(ctx context.Context, addr address.Address) ([]storagemarket.StorageDeal, error) { + allDeals, err := n.StateMarketDeals(ctx, nil) + if err != nil { + return nil, err + } + + var out []actors.OnChainDeal + + for _, deal := range allDeals { + if deal.Client == addr { + out = append(out, deal) + } + } + + return out, nil +} + +func (n *ClientNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) { + return n.ChainHead(ctx) +} + +// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. +func (n *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + // (Provider Node API) + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: addr, + Value: types.BigInt(amount), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.AddBalance, + }) + if err != nil { + return err + } + + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + + if r.Receipt.ExitCode != 0 { + return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode) + } + + return nil +} + +func (n *ClientNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + return n.fm.EnsureAvailable(ctx, addr, types.BigInt(amount)) +} + +func (n *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + bal, err := n.StateMarketBalance(ctx, addr, nil) + if err != nil { + return storagemarket.Balance{}, err + } + + return bal, nil +} + +// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal +// returns the Deal id if there is no error +func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (uint64, error) { + log.Infow("DEAL ACCEPTED!") + + pubmsg, err := c.cs.GetMessage(*deal.PublishMessage) + if err != nil { + return 0, xerrors.Errorf("getting deal pubsish message: %w", err) + } + + pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider) + if err != nil { + return 0, xerrors.Errorf("getting miner worker failed: %w", err) + } + + if pubmsg.From != pw { + return 0, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider) + } + + if pubmsg.To != actors.StorageMarketAddress { + return 0, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To) + } + + if pubmsg.Method != actors.SMAMethods.PublishStorageDeals { + return 0, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method) + } + + var params actors.PublishStorageDealsParams + if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { + return 0, err + } + + dealIdx := -1 + for i, storageDeal := range params.Deals { + // TODO: make it less hacky + sd := storageDeal + eq, err := cborutil.Equals(&deal.Proposal, &sd) + if err != nil { + return 0, err + } + if eq { + dealIdx = i + break + } + } + + if dealIdx == -1 { + return 0, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage) + } + + // TODO: timeout + _, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage) + if err != nil { + return 0, xerrors.Errorf("waiting for deal publish message: %w", err) + } + if ret.ExitCode != 0 { + return 0, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode) + } + + var res actors.PublishStorageDealResponse + if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil { + return 0, err + } + + return res.DealIDs[dealIdx], nil +} + +func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error { + checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts) + if err != nil { + // TODO: This may be fine for some errors + return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch > 0 { + cb(nil) + return true, false, nil + } + + return false, true, nil + } + + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { + defer func() { + if err != nil { + cb(xerrors.Errorf("handling applied event: %w", err)) + } + }() + + if msg == nil { + log.Error("timed out waiting for deal activation... what now?") + return false, nil + } + + sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts) + if err != nil { + return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch == 0 { + return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealId, ts.ParentState(), ts.Height()) + } + + log.Infof("Storage deal %d activated at epoch %d", dealId, sd.ActivationEpoch) + + cb(nil) + + return false, nil + } + + revert := func(ctx context.Context, ts *types.TipSet) error { + log.Warn("deal activation reverted; TODO: actually handle this!") + // TODO: Just go back to DealSealing? + return nil + } + + matchEvent := func(msg *types.Message) (bool, error) { + if msg.To != provider { + return false, nil + } + + if msg.Method != actors.MAMethods.ProveCommitSector { + return false, nil + } + + var params actors.SectorProveCommitInfo + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, err + } + + var found bool + for _, dealID := range params.DealIDs { + if dealID == dealId { + found = true + break + } + } + + return found, nil + } + + if err := c.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil { + return xerrors.Errorf("failed to set up called handler") + } + + return nil +} + +func (n *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error { + return api.SignWith(ctx, n.Wallet.Sign, signer, proposal) +} + +func (n *ClientNodeAdapter) GetDefaultWalletAddress(ctx context.Context) (address.Address, error) { + return n.Wallet.GetDefault() +} + +func (n *ClientNodeAdapter) ValidateAskSignature(ask *types.SignedStorageAsk) error { + tss := n.cs.GetHeaviestTipSet().ParentState() + + w, err := stmgr.GetMinerWorkerRaw(context.TODO(), n.StateManager, tss, ask.Ask.Miner) + if err != nil { + return xerrors.Errorf("failed to get worker for miner in ask", err) + } + + sigb, err := cborutil.Dump(ask.Ask) + if err != nil { + return xerrors.Errorf("failed to re-serialize ask") + } + + return ask.Signature.Verify(w, sigb) +} + +var _ storagemarket.StorageClientNode = &ClientNodeAdapter{} diff --git a/storagemarketadapter/provider_adapter.go b/storagemarketadapter/provider_adapter.go new file mode 100644 index 000000000..0691bee36 --- /dev/null +++ b/storagemarketadapter/provider_adapter.go @@ -0,0 +1,196 @@ +package storagemarketadapter + +// this file implements storagemarket.StorageProviderNode + +import ( + "bytes" + "context" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + unixfile "github.com/ipfs/go-unixfs/file" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/padreader" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" +) + +var log = logging.Logger("provideradapter") + +type ProviderNodeAdapter struct { + api.FullNode + + // this goes away with the data transfer module + dag dtypes.StagingDAG + + secb *sectorblocks.SectorBlocks +} + +func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { + return &ProviderNodeAdapter{ + FullNode: full, + dag: dag, + secb: secb, + } +} + +func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (storagemarket.DealID, cid.Cid, error) { + log.Info("publishing deal") + + worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, nil) + if err != nil { + return 0, cid.Undef, err + } + + params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{ + Deals: []actors.StorageDealProposal{deal.Proposal}, + }) + + if err != nil { + return 0, cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err) + } + + // TODO: We may want this to happen after fetching data + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: worker, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.PublishStorageDeals, + Params: params, + }) + if err != nil { + return 0, cid.Undef, err + } + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return 0, cid.Undef, err + } + if r.Receipt.ExitCode != 0 { + return 0, cid.Undef, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode) + } + var resp actors.PublishStorageDealResponse + if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { + return 0, cid.Undef, err + } + if len(resp.DealIDs) != 1 { + return 0, cid.Undef, xerrors.Errorf("got unexpected number of DealIDs from") + } + + return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil +} + +func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) { + root, err := n.dag.Get(ctx, deal.Ref) + if err != nil { + return 0, xerrors.Errorf("failed to get file root for deal: %s", err) + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + node, err := unixfile.NewUnixfsFile(ctx, n.dag, root) + if err != nil { + return 0, xerrors.Errorf("cannot open unixfs file: %s", err) + } + + uf, ok := node.(sectorblocks.UnixfsReader) + if !ok { + // we probably got directory, unsupported for now + return 0, xerrors.Errorf("unsupported unixfs file type") + } + + // TODO: uf.Size() is user input, not trusted + // This won't be useful / here after we migrate to putting CARs into sectors + size, err := uf.Size() + if err != nil { + return 0, xerrors.Errorf("getting unixfs file size: %w", err) + } + if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize { + return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") + } + + sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID) + if err != nil { + return 0, xerrors.Errorf("AddPiece failed: %s", err) + } + log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID) + + return sectorID, nil +} + +func (n *ProviderNodeAdapter) ListProviderDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) { + allDeals, err := n.StateMarketDeals(ctx, nil) + if err != nil { + return nil, err + } + + var out []actors.OnChainDeal + + for _, deal := range allDeals { + if deal.Provider == addr { + out = append(out, deal) + } + } + + return out, nil +} + +func (n *ProviderNodeAdapter) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) { + return n.StateMinerWorker(ctx, miner, nil) +} + +func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) { + return n.WalletSign(ctx, signer, b) +} + +func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amt storagemarket.TokenAmount) error { + return n.MarketEnsureAvailable(ctx, addr, types.BigInt(amt)) +} + +func (n *ProviderNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) { + return n.ChainHead(ctx) +} + +// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. +func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + // (Provider Node API) + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: addr, + Value: types.BigInt(amount), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.AddBalance, + }) + if err != nil { + return err + } + + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + + if r.Receipt.ExitCode != 0 { + return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode) + } + + return nil +} + +func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + bal, err := n.StateMarketBalance(ctx, addr, nil) + if err != nil { + return storagemarket.Balance{}, err + } + + return bal, nil +} + +var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}