Merge pull request #951 from filecoin-project/feat/storage-market-skeleton

Storage Market: Draw clean interfaces and node boundaries around storage market
This commit is contained in:
Łukasz Magiera 2020-01-10 03:58:39 +01:00 committed by GitHub
commit d6b0648610
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1684 additions and 522 deletions

View File

@ -219,10 +219,18 @@ func (t *Response) MarshalCBOR(w io.Writer) error {
return xerrors.Errorf("failed to write cid field t.Proposal: %w", err)
}
// t.StorageDealSubmission (types.SignedMessage) (struct)
if err := t.StorageDealSubmission.MarshalCBOR(w); err != nil {
return err
// t.PublishMessage (cid.Cid) (struct)
if t.PublishMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.PublishMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err)
}
}
return nil
}
@ -273,7 +281,7 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
t.Proposal = c
}
// t.StorageDealSubmission (types.SignedMessage) (struct)
// t.PublishMessage (cid.Cid) (struct)
{
@ -287,10 +295,13 @@ func (t *Response) UnmarshalCBOR(r io.Reader) error {
return err
}
} else {
t.StorageDealSubmission = new(types.SignedMessage)
if err := t.StorageDealSubmission.UnmarshalCBOR(br); err != nil {
return err
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err)
}
t.PublishMessage = &c
}
}
@ -526,56 +537,12 @@ func (t *ClientDeal) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{136}); err != nil {
if _, err := w.Write([]byte{129}); err != nil {
return err
}
// t.ProposalCid (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.ProposalCid); err != nil {
return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err)
}
// t.Proposal (actors.StorageDealProposal) (struct)
if err := t.Proposal.MarshalCBOR(w); err != nil {
return err
}
// t.State (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil {
return err
}
// t.Miner (peer.ID) (string)
if len(t.Miner) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.Miner was too long")
}
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Miner)); err != nil {
return err
}
// t.MinerWorker (address.Address) (struct)
if err := t.MinerWorker.MarshalCBOR(w); err != nil {
return err
}
// t.DealID (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
return err
}
// t.PayloadCid (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.PayloadCid); err != nil {
return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err)
}
// t.PublishMessage (types.SignedMessage) (struct)
if err := t.PublishMessage.MarshalCBOR(w); err != nil {
// t.ClientDeal (storagemarket.ClientDeal) (struct)
if err := t.ClientDeal.MarshalCBOR(w); err != nil {
return err
}
return nil
@ -592,102 +559,18 @@ func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 8 {
if extra != 1 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.ProposalCid (cid.Cid) (struct)
// t.ClientDeal (storagemarket.ClientDeal) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err)
}
t.ProposalCid = c
}
// t.Proposal (actors.StorageDealProposal) (struct)
{
if err := t.Proposal.UnmarshalCBOR(br); err != nil {
if err := t.ClientDeal.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.State (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.Miner (peer.ID) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Miner = peer.ID(sval)
}
// t.MinerWorker (address.Address) (struct)
{
if err := t.MinerWorker.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.DealID (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.PayloadCid (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err)
}
t.PayloadCid = c
}
// t.PublishMessage (types.SignedMessage) (struct)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
t.PublishMessage = new(types.SignedMessage)
if err := t.PublishMessage.UnmarshalCBOR(br); err != nil {
return err
}
}
}
return nil
}

View File

@ -15,39 +15,24 @@ import (
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storagemarket"
)
var log = logging.Logger("deals")
type ClientDeal struct {
ProposalCid cid.Cid
Proposal actors.StorageDealProposal
State api.DealState
Miner peer.ID
MinerWorker address.Address
DealID uint64
PayloadCid cid.Cid
PublishMessage *types.SignedMessage
storagemarket.ClientDeal
s inet.Stream
}
type Client struct {
sm *stmgr.StateManager
chain *store.ChainStore
h host.Host
w *wallet.Wallet
h host.Host
// dataTransfer
// TODO: once the data transfer module is complete, the
// client will listen to events on the data transfer module
@ -56,8 +41,8 @@ type Client struct {
dataTransfer dtypes.ClientDataTransfer
dag dtypes.ClientDAG
discovery *discovery.Local
events *events.Events
fm *market.FundMgr
node storagemarket.StorageClientNode
deals *statestore.StateStore
conns map[cid.Cid]inet.Stream
@ -76,22 +61,13 @@ type clientDealUpdate struct {
mut func(*ClientDeal)
}
type clientApi struct {
full.ChainAPI
full.StateAPI
}
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client {
func NewClient(h host.Host, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, deals dtypes.ClientDealStore, scn storagemarket.StorageClientNode) *Client {
c := &Client{
sm: sm,
chain: chain,
h: h,
w: w,
dataTransfer: dataTransfer,
dag: dag,
discovery: discovery,
fm: fm,
events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}),
node: scn,
deals: deals,
conns: map[cid.Cid]inet.Stream{},
@ -196,7 +172,8 @@ type ClientDealProposal struct {
}
func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) {
if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil {
amount := types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))
if err := c.node.EnsureFunds(ctx, p.Client, storagemarket.TokenAmount(amount)); err != nil {
return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err)
}
@ -216,7 +193,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
StorageCollateral: types.NewInt(uint64(pieceSize)), // TODO: real calc
}
if err := api.SignWith(ctx, c.w.Sign, p.Client, dealProposal); err != nil {
if err := c.node.SignProposal(ctx, p.Client, dealProposal); err != nil {
return cid.Undef, xerrors.Errorf("signing deal proposal failed: %w", err)
}
@ -225,7 +202,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("getting proposal node failed: %w", err)
}
s, err := c.h.NewStream(ctx, p.MinerID, DealProtocolID)
s, err := c.h.NewStream(ctx, p.MinerID, storagemarket.DealProtocolID)
if err != nil {
return cid.Undef, xerrors.Errorf("connecting to storage provider failed: %w", err)
}
@ -241,13 +218,16 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
}
deal := &ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: *dealProposal,
State: api.DealUnknown,
Miner: p.MinerID,
MinerWorker: p.MinerWorker,
PayloadCid: p.Data,
s: s,
ClientDeal: storagemarket.ClientDeal{
ProposalCid: proposalNd.Cid(),
Proposal: *dealProposal,
State: api.DealUnknown,
Miner: p.MinerID,
MinerWorker: p.MinerWorker,
PayloadCid: p.Data,
},
s: s,
}
c.incoming <- deal
@ -259,7 +239,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
}
func (c *Client) QueryAsk(ctx context.Context, p peer.ID, a address.Address) (*types.SignedStorageAsk, error) {
s, err := c.h.NewStream(ctx, p, AskProtocolID)
s, err := c.h.NewStream(ctx, p, storagemarket.AskProtocolID)
if err != nil {
return nil, xerrors.Errorf("failed to open stream to miner: %w", err)
}

View File

@ -1,16 +1,11 @@
package deals
import (
"bytes"
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
)
@ -57,70 +52,20 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) (func(*ClientDeal), e
}
return func(info *ClientDeal) {
info.PublishMessage = resp.StorageDealSubmission
info.PublishMessage = resp.PublishMessage
}, nil
}
func (c *Client) accepted(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
log.Infow("DEAL ACCEPTED!")
pubmsg := deal.PublishMessage.Message
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
dealId, err := c.node.ValidatePublishedDeal(ctx, deal.ClientDeal)
if err != nil {
return nil, xerrors.Errorf("getting miner worker failed: %w", err)
}
if pubmsg.From != pw {
return nil, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
}
if pubmsg.To != actors.StorageMarketAddress {
return nil, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
}
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
return nil, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
}
var params actors.PublishStorageDealsParams
if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil {
return nil, err
}
dealIdx := -1
for i, storageDeal := range params.Deals {
// TODO: make it less hacky
sd := storageDeal
eq, err := cborutil.Equals(&deal.Proposal, &sd)
if err != nil {
return nil, err
}
if eq {
dealIdx = i
break
}
}
if dealIdx == -1 {
return nil, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage.Cid())
}
// TODO: timeout
_, ret, err := c.sm.WaitForMessage(ctx, deal.PublishMessage.Cid())
if err != nil {
return nil, xerrors.Errorf("waiting for deal publish message: %w", err)
}
if ret.ExitCode != 0 {
return nil, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
}
var res actors.PublishStorageDealResponse
if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil {
return nil, err
}
return func(info *ClientDeal) {
info.DealID = res.DealIDs[dealIdx]
info.DealID = dealId
}, nil
}
@ -131,103 +76,22 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) (func(*ClientDeal)
}
func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal), error) {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch > 0 {
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
}:
case <-c.stop:
}
return true, false, nil
}
return false, true, nil
}
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
defer func() {
if err != nil {
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
err: xerrors.Errorf("handling applied event: %w", err),
}:
case <-c.stop:
}
}
}()
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
sd, err := stmgr.GetStorageDeal(ctx, c.sm, deal.DealID, ts)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch == 0 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", deal.DealID, ts.ParentState(), ts.Height())
}
log.Infof("Storage deal %d activated at epoch %d", deal.DealID, sd.ActivationEpoch)
cb := func(err error) {
select {
case c.updated <- clientDealUpdate{
newState: api.DealComplete,
id: deal.ProposalCid,
err: err,
}:
case <-c.stop:
}
return false, nil
}
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
}
err := c.node.OnDealSectorCommitted(ctx, deal.Proposal.Provider, deal.DealID, cb)
matchEvent := func(msg *types.Message) (bool, error) {
if msg.To != deal.Proposal.Provider {
return false, nil
}
if msg.Method != actors.MAMethods.ProveCommitSector {
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, dealID := range params.DealIDs {
if dealID == deal.DealID {
found = true
break
}
}
return found, nil
}
if err := c.events.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
return nil, xerrors.Errorf("failed to set up called handler")
}
return nil, nil
return nil, err
}
func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error {
return c.node.ValidateAskSignature(ask)
}

