From 549e7db12f3a86a1ad88f4aa57c6b1a163374b57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 2 Aug 2019 16:09:54 +0200 Subject: [PATCH] deals: Implement basic handshake logic --- chain/deals/client.go | 87 ++++++++++++++++--------- chain/deals/handler.go | 41 ++++++++++++ chain/deals/types.go | 100 +++++++++++++++++++++++++++++ cli/client.go | 41 ++++++++++++ lib/sectorbuilder/sectorbuilder.go | 4 ++ node/impl/full.go | 24 ++++++- 6 files changed, 266 insertions(+), 31 deletions(-) create mode 100644 chain/deals/handler.go create mode 100644 chain/deals/types.go diff --git a/chain/deals/client.go b/chain/deals/client.go index 4e46c5770..528495a05 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,22 +2,26 @@ package deals import ( "context" + "io/ioutil" + "os" "sync/atomic" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" - "github.com/filecoin-project/go-lotus/chain/vm" + "github.com/filecoin-project/go-lotus/lib/cborrpc" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" ) var log = logging.Logger("deals") +const ProtocolID = "/fil/storage/mk/1.0.0" + type DealStatus int const ( @@ -27,10 +31,13 @@ const ( type Deal struct { ID uint64 Status DealStatus + Miner peer.ID } type Client struct { cs *store.ChainStore + sb *sectorbuilder.SectorBuilder + h host.Host next uint64 deals map[uint64]Deal @@ -65,6 +72,7 @@ func (c *Client) Run() { case deal := <-c.incoming: log.Info("incoming deal") + // TODO: track in datastore c.deals[deal.ID] = deal case <-c.stop: @@ -74,49 +82,68 @@ func (c *Client) Run() { }() } -func (c *Client) Start(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) (uint64, error) { - // Getting PeerID - // TODO: Is there a nicer way? - - ts := c.cs.GetHeaviestTipSet() - state, err := c.cs.TipSetState(ts.Cids()) +func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (uint64, error) { + // TODO: Eww + f, err := ioutil.TempFile(os.TempDir(), "commP-temp-") if err != nil { return 0, err } - - vmi, err := vm.NewVM(state, ts.Height(), ts.Blocks()[0].Miner, c.cs) - if err != nil { - return 0, xerrors.Errorf("failed to set up vm: %w", err) - } - - msg := &types.Message{ - To: miner, - Method: actors.MAMethods.GetPeerID, - - Value: types.NewInt(0), - GasPrice: types.NewInt(0), - GasLimit: types.NewInt(10000000000), - } - - // TODO: maybe just use the invoker directly? - r, err := vmi.ApplyMessage(ctx, msg) + _, err = f.Write([]byte("hello\n")) if err != nil { return 0, err } - if r.ExitCode != 0 { - panic("TODO: do we error here?") + if err := f.Close(); err != nil { + return 0, err } - pid, err := peer.IDFromBytes(r.Return) + commP, err := c.sb.GeneratePieceCommitment(f.Name(), 6) if err != nil { return 0, err } + if err := os.Remove(f.Name()); err != nil { + return 0, err + } - log.Warnf("miner pid:%s", pid) + // TODO: use data + proposal := StorageDealProposal{ + PieceRef: "bafkqabtimvwgy3yk", // identity 'hello\n' + SerializationMode: SerializationRaw, + CommP: commP[:], + Size: 6, + TotalPrice: totalPrice, + Duration: blocksDuration, + Payment: nil, // TODO + MinerAddress: miner, + ClientAddress: from, + } + + s, err := c.h.NewStream(ctx, minerID, ProtocolID) + if err != nil { + return 0, err + } + defer s.Close() // TODO: not too soon? + + log.Info("Sending deal proposal") + + signedProposal := &SignedStorageDealProposal{ + Proposal: proposal, + Signature: nil, // TODO: SIGN! + } + + if err := cborrpc.WriteCborRPC(s, signedProposal); err != nil { + return 0, err + } + + var resp SignedStorageDealResponse + if err := cborrpc.ReadCborRPC(s, &resp); err != nil { + log.Errorw("failed to read StorageDealResponse message", "error", err) + return 0, err + } id := atomic.AddUint64(&c.next, 1) deal := Deal{ ID: id, Status: DealResolvingMiner, + Miner: minerID, } c.incoming <- deal diff --git a/chain/deals/handler.go b/chain/deals/handler.go new file mode 100644 index 000000000..887578897 --- /dev/null +++ b/chain/deals/handler.go @@ -0,0 +1,41 @@ +package deals + +import ( + "github.com/filecoin-project/go-lotus/lib/cborrpc" + inet "github.com/libp2p/go-libp2p-core/network" +) + +type Handler struct { +} + +func NewHandler() *Handler { + return &Handler{} +} + +func (h *Handler) HandleStream(s inet.Stream) { + defer s.Close() + + var proposal SignedStorageDealProposal + if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { + log.Errorw("failed to read proposal message", "error", err) + return + } + + // TODO: Validate proposal maybe + // (and signature, obviously) + + response := StorageDealResponse{ + State: Accepted, + Message: "", + Proposal: nil, // TODO + } + signedResponse := &SignedStorageDealResponse{ + Response: response, + Signature: nil, // TODO + } + if err := cborrpc.WriteCborRPC(s, signedResponse); err != nil { + log.Errorw("failed to write deal response", "error", err) + return + } + +} diff --git a/chain/deals/types.go b/chain/deals/types.go new file mode 100644 index 000000000..30091e785 --- /dev/null +++ b/chain/deals/types.go @@ -0,0 +1,100 @@ +package deals + +import ( + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + + "github.com/filecoin-project/go-lotus/chain/actors" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" +) + +func init() { + cbor.RegisterCborType(PaymentInfo{}) + cbor.RegisterCborType(StorageDealProposal{}) + cbor.RegisterCborType(SignedStorageDealProposal{}) + + cbor.RegisterCborType(PieceInclusionProof{}) + + cbor.RegisterCborType(StorageDealResponse{}) + cbor.RegisterCborType(SignedStorageDealResponse{}) +} + +type SerializationMode string + +const ( + SerializationUnixFs = "UnixFs" + SerializationRaw = "Raw" + SerializationIPLD = "IPLD" +) + +type DealState int + +const ( + Unknown = iota + Rejected + Accepted + Started + Failed + Staged + Sealing + Complete +) + +// TODO: this should probably be in a separate package with other paych utils +type PaymentInfo struct { + PayChActor address.Address + Payer address.Address + Channel uint64 // TODO: Not clear what that refers to, guessing something to do with multi-lane payments + ChannelMessage cid.Cid + + Vouchers []actors.SignedVoucher +} + +type StorageDealProposal struct { + PieceRef string // TODO: string per spec, but maybe should be a CID? + SerializationMode SerializationMode + CommP []byte + + Size uint64 // TODO: spec doesn't clearly specify the type + TotalPrice types.BigInt + Duration uint64 + + Payment PaymentInfo + + MinerAddress address.Address + ClientAddress address.Address +} + +type SignedStorageDealProposal struct { + Proposal StorageDealProposal + + Signature types.Signature +} + +// response +type PieceInclusionProof struct { + Position uint // todo: type? + ProofElements [32]byte +} + +// TODO: Spec says 'representation keyed', this is probably wrong +type StorageDealResponse struct { + State DealState + + // Rejected / Accepted / Failed / Staged + Message string + Proposal cid.Cid + + // Sealing + PieceInclusionProof PieceInclusionProof + + // Complete + SectorCommitMessage cid.Cid +} + +type SignedStorageDealResponse struct { + Response StorageDealResponse + + Signature types.Signature +} diff --git a/cli/client.go b/cli/client.go index b1b49e011..9ef4dbdda 100644 --- a/cli/client.go +++ b/cli/client.go @@ -2,8 +2,13 @@ package cli import ( "fmt" + "strconv" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/chain/address" ) var clientCmd = &cli.Command{ @@ -12,6 +17,7 @@ var clientCmd = &cli.Command{ Subcommands: []*cli.Command{ clientImportCmd, clientLocalCmd, + clientDealCmd, }, } @@ -54,3 +60,38 @@ var clientLocalCmd = &cli.Command{ return nil }, } + +var clientDealCmd = &cli.Command{ + Name: "deal", + Usage: "Initialize storage deal with a miner", + Action: func(cctx *cli.Context) error { + api, err := GetAPI(cctx) + if err != nil { + return err + } + ctx := ReqContext(cctx) + + if cctx.NArg() != 3 { + return xerrors.New("expected 3 args: dataCid, miner, duration") + } + + // [data, miner, dur] + + data, err := cid.Parse(cctx.Args().Get(0)) + if err != nil { + return err + } + + miner, err := address.NewFromString(cctx.Args().Get(1)) + if err != nil { + return err + } + + dur, err := strconv.ParseInt(cctx.Args().Get(2), 10, 32) + if err != nil { + return err + } + + return api.ClientStartDeal(ctx, data, miner, uint64(dur)) + }, +} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 63cc10988..dbaa7b921 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -88,6 +88,10 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]StagedSectorMetadata, error) { return sectorbuilder.GetAllStagedSectors(sb.handle) } +func (sb *SectorBuilder) GeneratePieceCommitment(piecePath string, pieceSize uint64) ([CommLen]byte, error) { + return sectorbuilder.GeneratePieceCommitment(piecePath, pieceSize) +} + func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSeed [CommLen]byte) ([][]byte, []uint64, error) { // Wait, this is a blocking method with no way of interrupting it? // does it checkpoint itself? diff --git a/node/impl/full.go b/node/impl/full.go index 3a3c6b157..e5c619a5d 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -25,6 +25,7 @@ import ( hamt "github.com/ipfs/go-hamt-ipld" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ) @@ -43,7 +44,22 @@ type FullNodeAPI struct { } func (a *FullNodeAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, blocksDuration uint64) error { - _, err := a.DealClient.Start(ctx, data, miner, blocksDuration) + msg := &types.Message{ + To: miner, + From: miner, // TODO: we need /something/ here, but this smells + Method: actors.MAMethods.GetPeerID, + } + + r, err := a.ChainCall(ctx, msg, nil) + if err != nil { + return err + } + pid, err := peer.IDFromBytes(r.Return) + if err != nil { + return err + } + + _, err = a.DealClient.Start(ctx, data, miner, pid, blocksDuration) return err } @@ -155,6 +171,12 @@ func (a *FullNodeAPI) ChainCall(ctx context.Context, msg *types.Message, ts *typ if msg.Value == types.EmptyInt { msg.Value = types.NewInt(0) } + if msg.Params == nil { + msg.Params, err = actors.SerializeParams(struct{}{}) + if err != nil { + return nil, err + } + } // TODO: maybe just use the invoker directly? ret, err := vmi.ApplyMessage(ctx, msg)