From 25dbdd761a03775f88ad8d9f3fd74033febe781d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 7 Aug 2019 20:57:48 +0200 Subject: [PATCH] deals: Rewrite handler with error handling --- chain/deals/handler.go | 291 +++++++++++----------------------- chain/deals/handler_states.go | 100 ++++++++++++ chain/deals/handler_utils.go | 99 ++++++++++++ chain/deals/state_store.go | 31 ++-- chain/deals/utils.go | 28 ++++ 5 files changed, 335 insertions(+), 214 deletions(-) create mode 100644 chain/deals/handler_states.go create mode 100644 chain/deals/handler_utils.go create mode 100644 chain/deals/utils.go diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 741652a03..20f31967e 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -2,23 +2,16 @@ package deals import ( "context" + "math" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/wallet" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + "github.com/filecoin-project/go-lotus/node/modules/dtypes" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - files "github.com/ipfs/go-ipfs-files" - "github.com/ipfs/go-merkledag" - unixfile "github.com/ipfs/go-unixfs/file" - "io" - "io/ioutil" - "math" - "os" - - "github.com/filecoin-project/go-lotus/chain/wallet" - "github.com/filecoin-project/go-lotus/lib/cborrpc" - "github.com/filecoin-project/go-lotus/node/modules/dtypes" - cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -35,6 +28,8 @@ type MinerDeal struct { State DealState Ref cid.Cid + + s inet.Stream } type Handler struct { @@ -46,18 +41,20 @@ type Handler struct { dag dtypes.StagingDAG deals StateStore - - incoming chan MinerDeal + conns map[cid.Cid]inet.Stream actor address.Address - stop chan struct{} - stopped chan struct{} + incoming chan MinerDeal + updated chan dealUpdate + stop chan struct{} + stopped chan struct{} } -type fetchResult struct { - id cid.Cid - err error +type dealUpdate struct { + newState DealState + id cid.Cid + err error } func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.SectorBuilder, dag dtypes.StagingDAG) (*Handler, error) { @@ -75,7 +72,12 @@ func NewHandler(w *wallet.Wallet, ds dtypes.MetadataDS, sb *sectorbuilder.Sector sb: sb, dag: dag, + conns: map[cid.Cid]inet.Stream{}, + incoming: make(chan MinerDeal), + updated: make(chan dealUpdate), + stop: make(chan struct{}), + stopped: make(chan struct{}), actor: minerAddress, @@ -87,117 +89,13 @@ func (h *Handler) Run(ctx context.Context) { go func() { defer log.Error("quitting deal handler loop") defer close(h.stopped) - fetched := make(chan fetchResult) for { select { - case deal := <-h.incoming: - log.Info("incoming deal") - - if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { - // TODO: This can happen when client re-sends proposal - log.Errorf("deal tracking failed: %s", err) - continue - } - - go func(id cid.Cid) { - log.Info("fetching data for a deal") - err := merkledag.FetchGraph(ctx, deal.Ref, h.dag) - select { - case fetched <- fetchResult{ - id: id, - err: err, - }: - case <-h.stop: - } - }(deal.ProposalCid) - case result := <-fetched: - if result.err != nil { - log.Errorf("failed to fetch data for deal: %s", result.err) - // TODO: fail deal - } - - // TODO: send response if client still there - // TODO: staging - - // TODO: async - log.Info("sealing deal") - - var deal MinerDeal - err := h.deals.MutateMiner(result.id, func(in MinerDeal) (MinerDeal, error) { - in.State = Sealing - deal = in - return in, nil - }) - if err != nil { - // TODO: fail deal - log.Errorf("deal tracking failed (set sealing): %s", err) - continue - } - - root, err := h.dag.Get(ctx, deal.Ref) - if err != nil { - // TODO: fail deal - log.Errorf("failed to get file root for deal: %s", err) - continue - } - - // TODO: abstract this away into ReadSizeCloser + implement different modes - n, err := unixfile.NewUnixfsFile(ctx, h.dag, root) - if err != nil { - // TODO: fail deal - log.Errorf("cannot open unixfs file: %s", err) - continue - } - - uf, ok := n.(files.File) - if !ok { - // TODO: we probably got directory, how should we handle this in unixfs mode? - log.Errorf("unsupported unixfs type") - // TODO: fail deal - continue - } - - size, err := uf.Size() - if err != nil { - log.Errorf("failed to get file size: %s", err) - // TODO: fail deal - continue - } - - ////////////// - - f, err := ioutil.TempFile(os.TempDir(), "piece-temp-") - if err != nil { - log.Error(err) - // TODO: fail deal - continue - } - if _, err := io.Copy(f, uf); err != nil { - log.Error(err) - // TODO: fail deal - continue - } - if err := f.Close(); err != nil { - log.Error(err) - // TODO: fail deal - continue - } - sectorID, err := h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f.Name()) - if err != nil { - // TODO: fail deal - log.Errorf("AddPiece failed: %s", err) - continue - } - if err := os.Remove(f.Name()); err != nil { - log.Error(err) - // TODO: fail deal - continue - } - - log.Warnf("New Sector: %d", sectorID) - - // TODO: update state, tell client + case deal := <-h.incoming: // Accepted + h.onIncoming(deal) + case update := <-h.updated: // Staged + h.onUpdated(ctx, update) case <-h.stop: return } @@ -205,100 +103,97 @@ func (h *Handler) Run(ctx context.Context) { }() } -func (h *Handler) HandleStream(s inet.Stream) { - defer s.Close() +func (h *Handler) onIncoming(deal MinerDeal) { + log.Info("incoming deal") - log.Info("Handling storage deal proposal!") + h.conns[deal.ProposalCid] = deal.s - var proposal SignedStorageDealProposal - if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { - log.Errorw("failed to read proposal message", "error", err) + if err := h.deals.Begin(deal.ProposalCid, deal); err != nil { + // This can happen when client re-sends proposal + h.failDeal(deal.ProposalCid, err) + log.Errorf("deal tracking failed: %s", err) return } - // TODO: Validate proposal maybe - // (and signature, obviously) + go func() { + h.updated <- dealUpdate{ + newState: Accepted, + id: deal.ProposalCid, + err: nil, + } + }() +} - if proposal.Proposal.MinerAddress != h.actor { - log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress) - // TODO: send error +func (h *Handler) onUpdated(ctx context.Context, update dealUpdate) { + log.Infof("Deal %s updated state to %d", update.id, update.newState) + if update.err != nil { + log.Errorf("deal %s failed: %s", update.id, update.err) + h.failDeal(update.id, update.err) + return + } + var deal MinerDeal + err := h.deals.MutateMiner(update.id, func(d *MinerDeal) error { + d.State = update.newState + deal = *d + return nil + }) + if err != nil { + h.failDeal(update.id, err) return } - switch proposal.Proposal.SerializationMode { - //case SerializationRaw: - //case SerializationIPLD: - case SerializationUnixFs: - default: - log.Errorf("deal proposal with unsupported serialization: %s", proposal.Proposal.SerializationMode) - // TODO: send error - return + switch update.newState { + case Accepted: + h.handle(ctx, deal, h.accept, Staged) + case Staged: + h.handle(ctx, deal, h.staged, Sealing) + case Sealing: + log.Error("TODO") } +} +func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDeal, error) { // TODO: Review: Not signed? - proposalNd, err := cbor.WrapObject(proposal.Proposal, math.MaxUint64, -1) + proposalNd, err := cbor.WrapObject(proposal, math.MaxUint64, -1) if err != nil { - log.Error(err) - return + return MinerDeal{}, err } - response := StorageDealResponse{ - State: Accepted, - Message: "", - Proposal: proposalNd.Cid(), - } - - msg, err := cbor.DumpObject(response) + ref, err := cid.Parse(proposal.PieceRef) if err != nil { - log.Errorw("failed to serialize response message", "error", err) - return + return MinerDeal{}, err } - def, err := h.w.ListAddrs() - if err != nil { - log.Error(err) - return - } - if len(def) != 1 { - // NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor - // TODO: implement with GetWorker on the miner actor - log.Errorf("Expected only 1 address in wallet, got %d", len(def)) - return - } - - sig, err := h.w.Sign(def[0], msg) - if err != nil { - log.Errorw("failed to sign response message", "error", err) - return - } - - log.Info("accepting deal") - - signedResponse := &SignedStorageDealResponse{ - Response: response, - Signature: sig, - } - if err := cborrpc.WriteCborRPC(s, signedResponse); err != nil { - log.Errorw("failed to write deal response", "error", err) - return - } - - ref, err := cid.Parse(proposal.Proposal.PieceRef) - if err != nil { - log.Error(err) - return - } - - log.Info("processing deal") - - h.incoming <- MinerDeal{ + return MinerDeal{ Client: s.Conn().RemotePeer(), - Proposal: proposal.Proposal, + Proposal: proposal, ProposalCid: proposalNd.Cid(), - State: Accepted, + State: Unknown, Ref: ref, + + s: s, + }, nil +} + +func (h *Handler) HandleStream(s inet.Stream) { + log.Info("Handling storage deal proposal!") + + proposal, err := h.readProposal(s) + if err != nil { + log.Error(err) + s.Close() + return } + + deal, err := h.newDeal(s, proposal.Proposal) + if err != nil { + log.Error(err) + s.Close() + return + } + + h.incoming <- deal } func (h *Handler) Stop() { diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go new file mode 100644 index 000000000..620d43201 --- /dev/null +++ b/chain/deals/handler_states.go @@ -0,0 +1,100 @@ +package deals + +import ( + "context" + + files "github.com/ipfs/go-ipfs-files" + "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" + "golang.org/x/xerrors" +) + +type handlerFunc func(ctx context.Context, deal MinerDeal) error + +func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb handlerFunc, next DealState) { + go func() { + err := cb(ctx, deal) + select { + case h.updated <- dealUpdate{ + newState: next, + id: deal.ProposalCid, + err: err, + }: + case <-h.stop: + } + }() +} + +// ACCEPTED + +func (h *Handler) accept(ctx context.Context, deal MinerDeal) error { + log.Info("acc") + switch deal.Proposal.SerializationMode { + //case SerializationRaw: + //case SerializationIPLD: + case SerializationUnixFs: + default: + return xerrors.Errorf("deal proposal with unsupported serialization: %s", deal.Proposal.SerializationMode) + } + + log.Info("fetching data for a deal") + err := h.sendSignedResponse(StorageDealResponse{ + State: Accepted, + Message: "", + Proposal: deal.ProposalCid, + }) + if err != nil { + return err + } + + return merkledag.FetchGraph(ctx, deal.Ref, h.dag) +} + +// STAGED + +func (h *Handler) staged(ctx context.Context, deal MinerDeal) error { + err := h.sendSignedResponse(StorageDealResponse{ + State: Staged, + Message: "", + Proposal: deal.ProposalCid, + }) + if err != nil { + return err + } + + root, err := h.dag.Get(ctx, deal.Ref) + if err != nil { + return xerrors.Errorf("failed to get file root for deal: %s", err) + } + + // TODO: abstract this away into ReadSizeCloser + implement different modes + n, err := unixfile.NewUnixfsFile(ctx, h.dag, root) + if err != nil { + return xerrors.Errorf("cannot open unixfs file: %s", err) + } + + uf, ok := n.(files.File) + if !ok { + // we probably got directory, unsupported for now + return xerrors.Errorf("unsupported unixfs type") + } + + size, err := uf.Size() + if err != nil { + return xerrors.Errorf("failed to get file size: %s", err) + } + + var sectorID uint64 + err = withTemp(uf, func(f string) (err error) { + sectorID, err = h.sb.AddPiece(deal.Proposal.PieceRef, uint64(size), f) + return err + }) + if err != nil { + return xerrors.Errorf("AddPiece failed: %s", err) + } + + log.Warnf("New Sector: %d", sectorID) + return nil +} + +// SEALING diff --git a/chain/deals/handler_utils.go b/chain/deals/handler_utils.go new file mode 100644 index 000000000..00e70fe69 --- /dev/null +++ b/chain/deals/handler_utils.go @@ -0,0 +1,99 @@ +package deals + +import ( + "runtime" + + "github.com/filecoin-project/go-lotus/lib/cborrpc" + cbor "github.com/ipfs/go-ipld-cbor" + inet "github.com/libp2p/go-libp2p-core/network" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +func (h *Handler) failDeal(id cid.Cid, cerr error) { + if err := h.deals.End(id); err != nil { + log.Warnf("deals.End: %s", err) + } + + if cerr == nil { + _, f, l, _ := runtime.Caller(1) + cerr = xerrors.Errorf("unknown error (fail called at %s:%d)", f, l) + } + + log.Error("Deal %s failed: %s", id, cerr) + + err := h.sendSignedResponse(StorageDealResponse{ + State: Failed, + Message: cerr.Error(), + Proposal: id, + }) + + s, ok := h.conns[id] + if ok { + _ = s.Close() + delete(h.conns, id) + } + + if err != nil { + log.Warnf("notifying client about deal failure: %s") + } +} + +func (h *Handler) readProposal(s inet.Stream) (proposal SignedStorageDealProposal, err error) { + if err := cborrpc.ReadCborRPC(s, &proposal); err != nil { + log.Errorw("failed to read proposal message", "error", err) + return SignedStorageDealProposal{}, err + } + + // TODO: Validate proposal maybe + // (and signature, obviously) + + if proposal.Proposal.MinerAddress != h.actor { + log.Errorf("proposal with wrong MinerAddress: %s", proposal.Proposal.MinerAddress) + return SignedStorageDealProposal{}, err + } + + return +} + +func (h *Handler) sendSignedResponse(resp StorageDealResponse) error { + s, ok := h.conns[resp.Proposal] + if !ok { + return xerrors.New("couldn't send response: not connected") + } + + msg, err := cbor.DumpObject(&resp) + if err != nil { + return xerrors.Errorf("serializing response: %w", err) + } + + def, err := h.w.ListAddrs() + if err != nil { + log.Error(err) + return xerrors.Errorf("listing wallet addresses: %w", err) + } + if len(def) != 1 { + // NOTE: If this ever happens for a good reason, implement this with GetWorker on the miner actor + // TODO: implement with GetWorker on the miner actor + return xerrors.Errorf("Expected only 1 address in wallet, got %d", len(def)) + } + + sig, err := h.w.Sign(def[0], msg) + if err != nil { + return xerrors.Errorf("failed to sign response message: %w", err) + } + + signedResponse := SignedStorageDealResponse{ + Response: resp, + Signature: sig, + } + + err = cborrpc.WriteCborRPC(s, signedResponse) + if err != nil { + // Assume client disconnected + s.Close() + delete(h.conns, resp.Proposal) + } + return err +} diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go index 93e13b53b..9f4dfd5ff 100644 --- a/chain/deals/state_store.go +++ b/chain/deals/state_store.go @@ -11,7 +11,7 @@ type StateStore struct { ds datastore.Datastore } -func (st *StateStore) Begin(i cid.Cid, s interface{}) error { +func (st *StateStore) Begin(i cid.Cid, state interface{}) error { k := datastore.NewKey(i.String()) has, err := st.ds.Has(k) if err != nil { @@ -21,7 +21,8 @@ func (st *StateStore) Begin(i cid.Cid, s interface{}) error { // TODO: uncomment after deals work //return xerrors.Errorf("Already tracking state for %s", i) } - b, err := cbor.DumpObject(s) + + b, err := cbor.DumpObject(state) if err != nil { return err } @@ -43,45 +44,43 @@ func (st *StateStore) End(i cid.Cid) error { // When this gets used anywhere else, migrate to reflect -func (st *StateStore) MutateMiner(i cid.Cid, mutator func(MinerDeal) (MinerDeal, error)) error { +func (st *StateStore) MutateMiner(i cid.Cid, mutator func(*MinerDeal) error) error { return st.mutate(i, minerMutator(mutator)) } -func minerMutator(m func(MinerDeal) (MinerDeal, error)) func([]byte) ([]byte, error) { +func minerMutator(m func(*MinerDeal) error) func([]byte) ([]byte, error) { return func(in []byte) ([]byte, error) { - var cur MinerDeal - err := cbor.DecodeInto(in, &cur) + var deal MinerDeal + err := cbor.DecodeInto(in, &deal) if err != nil { return nil, err } - mutated, err := m(cur) - if err != nil { + if err := m(&deal); err != nil { return nil, err } - return cbor.DumpObject(mutated) + return cbor.DumpObject(deal) } } -func (st *StateStore) MutateClient(i cid.Cid, mutator func(ClientDeal) (ClientDeal, error)) error { +func (st *StateStore) MutateClient(i cid.Cid, mutator func(*ClientDeal) error) error { return st.mutate(i, clientMutator(mutator)) } -func clientMutator(m func(ClientDeal) (ClientDeal, error)) func([]byte) ([]byte, error) { +func clientMutator(m func(*ClientDeal) error) func([]byte) ([]byte, error) { return func(in []byte) ([]byte, error) { - var cur ClientDeal - err := cbor.DecodeInto(in, &cur) + var deal ClientDeal + err := cbor.DecodeInto(in, &deal) if err != nil { return nil, err } - mutated, err := m(cur) - if err != nil { + if err := m(&deal); err != nil { return nil, err } - return cbor.DumpObject(mutated) + return cbor.DumpObject(deal) } } diff --git a/chain/deals/utils.go b/chain/deals/utils.go new file mode 100644 index 000000000..a23bafe05 --- /dev/null +++ b/chain/deals/utils.go @@ -0,0 +1,28 @@ +package deals + +import ( + "io" + "io/ioutil" + "os" +) + +func withTemp(r io.Reader, cb func(string) error) error { + f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-") + if err != nil { + return err + } + if _, err := io.Copy(f, r); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + + err = cb(f.Name()) + if err != nil { + os.Remove(f.Name()) + return err + } + + return os.Remove(f.Name()) +}