View File

@ -0,0 +1,118 @@
package deals
// this file implements storagemarket.StorageClient
import (
"context"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storagemarket"
)
func (c *Client) ListProviders(ctx context.Context) (<-chan storagemarket.StorageProviderInfo, error) {
providers, err := c.node.ListStorageProviders(ctx)
if err != nil {
return nil, err
}
out := make(chan storagemarket.StorageProviderInfo)
go func() {
for _, p := range providers {
select {
case out <- *p:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (c *Client) ListDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) {
return c.node.ListClientDeals(ctx, addr)
}
func (c *Client) ListInProgressDeals(ctx context.Context) ([]storagemarket.ClientDeal, error) {
deals, err := c.List()
if err != nil {
return nil, err
}
out := make([]storagemarket.ClientDeal, len(deals))
for k, v := range deals {
out[k] = storagemarket.ClientDeal{
ProposalCid: v.ProposalCid,
Proposal: v.Proposal,
State: v.State,
Miner: v.Miner,
MinerWorker: v.MinerWorker,
DealID: v.DealID,
PublishMessage: v.PublishMessage,
}
}
return out, nil
}
func (c *Client) GetInProgressDeal(ctx context.Context, cid cid.Cid) (storagemarket.ClientDeal, error) {
deals, err := c.ListInProgressDeals(ctx)
if err != nil {
return storagemarket.ClientDeal{}, err
}
for _, deal := range deals {
if deal.ProposalCid == cid {
return deal, nil
}
}
return storagemarket.ClientDeal{}, xerrors.Errorf("couldn't find client deal")
}
func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderInfo) (*storagemarket.StorageAsk, error) {
return c.QueryAsk(ctx, info.PeerID, info.Address)
}
func (c *Client) ProposeStorageDeal(ctx context.Context, addr address.Address, info *storagemarket.StorageProviderInfo, payloadCid cid.Cid, proposalExpiration storagemarket.Epoch, duration storagemarket.Epoch, price storagemarket.TokenAmount, collateral storagemarket.TokenAmount) (*storagemarket.ProposeStorageDealResult, error) {
proposal := ClientDealProposal{
Data: payloadCid,
PricePerEpoch: types.BigInt(price),
ProposalExpiration: uint64(proposalExpiration),
Duration: uint64(duration),
Client: addr,
ProviderAddress: info.Address,
MinerWorker: info.Worker,
MinerID: info.PeerID,
}
proposalCid, err := c.Start(ctx, proposal)
result := &storagemarket.ProposeStorageDealResult{
ProposalCid: proposalCid,
}
return result, err
}
func (c *Client) GetPaymentEscrow(ctx context.Context, addr address.Address) (storagemarket.Balance, error) {
balance, err := c.node.GetBalance(ctx, addr)
return balance, err
}
func (c *Client) AddPaymentEscrow(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error {
return c.node.AddFunds(ctx, addr, amount)
}
var _ storagemarket.StorageClient = &Client{}

View File

@ -5,9 +5,10 @@ import (
"errors"
"sync"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
@ -20,8 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
)
var ProviderDsPrefix = "/deals/provider"
@ -47,9 +47,7 @@ type Provider struct {
ask *types.SignedStorageAsk
askLk sync.Mutex
secb *sectorblocks.SectorBlocks
sminer *storage.Miner
full api.FullNode
spn storagemarket.StorageProviderNode
// TODO: This will go away once storage market module + CAR
// is implemented
@ -83,7 +81,7 @@ var (
ErrDataTransferFailed = errors.New("deal data transfer failed")
)
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, fullNode api.FullNode) (*Provider, error) {
func NewProvider(ds dtypes.MetadataDS, dag dtypes.StagingDAG, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode) (storagemarket.StorageProvider, error) {
addr, err := ds.Get(datastore.NewKey("miner-address"))
if err != nil {
return nil, err
@ -94,11 +92,9 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
}
h := &Provider{
sminer: sminer,
dag: dag,
dataTransfer: dataTransfer,
full: fullNode,
secb: secb,
spn: spn,
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
minPieceSize: 256, // TODO: allow setting (BUT KEEP MIN 256! (because of how we fill sectors up))
@ -135,9 +131,12 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks
return h, nil
}
func (p *Provider) Run(ctx context.Context) {
func (p *Provider) Run(ctx context.Context, host host.Host) {
// TODO: restore state
host.SetStreamHandler(storagemarket.DealProtocolID, p.HandleStream)
host.SetStreamHandler(storagemarket.AskProtocolID, p.HandleAskStream)
go func() {
defer log.Warn("quitting deal provider loop")
defer close(p.stopped)
@ -162,7 +161,7 @@ func (p *Provider) onIncoming(deal MinerDeal) {
if err := p.deals.Begin(deal.ProposalCid, &deal); err != nil {
// This can happen when client re-sends proposal
p.failDeal(deal.ProposalCid, err)
p.failDeal(context.TODO(), deal.ProposalCid, err)
log.Errorf("deal tracking failed: %s", err)
return
}
@ -180,7 +179,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState])
if update.err != nil {
log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err)
p.failDeal(update.id, update.err)
p.failDeal(ctx, update.id, update.err)
return
}
var deal MinerDeal
@ -193,7 +192,7 @@ func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
return nil
})
if err != nil {
p.failDeal(update.id, err)
p.failDeal(ctx, update.id, err)
return
}

View File

@ -5,13 +5,13 @@ import (
"context"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
datastore "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore"
inet "github.com/libp2p/go-libp2p-core/network"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/chain/types"
)
func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error {
@ -41,7 +41,7 @@ func (p *Provider) SetPrice(price types.BigInt, ttlsecs int64) error {
return p.saveAsk(ssa)
}
func (p *Provider) getAsk(m address.Address) *types.SignedStorageAsk {
func (p *Provider) GetAsk(m address.Address) *types.SignedStorageAsk {
p.askLk.Lock()
defer p.askLk.Unlock()
if m != p.actor {
@ -69,7 +69,7 @@ func (p *Provider) HandleAskStream(s inet.Stream) {
func (p *Provider) processAskRequest(ar *AskRequest) *AskResponse {
return &AskResponse{
Ask: p.getAsk(ar.Miner),
Ask: p.GetAsk(ar.Miner),
}
}
@ -112,12 +112,12 @@ func (p *Provider) signAsk(a *types.StorageAsk) (*types.SignedStorageAsk, error)
return nil, err
}
worker, err := p.getWorker(p.actor)
worker, err := p.spn.GetMinerWorker(context.TODO(), p.actor)
if err != nil {
return nil, xerrors.Errorf("failed to get worker to sign ask: %w", err)
}
sig, err := p.full.WalletSign(context.TODO(), worker, b)
sig, err := p.spn.SignBytes(context.TODO(), worker, b)
if err != nil {
return nil, err
}
@ -141,20 +141,3 @@ func (p *Provider) saveAsk(a *types.SignedStorageAsk) error {
p.ask = a
return nil
}
func (c *Client) checkAskSignature(ask *types.SignedStorageAsk) error {
tss := c.sm.ChainStore().GetHeaviestTipSet().ParentState()
w, err := stmgr.GetMinerWorkerRaw(context.TODO(), c.sm, tss, ask.Ask.Miner)
if err != nil {
return xerrors.Errorf("failed to get worker for miner in ask", err)
}
sigb, err := cborutil.Dump(ask.Ask)
if err != nil {
return xerrors.Errorf("failed to re-serialize ask")
}
return ask.Signature.Verify(w, sigb)
}

View File

@ -1,21 +1,17 @@
package deals
import (
"bytes"
"context"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
)
type providerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
@ -43,7 +39,7 @@ func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandle
// ACCEPTED
func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
head, err := p.full.ChainHead(ctx)
head, err := p.spn.MostRecentStateId(ctx)
if err != nil {
return nil, err
}
@ -63,7 +59,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
}
// check market funds
clientMarketBalance, err := p.full.StateMarketBalance(ctx, deal.Proposal.Client, nil)
clientMarketBalance, err := p.spn.GetBalance(ctx, deal.Proposal.Client)
if err != nil {
return nil, xerrors.Errorf("getting client market balance failed: %w", err)
}
@ -74,59 +70,36 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
return nil, xerrors.New("clientMarketBalance.Available too small")
}
waddr, err := p.full.StateMinerWorker(ctx, deal.Proposal.Provider, nil)
waddr, err := p.spn.GetMinerWorker(ctx, deal.Proposal.Provider)
if err != nil {
return nil, err
}
// TODO: check StorageCollateral (may be too large (or too small))
if err := p.full.MarketEnsureAvailable(ctx, waddr, deal.Proposal.StorageCollateral); err != nil {
if err := p.spn.EnsureFunds(ctx, waddr, storagemarket.TokenAmount(deal.Proposal.StorageCollateral)); err != nil {
return nil, err
}
log.Info("publishing deal")
params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{
Deals: []actors.StorageDealProposal{deal.Proposal},
})
if err != nil {
return nil, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err)
smDeal := storagemarket.MinerDeal{
Client: deal.Client,
Proposal: deal.Proposal,
ProposalCid: deal.ProposalCid,
State: deal.State,
Ref: deal.Ref,
SectorID: deal.SectorID,
}
// TODO: We may want this to happen after fetching data
smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: waddr,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.PublishStorageDeals,
Params: params,
})
dealId, mcid, err := p.spn.PublishDeals(ctx, smDeal)
if err != nil {
return nil, err
}
r, err := p.full.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, err
}
if r.Receipt.ExitCode != 0 {
return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return nil, err
}
if len(resp.DealIDs) != 1 {
return nil, xerrors.Errorf("got unexpected number of DealIDs from SMA")
}
log.Infof("fetching data for a deal %d", resp.DealIDs[0])
err = p.sendSignedResponse(&Response{
log.Infof("fetching data for a deal %d", dealId)
err = p.sendSignedResponse(ctx, &Response{
State: api.DealAccepted,
Proposal: deal.ProposalCid,
StorageDealSubmission: smsg,
Proposal: deal.ProposalCid,
PublishMessage: &mcid,
})
if err != nil {
return nil, err
@ -148,7 +121,7 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// (see onDataTransferEvent)
_, err = p.dataTransfer.OpenPullDataChannel(ctx,
deal.Client,
&StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: resp.DealIDs[0]},
&StorageDataTransferVoucher{Proposal: deal.ProposalCid, DealID: uint64(dealId)},
deal.Ref,
allSelector,
)
@ -162,39 +135,23 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
// STAGED
func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
root, err := p.dag.Get(ctx, deal.Ref)
if err != nil {
return nil, xerrors.Errorf("failed to get file root for deal: %s", err)
}
sectorID, err := p.spn.OnDealComplete(
ctx,
storagemarket.MinerDeal{
Client: deal.Client,
Proposal: deal.Proposal,
ProposalCid: deal.ProposalCid,
State: deal.State,
Ref: deal.Ref,
DealID: deal.DealID,
},
"",
)
// TODO: abstract this away into ReadSizeCloser + implement different modes
n, err := unixfile.NewUnixfsFile(ctx, p.dag, root)
if err != nil {
return nil, xerrors.Errorf("cannot open unixfs file: %s", err)
return nil, err
}
uf, ok := n.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
return nil, xerrors.Errorf("unsupported unixfs file type")
}
// TODO: uf.Size() is user input, not trusted
// This won't be useful / here after we migrate to putting CARs into sectors
size, err := uf.Size()
if err != nil {
return nil, xerrors.Errorf("getting unixfs file size: %w", err)
}
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
return nil, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
}
sectorID, err := p.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID)
return func(deal *MinerDeal) {
deal.SectorID = sectorID
}, nil

View File

@ -0,0 +1,65 @@
package deals
// this file implements storagemarket.StorageClient
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storagemarket"
)
func (p *Provider) AddAsk(price storagemarket.TokenAmount, ttlsecs int64) error {
return p.SetPrice(types.BigInt(price), ttlsecs)
}
func (p *Provider) ListAsks(addr address.Address) []*types.SignedStorageAsk {
ask := p.GetAsk(addr)
if ask != nil {
return []*types.SignedStorageAsk{ask}
}
return nil
}
func (p *Provider) ListDeals(ctx context.Context) ([]actors.OnChainDeal, error) {
return p.spn.ListProviderDeals(ctx, p.actor)
}
func (p *Provider) AddStorageCollateral(ctx context.Context, amount storagemarket.TokenAmount) error {
return p.spn.AddFunds(ctx, p.actor, amount)
}
func (p *Provider) GetStorageCollateral(ctx context.Context) (storagemarket.Balance, error) {
balance, err := p.spn.GetBalance(ctx, p.actor)
return balance, err
}
func (p *Provider) ListIncompleteDeals() ([]storagemarket.MinerDeal, error) {
var out []storagemarket.MinerDeal
var deals []MinerDeal
if err := p.deals.List(&deals); err != nil {
return nil, err
}
for _, deal := range deals {
out = append(out, storagemarket.MinerDeal{
Client: deal.Client,
Proposal: deal.Proposal,
ProposalCid: deal.ProposalCid,
State: deal.State,
Ref: deal.Ref,
DealID: deal.DealID,
SectorID: deal.SectorID,
})
}
return out, nil
}
var _ storagemarket.StorageProvider = &Provider{}

View File

@ -4,24 +4,21 @@ import (
"context"
"runtime"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipld/go-ipld-prime"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func (p *Provider) failDeal(id cid.Cid, cerr error) {
func (p *Provider) failDeal(ctx context.Context, id cid.Cid, cerr error) {
if err := p.deals.End(id); err != nil {
log.Warnf("deals.End: %s", err)
}
@ -33,7 +30,7 @@ func (p *Provider) failDeal(id cid.Cid, cerr error) {
log.Warnf("deal %s failed: %s", id, cerr)
err := p.sendSignedResponse(&Response{
err := p.sendSignedResponse(ctx, &Response{
State: api.DealFailed,
Message: cerr.Error(),
Proposal: id,
@ -72,7 +69,7 @@ func (p *Provider) readProposal(s inet.Stream) (proposal Proposal, err error) {
return
}
func (p *Provider) sendSignedResponse(resp *Response) error {
func (p *Provider) sendSignedResponse(ctx context.Context, resp *Response) error {
s, ok := p.conns[resp.Proposal]
if !ok {
return xerrors.New("couldn't send response: not connected")
@ -83,12 +80,12 @@ func (p *Provider) sendSignedResponse(resp *Response) error {
return xerrors.Errorf("serializing response: %w", err)
}
worker, err := p.getWorker(p.actor)
worker, err := p.spn.GetMinerWorker(ctx, p.actor)
if err != nil {
return err
}
sig, err := p.full.WalletSign(context.TODO(), worker, msg)
sig, err := p.spn.SignBytes(ctx, worker, msg)
if err != nil {
return xerrors.Errorf("failed to sign response message: %w", err)
}
@ -118,24 +115,6 @@ func (p *Provider) disconnect(deal MinerDeal) error {
return err
}
func (p *Provider) getWorker(miner address.Address) (address.Address, error) {
getworker := &types.Message{
To: miner,
From: miner,
Method: actors.MAMethods.GetWorkerAddr,
}
r, err := p.full.StateCall(context.TODO(), getworker, nil)
if err != nil {
return address.Undef, xerrors.Errorf("getting worker address: %w", err)
}
if r.ExitCode != 0 {
return address.Undef, xerrors.Errorf("getWorker call failed: %d", r.ExitCode)
}
return address.NewFromBytes(r.Return)
}
var _ datatransfer.RequestValidator = &ProviderRequestValidator{}
// ProviderRequestValidator validates data transfer requests for the provider

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storagemarket"
)
var blockGenerator = blocksutil.NewBlockGenerator()
@ -74,12 +75,14 @@ func newClientDeal(minerID peer.ID, state api.DealState) (deals.ClientDeal, erro
}
return deals.ClientDeal{
Proposal: newProposal,
ProposalCid: proposalNd.Cid(),
PayloadCid: blockGenerator.Next().Cid(),
Miner: minerID,
MinerWorker: minerAddr,
State: state,
ClientDeal: storagemarket.ClientDeal{
Proposal: newProposal,
ProposalCid: proposalNd.Cid(),
PayloadCid: blockGenerator.Next().Cid(),
Miner: minerID,
MinerWorker: minerAddr,
State: state,
},
}, nil
}

View File

@ -4,12 +4,13 @@ import (
"bytes"
"errors"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
var (
@ -41,9 +42,6 @@ var (
DataTransferStates = []api.DealState{api.DealAccepted, api.DealUnknown}
)
const DealProtocolID = "/fil/storage/mk/1.0.1"
const AskProtocolID = "/fil/storage/ask/1.0.1"
type Proposal struct {
DealProposal *actors.StorageDealProposal
@ -58,7 +56,7 @@ type Response struct {
Proposal cid.Cid
// DealAccepted
StorageDealSubmission *types.SignedMessage
PublishMessage *cid.Cid
}
// TODO: Do we actually need this to be signed?

View File

@ -5,7 +5,7 @@ import (
"io"
"math"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)

View File

@ -2,6 +2,7 @@ package main
import (
"fmt"
"github.com/filecoin-project/lotus/storagemarket"
"os"
gen "github.com/whyrusleeping/cbor-gen"
@ -122,6 +123,15 @@ func main() {
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./storagemarket/cbor_gen.go", "storagemarket",
storagemarket.ClientDeal{},
storagemarket.MinerDeal{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./chain/deals/cbor_gen.go", "deals",
deals.AskRequest{},
deals.AskResponse{},

View File

@ -47,6 +47,8 @@ import (
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
"github.com/filecoin-project/lotus/storagemarketadapter"
)
// special is a type used to give keys to modules which
@ -227,7 +229,8 @@ func Online() Option {
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
Override(new(*deals.Client), deals.NewClient),
Override(new(storagemarket.StorageClient), deals.NewClient),
Override(new(storagemarket.StorageClientNode), storagemarketadapter.NewClientNodeAdapter),
Override(RegisterClientValidatorKey, modules.RegisterClientValidator),
Override(RunDealClientKey, modules.RunDealClient),
@ -250,7 +253,8 @@ func Online() Option {
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
Override(new(*deals.Provider), deals.NewProvider),
Override(new(storagemarket.StorageProvider), deals.NewProvider),
Override(new(storagemarket.StorageProviderNode), storagemarketadapter.NewProviderNodeAdapter),
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),
Override(HandleRetrievalKey, modules.HandleRetrieval),
Override(GetParamsKey, modules.GetParams),

View File

@ -27,13 +27,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/storagemarket"
)
type API struct {
@ -44,7 +44,7 @@ type API struct {
full.WalletAPI
paych.PaychAPI
DealClient *deals.Client
SMDealClient storagemarket.StorageClient
RetDiscovery retrievalmarket.PeerResolver
Retrieval retrievalmarket.RetrievalClient
Chain *store.ChainStore
@ -72,28 +72,30 @@ func (a *API) ClientStartDeal(ctx context.Context, data cid.Cid, addr address.Ad
if err != nil {
return nil, xerrors.Errorf("failed getting miner worker: %w", err)
}
proposal := deals.ClientDealProposal{
Data: data,
PricePerEpoch: epochPrice,
ProposalExpiration: math.MaxUint64, // TODO: set something reasonable
Duration: blocksDuration,
Client: addr,
ProviderAddress: miner,
MinerWorker: mw,
MinerID: pid,
providerInfo := storagemarket.StorageProviderInfo{
Address: miner,
Worker: mw,
PeerID: pid,
}
result, err := a.SMDealClient.ProposeStorageDeal(
ctx,
addr,
&providerInfo,
data,
storagemarket.Epoch(math.MaxUint64),
storagemarket.Epoch(blocksDuration),
storagemarket.TokenAmount(epochPrice),
storagemarket.TokenAmount(storagemarket.EmptyInt))
c, err := a.DealClient.Start(ctx, proposal)
if err != nil {
return nil, xerrors.Errorf("failed to start deal: %w", err)
}
return &c, nil
return &result.ProposalCid, nil
}
func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.DealClient.List()
deals, err := a.SMDealClient.ListInProgressDeals(ctx)
if err != nil {
return nil, err
}
@ -117,10 +119,11 @@ func (a *API) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
}
func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo, error) {
v, err := a.DealClient.GetDeal(d)
v, err := a.SMDealClient.GetInProgressDeal(ctx, d)
if err != nil {
return nil, err
}
return &api.DealInfo{
ProposalCid: v.ProposalCid,
State: v.State,
@ -315,5 +318,6 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
return a.DealClient.QueryAsk(ctx, p, miner)
info := storagemarket.StorageProviderInfo{Address: miner, PeerID: p}
return a.SMDealClient.GetAsk(ctx, info)
}

View File

@ -10,7 +10,6 @@ import (
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/node/hello"
@ -18,6 +17,7 @@ import (
"github.com/filecoin-project/lotus/peermgr"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storagemarket"
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) {
@ -66,7 +66,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) {
func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.StorageClient) {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{

View File

@ -40,6 +40,7 @@ import (
"github.com/filecoin-project/lotus/retrievaladapter"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
)
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
@ -127,14 +128,12 @@ func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.Retrieva
})
}
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Provider) {
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h storagemarket.StorageProvider) {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
h.Run(ctx)
host.SetStreamHandler(deals.DealProtocolID, h.HandleStream)
host.SetStreamHandler(deals.AskProtocolID, h.HandleAskStream)
h.Run(ctx, host)
return nil
},
OnStop: func(context.Context) error {

View File

@ -21,7 +21,7 @@ func (t *RetParams) MarshalCBOR(w io.Writer) error {
return err
}
// t.Unixfs0 (retrieval.Unixfs0Offer) (struct)
// t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct)
if err := t.Unixfs0.MarshalCBOR(w); err != nil {
return err
}
@ -43,7 +43,7 @@ func (t *RetParams) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Unixfs0 (retrieval.Unixfs0Offer) (struct)
// t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct)
{
@ -124,7 +124,7 @@ func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error {
return err
}
// t.t.Status (retrieval.OldQueryResponseStatus) (uint64)
// t.Status (retrievalimpl.OldQueryResponseStatus) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
return err
}
@ -156,7 +156,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.t.Status (retrieval.OldQueryResponseStatus) (uint64)
// t.Status (retrievalimpl.OldQueryResponseStatus) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
@ -166,7 +166,7 @@ func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("wrong type for uint64 field")
}
t.Status = OldQueryResponseStatus(extra)
// t.t.Size (uint64) (uint64)
// t.Size (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
@ -267,7 +267,7 @@ func (t *OldDealProposal) MarshalCBOR(w io.Writer) error {
return xerrors.Errorf("failed to write cid field t.Ref: %w", err)
}
// t.Params (retrieval.RetParams) (struct)
// t.Params (retrievalimpl.RetParams) (struct)
if err := t.Params.MarshalCBOR(w); err != nil {
return err
}
@ -310,7 +310,7 @@ func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error {
t.Ref = c
}
// t.Params (retrieval.RetParams) (struct)
// t.Params (retrievalimpl.RetParams) (struct)
{

View File

@ -3,8 +3,8 @@ package retrievaladapter
import (
"context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
)

241
storagemarket/bigint.go Normal file
View File

@ -0,0 +1,241 @@
// Copied from lotus until this can be extracted into shared types
package storagemarket
import (
"encoding/json"
"fmt"
"io"
"math/big"
"github.com/filecoin-project/lotus/build"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/polydawn/refmt/obj/atlas"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
const BigIntMaxSerializedLen = 128 // is this big enough? or too big?
var TotalFilecoinInt = FromFil(build.TotalFilecoin)
func init() {
cbor.RegisterCborType(atlas.BuildEntry(BigInt{}).Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(
func(i BigInt) ([]byte, error) {
return i.cborBytes(), nil
})).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
func(x []byte) (BigInt, error) {
return fromCborBytes(x)
})).
Complete())
}
var EmptyInt = BigInt{}
type BigInt struct {
*big.Int
}
func NewInt(i uint64) BigInt {
return BigInt{big.NewInt(0).SetUint64(i)}
}
func FromFil(i uint64) BigInt {
return BigMul(NewInt(i), NewInt(build.FilecoinPrecision))
}
func BigFromBytes(b []byte) BigInt {
i := big.NewInt(0).SetBytes(b)
return BigInt{i}
}
func BigFromString(s string) (BigInt, error) {
v, ok := big.NewInt(0).SetString(s, 10)
if !ok {
return BigInt{}, fmt.Errorf("failed to parse string as a big int")
}
return BigInt{v}, nil
}
func BigMul(a, b BigInt) BigInt {
return BigInt{big.NewInt(0).Mul(a.Int, b.Int)}
}
func BigDiv(a, b BigInt) BigInt {
return BigInt{big.NewInt(0).Div(a.Int, b.Int)}
}
func BigMod(a, b BigInt) BigInt {
return BigInt{big.NewInt(0).Mod(a.Int, b.Int)}
}
func BigAdd(a, b BigInt) BigInt {
return BigInt{big.NewInt(0).Add(a.Int, b.Int)}
}
func BigSub(a, b BigInt) BigInt {
return BigInt{big.NewInt(0).Sub(a.Int, b.Int)}
}
func BigCmp(a, b BigInt) int {
return a.Int.Cmp(b.Int)
}
func (bi BigInt) Nil() bool {
return bi.Int == nil
}
// LessThan returns true if bi < o
func (bi BigInt) LessThan(o BigInt) bool {
return BigCmp(bi, o) < 0
}
// GreaterThan returns true if bi > o
func (bi BigInt) GreaterThan(o BigInt) bool {
return BigCmp(bi, o) > 0
}
// Equals returns true if bi == o
func (bi BigInt) Equals(o BigInt) bool {
return BigCmp(bi, o) == 0
}
func (bi *BigInt) MarshalJSON() ([]byte, error) {
return json.Marshal(bi.String())
}
func (bi *BigInt) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
i, ok := big.NewInt(0).SetString(s, 10)
if !ok {
if string(s) == "<nil>" {
return nil
}
return xerrors.Errorf("failed to parse bigint string: '%s'", string(b))
}
bi.Int = i
return nil
}
func (bi *BigInt) Scan(value interface{}) error {
switch value := value.(type) {
case string:
i, ok := big.NewInt(0).SetString(value, 10)
if !ok {
if value == "<nil>" {
return nil
}
return xerrors.Errorf("failed to parse bigint string: '%s'", value)
}
bi.Int = i
return nil
case int64:
bi.Int = big.NewInt(value)
return nil
default:
return xerrors.Errorf("non-string types unsupported: %T", value)
}
}
func (bi *BigInt) cborBytes() []byte {
if bi.Int == nil {
return []byte{}
}
switch {
case bi.Sign() > 0:
return append([]byte{0}, bi.Bytes()...)
case bi.Sign() < 0:
return append([]byte{1}, bi.Bytes()...)
default: // bi.Sign() == 0:
return []byte{}
}
}
func fromCborBytes(buf []byte) (BigInt, error) {
if len(buf) == 0 {
return NewInt(0), nil
}
var negative bool
switch buf[0] {
case 0:
negative = false
case 1:
negative = true
default:
return EmptyInt, fmt.Errorf("big int prefix should be either 0 or 1, got %d", buf[0])
}
i := big.NewInt(0).SetBytes(buf[1:])
if negative {
i.Neg(i)
}
return BigInt{i}, nil
}
func (bi *BigInt) MarshalCBOR(w io.Writer) error {
if bi.Int == nil {
zero := NewInt(0)
return zero.MarshalCBOR(w)
}
enc := bi.cborBytes()
header := cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(enc)))
if _, err := w.Write(header); err != nil {
return err
}
if _, err := w.Write(enc); err != nil {
return err
}
return nil
}
func (bi *BigInt) UnmarshalCBOR(br io.Reader) error {
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajByteString {
return fmt.Errorf("cbor input for fil big int was not a byte string (%x)", maj)
}
if extra == 0 {
bi.Int = big.NewInt(0)
return nil
}
if extra > BigIntMaxSerializedLen {
return fmt.Errorf("big integer byte array too long")
}
buf := make([]byte, extra)
if _, err := io.ReadFull(br, buf); err != nil {
return err
}
i, err := fromCborBytes(buf)
if err != nil {
return err
}
*bi = i
return nil
}

352
storagemarket/cbor_gen.go Normal file
View File

@ -0,0 +1,352 @@
package storagemarket
import (
"fmt"
"io"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
var _ = xerrors.Errorf
func (t *ClientDeal) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{136}); err != nil {
return err
}
// t.ProposalCid (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.ProposalCid); err != nil {
return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err)
}
// t.Proposal (actors.StorageDealProposal) (struct)
if err := t.Proposal.MarshalCBOR(w); err != nil {
return err
}
// t.State (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil {
return err
}
// t.Miner (peer.ID) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Miner)); err != nil {
return err
}
// t.MinerWorker (address.Address) (struct)
if err := t.MinerWorker.MarshalCBOR(w); err != nil {
return err
}
// t.DealID (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
return err
}
// t.PayloadCid (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.PayloadCid); err != nil {
return xerrors.Errorf("failed to write cid field t.PayloadCid: %w", err)
}
// t.PublishMessage (cid.Cid) (struct)
if t.PublishMessage == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCid(w, *t.PublishMessage); err != nil {
return xerrors.Errorf("failed to write cid field t.PublishMessage: %w", err)
}
}
return nil
}
func (t *ClientDeal) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 8 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.ProposalCid (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err)
}
t.ProposalCid = c
}
// t.Proposal (actors.StorageDealProposal) (struct)
{
if err := t.Proposal.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.State (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.Miner (peer.ID) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Miner = peer.ID(sval)
}
// t.MinerWorker (address.Address) (struct)
{
if err := t.MinerWorker.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.DealID (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.PayloadCid (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PayloadCid: %w", err)
}
t.PayloadCid = c
}
// t.PublishMessage (cid.Cid) (struct)
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PublishMessage: %w", err)
}
t.PublishMessage = &c
}
}
return nil
}
func (t *MinerDeal) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{136}); err != nil {
return err
}
// t.ProposalCid (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.ProposalCid); err != nil {
return xerrors.Errorf("failed to write cid field t.ProposalCid: %w", err)
}
// t.Proposal (actors.StorageDealProposal) (struct)
if err := t.Proposal.MarshalCBOR(w); err != nil {
return err
}
// t.Miner (peer.ID) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Miner)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Miner)); err != nil {
return err
}
// t.Client (peer.ID) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Client)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.Client)); err != nil {
return err
}
// t.State (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.State))); err != nil {
return err
}
// t.Ref (cid.Cid) (struct)
if err := cbg.WriteCid(w, t.Ref); err != nil {
return xerrors.Errorf("failed to write cid field t.Ref: %w", err)
}
// t.DealID (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.DealID))); err != nil {
return err
}
// t.SectorID (uint64) (uint64)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.SectorID))); err != nil {
return err
}
return nil
}
func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error {
br := cbg.GetPeeker(r)
maj, extra, err := cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 8 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.ProposalCid (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.ProposalCid: %w", err)
}
t.ProposalCid = c
}
// t.Proposal (actors.StorageDealProposal) (struct)
{
if err := t.Proposal.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.Miner (peer.ID) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Miner = peer.ID(sval)
}
// t.Client (peer.ID) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.Client = peer.ID(sval)
}
// t.State (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.Ref (cid.Cid) (struct)
{
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.Ref: %w", err)
}
t.Ref = c
}
// t.DealID (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.SectorID (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
return nil
}

