diff --git a/chain/deals/cbor_gen.go b/chain/deals/cbor_gen.go index bd56e0551..5abb92825 100644 --- a/chain/deals/cbor_gen.go +++ b/chain/deals/cbor_gen.go @@ -219,10 +219,18 @@ func (t *Response) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Proposal: %w", err) } - // t.StorageDealSubmission (types.SignedMessage) (struct) - if err := t.StorageDealSubmission.MarshalCBOR(w); err != nil { - return err + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } } + return nil } @@ -273,7 +281,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { t.Proposal = c } - // t.StorageDealSubmission (types.SignedMessage) (struct) + // t.PublishMessage (cid.Cid) (struct) { @@ -287,10 +295,13 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error { return err } } else { - t.StorageDealSubmission = new(types.SignedMessage) - if err := t.StorageDealSubmission.UnmarshalCBOR(br); err != nil { - return err + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) } + + t.PublishMessage = &c } } @@ -526,56 +537,12 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{136}); err != nil { + if _, err := w.Write([]byte{129}); err != nil { return err } - // t.ProposalCid (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.ProposalCid); err != nil { - return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) - } - - // t.Proposal (actors.StorageDealProposal) (struct) - if err := t.Proposal.MarshalCBOR(w); err != nil { - return err - } - - // t.State (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { - return err - } - - // t.Miner (peer.ID) (string) - if len(t.Miner) > cbg.MaxLength { - return xerrors.Errorf("Value in field t.Miner was too long") - } - - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { - return err - } - if _, err := w.Write([]byte(t.Miner)); err != nil { - return err - } - - // t.MinerWorker (address.Address) (struct) - if err := t.MinerWorker.MarshalCBOR(w); err != nil { - return err - } - - // t.DealID (uint64) (uint64) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { - return err - } - - // t.PayloadCid (cid.Cid) (struct) - - if err := cbg.WriteCid(w, t.PayloadCid); err != nil { - return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err) - } - - // t.PublishMessage (types.SignedMessage) (struct) - if err := t.PublishMessage.MarshalCBOR(w); err != nil { + // t.ClientDeal (storagemarket.ClientDeal) (struct) + if err := t.ClientDeal.MarshalCBOR(w); err != nil { return err } return nil @@ -592,102 +559,18 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 8 { + if extra != 1 { return fmt.Errorf("cbor input had wrong number of fields") } - // t.ProposalCid (cid.Cid) (struct) + // t.ClientDeal (storagemarket.ClientDeal) (struct) { - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) - } - - t.ProposalCid = c - - } - // t.Proposal (actors.StorageDealProposal) (struct) - - { - - if err := t.Proposal.UnmarshalCBOR(br); err != nil { + if err := t.ClientDeal.UnmarshalCBOR(br); err != nil { return err } - } - // t.State (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.State = uint64(extra) - // t.Miner (peer.ID) (string) - - { - sval, err := cbg.ReadString(br) - if err != nil { - return err - } - - t.Miner = peer.ID(sval) - } - // t.MinerWorker (address.Address) (struct) - - { - - if err := t.MinerWorker.UnmarshalCBOR(br); err != nil { - return err - } - - } - // t.DealID (uint64) (uint64) - - maj, extra, err = cbg.CborReadHeader(br) - if err != nil { - return err - } - if maj != cbg.MajUnsignedInt { - return fmt.Errorf("wrong type for uint64 field") - } - t.DealID = uint64(extra) - // t.PayloadCid (cid.Cid) (struct) - - { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err) - } - - t.PayloadCid = c - - } - // t.PublishMessage (types.SignedMessage) (struct) - - { - - pb, err := br.PeekByte() - if err != nil { - return err - } - if pb == cbg.CborNull[0] { - var nbuf [1]byte - if _, err := br.Read(nbuf[:]); err != nil { - return err - } - } else { - t.PublishMessage = new(types.SignedMessage) - if err := t.PublishMessage.UnmarshalCBOR(br); err != nil { - return err - } - } - } return nil } diff --git a/chain/deals/client.go b/chain/deals/client.go index 4e1d3df4a..879e408e2 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -15,39 +15,24 @@ import ( "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/market" - "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/chain/wallet" - "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" + "github.com/filecoin-project/lotus/storagemarket" ) var log = logging.Logger("deals") type ClientDeal struct { - ProposalCid cid.Cid - Proposal actors.StorageDealProposal - State api.DealState - Miner peer.ID - MinerWorker address.Address - DealID uint64 - PayloadCid cid.Cid - - PublishMessage *types.SignedMessage + storagemarket.ClientDeal s inet.Stream } type Client struct { - sm *stmgr.StateManager - chain *store.ChainStore - h host.Host - w *wallet.Wallet + h host.Host + // dataTransfer // TODO: once the data transfer module is complete, the // client will listen to events on the data transfer module @@ -56,8 +41,8 @@ type Client struct { dataTransfer dtypes.ClientDataTransfer dag dtypes.ClientDAG discovery *discovery.Local - events *events.Events - fm *market.FundMgr + + node storagemarket.StorageClientNode deals *statestore.StateStore conns map[cid.Cid]inet.Stream @@ -76,22 +61,13 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -type clientApi struct { - full.ChainAPI - full.StateAPI -} - -func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client { +func NewClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) *Client { c := &Client{ - sm: sm, - chain: chain, h: h, - w: w, dataTransfer: dataTransfer, dag: dag, discovery: discovery, - fm: fm, - events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}), + node: scn, deals: deals, conns: map[cid.Cid]inet.Stream{}, @@ -196,7 +172,8 @@ type ClientDealProposal struct { } func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) { - if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil { + amount := types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration)) + if err := c.node.EnsureFunds(ctx, p.Client, storagemarket.TokenAmount(amount)); err != nil { return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err) } @@ -216,7 +193,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc } - if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil { + if err := c.node.SignProposal(ctx, p.Client, dealProposal); err != nil { return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err) } @@ -225,7 +202,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err) } - s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID) + s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID) if err != nil { return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err) } @@ -241,13 +218,16 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } deal := &ClientDeal{ - ProposalCid: proposalNd.Cid(), - Proposal: *dealProposal, - State: api.DealUnknown, - Miner: p.MinerID, - MinerWorker: p.MinerWorker, - PayloadCid: p.Data, - s: s, + ClientDeal: storagemarket.ClientDeal{ + ProposalCid: proposalNd.Cid(), + Proposal: *dealProposal, + State: api.DealUnknown, + Miner: p.MinerID, + MinerWorker: p.MinerWorker, + PayloadCid: p.Data, + }, + + s: s, } c.incoming <- deal @@ -259,7 +239,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro } func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) { - s, err := c.h.NewStream(ctx, p, AskProtocolID) + s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID) if err != nil { return nil, xerrors.Errorf("failed to open stream to miner: %w", err) } diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index d19765f60..e82beb126 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -1,16 +1,11 @@ package deals import ( - "bytes" "context" "golang.org/x/xerrors" - "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" ) @@ -57,70 +52,20 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), e } return func(info *ClientDeal) { - info.PublishMessage = resp.StorageDealSubmission + info.PublishMessage = resp.PublishMessage }, nil } func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) { log.Infow("DEAL ACCEPTED!") - pubmsg := deal.PublishMessage.Message - pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider) + dealId, err := c.node.ValidatePublishedDeal(ctx, deal.ClientDeal) if err != nil { - return nil, xerrors.Errorf("getting miner worker failed: %w", err) - } - - if pubmsg.From != pw { - return nil, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider) - } - - if pubmsg.To != actors.StorageMarketAddress { - return nil, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To) - } - - if pubmsg.Method != actors.SMAMethods.PublishStorageDeals { - return nil, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method) - } - - var params actors.PublishStorageDealsParams - if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { - return nil, err - } - - dealIdx := -1 - for i, storageDeal := range params.Deals { - // TODO: make it less hacky - sd := storageDeal - eq, err := cborutil.Equals(&deal.Proposal, &sd) - if err != nil { - return nil, err - } - if eq { - dealIdx = i - break - } - } - - if dealIdx == -1 { - return nil, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage.Cid()) - } - - // TODO: timeout - _, ret, err := c.sm.WaitForMessage(ctx, deal.PublishMessage.Cid()) - if err != nil { - return nil, xerrors.Errorf("waiting for deal publish message: %w", err) - } - if ret.ExitCode != 0 { - return nil, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode) - } - - var res actors.PublishStorageDealResponse - if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil { return nil, err } return func(info *ClientDeal) { - info.DealID = res.DealIDs[dealIdx] + info.DealID = dealId }, nil } @@ -131,103 +76,22 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) (func(*ClientDeal) } func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) { - checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts) - if err != nil { - // TODO: This may be fine for some errors - return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) - } - - if sd.ActivationEpoch > 0 { - select { - case c.updated <- clientDealUpdate{ - newState: api.DealComplete, - id: deal.ProposalCid, - }: - case <-c.stop: - } - - return true, false, nil - } - - return false, true, nil - } - - called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { - defer func() { - if err != nil { - select { - case c.updated <- clientDealUpdate{ - newState: api.DealComplete, - id: deal.ProposalCid, - err: xerrors.Errorf("handling applied event: %w", err), - }: - case <-c.stop: - } - } - }() - - if msg == nil { - log.Error("timed out waiting for deal activation... what now?") - return false, nil - } - - sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts) - if err != nil { - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) - } - - if sd.ActivationEpoch == 0 { - return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", deal.DealID, ts.ParentState(), ts.Height()) - } - - log.Infof("Storage deal %d activated at epoch %d", deal.DealID, sd.ActivationEpoch) - + cb := func(err error) { select { case c.updated <- clientDealUpdate{ newState: api.DealComplete, id: deal.ProposalCid, + err: err, }: case <-c.stop: } - - return false, nil } - revert := func(ctx context.Context, ts *types.TipSet) error { - log.Warn("deal activation reverted; TODO: actually handle this!") - // TODO: Just go back to DealSealing? - return nil - } + err := c.node.OnDealSectorCommitted(ctx, deal.Proposal.Provider, deal.DealID, cb) - matchEvent := func(msg *types.Message) (bool, error) { - if msg.To != deal.Proposal.Provider { - return false, nil - } - - if msg.Method != actors.MAMethods.ProveCommitSector { - return false, nil - } - - var params actors.SectorProveCommitInfo - if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { - return false, err - } - - var found bool - for _, dealID := range params.DealIDs { - if dealID == deal.DealID { - found = true - break - } - } - - return found, nil - } - - if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil { - return nil, xerrors.Errorf("failed to set up called handler") - } - - return nil, nil + return nil, err +} + +func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error { + return c.node.ValidateAskSignature(ask) } diff --git a/chain/deals/client_storagemarket.go b/chain/deals/client_storagemarket.go new file mode 100644 index 000000000..08dacc7a3 --- /dev/null +++ b/chain/deals/client_storagemarket.go @@ -0,0 +1,118 @@ +package deals + +// this file implements storagemarket.StorageClient + +import ( + "context" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" +) + +func (c *Client) ListProviders(ctx context.Context) (<-chan storagemarket.StorageProviderInfo, error) { + providers, err := c.node.ListStorageProviders(ctx) + if err != nil { + return nil, err + } + + out := make(chan storagemarket.StorageProviderInfo) + + go func() { + for _, p := range providers { + select { + case out <- *p: + case <-ctx.Done(): + return + } + + } + }() + + return out, nil +} + +func (c *Client) ListDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) { + return c.node.ListClientDeals(ctx, addr) +} + +func (c *Client) ListInProgressDeals(ctx context.Context) ([]storagemarket.ClientDeal, error) { + deals, err := c.List() + if err != nil { + return nil, err + } + + out := make([]storagemarket.ClientDeal, len(deals)) + for k, v := range deals { + out[k] = storagemarket.ClientDeal{ + ProposalCid: v.ProposalCid, + Proposal: v.Proposal, + State: v.State, + Miner: v.Miner, + MinerWorker: v.MinerWorker, + DealID: v.DealID, + PublishMessage: v.PublishMessage, + } + } + + return out, nil +} + +func (c *Client) GetInProgressDeal(ctx context.Context, cid cid.Cid) (storagemarket.ClientDeal, error) { + deals, err := c.ListInProgressDeals(ctx) + if err != nil { + return storagemarket.ClientDeal{}, err + } + + for _, deal := range deals { + if deal.ProposalCid == cid { + return deal, nil + } + } + + return storagemarket.ClientDeal{}, xerrors.Errorf("couldn't find client deal") +} + +func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderInfo) (*storagemarket.StorageAsk, error) { + return c.QueryAsk(ctx, info.PeerID, info.Address) +} + +func (c *Client) ProposeStorageDeal(ctx context.Context, addr address.Address, info *storagemarket.StorageProviderInfo, payloadCid cid.Cid, proposalExpiration storagemarket.Epoch, duration storagemarket.Epoch, price storagemarket.TokenAmount, collateral storagemarket.TokenAmount) (*storagemarket.ProposeStorageDealResult, error) { + + proposal := ClientDealProposal{ + Data: payloadCid, + PricePerEpoch: types.BigInt(price), + ProposalExpiration: uint64(proposalExpiration), + Duration: uint64(duration), + Client: addr, + ProviderAddress: info.Address, + MinerWorker: info.Worker, + MinerID: info.PeerID, + } + + proposalCid, err := c.Start(ctx, proposal) + + result := &storagemarket.ProposeStorageDealResult{ + ProposalCid: proposalCid, + } + + return result, err +} + +func (c *Client) GetPaymentEscrow(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + + balance, err := c.node.GetBalance(ctx, addr) + + return balance, err +} + +func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + + return c.node.AddFunds(ctx, addr, amount) +} + +var _ storagemarket.StorageClient = &Client{} diff --git a/chain/deals/provider.go b/chain/deals/provider.go index 0b1fa3900..1a020de71 100644 --- a/chain/deals/provider.go +++ b/chain/deals/provider.go @@ -5,9 +5,10 @@ import ( "errors" "sync" - cid "github.com/ipfs/go-cid" - datastore "github.com/ipfs/go-datastore" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -20,8 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage" - "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) var ProviderDsPrefix = "/deals/provider" @@ -47,9 +47,7 @@ type Provider struct { ask *types.SignedStorageAsk askLk sync.Mutex - secb *sectorblocks.SectorBlocks - sminer *storage.Miner - full api.FullNode + spn storagemarket.StorageProviderNode // TODO: This will go away once storage market module + CAR // is implemented @@ -83,7 +81,7 @@ var ( ErrDataTransferFailed = errors.New("deal data transfer failed") ) -func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) { +func NewProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) { addr, err := ds.Get(datastore.NewKey("miner-address")) if err != nil { return nil, err @@ -94,11 +92,9 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks } h := &Provider{ - sminer: sminer, dag: dag, dataTransfer: dataTransfer, - full: fullNode, - secb: secb, + spn: spn, pricePerByteBlock: types.NewInt(3), // TODO: allow setting minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up)) @@ -135,9 +131,12 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks return h, nil } -func (p *Provider) Run(ctx context.Context) { +func (p *Provider) Run(ctx context.Context, host host.Host) { // TODO: restore state + host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream) + host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream) + go func() { defer log.Warn("quitting deal provider loop") defer close(p.stopped) @@ -162,7 +161,7 @@ func (p *Provider) onIncoming(deal MinerDeal) { if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil { // This can happen when client re-sends proposal - p.failDeal(deal.ProposalCid, err) + p.failDeal(context.TODO(), deal.ProposalCid, err) log.Errorf("deal tracking failed: %s", err) return } @@ -180,7 +179,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState]) if update.err != nil { log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err) - p.failDeal(update.id, update.err) + p.failDeal(ctx, update.id, update.err) return } var deal MinerDeal @@ -193,7 +192,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) { return nil }) if err != nil { - p.failDeal(update.id, err) + p.failDeal(ctx, update.id, err) return } diff --git a/chain/deals/provider_asks.go b/chain/deals/provider_asks.go index 1170eb28b..a0cdd9c71 100644 --- a/chain/deals/provider_asks.go +++ b/chain/deals/provider_asks.go @@ -5,13 +5,13 @@ import ( "context" "time" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/chain/stmgr" - "github.com/filecoin-project/lotus/chain/types" - datastore "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore" inet "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/chain/types" ) func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error { @@ -41,7 +41,7 @@ func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error { return p.saveAsk(ssa) } -func (p *Provider) getAsk(m address.Address) *types.SignedStorageAsk { +func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk { p.askLk.Lock() defer p.askLk.Unlock() if m != p.actor { @@ -69,7 +69,7 @@ func (p *Provider) HandleAskStream(s inet.Stream) { func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse { return &AskResponse{ - Ask: p.getAsk(ar.Miner), + Ask: p.GetAsk(ar.Miner), } } @@ -112,12 +112,12 @@ func (p *Provider) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error) return nil, err } - worker, err := p.getWorker(p.actor) + worker, err := p.spn.GetMinerWorker(context.TODO(), p.actor) if err != nil { return nil, xerrors.Errorf("failed to get worker to sign ask: %w", err) } - sig, err := p.full.WalletSign(context.TODO(), worker, b) + sig, err := p.spn.SignBytes(context.TODO(), worker, b) if err != nil { return nil, err } @@ -141,20 +141,3 @@ func (p *Provider) saveAsk(a *types.SignedStorageAsk) error { p.ask = a return nil } - -func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error { - tss := c.sm.ChainStore().GetHeaviestTipSet().ParentState() - - w, err := stmgr.GetMinerWorkerRaw(context.TODO(), c.sm, tss, ask.Ask.Miner) - if err != nil { - return xerrors.Errorf("failed to get worker for miner in ask", err) - } - - sigb, err := cborutil.Dump(ask.Ask) - if err != nil { - return xerrors.Errorf("failed to re-serialize ask") - } - - return ask.Signature.Verify(w, sigb) - -} diff --git a/chain/deals/provider_states.go b/chain/deals/provider_states.go index 4faa44973..76fa9a99b 100644 --- a/chain/deals/provider_states.go +++ b/chain/deals/provider_states.go @@ -1,21 +1,17 @@ package deals import ( - "bytes" "context" ipldfree "github.com/ipld/go-ipld-prime/impl/free" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" - unixfile "github.com/ipfs/go-unixfs/file" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/padreader" - "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) @@ -43,7 +39,7 @@ func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandle // ACCEPTED func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - head, err := p.full.ChainHead(ctx) + head, err := p.spn.MostRecentStateId(ctx) if err != nil { return nil, err } @@ -63,7 +59,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) } // check market funds - clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil) + clientMarketBalance, err := p.spn.GetBalance(ctx, deal.Proposal.Client) if err != nil { return nil, xerrors.Errorf("getting client market balance failed: %w", err) } @@ -74,59 +70,36 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) return nil, xerrors.New("clientMarketBalance.Available too small") } - waddr, err := p.full.StateMinerWorker(ctx, deal.Proposal.Provider, nil) + waddr, err := p.spn.GetMinerWorker(ctx, deal.Proposal.Provider) if err != nil { return nil, err } // TODO: check StorageCollateral (may be too large (or too small)) - if err := p.full.MarketEnsureAvailable(ctx, waddr, deal.Proposal.StorageCollateral); err != nil { + if err := p.spn.EnsureFunds(ctx, waddr, storagemarket.TokenAmount(deal.Proposal.StorageCollateral)); err != nil { return nil, err } - log.Info("publishing deal") - - params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{ - Deals: []actors.StorageDealProposal{deal.Proposal}, - }) - if err != nil { - return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err) + smDeal := storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + SectorID: deal.SectorID, } - // TODO: We may want this to happen after fetching data - smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{ - To: actors.StorageMarketAddress, - From: waddr, - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(1000000), - Method: actors.SMAMethods.PublishStorageDeals, - Params: params, - }) + dealId, mcid, err := p.spn.PublishDeals(ctx, smDeal) if err != nil { return nil, err } - r, err := p.full.StateWaitMsg(ctx, smsg.Cid()) - if err != nil { - return nil, err - } - if r.Receipt.ExitCode != 0 { - return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode) - } - var resp actors.PublishStorageDealResponse - if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { - return nil, err - } - if len(resp.DealIDs) != 1 { - return nil, xerrors.Errorf("got unexpected number of DealIDs from SMA") - } - log.Infof("fetching data for a deal %d", resp.DealIDs[0]) - err = p.sendSignedResponse(&Response{ + log.Infof("fetching data for a deal %d", dealId) + err = p.sendSignedResponse(ctx, &Response{ State: api.DealAccepted, - Proposal: deal.ProposalCid, - StorageDealSubmission: smsg, + Proposal: deal.ProposalCid, + PublishMessage: &mcid, }) if err != nil { return nil, err @@ -148,7 +121,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // (see onDataTransferEvent) _, err = p.dataTransfer.OpenPullDataChannel(ctx, deal.Client, - &StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: resp.DealIDs[0]}, + &StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: uint64(dealId)}, deal.Ref, allSelector, ) @@ -162,39 +135,23 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal) // STAGED func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { - root, err := p.dag.Get(ctx, deal.Ref) - if err != nil { - return nil, xerrors.Errorf("failed to get file root for deal: %s", err) - } + sectorID, err := p.spn.OnDealComplete( + ctx, + storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + DealID: deal.DealID, + }, + "", + ) - // TODO: abstract this away into ReadSizeCloser + implement different modes - n, err := unixfile.NewUnixfsFile(ctx, p.dag, root) if err != nil { - return nil, xerrors.Errorf("cannot open unixfs file: %s", err) + return nil, err } - uf, ok := n.(sectorblocks.UnixfsReader) - if !ok { - // we probably got directory, unsupported for now - return nil, xerrors.Errorf("unsupported unixfs file type") - } - - // TODO: uf.Size() is user input, not trusted - // This won't be useful / here after we migrate to putting CARs into sectors - size, err := uf.Size() - if err != nil { - return nil, xerrors.Errorf("getting unixfs file size: %w", err) - } - if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize { - return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") - } - - sectorID, err := p.secb.AddUnixfsPiece(ctx, uf, deal.DealID) - if err != nil { - return nil, xerrors.Errorf("AddPiece failed: %s", err) - } - log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID) - return func(deal *MinerDeal) { deal.SectorID = sectorID }, nil diff --git a/chain/deals/provider_storagemarket.go b/chain/deals/provider_storagemarket.go new file mode 100644 index 000000000..4d9863bd9 --- /dev/null +++ b/chain/deals/provider_storagemarket.go @@ -0,0 +1,65 @@ +package deals + +// this file implements storagemarket.StorageClient + +import ( + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" +) + +func (p *Provider) AddAsk(price storagemarket.TokenAmount, ttlsecs int64) error { + return p.SetPrice(types.BigInt(price), ttlsecs) +} + +func (p *Provider) ListAsks(addr address.Address) []*types.SignedStorageAsk { + ask := p.GetAsk(addr) + + if ask != nil { + return []*types.SignedStorageAsk{ask} + } + + return nil +} + +func (p *Provider) ListDeals(ctx context.Context) ([]actors.OnChainDeal, error) { + return p.spn.ListProviderDeals(ctx, p.actor) +} + +func (p *Provider) AddStorageCollateral(ctx context.Context, amount storagemarket.TokenAmount) error { + return p.spn.AddFunds(ctx, p.actor, amount) +} + +func (p *Provider) GetStorageCollateral(ctx context.Context) (storagemarket.Balance, error) { + balance, err := p.spn.GetBalance(ctx, p.actor) + + return balance, err +} + +func (p *Provider) ListIncompleteDeals() ([]storagemarket.MinerDeal, error) { + var out []storagemarket.MinerDeal + + var deals []MinerDeal + if err := p.deals.List(&deals); err != nil { + return nil, err + } + + for _, deal := range deals { + out = append(out, storagemarket.MinerDeal{ + Client: deal.Client, + Proposal: deal.Proposal, + ProposalCid: deal.ProposalCid, + State: deal.State, + Ref: deal.Ref, + DealID: deal.DealID, + SectorID: deal.SectorID, + }) + } + + return out, nil +} + +var _ storagemarket.StorageProvider = &Provider{} diff --git a/chain/deals/provider_utils.go b/chain/deals/provider_utils.go index e52268dde..54b838a43 100644 --- a/chain/deals/provider_utils.go +++ b/chain/deals/provider_utils.go @@ -4,24 +4,21 @@ import ( "context" "runtime" - datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/ipld/go-ipld-prime" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-statestore" - "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) -func (p *Provider) failDeal(id cid.Cid, cerr error) { +func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) { if err := p.deals.End(id); err != nil { log.Warnf("deals.End: %s", err) } @@ -33,7 +30,7 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) { log.Warnf("deal %s failed: %s", id, cerr) - err := p.sendSignedResponse(&Response{ + err := p.sendSignedResponse(ctx, &Response{ State: api.DealFailed, Message: cerr.Error(), Proposal: id, @@ -72,7 +69,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) { return } -func (p *Provider) sendSignedResponse(resp *Response) error { +func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error { s, ok := p.conns[resp.Proposal] if !ok { return xerrors.New("couldn't send response: not connected") @@ -83,12 +80,12 @@ func (p *Provider) sendSignedResponse(resp *Response) error { return xerrors.Errorf("serializing response: %w", err) } - worker, err := p.getWorker(p.actor) + worker, err := p.spn.GetMinerWorker(ctx, p.actor) if err != nil { return err } - sig, err := p.full.WalletSign(context.TODO(), worker, msg) + sig, err := p.spn.SignBytes(ctx, worker, msg) if err != nil { return xerrors.Errorf("failed to sign response message: %w", err) } @@ -118,24 +115,6 @@ func (p *Provider) disconnect(deal MinerDeal) error { return err } -func (p *Provider) getWorker(miner address.Address) (address.Address, error) { - getworker := &types.Message{ - To: miner, - From: miner, - Method: actors.MAMethods.GetWorkerAddr, - } - r, err := p.full.StateCall(context.TODO(), getworker, nil) - if err != nil { - return address.Undef, xerrors.Errorf("getting worker address: %w", err) - } - - if r.ExitCode != 0 { - return address.Undef, xerrors.Errorf("getWorker call failed: %d", r.ExitCode) - } - - return address.NewFromBytes(r.Return) -} - var _ datatransfer.RequestValidator = &ProviderRequestValidator{} // ProviderRequestValidator validates data transfer requests for the provider diff --git a/chain/deals/request_validation_test.go b/chain/deals/request_validation_test.go index f6e6a0e33..7607eacb5 100644 --- a/chain/deals/request_validation_test.go +++ b/chain/deals/request_validation_test.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storagemarket" ) var blockGenerator = blocksutil.NewBlockGenerator() @@ -74,12 +75,14 @@ func newClientDeal(minerID peer.ID, state api.DealState) (deals.ClientDeal, erro } return deals.ClientDeal{ - Proposal: newProposal, - ProposalCid: proposalNd.Cid(), - PayloadCid: blockGenerator.Next().Cid(), - Miner: minerID, - MinerWorker: minerAddr, - State: state, + ClientDeal: storagemarket.ClientDeal{ + Proposal: newProposal, + ProposalCid: proposalNd.Cid(), + PayloadCid: blockGenerator.Next().Cid(), + Miner: minerID, + MinerWorker: minerAddr, + State: state, + }, }, nil } diff --git a/chain/deals/types.go b/chain/deals/types.go index 4ad60a4bc..a35f537ab 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -4,12 +4,13 @@ import ( "bytes" "errors" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" ) var ( @@ -41,9 +42,6 @@ var ( DataTransferStates = []api.DealState{api.DealAccepted, api.DealUnknown} ) -const DealProtocolID = "/fil/storage/mk/1.0.1" -const AskProtocolID = "/fil/storage/ask/1.0.1" - type Proposal struct { DealProposal *actors.StorageDealProposal @@ -58,7 +56,7 @@ type Response struct { Proposal cid.Cid // DealAccepted - StorageDealSubmission *types.SignedMessage + PublishMessage *cid.Cid } // TODO: Do we actually need this to be signed? diff --git a/chain/types/cbor_gen.go b/chain/types/cbor_gen.go index 407f89809..73352df61 100644 --- a/chain/types/cbor_gen.go +++ b/chain/types/cbor_gen.go @@ -5,7 +5,7 @@ import ( "io" "math" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" xerrors "golang.org/x/xerrors" ) diff --git a/gen/main.go b/gen/main.go index 4f90dc236..0cb569148 100644 --- a/gen/main.go +++ b/gen/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/filecoin-project/lotus/storagemarket" "os" gen "github.com/whyrusleeping/cbor-gen" @@ -122,6 +123,15 @@ func main() { os.Exit(1) } + err = gen.WriteTupleEncodersToFile("./storagemarket/cbor_gen.go", "storagemarket", + storagemarket.ClientDeal{}, + storagemarket.MinerDeal{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = gen.WriteTupleEncodersToFile("./chain/deals/cbor_gen.go", "deals", deals.AskRequest{}, deals.AskResponse{}, diff --git a/node/builder.go b/node/builder.go index 8169d3829..dcce895b7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -47,6 +47,8 @@ import ( "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" + "github.com/filecoin-project/lotus/storagemarketadapter" ) // special is a type used to give keys to modules which @@ -227,7 +229,8 @@ func Online() Option { Override(new(dtypes.ClientDealStore), modules.NewClientDealStore), Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer), Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator), - Override(new(*deals.Client), deals.NewClient), + Override(new(storagemarket.StorageClient), deals.NewClient), + Override(new(storagemarket.StorageClientNode), storagemarketadapter.NewClientNodeAdapter), Override(RegisterClientValidatorKey, modules.RegisterClientValidator), Override(RunDealClientKey, modules.RunDealClient), @@ -250,7 +253,8 @@ func Online() Option { Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator), - Override(new(*deals.Provider), deals.NewProvider), + Override(new(storagemarket.StorageProvider), deals.NewProvider), + Override(new(storagemarket.StorageProviderNode), storagemarketadapter.NewProviderNodeAdapter), Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator), Override(HandleRetrievalKey, modules.HandleRetrieval), Override(GetParamsKey, modules.GetParams), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 57df600bd..58f250f67 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -27,13 +27,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" retrievalmarket "github.com/filecoin-project/lotus/retrieval" + "github.com/filecoin-project/lotus/storagemarket" ) type API struct { @@ -44,7 +44,7 @@ type API struct { full.WalletAPI paych.PaychAPI - DealClient *deals.Client + SMDealClient storagemarket.StorageClient RetDiscovery retrievalmarket.PeerResolver Retrieval retrievalmarket.RetrievalClient Chain *store.ChainStore @@ -72,28 +72,30 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, addr address.Ad if err != nil { return nil, xerrors.Errorf("failed getting miner worker: %w", err) } - - proposal := deals.ClientDealProposal{ - Data: data, - PricePerEpoch: epochPrice, - ProposalExpiration: math.MaxUint64, // TODO: set something reasonable - Duration: blocksDuration, - Client: addr, - ProviderAddress: miner, - MinerWorker: mw, - MinerID: pid, + providerInfo := storagemarket.StorageProviderInfo{ + Address: miner, + Worker: mw, + PeerID: pid, } + result, err := a.SMDealClient.ProposeStorageDeal( + ctx, + addr, + &providerInfo, + data, + storagemarket.Epoch(math.MaxUint64), + storagemarket.Epoch(blocksDuration), + storagemarket.TokenAmount(epochPrice), + storagemarket.TokenAmount(storagemarket.EmptyInt)) - c, err := a.DealClient.Start(ctx, proposal) if err != nil { return nil, xerrors.Errorf("failed to start deal: %w", err) } - return &c, nil + return &result.ProposalCid, nil } func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { - deals, err := a.DealClient.List() + deals, err := a.SMDealClient.ListInProgressDeals(ctx) if err != nil { return nil, err } @@ -117,10 +119,11 @@ func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { } func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) { - v, err := a.DealClient.GetDeal(d) + v, err := a.SMDealClient.GetInProgressDeal(ctx, d) if err != nil { return nil, err } + return &api.DealInfo{ ProposalCid: v.ProposalCid, State: v.State, @@ -315,5 +318,6 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path } func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) { - return a.DealClient.QueryAsk(ctx, p, miner) + info := storagemarket.StorageProviderInfo{Address: miner, PeerID: p} + return a.SMDealClient.GetAsk(ctx, info) } diff --git a/node/modules/services.go b/node/modules/services.go index 9949d0d69..bd0978085 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -10,7 +10,6 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/blocksync" - "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/node/hello" @@ -18,6 +17,7 @@ import ( "github.com/filecoin-project/lotus/peermgr" retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" + "github.com/filecoin-project/lotus/storagemarket" ) func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) { @@ -66,7 +66,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu go sub.HandleIncomingMessages(ctx, mpool, msgsub) } -func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) { +func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.StorageClient) { ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 2a897a9a3..d868414d9 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -40,6 +40,7 @@ import ( "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -127,14 +128,12 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva }) } -func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Provider) { +func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) { ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ OnStart: func(context.Context) error { - h.Run(ctx) - host.SetStreamHandler(deals.DealProtocolID, h.HandleStream) - host.SetStreamHandler(deals.AskProtocolID, h.HandleAskStream) + h.Run(ctx, host) return nil }, OnStop: func(context.Context) error { diff --git a/retrieval/impl/cbor_gen.go b/retrieval/impl/cbor_gen.go index f2778544c..220577089 100644 --- a/retrieval/impl/cbor_gen.go +++ b/retrieval/impl/cbor_gen.go @@ -21,7 +21,7 @@ func (t *RetParams) MarshalCBOR(w io.Writer) error { return err } - // t.Unixfs0 (retrieval.Unixfs0Offer) (struct) + // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) if err := t.Unixfs0.MarshalCBOR(w); err != nil { return err } @@ -43,7 +43,7 @@ func (t *RetParams) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Unixfs0 (retrieval.Unixfs0Offer) (struct) + // t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct) { @@ -124,7 +124,7 @@ func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error { return err } - // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) + // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { return err } @@ -156,7 +156,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) + // t.Status (retrievalimpl.OldQueryResponseStatus) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -166,7 +166,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("wrong type for uint64 field") } t.Status = OldQueryResponseStatus(extra) - // t.t.Size (uint64) (uint64) + // t.Size (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -267,7 +267,7 @@ func (t *OldDealProposal) MarshalCBOR(w io.Writer) error { return xerrors.Errorf("failed to write cid field t.Ref: %w", err) } - // t.Params (retrieval.RetParams) (struct) + // t.Params (retrievalimpl.RetParams) (struct) if err := t.Params.MarshalCBOR(w); err != nil { return err } @@ -310,7 +310,7 @@ func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error { t.Ref = c } - // t.Params (retrieval.RetParams) (struct) + // t.Params (retrievalimpl.RetParams) (struct) { diff --git a/retrievaladapter/provider.go b/retrievaladapter/provider.go index f8d188a95..06757593b 100644 --- a/retrievaladapter/provider.go +++ b/retrievaladapter/provider.go @@ -3,8 +3,8 @@ package retrievaladapter import ( "context" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) diff --git a/storagemarket/bigint.go b/storagemarket/bigint.go new file mode 100644 index 000000000..f36e242b0 --- /dev/null +++ b/storagemarket/bigint.go @@ -0,0 +1,241 @@ +// Copied from lotus until this can be extracted into shared types + +package storagemarket + +import ( + "encoding/json" + "fmt" + "io" + "math/big" + + "github.com/filecoin-project/lotus/build" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/polydawn/refmt/obj/atlas" + + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" +) + +const BigIntMaxSerializedLen = 128 // is this big enough? or too big? + +var TotalFilecoinInt = FromFil(build.TotalFilecoin) + +func init() { + cbor.RegisterCborType(atlas.BuildEntry(BigInt{}).Transform(). + TransformMarshal(atlas.MakeMarshalTransformFunc( + func(i BigInt) ([]byte, error) { + return i.cborBytes(), nil + })). + TransformUnmarshal(atlas.MakeUnmarshalTransformFunc( + func(x []byte) (BigInt, error) { + return fromCborBytes(x) + })). + Complete()) +} + +var EmptyInt = BigInt{} + +type BigInt struct { + *big.Int +} + +func NewInt(i uint64) BigInt { + return BigInt{big.NewInt(0).SetUint64(i)} +} + +func FromFil(i uint64) BigInt { + return BigMul(NewInt(i), NewInt(build.FilecoinPrecision)) +} + +func BigFromBytes(b []byte) BigInt { + i := big.NewInt(0).SetBytes(b) + return BigInt{i} +} + +func BigFromString(s string) (BigInt, error) { + v, ok := big.NewInt(0).SetString(s, 10) + if !ok { + return BigInt{}, fmt.Errorf("failed to parse string as a big int") + } + + return BigInt{v}, nil +} + +func BigMul(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Mul(a.Int, b.Int)} +} + +func BigDiv(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Div(a.Int, b.Int)} +} + +func BigMod(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Mod(a.Int, b.Int)} +} + +func BigAdd(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Add(a.Int, b.Int)} +} + +func BigSub(a, b BigInt) BigInt { + return BigInt{big.NewInt(0).Sub(a.Int, b.Int)} +} + +func BigCmp(a, b BigInt) int { + return a.Int.Cmp(b.Int) +} + +func (bi BigInt) Nil() bool { + return bi.Int == nil +} + +// LessThan returns true if bi < o +func (bi BigInt) LessThan(o BigInt) bool { + return BigCmp(bi, o) < 0 +} + +// GreaterThan returns true if bi > o +func (bi BigInt) GreaterThan(o BigInt) bool { + return BigCmp(bi, o) > 0 +} + +// Equals returns true if bi == o +func (bi BigInt) Equals(o BigInt) bool { + return BigCmp(bi, o) == 0 +} + +func (bi *BigInt) MarshalJSON() ([]byte, error) { + return json.Marshal(bi.String()) +} + +func (bi *BigInt) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + + i, ok := big.NewInt(0).SetString(s, 10) + if !ok { + if string(s) == "" { + return nil + } + return xerrors.Errorf("failed to parse bigint string: '%s'", string(b)) + } + + bi.Int = i + return nil +} + +func (bi *BigInt) Scan(value interface{}) error { + switch value := value.(type) { + case string: + i, ok := big.NewInt(0).SetString(value, 10) + if !ok { + if value == "" { + return nil + } + return xerrors.Errorf("failed to parse bigint string: '%s'", value) + } + + bi.Int = i + + return nil + case int64: + bi.Int = big.NewInt(value) + return nil + default: + return xerrors.Errorf("non-string types unsupported: %T", value) + } +} + +func (bi *BigInt) cborBytes() []byte { + if bi.Int == nil { + return []byte{} + } + + switch { + case bi.Sign() > 0: + return append([]byte{0}, bi.Bytes()...) + case bi.Sign() < 0: + return append([]byte{1}, bi.Bytes()...) + default: // bi.Sign() == 0: + return []byte{} + } +} + +func fromCborBytes(buf []byte) (BigInt, error) { + if len(buf) == 0 { + return NewInt(0), nil + } + + var negative bool + switch buf[0] { + case 0: + negative = false + case 1: + negative = true + default: + return EmptyInt, fmt.Errorf("big int prefix should be either 0 or 1, got %d", buf[0]) + } + + i := big.NewInt(0).SetBytes(buf[1:]) + if negative { + i.Neg(i) + } + + return BigInt{i}, nil +} + +func (bi *BigInt) MarshalCBOR(w io.Writer) error { + if bi.Int == nil { + zero := NewInt(0) + return zero.MarshalCBOR(w) + } + + enc := bi.cborBytes() + + header := cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(enc))) + if _, err := w.Write(header); err != nil { + return err + } + + if _, err := w.Write(enc); err != nil { + return err + } + + return nil +} + +func (bi *BigInt) UnmarshalCBOR(br io.Reader) error { + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + + if maj != cbg.MajByteString { + return fmt.Errorf("cbor input for fil big int was not a byte string (%x)", maj) + } + + if extra == 0 { + bi.Int = big.NewInt(0) + return nil + } + + if extra > BigIntMaxSerializedLen { + return fmt.Errorf("big integer byte array too long") + } + + buf := make([]byte, extra) + if _, err := io.ReadFull(br, buf); err != nil { + return err + } + + i, err := fromCborBytes(buf) + if err != nil { + return err + } + + *bi = i + + return nil +} diff --git a/storagemarket/cbor_gen.go b/storagemarket/cbor_gen.go new file mode 100644 index 000000000..e4a9efcfa --- /dev/null +++ b/storagemarket/cbor_gen.go @@ -0,0 +1,352 @@ +package storagemarket + +import ( + "fmt" + "io" + + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf + +func (t *ClientDeal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{136}); err != nil { + return err + } + + // t.ProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.ProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) + } + + // t.Proposal (actors.StorageDealProposal) (struct) + if err := t.Proposal.MarshalCBOR(w); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Miner (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Miner)); err != nil { + return err + } + + // t.MinerWorker (address.Address) (struct) + if err := t.MinerWorker.MarshalCBOR(w); err != nil { + return err + } + + // t.DealID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { + return err + } + + // t.PayloadCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.PayloadCid); err != nil { + return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err) + } + + // t.PublishMessage (cid.Cid) (struct) + + if t.PublishMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCid(w, *t.PublishMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err) + } + } + + return nil +} + +func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 8 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) + } + + t.ProposalCid = c + + } + // t.Proposal (actors.StorageDealProposal) (struct) + + { + + if err := t.Proposal.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Miner (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Miner = peer.ID(sval) + } + // t.MinerWorker (address.Address) (struct) + + { + + if err := t.MinerWorker.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.DealID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = uint64(extra) + // t.PayloadCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err) + } + + t.PayloadCid = c + + } + // t.PublishMessage (cid.Cid) (struct) + + { + + pb, err := br.PeekByte() + if err != nil { + return err + } + if pb == cbg.CborNull[0] { + var nbuf [1]byte + if _, err := br.Read(nbuf[:]); err != nil { + return err + } + } else { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err) + } + + t.PublishMessage = &c + } + + } + return nil +} + +func (t *MinerDeal) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{136}); err != nil { + return err + } + + // t.ProposalCid (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.ProposalCid); err != nil { + return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err) + } + + // t.Proposal (actors.StorageDealProposal) (struct) + if err := t.Proposal.MarshalCBOR(w); err != nil { + return err + } + + // t.Miner (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Miner)); err != nil { + return err + } + + // t.Client (peer.ID) (string) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Client)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Client)); err != nil { + return err + } + + // t.State (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil { + return err + } + + // t.Ref (cid.Cid) (struct) + + if err := cbg.WriteCid(w, t.Ref); err != nil { + return xerrors.Errorf("failed to write cid field t.Ref: %w", err) + } + + // t.DealID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil { + return err + } + + // t.SectorID (uint64) (uint64) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SectorID))); err != nil { + return err + } + return nil +} + +func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 8 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.ProposalCid (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err) + } + + t.ProposalCid = c + + } + // t.Proposal (actors.StorageDealProposal) (struct) + + { + + if err := t.Proposal.UnmarshalCBOR(br); err != nil { + return err + } + + } + // t.Miner (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Miner = peer.ID(sval) + } + // t.Client (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Client = peer.ID(sval) + } + // t.State (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.State = uint64(extra) + // t.Ref (cid.Cid) (struct) + + { + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.Ref: %w", err) + } + + t.Ref = c + + } + // t.DealID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.DealID = uint64(extra) + // t.SectorID (uint64) (uint64) + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.SectorID = uint64(extra) + return nil +} diff --git a/storagemarket/types.go b/storagemarket/types.go new file mode 100644 index 000000000..1497063c4 --- /dev/null +++ b/storagemarket/types.go @@ -0,0 +1,199 @@ +package storagemarket + +import ( + "context" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" +) + +const DealProtocolID = "/fil/storage/mk/1.0.1" +const AskProtocolID = "/fil/storage/ask/1.0.1" + +// type shims - used during migration into separate module +type Balance = actors.StorageParticipantBalance +type DealID uint64 +type Signature = types.Signature +type StorageDeal = actors.OnChainDeal +type StorageAsk = types.SignedStorageAsk +type StateKey = *types.TipSet +type Epoch uint64 +type TokenAmount BigInt + +// Duplicated from deals package for now +type MinerDeal struct { + ProposalCid cid.Cid + Proposal actors.StorageDealProposal + Miner peer.ID + Client peer.ID + State api.DealState + + Ref cid.Cid + + DealID uint64 + SectorID uint64 // Set when sm >= DealStaged +} + +type ClientDeal struct { + ProposalCid cid.Cid + Proposal actors.StorageDealProposal + State api.DealState + Miner peer.ID + MinerWorker address.Address + DealID uint64 + PayloadCid cid.Cid + PublishMessage *cid.Cid +} + +// The interface provided for storage providers +type StorageProvider interface { + Run(ctx context.Context, host host.Host) + + Stop() + + AddAsk(price TokenAmount, ttlsecs int64) error + + // ListAsks lists current asks + ListAsks(addr address.Address) []*StorageAsk + + // ListDeals lists on-chain deals associated with this provider + ListDeals(ctx context.Context) ([]StorageDeal, error) + + // ListIncompleteDeals lists deals that are in progress or rejected + ListIncompleteDeals() ([]MinerDeal, error) + + // AddStorageCollateral adds storage collateral + AddStorageCollateral(ctx context.Context, amount TokenAmount) error + + // GetStorageCollateral returns the current collateral balance + GetStorageCollateral(ctx context.Context) (Balance, error) +} + +// Node dependencies for a StorageProvider +type StorageProviderNode interface { + MostRecentStateId(ctx context.Context) (StateKey, error) + + // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. + AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // Ensures that a storage market participant has a certain amount of available funds + EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. + GetBalance(ctx context.Context, addr address.Address) (Balance, error) + + // Publishes deal on chain + PublishDeals(ctx context.Context, deal MinerDeal) (DealID, cid.Cid, error) + + // ListProviderDeals lists all deals associated with a storage provider + ListProviderDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // Called when a deal is complete and on chain, and data has been transferred and is ready to be added to a sector + // returns sector id + OnDealComplete(ctx context.Context, deal MinerDeal, piecePath string) (uint64, error) + + // returns the worker address associated with a miner + GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) + + // Signs bytes + SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) +} + +type DealSectorCommittedCallback func(error) + +// Node dependencies for a StorageClient +type StorageClientNode interface { + MostRecentStateId(ctx context.Context) (StateKey, error) + + // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. + AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error + + // GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients. + GetBalance(ctx context.Context, addr address.Address) (Balance, error) + + //// ListClientDeals lists all on-chain deals associated with a storage client + ListClientDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // GetProviderInfo returns information about a single storage provider + //GetProviderInfo(stateId StateID, addr Address) *StorageProviderInfo + + // GetStorageProviders returns information about known miners + ListStorageProviders(ctx context.Context) ([]*StorageProviderInfo, error) + + // Subscribes to storage market actor state changes for a given address. + // TODO: Should there be a timeout option for this? In the case that we are waiting for funds to be deposited and it never happens? + //SubscribeStorageMarketEvents(addr Address, handler StorageMarketEventHandler) (SubID, error) + + // Cancels a subscription + //UnsubscribeStorageMarketEvents(subId SubID) + ValidatePublishedDeal(ctx context.Context, deal ClientDeal) (uint64, error) + + // SignProposal signs a proposal + SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error + + GetDefaultWalletAddress(ctx context.Context) (address.Address, error) + + OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb DealSectorCommittedCallback) error + + ValidateAskSignature(ask *StorageAsk) error +} + +type StorageClientProofs interface { + //GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (CommP, error) +} + +// Closely follows the MinerInfo struct in the spec +type StorageProviderInfo struct { + Address address.Address // actor address + Owner address.Address + Worker address.Address // signs messages + SectorSize uint64 + PeerID peer.ID + // probably more like how much storage power, available collateral etc +} + +type ProposeStorageDealResult struct { + ProposalCid cid.Cid +} + +// The interface provided by the module to the outside world for storage clients. +type StorageClient interface { + Run(ctx context.Context) + + Stop() + + // ListProviders queries chain state and returns active storage providers + ListProviders(ctx context.Context) (<-chan StorageProviderInfo, error) + + // ListDeals lists on-chain deals associated with this provider + ListDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error) + + // ListInProgressDeals lists deals that are in progress or rejected + ListInProgressDeals(ctx context.Context) ([]ClientDeal, error) + + // ListInProgressDeals lists deals that are in progress or rejected + GetInProgressDeal(ctx context.Context, cid cid.Cid) (ClientDeal, error) + + // GetAsk returns the current ask for a storage provider + GetAsk(ctx context.Context, info StorageProviderInfo) (*StorageAsk, error) + + //// FindStorageOffers lists providers and queries them to find offers that satisfy some criteria based on price, duration, etc. + //FindStorageOffers(criteria AskCriteria, limit uint) []*StorageOffer + + // ProposeStorageDeal initiates deal negotiation with a Storage Provider + ProposeStorageDeal(ctx context.Context, addr address.Address, info *StorageProviderInfo, payloadCid cid.Cid, proposalExpiration Epoch, duration Epoch, price TokenAmount, collateral TokenAmount) (*ProposeStorageDealResult, error) + + // GetPaymentEscrow returns the current funds available for deal payment + GetPaymentEscrow(ctx context.Context, addr address.Address) (Balance, error) + + // AddStorageCollateral adds storage collateral + AddPaymentEscrow(ctx context.Context, addr address.Address, amount TokenAmount) error +} diff --git a/storagemarketadapter/client_adapter.go b/storagemarketadapter/client_adapter.go new file mode 100644 index 000000000..9e23771f5 --- /dev/null +++ b/storagemarketadapter/client_adapter.go @@ -0,0 +1,328 @@ +package storagemarketadapter + +// this file implements storagemarket.StorageClientNode + +import ( + "bytes" + "context" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/market" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/storagemarket" +) + +type ClientNodeAdapter struct { + full.StateAPI + full.ChainAPI + full.MpoolAPI + + sm *stmgr.StateManager + cs *store.ChainStore + fm *market.FundMgr + ev *events.Events +} + +type clientApi struct { + full.ChainAPI + full.StateAPI +} + +func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode { + return &ClientNodeAdapter{ + StateAPI: state, + ChainAPI: chain, + MpoolAPI: mpool, + + sm: sm, + cs: cs, + fm: fm, + ev: events.NewEvents(context.TODO(), &clientApi{chain, state}), + } +} + +func (n *ClientNodeAdapter) ListStorageProviders(ctx context.Context) ([]*storagemarket.StorageProviderInfo, error) { + ts, err := n.ChainHead(ctx) + if err != nil { + return nil, err + } + + addresses, err := n.StateListMiners(ctx, ts) + if err != nil { + return nil, err + } + + var out []*storagemarket.StorageProviderInfo + + for _, addr := range addresses { + workerAddr, err := n.StateMinerWorker(ctx, addr, ts) + if err != nil { + return nil, err + } + + sectorSize, err := n.StateMinerSectorSize(ctx, addr, ts) + if err != nil { + return nil, err + } + + peerId, err := n.StateMinerPeerID(ctx, addr, ts) + if err != nil { + return nil, err + } + + out = append(out, &storagemarket.StorageProviderInfo{ + Address: addr, + Worker: workerAddr, + SectorSize: sectorSize, + PeerID: peerId, + }) + } + + return out, nil +} + +func (n *ClientNodeAdapter) ListClientDeals(ctx context.Context, addr address.Address) ([]storagemarket.StorageDeal, error) { + allDeals, err := n.StateMarketDeals(ctx, nil) + if err != nil { + return nil, err + } + + var out []actors.OnChainDeal + + for _, deal := range allDeals { + if deal.Client == addr { + out = append(out, deal) + } + } + + return out, nil +} + +func (n *ClientNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) { + return n.ChainHead(ctx) +} + +// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. +func (n *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + // (Provider Node API) + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: addr, + Value: types.BigInt(amount), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.AddBalance, + }) + if err != nil { + return err + } + + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + + if r.Receipt.ExitCode != 0 { + return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode) + } + + return nil +} + +func (n *ClientNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + return n.fm.EnsureAvailable(ctx, addr, types.BigInt(amount)) +} + +func (n *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + bal, err := n.StateMarketBalance(ctx, addr, nil) + if err != nil { + return storagemarket.Balance{}, err + } + + return bal, nil +} + +// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal +// returns the Deal id if there is no error +func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (uint64, error) { + log.Infow("DEAL ACCEPTED!") + + pubmsg, err := c.cs.GetMessage(*deal.PublishMessage) + if err != nil { + return 0, xerrors.Errorf("getting deal pubsish message: %w", err) + } + + pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider) + if err != nil { + return 0, xerrors.Errorf("getting miner worker failed: %w", err) + } + + if pubmsg.From != pw { + return 0, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider) + } + + if pubmsg.To != actors.StorageMarketAddress { + return 0, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To) + } + + if pubmsg.Method != actors.SMAMethods.PublishStorageDeals { + return 0, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method) + } + + var params actors.PublishStorageDealsParams + if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { + return 0, err + } + + dealIdx := -1 + for i, storageDeal := range params.Deals { + // TODO: make it less hacky + sd := storageDeal + eq, err := cborutil.Equals(&deal.Proposal, &sd) + if err != nil { + return 0, err + } + if eq { + dealIdx = i + break + } + } + + if dealIdx == -1 { + return 0, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage) + } + + // TODO: timeout + _, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage) + if err != nil { + return 0, xerrors.Errorf("waiting for deal publish message: %w", err) + } + if ret.ExitCode != 0 { + return 0, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode) + } + + var res actors.PublishStorageDealResponse + if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil { + return 0, err + } + + return res.DealIDs[dealIdx], nil +} + +func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error { + checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { + sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts) + if err != nil { + // TODO: This may be fine for some errors + return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch > 0 { + cb(nil) + return true, false, nil + } + + return false, true, nil + } + + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { + defer func() { + if err != nil { + cb(xerrors.Errorf("handling applied event: %w", err)) + } + }() + + if msg == nil { + log.Error("timed out waiting for deal activation... what now?") + return false, nil + } + + sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts) + if err != nil { + return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + } + + if sd.ActivationEpoch == 0 { + return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealId, ts.ParentState(), ts.Height()) + } + + log.Infof("Storage deal %d activated at epoch %d", dealId, sd.ActivationEpoch) + + cb(nil) + + return false, nil + } + + revert := func(ctx context.Context, ts *types.TipSet) error { + log.Warn("deal activation reverted; TODO: actually handle this!") + // TODO: Just go back to DealSealing? + return nil + } + + matchEvent := func(msg *types.Message) (bool, error) { + if msg.To != provider { + return false, nil + } + + if msg.Method != actors.MAMethods.ProveCommitSector { + return false, nil + } + + var params actors.SectorProveCommitInfo + if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil { + return false, err + } + + var found bool + for _, dealID := range params.DealIDs { + if dealID == dealId { + found = true + break + } + } + + return found, nil + } + + if err := c.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil { + return xerrors.Errorf("failed to set up called handler") + } + + return nil +} + +func (n *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error { + return api.SignWith(ctx, n.Wallet.Sign, signer, proposal) +} + +func (n *ClientNodeAdapter) GetDefaultWalletAddress(ctx context.Context) (address.Address, error) { + return n.Wallet.GetDefault() +} + +func (n *ClientNodeAdapter) ValidateAskSignature(ask *types.SignedStorageAsk) error { + tss := n.cs.GetHeaviestTipSet().ParentState() + + w, err := stmgr.GetMinerWorkerRaw(context.TODO(), n.StateManager, tss, ask.Ask.Miner) + if err != nil { + return xerrors.Errorf("failed to get worker for miner in ask", err) + } + + sigb, err := cborutil.Dump(ask.Ask) + if err != nil { + return xerrors.Errorf("failed to re-serialize ask") + } + + return ask.Signature.Verify(w, sigb) +} + +var _ storagemarket.StorageClientNode = &ClientNodeAdapter{} diff --git a/storagemarketadapter/provider_adapter.go b/storagemarketadapter/provider_adapter.go new file mode 100644 index 000000000..0691bee36 --- /dev/null +++ b/storagemarketadapter/provider_adapter.go @@ -0,0 +1,196 @@ +package storagemarketadapter + +// this file implements storagemarket.StorageProviderNode + +import ( + "bytes" + "context" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + unixfile "github.com/ipfs/go-unixfs/file" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/padreader" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/filecoin-project/lotus/storagemarket" +) + +var log = logging.Logger("provideradapter") + +type ProviderNodeAdapter struct { + api.FullNode + + // this goes away with the data transfer module + dag dtypes.StagingDAG + + secb *sectorblocks.SectorBlocks +} + +func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { + return &ProviderNodeAdapter{ + FullNode: full, + dag: dag, + secb: secb, + } +} + +func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (storagemarket.DealID, cid.Cid, error) { + log.Info("publishing deal") + + worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, nil) + if err != nil { + return 0, cid.Undef, err + } + + params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{ + Deals: []actors.StorageDealProposal{deal.Proposal}, + }) + + if err != nil { + return 0, cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err) + } + + // TODO: We may want this to happen after fetching data + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: worker, + Value: types.NewInt(0), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.PublishStorageDeals, + Params: params, + }) + if err != nil { + return 0, cid.Undef, err + } + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return 0, cid.Undef, err + } + if r.Receipt.ExitCode != 0 { + return 0, cid.Undef, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode) + } + var resp actors.PublishStorageDealResponse + if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil { + return 0, cid.Undef, err + } + if len(resp.DealIDs) != 1 { + return 0, cid.Undef, xerrors.Errorf("got unexpected number of DealIDs from") + } + + return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil +} + +func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) { + root, err := n.dag.Get(ctx, deal.Ref) + if err != nil { + return 0, xerrors.Errorf("failed to get file root for deal: %s", err) + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + node, err := unixfile.NewUnixfsFile(ctx, n.dag, root) + if err != nil { + return 0, xerrors.Errorf("cannot open unixfs file: %s", err) + } + + uf, ok := node.(sectorblocks.UnixfsReader) + if !ok { + // we probably got directory, unsupported for now + return 0, xerrors.Errorf("unsupported unixfs file type") + } + + // TODO: uf.Size() is user input, not trusted + // This won't be useful / here after we migrate to putting CARs into sectors + size, err := uf.Size() + if err != nil { + return 0, xerrors.Errorf("getting unixfs file size: %w", err) + } + if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize { + return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size") + } + + sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID) + if err != nil { + return 0, xerrors.Errorf("AddPiece failed: %s", err) + } + log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID) + + return sectorID, nil +} + +func (n *ProviderNodeAdapter) ListProviderDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) { + allDeals, err := n.StateMarketDeals(ctx, nil) + if err != nil { + return nil, err + } + + var out []actors.OnChainDeal + + for _, deal := range allDeals { + if deal.Provider == addr { + out = append(out, deal) + } + } + + return out, nil +} + +func (n *ProviderNodeAdapter) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) { + return n.StateMinerWorker(ctx, miner, nil) +} + +func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) { + return n.WalletSign(ctx, signer, b) +} + +func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amt storagemarket.TokenAmount) error { + return n.MarketEnsureAvailable(ctx, addr, types.BigInt(amt)) +} + +func (n *ProviderNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) { + return n.ChainHead(ctx) +} + +// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. +func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error { + // (Provider Node API) + smsg, err := n.MpoolPushMessage(ctx, &types.Message{ + To: actors.StorageMarketAddress, + From: addr, + Value: types.BigInt(amount), + GasPrice: types.NewInt(0), + GasLimit: types.NewInt(1000000), + Method: actors.SMAMethods.AddBalance, + }) + if err != nil { + return err + } + + r, err := n.StateWaitMsg(ctx, smsg.Cid()) + if err != nil { + return err + } + + if r.Receipt.ExitCode != 0 { + return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode) + } + + return nil +} + +func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) { + bal, err := n.StateMarketBalance(ctx, addr, nil) + if err != nil { + return storagemarket.Balance{}, err + } + + return bal, nil +} + +var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}