diff --git a/api/api.go b/api/api.go index 9017921bd..8e9bd6b29 100644 --- a/api/api.go +++ b/api/api.go @@ -116,6 +116,7 @@ type FullNode interface { // ClientImport imports file under the specified path into filestore ClientImport(ctx context.Context, path string) (cid.Cid, error) + ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/client/client.go b/api/client/client.go index 1bf04d6a2..e40e0c854 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -7,6 +7,17 @@ import ( "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) +// NewCommonRPC creates a new http jsonrpc client. +func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, error) { + var res api.CommonStruct + _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.Internal, + }, requestHeader) + + return &res, err +} + // NewFullNodeRPC creates a new http jsonrpc client. func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, error) { var res api.FullNodeStruct diff --git a/api/struct.go b/api/struct.go index c125c7bdd..332a36db9 100644 --- a/api/struct.go +++ b/api/struct.go @@ -62,8 +62,9 @@ type FullNodeStruct struct { WalletDefaultAddress func(context.Context) (address.Address, error) `perm:"write"` MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"` - ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` - ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"` + ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` + ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"` + ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerProvingSet func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` @@ -128,6 +129,10 @@ func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid return c.Internal.ClientImport(ctx, path) } +func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { + return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) +} + func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { return c.Internal.MpoolPending(ctx, ts) } diff --git a/chain/actors/actor_paych.go b/chain/actors/actor_paych.go index defe072c1..b514088b2 100644 --- a/chain/actors/actor_paych.go +++ b/chain/actors/actor_paych.go @@ -3,11 +3,12 @@ package actors import ( "bytes" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/filecoin-project/go-lotus/chain/actors/aerrors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" - - cbor "github.com/ipfs/go-ipld-cbor" ) const ChannelClosingDelay = 6 * 60 * 2 // six hours @@ -19,10 +20,19 @@ func init() { cbor.RegisterCborType(Merge{}) cbor.RegisterCborType(LaneState{}) cbor.RegisterCborType(UpdateChannelState{}) + cbor.RegisterCborType(PaymentInfo{}) } type PaymentChannelActor struct{} +type PaymentInfo struct { + PayChActor address.Address + Payer address.Address + ChannelMessage cid.Cid + + Vouchers []SignedVoucher +} + type LaneState struct { Closed bool Redeemed types.BigInt diff --git a/chain/deals/client.go b/chain/deals/client.go new file mode 100644 index 000000000..742ed0384 --- /dev/null +++ b/chain/deals/client.go @@ -0,0 +1,237 @@ +package deals + +import ( + "context" + "github.com/filecoin-project/go-lotus/chain/actors" + "math" + + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + files "github.com/ipfs/go-ipfs-files" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log" + unixfile "github.com/ipfs/go-unixfs/file" + "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/store" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/chain/wallet" + "github.com/filecoin-project/go-lotus/lib/cborrpc" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" +) + +func init() { + cbor.RegisterCborType(ClientDeal{}) +} + +var log = logging.Logger("deals") + +const ProtocolID = "/fil/storage/mk/1.0.0" + +type DealStatus int + +const ( + DealResolvingMiner = DealStatus(iota) +) + +type ClientDeal struct { + ProposalCid cid.Cid + Status DealStatus + Miner peer.ID +} + +type Client struct { + cs *store.ChainStore + h host.Host + w *wallet.Wallet + dag dtypes.ClientDAG + + deals StateStore + + incoming chan ClientDeal + + stop chan struct{} + stopped chan struct{} +} + +func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG) *Client { + c := &Client{ + cs: cs, + h: h, + w: w, + dag: dag, + + deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, + + incoming: make(chan ClientDeal, 16), + + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + + return c +} + +func (c *Client) Run() { + go func() { + defer close(c.stopped) + + for { + select { + case deal := <-c.incoming: + log.Info("incoming deal") + + // TODO: track in datastore + if err := c.deals.Begin(deal.ProposalCid, deal); err != nil { + log.Errorf("deal state begin failed: %s", err) + continue + } + + case <-c.stop: + return + } + } + }() +} + +func (c *Client) commP(ctx context.Context, data cid.Cid) ([]byte, int64, error) { + root, err := c.dag.Get(ctx, data) + if err != nil { + log.Errorf("failed to get file root for deal: %s", err) + return nil, 0, err + } + + n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) + if err != nil { + log.Errorf("cannot open unixfs file: %s", err) + return nil, 0, err + } + + uf, ok := n.(files.File) + if !ok { + // TODO: we probably got directory, how should we handle this in unixfs mode? + return nil, 0, xerrors.New("unsupported unixfs type") + } + + size, err := uf.Size() + if err != nil { + return nil, 0, err + } + + var commP [sectorbuilder.CommitmentBytesLen]byte + err = withTemp(uf, func(f string) error { + commP, err = sectorbuilder.GeneratePieceCommitment(f, uint64(size)) + return err + }) + return commP[:], size, err +} + +func (c *Client) sendProposal(s inet.Stream, proposal StorageDealProposal, from address.Address) error { + log.Info("Sending deal proposal") + + msg, err := cbor.DumpObject(proposal) + if err != nil { + return err + } + sig, err := c.w.Sign(from, msg) + if err != nil { + return err + } + + signedProposal := &SignedStorageDealProposal{ + Proposal: proposal, + Signature: sig, + } + + return cborrpc.WriteCborRPC(s, signedProposal) +} + +func (c *Client) waitAccept(s inet.Stream, proposal StorageDealProposal, minerID peer.ID) (ClientDeal, error) { + log.Info("Waiting for response") + + var resp SignedStorageDealResponse + if err := cborrpc.ReadCborRPC(s, &resp); err != nil { + log.Errorw("failed to read StorageDealResponse message", "error", err) + return ClientDeal{}, err + } + + // TODO: verify signature + + if resp.Response.State != Accepted { + return ClientDeal{}, xerrors.Errorf("Deal wasn't accepted (State=%d)", resp.Response.State) + } + + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) + if err != nil { + return ClientDeal{}, err + } + + if resp.Response.Proposal != proposalNd.Cid() { + return ClientDeal{}, xerrors.New("miner responded to a wrong proposal") + } + + return ClientDeal{ + ProposalCid: proposalNd.Cid(), + Status: DealResolvingMiner, + Miner: minerID, + }, nil +} + +func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) { + commP, size, err := c.commP(ctx, data) + if err != nil { + return cid.Undef, err + } + + dummyCid, _ := cid.Parse("bafkqaaa") + + // TODO: use data + proposal := StorageDealProposal{ + PieceRef: data.String(), + SerializationMode: SerializationUnixFs, + CommP: commP[:], + Size: uint64(size), + TotalPrice: totalPrice, + Duration: blocksDuration, + Payment: actors.PaymentInfo{ + PayChActor: address.Address{}, + Payer: address.Address{}, + ChannelMessage: dummyCid, + Vouchers: nil, + }, + MinerAddress: miner, + ClientAddress: from, + } + + s, err := c.h.NewStream(ctx, minerID, ProtocolID) + if err != nil { + return cid.Undef, err + } + defer s.Reset() // TODO: handle other updates + + if err := c.sendProposal(s, proposal, from); err != nil { + return cid.Undef, err + } + + deal, err := c.waitAccept(s, proposal, minerID) + if err != nil { + return cid.Undef, err + } + + log.Info("DEAL ACCEPTED!") + + // TODO: actually care about what happens with the deal after it was accepted + //c.incoming <- deal + return deal.ProposalCid, nil +} + +func (c *Client) Stop() { + close(c.stop) + <-c.stopped +} diff --git a/chain/deals/handler.go b/chain/deals/handler.go new file mode 100644 index 000000000..5734bea9c --- /dev/null +++ b/chain/deals/handler.go @@ -0,0 +1,204 @@ +package deals + +import ( + "context" + "math" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/wallet" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + cbor "github.com/ipfs/go-ipld-cbor" + inet "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" +) + +func init() { + cbor.RegisterCborType(MinerDeal{}) +} + +type MinerDeal struct { + Client peer.ID + Proposal StorageDealProposal + ProposalCid cid.Cid + State DealState + + Ref cid.Cid + + s inet.Stream +} + +type Handler struct { + w *wallet.Wallet + sb *sectorbuilder.SectorBuilder + + // TODO: Use a custom protocol or graphsync in the future + // TODO: GC + dag dtypes.StagingDAG + + deals StateStore + conns map[cid.Cid]inet.Stream + + actor address.Address + + incoming chan MinerDeal + updated chan dealUpdate + stop chan struct{} + stopped chan struct{} +} + +type dealUpdate struct { + newState DealState + id cid.Cid + err error +} + +func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) { + addr, err := ds.Get(datastore.NewKey("miner-address")) + if err != nil { + return nil, err + } + minerAddress, err := address.NewFromBytes(addr) + if err != nil { + return nil, err + } + + return &Handler{ + w: w, + sb: sb, + dag: dag, + + conns: map[cid.Cid]inet.Stream{}, + + incoming: make(chan MinerDeal), + updated: make(chan dealUpdate), + stop: make(chan struct{}), + stopped: make(chan struct{}), + + actor: minerAddress, + + deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, + }, nil +} + +func (h *Handler) Run(ctx context.Context) { + // TODO: restore state + + go func() { + defer log.Error("quitting deal handler loop") + defer close(h.stopped) + + for { + select { + case deal := <-h.incoming: // Accepted + h.onIncoming(deal) + case update := <-h.updated: // Staged + h.onUpdated(ctx, update) + case <-h.stop: + return + } + } + }() +} + +func (h *Handler) onIncoming(deal MinerDeal) { + log.Info("incoming deal") + + h.conns[deal.ProposalCid] = deal.s + + if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { + // This can happen when client re-sends proposal + h.failDeal(deal.ProposalCid, err) + log.Errorf("deal tracking failed: %s", err) + return + } + + go func() { + h.updated <- dealUpdate{ + newState: Accepted, + id: deal.ProposalCid, + err: nil, + } + }() +} + +func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) { + log.Infof("Deal %s updated state to %d", update.id, update.newState) + if update.err != nil { + log.Errorf("deal %s failed: %s", update.id, update.err) + h.failDeal(update.id, update.err) + return + } + var deal MinerDeal + err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error { + d.State = update.newState + deal = *d + return nil + }) + if err != nil { + h.failDeal(update.id, err) + return + } + + switch update.newState { + case Accepted: + h.handle(ctx, deal, h.accept, Staged) + case Staged: + h.handle(ctx, deal, h.staged, Sealing) + case Sealing: + log.Error("TODO") + } +} + +func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDeal, error) { + // TODO: Review: Not signed? + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) + if err != nil { + return MinerDeal{}, err + } + + ref, err := cid.Parse(proposal.PieceRef) + if err != nil { + return MinerDeal{}, err + } + + return MinerDeal{ + Client: s.Conn().RemotePeer(), + Proposal: proposal, + ProposalCid: proposalNd.Cid(), + State: Unknown, + + Ref: ref, + + s: s, + }, nil +} + +func (h *Handler) HandleStream(s inet.Stream) { + log.Info("Handling storage deal proposal!") + + proposal, err := h.readProposal(s) + if err != nil { + log.Error(err) + s.Close() + return + } + + deal, err := h.newDeal(s, proposal.Proposal) + if err != nil { + log.Error(err) + s.Close() + return + } + + h.incoming <- deal +} + +func (h *Handler) Stop() { + close(h.stop) + <-h.stopped +} diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go new file mode 100644 index 000000000..e9b6546d8 --- /dev/null +++ b/chain/deals/handler_states.go @@ -0,0 +1,102 @@ +package deals + +import ( + "context" + + files "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" + "golang.org/x/xerrors" +) + +type handlerFunc func(ctx context.Context, deal MinerDeal) error + +func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) { + go func() { + err := cb(ctx, deal) + select { + case h.updated <- dealUpdate{ + newState: next, + id: deal.ProposalCid, + err: err, + }: + case <-h.stop: + } + }() +} + +// ACCEPTED + +func (h *Handler) accept(ctx context.Context, deal MinerDeal) error { + log.Info("acc") + switch deal.Proposal.SerializationMode { + //case SerializationRaw: + //case SerializationIPLD: + case SerializationUnixFs: + default: + return xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode) + } + + // TODO: check payment + + log.Info("fetching data for a deal") + err := h.sendSignedResponse(StorageDealResponse{ + State: Accepted, + Message: "", + Proposal: deal.ProposalCid, + }) + if err != nil { + return err + } + + return merkledag.FetchGraph(ctx, deal.Ref, h.dag) +} + +// STAGED + +func (h *Handler) staged(ctx context.Context, deal MinerDeal) error { + err := h.sendSignedResponse(StorageDealResponse{ + State: Staged, + Message: "", + Proposal: deal.ProposalCid, + }) + if err != nil { + log.Warnf("Sending deal response failed: %s", err) + } + + root, err := h.dag.Get(ctx, deal.Ref) + if err != nil { + return xerrors.Errorf("failed to get file root for deal: %s", err) + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + n, err := unixfile.NewUnixfsFile(ctx, h.dag, root) + if err != nil { + return xerrors.Errorf("cannot open unixfs file: %s", err) + } + + uf, ok := n.(files.File) + if !ok { + // we probably got directory, unsupported for now + return xerrors.Errorf("unsupported unixfs type") + } + + size, err := uf.Size() + if err != nil { + return xerrors.Errorf("failed to get file size: %s", err) + } + + var sectorID uint64 + err = withTemp(uf, func(f string) (err error) { + sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f) + return err + }) + if err != nil { + return xerrors.Errorf("AddPiece failed: %s", err) + } + + log.Warnf("New Sector: %d", sectorID) + return nil +} + +// SEALING diff --git a/chain/deals/handler_utils.go b/chain/deals/handler_utils.go new file mode 100644 index 000000000..6ef7bc69e --- /dev/null +++ b/chain/deals/handler_utils.go @@ -0,0 +1,99 @@ +package deals + +import ( + "runtime" + + "github.com/filecoin-project/go-lotus/lib/cborrpc" + cbor "github.com/ipfs/go-ipld-cbor" + inet "github.com/libp2p/go-libp2p-core/network" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +func (h *Handler) failDeal(id cid.Cid, cerr error) { + if err := h.deals.End(id); err != nil { + log.Warnf("deals.End: %s", err) + } + + if cerr == nil { + _, f, l, _ := runtime.Caller(1) + cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l) + } + + log.Errorf("deal %s failed: %s", id, cerr) + + err := h.sendSignedResponse(StorageDealResponse{ + State: Failed, + Message: cerr.Error(), + Proposal: id, + }) + + s, ok := h.conns[id] + if ok { + _ = s.Close() + delete(h.conns, id) + } + + if err != nil { + log.Warnf("notifying client about deal failure: %s", err) + } +} + +func (h *Handler) readProposal(s inet.Stream) (proposal SignedStorageDealProposal, err error) { + if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { + log.Errorw("failed to read proposal message", "error", err) + return SignedStorageDealProposal{}, err + } + + // TODO: Validate proposal maybe + // (and signature, obviously) + + if proposal.Proposal.MinerAddress != h.actor { + log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress) + return SignedStorageDealProposal{}, err + } + + return +} + +func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { + s, ok := h.conns[resp.Proposal] + if !ok { + return xerrors.New("couldn't send response: not connected") + } + + msg, err := cbor.DumpObject(&resp) + if err != nil { + return xerrors.Errorf("serializing response: %w", err) + } + + def, err := h.w.ListAddrs() + if err != nil { + log.Error(err) + return xerrors.Errorf("listing wallet addresses: %w", err) + } + if len(def) != 1 { + // NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor + // TODO: implement with GetWorker on the miner actor + return xerrors.Errorf("expected only 1 address in wallet, got %d", len(def)) + } + + sig, err := h.w.Sign(def[0], msg) + if err != nil { + return xerrors.Errorf("failed to sign response message: %w", err) + } + + signedResponse := SignedStorageDealResponse{ + Response: resp, + Signature: sig, + } + + err = cborrpc.WriteCborRPC(s, signedResponse) + if err != nil { + // Assume client disconnected + s.Close() + delete(h.conns, resp.Proposal) + } + return err +} diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go new file mode 100644 index 000000000..9f4dfd5ff --- /dev/null +++ b/chain/deals/state_store.go @@ -0,0 +1,108 @@ +package deals + +import ( + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" +) + +type StateStore struct { + ds datastore.Datastore +} + +func (st *StateStore) Begin(i cid.Cid, state interface{}) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if has { + // TODO: uncomment after deals work + //return xerrors.Errorf("Already tracking state for %s", i) + } + + b, err := cbor.DumpObject(state) + if err != nil { + return err + } + + return st.ds.Put(k, b) +} + +func (st *StateStore) End(i cid.Cid) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if !has { + return xerrors.Errorf("No state for %s", i) + } + return st.ds.Delete(k) +} + +// When this gets used anywhere else, migrate to reflect + +func (st *StateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error { + return st.mutate(i, minerMutator(mutator)) +} + +func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) { + return func(in []byte) ([]byte, error) { + var deal MinerDeal + err := cbor.DecodeInto(in, &deal) + if err != nil { + return nil, err + } + + if err := m(&deal); err != nil { + return nil, err + } + + return cbor.DumpObject(deal) + } +} + +func (st *StateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error { + return st.mutate(i, clientMutator(mutator)) +} + +func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) { + return func(in []byte) ([]byte, error) { + var deal ClientDeal + err := cbor.DecodeInto(in, &deal) + if err != nil { + return nil, err + } + + if err := m(&deal); err != nil { + return nil, err + } + + return cbor.DumpObject(deal) + } +} + +func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) error { + k := datastore.NewKey(i.String()) + has, err := st.ds.Has(k) + if err != nil { + return err + } + if !has { + return xerrors.Errorf("No state for %s", i) + } + + cur, err := st.ds.Get(k) + if err != nil { + return err + } + + mutated, err := mutator(cur) + if err != nil { + return err + } + + return st.ds.Put(k, mutated) +} diff --git a/chain/deals/types.go b/chain/deals/types.go new file mode 100644 index 000000000..fcb8d23ac --- /dev/null +++ b/chain/deals/types.go @@ -0,0 +1,89 @@ +package deals + +import ( + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func init() { + cbor.RegisterCborType(StorageDealProposal{}) + cbor.RegisterCborType(SignedStorageDealProposal{}) + + cbor.RegisterCborType(PieceInclusionProof{}) + + cbor.RegisterCborType(StorageDealResponse{}) + cbor.RegisterCborType(SignedStorageDealResponse{}) +} + +type SerializationMode string + +const ( + SerializationUnixFs = "UnixFs" + SerializationRaw = "Raw" + SerializationIPLD = "IPLD" +) + +type DealState int + +const ( + Unknown = iota + Rejected + Accepted + Started + Failed + Staged + Sealing + Complete +) + +type StorageDealProposal struct { + PieceRef string + SerializationMode SerializationMode + CommP []byte + + Size uint64 + TotalPrice types.BigInt + Duration uint64 + + Payment actors.PaymentInfo + + MinerAddress address.Address + ClientAddress address.Address +} + +type SignedStorageDealProposal struct { + Proposal StorageDealProposal + + Signature *types.Signature +} + +// response + +type PieceInclusionProof struct { + Position uint64 + ProofElements [32]byte +} + +type StorageDealResponse struct { + State DealState + + // Rejected / Accepted / Failed / Staged + Message string + Proposal cid.Cid + + // Sealing + PieceInclusionProof PieceInclusionProof + + // Complete + SectorCommitMessage *cid.Cid +} + +type SignedStorageDealResponse struct { + Response StorageDealResponse + + Signature *types.Signature +} diff --git a/chain/deals/utils.go b/chain/deals/utils.go new file mode 100644 index 000000000..a23bafe05 --- /dev/null +++ b/chain/deals/utils.go @@ -0,0 +1,28 @@ +package deals + +import ( + "io" + "io/ioutil" + "os" +) + +func withTemp(r io.Reader, cb func(string) error) error { + f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-") + if err != nil { + return err + } + if _, err := io.Copy(f, r); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + err = cb(f.Name()) + if err != nil { + os.Remove(f.Name()) + return err + } + + return os.Remove(f.Name()) +} diff --git a/chain/wallet/wallet.go b/chain/wallet/wallet.go index a94ba69ef..0f6410960 100644 --- a/chain/wallet/wallet.go +++ b/chain/wallet/wallet.go @@ -38,6 +38,9 @@ func (w *Wallet) Sign(addr address.Address, msg []byte) (*types.Signature, error if err != nil { return nil, err } + if ki == nil { + return nil, xerrors.Errorf("signing using key '%s': %w", addr.String(), repo.ErrKeyNotFound) + } switch ki.Type { case types.KTSecp256k1: diff --git a/cli/chain.go b/cli/chain.go index a3459a38b..485964758 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -23,7 +23,7 @@ var chainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -51,7 +51,7 @@ var chainGetBlock = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/client.go b/cli/client.go index b1b49e011..61c58678c 100644 --- a/cli/client.go +++ b/cli/client.go @@ -2,8 +2,14 @@ package cli import ( "fmt" + "strconv" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" ) var clientCmd = &cli.Command{ @@ -12,6 +18,7 @@ var clientCmd = &cli.Command{ Subcommands: []*cli.Command{ clientImportCmd, clientLocalCmd, + clientDealCmd, }, } @@ -19,7 +26,7 @@ var clientImportCmd = &cli.Command{ Name: "import", Usage: "Import data", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -38,7 +45,7 @@ var clientLocalCmd = &cli.Command{ Name: "local", Usage: "List locally imported data", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -54,3 +61,50 @@ var clientLocalCmd = &cli.Command{ return nil }, } + +var clientDealCmd = &cli.Command{ + Name: "deal", + Usage: "Initialize storage deal with a miner", + Action: func(cctx *cli.Context) error { + api, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + ctx := ReqContext(cctx) + + if cctx.NArg() != 4 { + return xerrors.New("expected 4 args: dataCid, miner, price, duration") + } + + // [data, miner, dur] + + data, err := cid.Parse(cctx.Args().Get(0)) + if err != nil { + return err + } + + miner, err := address.NewFromString(cctx.Args().Get(1)) + if err != nil { + return err + } + + // TODO: parse bigint + price, err := strconv.ParseInt(cctx.Args().Get(2), 10, 32) + if err != nil { + return err + } + + dur, err := strconv.ParseInt(cctx.Args().Get(3), 10, 32) + if err != nil { + return err + } + + proposal, err := api.ClientStartDeal(ctx, data, miner, types.NewInt(uint64(price)), uint64(dur)) + if err != nil { + return err + } + + fmt.Println(proposal) + return nil + }, +} diff --git a/cli/cmd.go b/cli/cmd.go index c1faa92f1..a5c2abc95 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -53,7 +53,21 @@ func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { return "ws://" + addr + "/rpc/v0", headers, nil } -func GetAPI(ctx *cli.Context) (api.FullNode, error) { +func GetAPI(ctx *cli.Context) (api.Common, error) { + f := "repo" + if ctx.String("storagerepo") != "" { + f = "storagerepo" + } + + addr, headers, err := getAPI(ctx, f) + if err != nil { + return nil, err + } + + return client.NewCommonRPC(addr, headers) +} + +func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, error) { addr, headers, err := getAPI(ctx, "repo") if err != nil { return nil, err diff --git a/cli/createminer.go b/cli/createminer.go index 93d65298a..cbbe4e5af 100644 --- a/cli/createminer.go +++ b/cli/createminer.go @@ -22,7 +22,7 @@ var createMinerCmd = &cli.Command{ return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID") } - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/miner.go b/cli/miner.go index a18bc9d50..0ca63fd5c 100644 --- a/cli/miner.go +++ b/cli/miner.go @@ -18,7 +18,7 @@ var minerStart = &cli.Command{ Name: "start", Usage: "start mining", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/mpool.go b/cli/mpool.go index 4ff97daa3..f37402e52 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -18,7 +18,7 @@ var mpoolPending = &cli.Command{ Name: "pending", Usage: "Get pending messages", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/send.go b/cli/send.go index 37d2d334d..47837c747 100644 --- a/cli/send.go +++ b/cli/send.go @@ -18,7 +18,7 @@ var sendCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/state.go b/cli/state.go index e195a6e12..9f1f8851b 100644 --- a/cli/state.go +++ b/cli/state.go @@ -25,7 +25,7 @@ var statePowerCmd = &cli.Command{ Name: "power", Usage: "Query network or miner power", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -77,7 +77,7 @@ var stateSectorsCmd = &cli.Command{ Name: "sectors", Usage: "Query the sector set of a miner", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -110,7 +110,7 @@ var stateProvingSetCmd = &cli.Command{ Name: "proving", Usage: "Query the proving set of a miner", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cli/wallet.go b/cli/wallet.go index 34e2376b7..6d70ff948 100644 --- a/cli/wallet.go +++ b/cli/wallet.go @@ -21,7 +21,7 @@ var walletNew = &cli.Command{ Name: "new", Usage: "Generate a new key of the given type (bls or secp256k1)", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -47,7 +47,7 @@ var walletList = &cli.Command{ Name: "list", Usage: "List wallet address", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } @@ -69,7 +69,7 @@ var walletBalance = &cli.Command{ Name: "balance", Usage: "get account balance", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, err := GetFullNodeAPI(cctx) if err != nil { return err } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 92005df13..fcc55d1bf 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -37,7 +37,7 @@ var initCmd = &cli.Command{ log.Info("Trying to connect to full node RPC") - api, err := lcli.GetAPI(cctx) // TODO: consider storing full node address in config + api, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config if err != nil { return err } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 0e3d06932..8696c7721 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -28,7 +28,7 @@ var runCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetAPI(cctx) + nodeApi, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err } @@ -80,7 +80,15 @@ var runCmd = &cli.Command{ return err } - // TODO: libp2p node + // Bootstrap with full node + remoteAddrs, err := nodeApi.NetAddrsListen(ctx) + if err != nil { + return err + } + + if err := minerapi.NetConnect(ctx, remoteAddrs); err != nil { + return err + } log.Infof("Remote version %s", v) diff --git a/go.mod b/go.mod index 29d51b76a..01fa1e6c3 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,12 @@ go 1.12 require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 github.com/BurntSushi/toml v0.3.1 - github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect - github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 github.com/filecoin-project/go-sectorbuilder v0.0.0-00010101000000-000000000000 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/gorilla/websocket v1.4.0 - github.com/ipfs/go-bitswap v0.1.5 + github.com/ipfs/go-bitswap v0.1.6-0.20190808170517-167327fc3c5e github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.2 github.com/ipfs/go-car v0.0.1 @@ -62,7 +60,6 @@ require ( github.com/multiformats/go-multihash v0.0.6 github.com/pkg/errors v0.8.1 github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a - github.com/prometheus/common v0.6.0 github.com/smartystreets/assertions v1.0.1 // indirect github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index 599dd9db3..81115e8b3 100644 --- a/go.sum +++ b/go.sum @@ -14,16 +14,11 @@ github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cB github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= -github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apache/thrift v0.12.0 h1:pODnxUFNcjP9UTLZGTdeh+j16A8lJbRvD3rOtrk/7bs= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= -github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btcd v0.0.0-20190605094302-a0d1e3e36d50 h1:4i3KsuVA0o0KoBxAC5x+MY7RbteiMK1V7gf/G08NGIQ= @@ -77,7 +72,6 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJY github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -128,8 +122,8 @@ github.com/ipfs/go-bitswap v0.0.1/go.mod h1:z+tP3h+HTJ810n1R5yMy2ccKFffJ2F6Vqm/5 github.com/ipfs/go-bitswap v0.1.0/go.mod h1:FFJEf18E9izuCqUtHxbWEvq+reg7o4CW5wSAE1wsxj0= github.com/ipfs/go-bitswap v0.1.2/go.mod h1:qxSWS4NXGs7jQ6zQvoPY3+NmOfHHG47mhkiLzBpJQIs= github.com/ipfs/go-bitswap v0.1.3/go.mod h1:YEQlFy0kkxops5Vy+OxWdRSEZIoS7I7KDIwoa5Chkps= -github.com/ipfs/go-bitswap v0.1.5 h1:pgajlrTCFbbPgYJ234M1pssneLuIsVuxtfpx1I4cz3Y= -github.com/ipfs/go-bitswap v0.1.5/go.mod h1:TOWoxllhccevbWFUR2N7B1MTSVVge1s6XSMiCSA4MzM= +github.com/ipfs/go-bitswap v0.1.6-0.20190808170517-167327fc3c5e h1:LiCecZPwRrr6m91+HfyXiawmAT1/t7h9OXwoNjS6bjY= +github.com/ipfs/go-bitswap v0.1.6-0.20190808170517-167327fc3c5e/go.mod h1:TOWoxllhccevbWFUR2N7B1MTSVVge1s6XSMiCSA4MzM= github.com/ipfs/go-block-format v0.0.1/go.mod h1:DK/YYcsSUIVAFNwo/KZCdIIbpN0ROH/baNLgayt4pFc= github.com/ipfs/go-block-format v0.0.2 h1:qPDvcP19izTjU8rgo6p7gTXZlkMkF5bz5G3fqIsSCPE= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= @@ -157,8 +151,6 @@ github.com/ipfs/go-filestore v0.0.2 h1:pcYwpjtXXwirtbjBXKVJM9CTa9F7/8v1EkfnDaHTO github.com/ipfs/go-filestore v0.0.2/go.mod h1:KnZ41qJsCt2OX2mxZS0xsK3Psr0/oB93HMMssLujjVc= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= github.com/ipfs/go-fs-lock v0.0.1/go.mod h1:DNBekbboPKcxs1aukPSaOtFA3QfSdi5C855v0i9XJ8Y= -github.com/ipfs/go-hamt-ipld v0.0.10 h1:jmJGsV/8OPpBEmO+b1nAPpqX8SG2kLeYveKk8F7IxG4= -github.com/ipfs/go-hamt-ipld v0.0.10/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk= github.com/ipfs/go-hamt-ipld v0.0.11 h1:iUHlbycdlheWf7QLU3FjHonK2lEnd+/85SeM5gvcUZE= github.com/ipfs/go-hamt-ipld v0.0.11/go.mod h1:WrX60HHX2SeMb602Z1s9Ztnf/4fzNHzwH9gxNTVpEmk= github.com/ipfs/go-ipfs-blockstore v0.0.1 h1:O9n3PbmTYZoNhkgkEyrXTznbmktIXif62xLX+8dPHzc= @@ -229,7 +221,6 @@ github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr1 github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= -github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -412,8 +403,6 @@ github.com/minio/sha256-simd v0.1.0/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= @@ -465,21 +454,13 @@ github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14 h1:2m16U/rLwVaRdz7A github.com/polydawn/refmt v0.0.0-20190408063855-01bf1e26dd14/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a h1:TdavzKWkPcC2G+6rKJclm/JfrWC6WZFfLUR7EJJX8MA= github.com/polydawn/refmt v0.0.0-20190731040541-eff0b363297a/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= -github.com/polydawn/refmt v0.0.0-20190804001829-26ba426d088b h1:JWrXOvqGFU2mv58NZSqEinWkezjkcGam1jNKSIV5Meg= -github.com/polydawn/refmt v0.0.0-20190804001829-26ba426d088b/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= -github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= -github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= -github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= -github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= @@ -492,7 +473,6 @@ github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2 github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI= github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= @@ -598,7 +578,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= @@ -631,8 +610,6 @@ golang.org/x/sys v0.0.0-20190610200419-93c9922d18ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190730183949-1393eb018365 h1:SaXEMXhWzMJThc05vu6uh61Q245r4KaWMrsTedk0FDc= golang.org/x/sys v0.0.0-20190730183949-1393eb018365/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= -golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -644,7 +621,6 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190806215303-88ddfcebc769/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 63cc10988..28fd72d22 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -5,10 +5,11 @@ import ( "encoding/binary" "unsafe" - "github.com/filecoin-project/go-lotus/chain/address" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" logging "github.com/ipfs/go-log" + + "github.com/filecoin-project/go-lotus/chain/address" ) var log = logging.Logger("sectorbuilder") diff --git a/node/builder.go b/node/builder.go index cf32623c4..dae50da06 100644 --- a/node/builder.go +++ b/node/builder.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/deals" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/wallet" @@ -71,6 +72,9 @@ const ( HandleIncomingBlocksKey HandleIncomingMessagesKey + RunDealClientKey + HandleDealsKey + // daemon ExtractApiKey @@ -207,12 +211,20 @@ func Online() Option { Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + + Override(new(*deals.Client), deals.NewClient), + Override(RunDealClientKey, modules.RunDealClient), ), // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*storage.Miner), modules.StorageMiner), + + Override(new(dtypes.StagingDAG), modules.StagingDAG), + + Override(new(*deals.Handler), deals.NewHandler), + Override(HandleDealsKey, modules.HandleDeals), ), ) } diff --git a/node/impl/full.go b/node/impl/full.go index d9467a403..3fb40eb84 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -5,12 +5,11 @@ import ( "fmt" "strconv" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/deals" "github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/state" "github.com/filecoin-project/go-lotus/chain/store" @@ -24,7 +23,9 @@ import ( hamt "github.com/ipfs/go-hamt-ipld" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "golang.org/x/xerrors" ) var log = logging.Logger("node") @@ -34,10 +35,37 @@ type FullNodeAPI struct { CommonAPI - Chain *store.ChainStore - PubSub *pubsub.PubSub - Mpool *chain.MessagePool - Wallet *wallet.Wallet + DealClient *deals.Client + Chain *store.ChainStore + PubSub *pubsub.PubSub + Mpool *chain.MessagePool + Wallet *wallet.Wallet +} + +func (a *FullNodeAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) { + self, err := a.WalletDefaultAddress(ctx) + if err != nil { + return nil, err + } + + msg := &types.Message{ + To: miner, + From: miner, + Method: actors.MAMethods.GetPeerID, + } + + r, err := a.ChainCall(ctx, msg, nil) + if err != nil { + return nil, err + } + pid, err := peer.IDFromBytes(r.Return) + if err != nil { + return nil, err + } + + total := types.BigMul(price, types.NewInt(blocksDuration)) + c, err := a.DealClient.Start(ctx, data, total, self, miner, pid, blocksDuration) + return &c, err } func (a *FullNodeAPI) ChainNotify(ctx context.Context) (<-chan *store.HeadChange, error) { @@ -148,6 +176,12 @@ func (a *FullNodeAPI) ChainCall(ctx context.Context, msg *types.Message, ts *typ if msg.Value == types.EmptyInt { msg.Value = types.NewInt(0) } + if msg.Params == nil { + msg.Params, err = actors.SerializeParams(struct{}{}) + if err != nil { + return nil, err + } + } // TODO: maybe just use the invoker directly? ret, err := vmi.ApplyMessage(ctx, msg) diff --git a/node/modules/chain.go b/node/modules/chain.go index 1de38d4fa..6ca34cfb3 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -23,7 +23,9 @@ import ( ) func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainExchange { - bitswapNetwork := network.NewFromIpfsHost(host, rt) + // prefix protocol for chain bitswap + // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) + bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { diff --git a/node/modules/client.go b/node/modules/client.go index 401e19231..0ddd5b7d2 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,6 +2,11 @@ package modules import ( "context" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/routing" "path/filepath" "github.com/ipfs/go-blockservice" @@ -9,7 +14,6 @@ import ( "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-filestore" blockstore "github.com/ipfs/go-ipfs-blockstore" - offline "github.com/ipfs/go-ipfs-exchange-offline" "github.com/ipfs/go-merkledag" "go.uber.org/fx" @@ -32,9 +36,13 @@ func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) { return filestore.NewFilestore(bs, fm), nil } -func ClientDAG(lc fx.Lifecycle, fstore dtypes.ClientFilestore) dtypes.ClientDAG { +func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore, rt routing.Routing, h host.Host) dtypes.ClientDAG { ibs := blockstore.NewIdStore((*filestore.Filestore)(fstore)) - bsvc := blockservice.New(ibs, offline.Exchange(ibs)) + + bitswapNetwork := network.NewFromIpfsHost(h, rt) + exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs) + + bsvc := blockservice.New(ibs, exch) dag := merkledag.NewDAGService(bsvc) lc.Append(fx.Hook{ diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 46418b689..a33f68c89 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -22,3 +22,5 @@ type ChainBlockService bserv.BlockService type ClientFilestore *filestore.Filestore type ClientDAG ipld.DAGService + +type StagingDAG ipld.DAGService diff --git a/node/modules/services.go b/node/modules/services.go index 099b9e125..653ab91a7 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -1,12 +1,15 @@ package modules import ( + "context" + "github.com/libp2p/go-libp2p-core/host" inet "github.com/libp2p/go-libp2p-core/network" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/deals" "github.com/filecoin-project/go-lotus/chain/sub" "github.com/filecoin-project/go-lotus/node/hello" "github.com/filecoin-project/go-lotus/node/modules/helpers" @@ -53,3 +56,16 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pu go sub.HandleIncomingMessages(ctx, mpool, msgsub) } + +func RunDealClient(lc fx.Lifecycle, c *deals.Client) { + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + c.Run() + return nil + }, + OnStop: func(context.Context) error { + c.Stop() + return nil + }, + }) +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 1ff4ed221..b1ac6a17a 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,19 +2,27 @@ package modules import ( "context" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + "github.com/libp2p/go-libp2p-core/routing" "path/filepath" + "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" "github.com/mitchellh/go-homedir" "go.uber.org/fx" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/deals" "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/dtypes" "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/storage" ) @@ -94,3 +102,43 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h return sm, nil } + +func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Handler) { + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + h.Run(ctx) + host.SetStreamHandler(deals.ProtocolID, h.HandleStream) + return nil + }, + OnStop: func(context.Context) error { + h.Stop() + return nil + }, + }) +} + +func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) { + stagingds, err := r.Datastore("/staging") + if err != nil { + return nil, err + } + + bs := blockstore.NewBlockstore(stagingds) + ibs := blockstore.NewIdStore(bs) + + bitswapNetwork := network.NewFromIpfsHost(h, rt) + exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) + + bsvc := blockservice.New(ibs, exch) + dag := merkledag.NewDAGService(bsvc) + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return bsvc.Close() + }, + }) + + return dag, nil +}