199
storagemarket/types.go Normal file
View File

@ -0,0 +1,199 @@
package storagemarket
import (
"context"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
const DealProtocolID = "/fil/storage/mk/1.0.1"
const AskProtocolID = "/fil/storage/ask/1.0.1"
// type shims - used during migration into separate module
type Balance = actors.StorageParticipantBalance
type DealID uint64
type Signature = types.Signature
type StorageDeal = actors.OnChainDeal
type StorageAsk = types.SignedStorageAsk
type StateKey = *types.TipSet
type Epoch uint64
type TokenAmount BigInt
// Duplicated from deals package for now
type MinerDeal struct {
ProposalCid cid.Cid
Proposal actors.StorageDealProposal
Miner peer.ID
Client peer.ID
State api.DealState
Ref cid.Cid
DealID uint64
SectorID uint64 // Set when sm >= DealStaged
}
type ClientDeal struct {
ProposalCid cid.Cid
Proposal actors.StorageDealProposal
State api.DealState
Miner peer.ID
MinerWorker address.Address
DealID uint64
PayloadCid cid.Cid
PublishMessage *cid.Cid
}
// The interface provided for storage providers
type StorageProvider interface {
Run(ctx context.Context, host host.Host)
Stop()
AddAsk(price TokenAmount, ttlsecs int64) error
// ListAsks lists current asks
ListAsks(addr address.Address) []*StorageAsk
// ListDeals lists on-chain deals associated with this provider
ListDeals(ctx context.Context) ([]StorageDeal, error)
// ListIncompleteDeals lists deals that are in progress or rejected
ListIncompleteDeals() ([]MinerDeal, error)
// AddStorageCollateral adds storage collateral
AddStorageCollateral(ctx context.Context, amount TokenAmount) error
// GetStorageCollateral returns the current collateral balance
GetStorageCollateral(ctx context.Context) (Balance, error)
}
// Node dependencies for a StorageProvider
type StorageProviderNode interface {
MostRecentStateId(ctx context.Context) (StateKey, error)
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error
// Ensures that a storage market participant has a certain amount of available funds
EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error
// GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients.
GetBalance(ctx context.Context, addr address.Address) (Balance, error)
// Publishes deal on chain
PublishDeals(ctx context.Context, deal MinerDeal) (DealID, cid.Cid, error)
// ListProviderDeals lists all deals associated with a storage provider
ListProviderDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error)
// Called when a deal is complete and on chain, and data has been transferred and is ready to be added to a sector
// returns sector id
OnDealComplete(ctx context.Context, deal MinerDeal, piecePath string) (uint64, error)
// returns the worker address associated with a miner
GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error)
// Signs bytes
SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error)
}
type DealSectorCommittedCallback func(error)
// Node dependencies for a StorageClient
type StorageClientNode interface {
MostRecentStateId(ctx context.Context) (StateKey, error)
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
AddFunds(ctx context.Context, addr address.Address, amount TokenAmount) error
EnsureFunds(ctx context.Context, addr address.Address, amount TokenAmount) error
// GetBalance returns locked/unlocked for a storage participant. Used by both providers and clients.
GetBalance(ctx context.Context, addr address.Address) (Balance, error)
//// ListClientDeals lists all on-chain deals associated with a storage client
ListClientDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error)
// GetProviderInfo returns information about a single storage provider
//GetProviderInfo(stateId StateID, addr Address) *StorageProviderInfo
// GetStorageProviders returns information about known miners
ListStorageProviders(ctx context.Context) ([]*StorageProviderInfo, error)
// Subscribes to storage market actor state changes for a given address.
// TODO: Should there be a timeout option for this? In the case that we are waiting for funds to be deposited and it never happens?
//SubscribeStorageMarketEvents(addr Address, handler StorageMarketEventHandler) (SubID, error)
// Cancels a subscription
//UnsubscribeStorageMarketEvents(subId SubID)
ValidatePublishedDeal(ctx context.Context, deal ClientDeal) (uint64, error)
// SignProposal signs a proposal
SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error
GetDefaultWalletAddress(ctx context.Context) (address.Address, error)
OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb DealSectorCommittedCallback) error
ValidateAskSignature(ask *StorageAsk) error
}
type StorageClientProofs interface {
//GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (CommP, error)
}
// Closely follows the MinerInfo struct in the spec
type StorageProviderInfo struct {
Address address.Address // actor address
Owner address.Address
Worker address.Address // signs messages
SectorSize uint64
PeerID peer.ID
// probably more like how much storage power, available collateral etc
}
type ProposeStorageDealResult struct {
ProposalCid cid.Cid
}
// The interface provided by the module to the outside world for storage clients.
type StorageClient interface {
Run(ctx context.Context)
Stop()
// ListProviders queries chain state and returns active storage providers
ListProviders(ctx context.Context) (<-chan StorageProviderInfo, error)
// ListDeals lists on-chain deals associated with this provider
ListDeals(ctx context.Context, addr address.Address) ([]StorageDeal, error)
// ListInProgressDeals lists deals that are in progress or rejected
ListInProgressDeals(ctx context.Context) ([]ClientDeal, error)
// ListInProgressDeals lists deals that are in progress or rejected
GetInProgressDeal(ctx context.Context, cid cid.Cid) (ClientDeal, error)
// GetAsk returns the current ask for a storage provider
GetAsk(ctx context.Context, info StorageProviderInfo) (*StorageAsk, error)
//// FindStorageOffers lists providers and queries them to find offers that satisfy some criteria based on price, duration, etc.
//FindStorageOffers(criteria AskCriteria, limit uint) []*StorageOffer
// ProposeStorageDeal initiates deal negotiation with a Storage Provider
ProposeStorageDeal(ctx context.Context, addr address.Address, info *StorageProviderInfo, payloadCid cid.Cid, proposalExpiration Epoch, duration Epoch, price TokenAmount, collateral TokenAmount) (*ProposeStorageDealResult, error)
// GetPaymentEscrow returns the current funds available for deal payment
GetPaymentEscrow(ctx context.Context, addr address.Address) (Balance, error)
// AddStorageCollateral adds storage collateral
AddPaymentEscrow(ctx context.Context, addr address.Address, amount TokenAmount) error
}

