diff --git a/chain/deals/client.go b/chain/deals/client.go index 1752f0aaf..7359d7c1b 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,6 +2,7 @@ package deals import ( "context" + "io" "io/ioutil" "math" "os" @@ -10,8 +11,10 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" + files "github.com/ipfs/go-ipfs-files" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" + unixfile "github.com/ipfs/go-unixfs/file" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" @@ -24,6 +27,10 @@ import ( "github.com/filecoin-project/go-lotus/node/modules/dtypes" ) +func init() { + cbor.RegisterCborType(ClientDeal{}) +} + var log = logging.Logger("deals") const ProtocolID = "/fil/storage/mk/1.0.0" @@ -44,6 +51,7 @@ type Client struct { cs *store.ChainStore h host.Host w *wallet.Wallet + dag dtypes.ClientDAG deals StateStore @@ -53,11 +61,12 @@ type Client struct { stopped chan struct{} } -func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS) *Client { +func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG) *Client { c := &Client{ cs: cs, h: h, w: w, + dag: dag, deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, @@ -94,12 +103,29 @@ func (c *Client) Run() { func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigInt, from address.Address, miner address.Address, minerID peer.ID, blocksDuration uint64) (cid.Cid, error) { // TODO: Eww + root, err := c.dag.Get(ctx, data) + if err != nil { + log.Errorf("failed to get file root for deal: %s", err) + return cid.Undef, err + } + + n, err := unixfile.NewUnixfsFile(ctx, c.dag, root) + if err != nil { + log.Errorf("cannot open unixfs file: %s", err) + return cid.Undef, err + } + + uf, ok := n.(files.File) + if !ok { + // TODO: we probably got directory, how should we handle this in unixfs mode? + return cid.Undef, xerrors.New("unsupported unixfs type") + } + f, err := ioutil.TempFile(os.TempDir(), "commP-temp-") if err != nil { return cid.Undef, err } - _, err = f.Write([]byte("hello\n")) - if err != nil { + if _, err := io.Copy(f, uf); err != nil { return cid.Undef, err } if err := f.Close(); err != nil { @@ -117,8 +143,8 @@ func (c *Client) Start(ctx context.Context, data cid.Cid, totalPrice types.BigIn // TODO: use data proposal := StorageDealProposal{ - PieceRef: "bafkqabtimvwgy3yk", // identity 'hello\n' - SerializationMode: SerializationRaw, + PieceRef: data.String(), + SerializationMode: SerializationUnixFs, CommP: commP[:], Size: 6, TotalPrice: totalPrice, diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 82e51466c..9e94d527c 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -20,6 +21,10 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +func init() { + cbor.RegisterCborType(MinerDeal{}) +} + type MinerDeal struct { Client peer.ID Proposal StorageDealProposal @@ -41,21 +46,38 @@ type Handler struct { incoming chan MinerDeal + actor address.Address + stop chan struct{} stopped chan struct{} } -func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) *Handler { +func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) { + addr, err := ds.Get(datastore.NewKey("miner-address")) + if err != nil { + return nil, err + } + minerAddress, err := address.NewFromBytes(addr) + if err != nil { + return nil, err + } + return &Handler{ w: w, + sb: sb, dag: dag, + incoming: make(chan MinerDeal), + + actor: minerAddress, + deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))}, - } + }, nil } func (h *Handler) Run(ctx context.Context) { go func() { + defer log.Error("quitting deal handler loop") defer close(h.stopped) fetched := make(chan cid.Cid) @@ -67,7 +89,7 @@ func (h *Handler) Run(ctx context.Context) { if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { // TODO: This can happen when client re-sends proposal log.Errorf("deal tracking failed: %s", err) - return + continue } go func(id cid.Cid) { @@ -104,7 +126,7 @@ func (h *Handler) Run(ctx context.Context) { if err != nil { // TODO: fail deal log.Errorf("failed to get file root for deal: %s", err) - return + continue } // TODO: abstract this away into ReadSizeCloser + implement different modes @@ -112,7 +134,7 @@ func (h *Handler) Run(ctx context.Context) { if err != nil { // TODO: fail deal log.Errorf("cannot open unixfs file: %s", err) - return + continue } f, ok := n.(files.File) @@ -127,7 +149,7 @@ func (h *Handler) Run(ctx context.Context) { if err != nil { log.Errorf("failed to get file size: %s", err) // TODO: fail deal - return + continue } // TODO: can we use pipes? @@ -135,7 +157,7 @@ func (h *Handler) Run(ctx context.Context) { if err != nil { // TODO: fail deal log.Errorf("AddPiece failed: %s", err) - return + continue } log.Warnf("New Sector: %d", sectorID) @@ -162,6 +184,12 @@ func (h *Handler) HandleStream(s inet.Stream) { // TODO: Validate proposal maybe // (and signature, obviously) + if proposal.Proposal.MinerAddress != h.actor { + log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress) + // TODO: send error + return + } + switch proposal.Proposal.SerializationMode { //case SerializationRaw: //case SerializationIPLD: @@ -175,6 +203,7 @@ func (h *Handler) HandleStream(s inet.Stream) { // TODO: Review: Not signed? proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1) if err != nil { + log.Error(err) return } @@ -189,12 +218,27 @@ func (h *Handler) HandleStream(s inet.Stream) { log.Errorw("failed to serialize response message", "error", err) return } - sig, err := h.w.Sign(proposal.Proposal.MinerAddress, msg) + + def, err := h.w.ListAddrs() + if err != nil { + log.Error(err) + return + } + if len(def) != 1 { + // NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor + // TODO: implement with GetWorker on the miner actor + log.Errorf("Expected only 1 address in wallet, got %d", len(def)) + return + } + + sig, err := h.w.Sign(def[0], msg) if err != nil { log.Errorw("failed to sign response message", "error", err) return } + log.Info("accepting deal") + signedResponse := &SignedStorageDealResponse{ Response: response, Signature: sig, @@ -206,9 +250,12 @@ func (h *Handler) HandleStream(s inet.Stream) { ref, err := cid.Parse(proposal.Proposal.PieceRef) if err != nil { + log.Error(err) return } + log.Info("processing deal") + h.incoming <- MinerDeal{ Client: s.Conn().RemotePeer(), Proposal: proposal.Proposal, @@ -218,3 +265,8 @@ func (h *Handler) HandleStream(s inet.Stream) { Ref: ref, } } + +func (h *Handler) Stop() { + close(h.stop) + <-h.stopped +} diff --git a/chain/deals/types.go b/chain/deals/types.go index be78670cb..589c5dd4f 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -89,7 +89,7 @@ type StorageDealResponse struct { PieceInclusionProof PieceInclusionProof // Complete - SectorCommitMessage cid.Cid + SectorCommitMessage *cid.Cid } type SignedStorageDealResponse struct { diff --git a/chain/wallet/wallet.go b/chain/wallet/wallet.go index a94ba69ef..0f6410960 100644 --- a/chain/wallet/wallet.go +++ b/chain/wallet/wallet.go @@ -38,6 +38,9 @@ func (w *Wallet) Sign(addr address.Address, msg []byte) (*types.Signature, error if err != nil { return nil, err } + if ki == nil { + return nil, xerrors.Errorf("signing using key '%s': %w", addr.String(), repo.ErrKeyNotFound) + } switch ki.Type { case types.KTSecp256k1: diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index e077cc22d..b1ac6a17a 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -103,8 +103,20 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h return sm, nil } -func HandleDeals(h host.Host, handler *deals.Handler) { - h.SetStreamHandler(deals.ProtocolID, handler.HandleStream) +func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Handler) { + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + h.Run(ctx) + host.SetStreamHandler(deals.ProtocolID, h.HandleStream) + return nil + }, + OnStop: func(context.Context) error { + h.Stop() + return nil + }, + }) } func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, rt routing.Routing, h host.Host) (dtypes.StagingDAG, error) {