View File

@ -0,0 +1,328 @@
package storagemarketadapter
// this file implements storagemarket.StorageClientNode
import (
"bytes"
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/storagemarket"
)
type ClientNodeAdapter struct {
full.StateAPI
full.ChainAPI
full.MpoolAPI
sm *stmgr.StateManager
cs *store.ChainStore
fm *market.FundMgr
ev *events.Events
}
type clientApi struct {
full.ChainAPI
full.StateAPI
}
func NewClientNodeAdapter(state full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, sm *stmgr.StateManager, cs *store.ChainStore, fm *market.FundMgr) storagemarket.StorageClientNode {
return &ClientNodeAdapter{
StateAPI: state,
ChainAPI: chain,
MpoolAPI: mpool,
sm: sm,
cs: cs,
fm: fm,
ev: events.NewEvents(context.TODO(), &clientApi{chain, state}),
}
}
func (n *ClientNodeAdapter) ListStorageProviders(ctx context.Context) ([]*storagemarket.StorageProviderInfo, error) {
ts, err := n.ChainHead(ctx)
if err != nil {
return nil, err
}
addresses, err := n.StateListMiners(ctx, ts)
if err != nil {
return nil, err
}
var out []*storagemarket.StorageProviderInfo
for _, addr := range addresses {
workerAddr, err := n.StateMinerWorker(ctx, addr, ts)
if err != nil {
return nil, err
}
sectorSize, err := n.StateMinerSectorSize(ctx, addr, ts)
if err != nil {
return nil, err
}
peerId, err := n.StateMinerPeerID(ctx, addr, ts)
if err != nil {
return nil, err
}
out = append(out, &storagemarket.StorageProviderInfo{
Address: addr,
Worker: workerAddr,
SectorSize: sectorSize,
PeerID: peerId,
})
}
return out, nil
}
func (n *ClientNodeAdapter) ListClientDeals(ctx context.Context, addr address.Address) ([]storagemarket.StorageDeal, error) {
allDeals, err := n.StateMarketDeals(ctx, nil)
if err != nil {
return nil, err
}
var out []actors.OnChainDeal
for _, deal := range allDeals {
if deal.Client == addr {
out = append(out, deal)
}
}
return out, nil
}
func (n *ClientNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) {
return n.ChainHead(ctx)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error {
// (Provider Node API)
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: addr,
Value: types.BigInt(amount),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.AddBalance,
})
if err != nil {
return err
}
r, err := n.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if r.Receipt.ExitCode != 0 {
return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode)
}
return nil
}
func (n *ClientNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error {
return n.fm.EnsureAvailable(ctx, addr, types.BigInt(amount))
}
func (n *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) {
bal, err := n.StateMarketBalance(ctx, addr, nil)
if err != nil {
return storagemarket.Balance{}, err
}
return bal, nil
}
// ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal
// returns the Deal id if there is no error
func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (uint64, error) {
log.Infow("DEAL ACCEPTED!")
pubmsg, err := c.cs.GetMessage(*deal.PublishMessage)
if err != nil {
return 0, xerrors.Errorf("getting deal pubsish message: %w", err)
}
pw, err := stmgr.GetMinerWorker(ctx, c.sm, nil, deal.Proposal.Provider)
if err != nil {
return 0, xerrors.Errorf("getting miner worker failed: %w", err)
}
if pubmsg.From != pw {
return 0, xerrors.Errorf("deal wasn't published by storage provider: from=%s, provider=%s", pubmsg.From, deal.Proposal.Provider)
}
if pubmsg.To != actors.StorageMarketAddress {
return 0, xerrors.Errorf("deal publish message wasn't set to StorageMarket actor (to=%s)", pubmsg.To)
}
if pubmsg.Method != actors.SMAMethods.PublishStorageDeals {
return 0, xerrors.Errorf("deal publish message called incorrect method (method=%s)", pubmsg.Method)
}
var params actors.PublishStorageDealsParams
if err := params.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil {
return 0, err
}
dealIdx := -1
for i, storageDeal := range params.Deals {
// TODO: make it less hacky
sd := storageDeal
eq, err := cborutil.Equals(&deal.Proposal, &sd)
if err != nil {
return 0, err
}
if eq {
dealIdx = i
break
}
}
if dealIdx == -1 {
return 0, xerrors.Errorf("deal publish didn't contain our deal (message cid: %s)", deal.PublishMessage)
}
// TODO: timeout
_, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage)
if err != nil {
return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
}
if ret.ExitCode != 0 {
return 0, xerrors.Errorf("deal publish failed: exit=%d", ret.ExitCode)
}
var res actors.PublishStorageDealResponse
if err := res.UnmarshalCBOR(bytes.NewReader(ret.Return)); err != nil {
return 0, err
}
return res.DealIDs[dealIdx], nil
}
func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealId uint64, cb storagemarket.DealSectorCommittedCallback) error {
checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) {
sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts)
if err != nil {
// TODO: This may be fine for some errors
return false, false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch > 0 {
cb(nil)
return true, false, nil
}
return false, true, nil
}
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
defer func() {
if err != nil {
cb(xerrors.Errorf("handling applied event: %w", err))
}
}()
if msg == nil {
log.Error("timed out waiting for deal activation... what now?")
return false, nil
}
sd, err := stmgr.GetStorageDeal(ctx, c.StateManager, dealId, ts)
if err != nil {
return false, xerrors.Errorf("failed to look up deal on chain: %w", err)
}
if sd.ActivationEpoch == 0 {
return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealId, ts.ParentState(), ts.Height())
}
log.Infof("Storage deal %d activated at epoch %d", dealId, sd.ActivationEpoch)
cb(nil)
return false, nil
}
revert := func(ctx context.Context, ts *types.TipSet) error {
log.Warn("deal activation reverted; TODO: actually handle this!")
// TODO: Just go back to DealSealing?
return nil
}
matchEvent := func(msg *types.Message) (bool, error) {
if msg.To != provider {
return false, nil
}
if msg.Method != actors.MAMethods.ProveCommitSector {
return false, nil
}
var params actors.SectorProveCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, err
}
var found bool
for _, dealID := range params.DealIDs {
if dealID == dealId {
found = true
break
}
}
return found, nil
}
if err := c.ev.Called(checkFunc, called, revert, 3, build.SealRandomnessLookbackLimit, matchEvent); err != nil {
return xerrors.Errorf("failed to set up called handler")
}
return nil
}
func (n *ClientNodeAdapter) SignProposal(ctx context.Context, signer address.Address, proposal *actors.StorageDealProposal) error {
return api.SignWith(ctx, n.Wallet.Sign, signer, proposal)
}
func (n *ClientNodeAdapter) GetDefaultWalletAddress(ctx context.Context) (address.Address, error) {
return n.Wallet.GetDefault()
}
func (n *ClientNodeAdapter) ValidateAskSignature(ask *types.SignedStorageAsk) error {
tss := n.cs.GetHeaviestTipSet().ParentState()
w, err := stmgr.GetMinerWorkerRaw(context.TODO(), n.StateManager, tss, ask.Ask.Miner)
if err != nil {
return xerrors.Errorf("failed to get worker for miner in ask", err)
}
sigb, err := cborutil.Dump(ask.Ask)
if err != nil {
return xerrors.Errorf("failed to re-serialize ask")
}
return ask.Signature.Verify(w, sigb)
}
var _ storagemarket.StorageClientNode = &ClientNodeAdapter{}

View File

@ -0,0 +1,196 @@
package storagemarketadapter
// this file implements storagemarket.StorageProviderNode
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
unixfile "github.com/ipfs/go-unixfs/file"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/padreader"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/storagemarket"
)
var log = logging.Logger("provideradapter")
type ProviderNodeAdapter struct {
api.FullNode
// this goes away with the data transfer module
dag dtypes.StagingDAG
secb *sectorblocks.SectorBlocks
}
func NewProviderNodeAdapter(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode {
return &ProviderNodeAdapter{
FullNode: full,
dag: dag,
secb: secb,
}
}
func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (storagemarket.DealID, cid.Cid, error) {
log.Info("publishing deal")
worker, err := n.StateMinerWorker(ctx, deal.Proposal.Provider, nil)
if err != nil {
return 0, cid.Undef, err
}
params, err := actors.SerializeParams(&actors.PublishStorageDealsParams{
Deals: []actors.StorageDealProposal{deal.Proposal},
})
if err != nil {
return 0, cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: ", err)
}
// TODO: We may want this to happen after fetching data
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: worker,
Value: types.NewInt(0),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.PublishStorageDeals,
Params: params,
})
if err != nil {
return 0, cid.Undef, err
}
r, err := n.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return 0, cid.Undef, err
}
if r.Receipt.ExitCode != 0 {
return 0, cid.Undef, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
}
var resp actors.PublishStorageDealResponse
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
return 0, cid.Undef, err
}
if len(resp.DealIDs) != 1 {
return 0, cid.Undef, xerrors.Errorf("got unexpected number of DealIDs from")
}
return storagemarket.DealID(resp.DealIDs[0]), smsg.Cid(), nil
}
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, piecePath string) (uint64, error) {
root, err := n.dag.Get(ctx, deal.Ref)
if err != nil {
return 0, xerrors.Errorf("failed to get file root for deal: %s", err)
}
// TODO: abstract this away into ReadSizeCloser + implement different modes
node, err := unixfile.NewUnixfsFile(ctx, n.dag, root)
if err != nil {
return 0, xerrors.Errorf("cannot open unixfs file: %s", err)
}
uf, ok := node.(sectorblocks.UnixfsReader)
if !ok {
// we probably got directory, unsupported for now
return 0, xerrors.Errorf("unsupported unixfs file type")
}
// TODO: uf.Size() is user input, not trusted
// This won't be useful / here after we migrate to putting CARs into sectors
size, err := uf.Size()
if err != nil {
return 0, xerrors.Errorf("getting unixfs file size: %w", err)
}
if padreader.PaddedSize(uint64(size)) != deal.Proposal.PieceSize {
return 0, xerrors.Errorf("deal.Proposal.PieceSize didn't match padded unixfs file size")
}
sectorID, err := n.secb.AddUnixfsPiece(ctx, uf, deal.DealID)
if err != nil {
return 0, xerrors.Errorf("AddPiece failed: %s", err)
}
log.Warnf("New Sector: %d (deal %d)", sectorID, deal.DealID)
return sectorID, nil
}
func (n *ProviderNodeAdapter) ListProviderDeals(ctx context.Context, addr address.Address) ([]actors.OnChainDeal, error) {
allDeals, err := n.StateMarketDeals(ctx, nil)
if err != nil {
return nil, err
}
var out []actors.OnChainDeal
for _, deal := range allDeals {
if deal.Provider == addr {
out = append(out, deal)
}
}
return out, nil
}
func (n *ProviderNodeAdapter) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) {
return n.StateMinerWorker(ctx, miner, nil)
}
func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Address, b []byte) (*types.Signature, error) {
return n.WalletSign(ctx, signer, b)
}
func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr address.Address, amt storagemarket.TokenAmount) error {
return n.MarketEnsureAvailable(ctx, addr, types.BigInt(amt))
}
func (n *ProviderNodeAdapter) MostRecentStateId(ctx context.Context) (storagemarket.StateKey, error) {
return n.ChainHead(ctx)
}
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
func (n *ProviderNodeAdapter) AddFunds(ctx context.Context, addr address.Address, amount storagemarket.TokenAmount) error {
// (Provider Node API)
smsg, err := n.MpoolPushMessage(ctx, &types.Message{
To: actors.StorageMarketAddress,
From: addr,
Value: types.BigInt(amount),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(1000000),
Method: actors.SMAMethods.AddBalance,
})
if err != nil {
return err
}
r, err := n.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if r.Receipt.ExitCode != 0 {
return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode)
}
return nil
}
func (n *ProviderNodeAdapter) GetBalance(ctx context.Context, addr address.Address) (storagemarket.Balance, error) {
bal, err := n.StateMarketBalance(ctx, addr, nil)
if err != nil {
return storagemarket.Balance{}, err
}
return bal, nil
}
var _ storagemarket.StorageProviderNode = &ProviderNodeAdapter{}