From 1fc7a4875968a4fbf0a5bb70fc788cecf153f1a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 10 Sep 2019 16:13:24 +0200 Subject: [PATCH] deals: API to list client deals --- api/api.go | 7 +++++++ api/struct.go | 5 +++++ api/types.go | 13 +++++++++++++ chain/deals/client.go | 29 +++++++++++++++++------------ chain/deals/client_states.go | 11 ++++++----- chain/deals/handler.go | 26 +++++++++++++------------- chain/deals/handler_states.go | 9 +++++---- chain/deals/handler_utils.go | 3 ++- chain/deals/state_store.go | 28 ++++++++++++++++++++++++++++ chain/deals/types.go | 22 +++++----------------- node/impl/full/client.go | 18 ++++++++++++++++++ 11 files changed, 119 insertions(+), 52 deletions(-) diff --git a/api/api.go b/api/api.go index 12ca3935c..faf239fd9 100644 --- a/api/api.go +++ b/api/api.go @@ -86,6 +86,7 @@ type FullNode interface { // ClientImport imports file under the specified path into filestore ClientImport(ctx context.Context, path string) (cid.Cid, error) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) + ClientListDeals(ctx context.Context) ([]DealInfo, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error @@ -161,6 +162,12 @@ type Import struct { Size uint64 } +type DealInfo struct { + ProposalCid cid.Cid + State DealState + Miner peer.ID +} + type MsgWait struct { InBlock cid.Cid Receipt types.MessageReceipt diff --git a/api/struct.go b/api/struct.go index 87898f853..df5fac094 100644 --- a/api/struct.go +++ b/api/struct.go @@ -70,6 +70,7 @@ type FullNodeStruct struct { ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` + ClientListDeals func(ctx context.Context) ([]DealInfo, error) `perm:"write"` ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` @@ -169,6 +170,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) } +func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]DealInfo, error) { + return c.Internal.ClientListDeals(ctx) +} + func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error { return c.Internal.ClientRetrieve(ctx, order, path) } diff --git a/api/types.go b/api/types.go index 449a3ae79..a88dedb02 100644 --- a/api/types.go +++ b/api/types.go @@ -6,6 +6,19 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +type DealState int + +const ( + DealUnknown = iota + DealRejected + DealAccepted + DealStarted + DealFailed + DealStaged + DealSealing + DealComplete +) + // TODO: check if this exists anywhere else type MultiaddrSlice []ma.Multiaddr diff --git a/chain/deals/client.go b/chain/deals/client.go index 7be56801e..25c5595bb 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/go-lotus/api" "math" "github.com/ipfs/go-cid" @@ -31,7 +32,7 @@ var log = logging.Logger("deals") type ClientDeal struct { ProposalCid cid.Cid Proposal StorageDealProposal - State DealState + State api.DealState Miner peer.ID s inet.Stream @@ -55,7 +56,7 @@ type Client struct { } type clientDealUpdate struct { - newState DealState + newState api.DealState id cid.Cid err error } @@ -116,7 +117,7 @@ func (c *Client) onIncoming(deal ClientDeal) { go func() { c.updated <- clientDealUpdate{ - newState: Unknown, + newState: api.DealUnknown, id: deal.ProposalCid, err: nil, } @@ -142,14 +143,14 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) { } switch update.newState { - case Unknown: // new - c.handle(ctx, deal, c.new, Accepted) - case Accepted: - c.handle(ctx, deal, c.accepted, Staged) - case Staged: - c.handle(ctx, deal, c.staged, Sealing) - case Sealing: - c.handle(ctx, deal, c.sealing, Complete) + case api.DealUnknown: // new + c.handle(ctx, deal, c.new, api.DealAccepted) + case api.DealAccepted: + c.handle(ctx, deal, c.accepted, api.DealStaged) + case api.DealStaged: + c.handle(ctx, deal, c.staged, api.DealSealing) + case api.DealSealing: + c.handle(ctx, deal, c.sealing, api.DealComplete) } } @@ -208,7 +209,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie deal := ClientDeal{ ProposalCid: proposalNd.Cid(), Proposal: proposal, - State: Unknown, + State: api.DealUnknown, Miner: p.MinerID, s: s, @@ -224,6 +225,10 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie }) } +func (c *Client) List() ([]ClientDeal, error) { + return c.deals.ListClient() +} + func (c *Client) Stop() { close(c.stop) <-c.stopped diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index 0139252e3..5196c4cbb 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/go-lotus/api" "golang.org/x/xerrors" @@ -11,7 +12,7 @@ import ( type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error -func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next DealState) { +func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) { go func() { err := cb(ctx, deal) select { @@ -31,7 +32,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error { return err } - if resp.State != Accepted { + if resp.State != api.DealAccepted { return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State) } @@ -48,7 +49,7 @@ func (c *Client) accepted(ctx context.Context, deal ClientDeal) error { return err } - if resp.State != Staged { + if resp.State != api.DealStaged { return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State) } @@ -65,7 +66,7 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) error { return err } - if resp.State != Sealing { + if resp.State != api.DealSealing { return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State) } @@ -88,7 +89,7 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) error { return err } - if resp.State != Complete { + if resp.State != api.DealComplete { return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State) } diff --git a/chain/deals/handler.go b/chain/deals/handler.go index c98cc7f4c..67fcc70de 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -26,11 +26,11 @@ type MinerDeal struct { Client peer.ID Proposal StorageDealProposal ProposalCid cid.Cid - State DealState + State api.DealState Ref cid.Cid - SectorID uint64 // Set when State >= Staged + SectorID uint64 // Set when State >= DealStaged s inet.Stream } @@ -57,7 +57,7 @@ type Handler struct { } type minerDealUpdate struct { - newState DealState + newState api.DealState id cid.Cid err error mut func(*MinerDeal) @@ -102,9 +102,9 @@ func (h *Handler) Run(ctx context.Context) { for { select { - case deal := <-h.incoming: // Accepted + case deal := <-h.incoming: // DealAccepted h.onIncoming(deal) - case update := <-h.updated: // Staged + case update := <-h.updated: // DealStaged h.onUpdated(ctx, update) case <-h.stop: return @@ -127,7 +127,7 @@ func (h *Handler) onIncoming(deal MinerDeal) { go func() { h.updated <- minerDealUpdate{ - newState: Accepted, + newState: api.DealAccepted, id: deal.ProposalCid, err: nil, } @@ -156,12 +156,12 @@ func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) { } switch update.newState { - case Accepted: - h.handle(ctx, deal, h.accept, Staged) - case Staged: - h.handle(ctx, deal, h.staged, Sealing) - case Sealing: - h.handle(ctx, deal, h.sealing, Complete) + case api.DealAccepted: + h.handle(ctx, deal, h.accept, api.DealStaged) + case api.DealStaged: + h.handle(ctx, deal, h.staged, api.DealSealing) + case api.DealSealing: + h.handle(ctx, deal, h.sealing, api.DealComplete) } } @@ -181,7 +181,7 @@ func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDea Client: s.Conn().RemotePeer(), Proposal: proposal, ProposalCid: proposalNd.Cid(), - State: Unknown, + State: api.DealUnknown, Ref: ref, diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 503b503a2..53499bc3b 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -3,6 +3,7 @@ package deals import ( "bytes" "context" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" cbor "github.com/ipfs/go-ipld-cbor" @@ -18,7 +19,7 @@ import ( type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) -func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next DealState) { +func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) { go func() { mut, err := cb(ctx, deal) select { @@ -118,7 +119,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), log.Info("fetching data for a deal") err := h.sendSignedResponse(StorageDealResponse{ - State: Accepted, + State: api.DealAccepted, Message: "", Proposal: deal.ProposalCid, }) @@ -133,7 +134,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { err := h.sendSignedResponse(StorageDealResponse{ - State: Staged, + State: api.DealStaged, Proposal: deal.ProposalCid, }) if err != nil { @@ -234,7 +235,7 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal) } err = h.sendSignedResponse(StorageDealResponse{ - State: Sealing, + State: api.DealSealing, Proposal: deal.ProposalCid, PieceInclusionProof: ip, CommD: status.CommD[:], diff --git a/chain/deals/handler_utils.go b/chain/deals/handler_utils.go index e5d652a5e..34e4093e7 100644 --- a/chain/deals/handler_utils.go +++ b/chain/deals/handler_utils.go @@ -2,6 +2,7 @@ package deals import ( "context" + "github.com/filecoin-project/go-lotus/api" "runtime" "github.com/filecoin-project/go-lotus/chain/actors" @@ -28,7 +29,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) { log.Errorf("deal %s failed: %s", id, cerr) err := h.sendSignedResponse(StorageDealResponse{ - State: Failed, + State: api.DealFailed, Message: cerr.Error(), Proposal: id, }) diff --git a/chain/deals/state_store.go b/chain/deals/state_store.go index 9f4dfd5ff..385843d10 100644 --- a/chain/deals/state_store.go +++ b/chain/deals/state_store.go @@ -3,6 +3,7 @@ package deals import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" cbor "github.com/ipfs/go-ipld-cbor" "golang.org/x/xerrors" ) @@ -106,3 +107,30 @@ func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er return st.ds.Put(k, mutated) } + +func (st *StateStore) ListClient() ([]ClientDeal, error) { + var out []ClientDeal + + res, err := st.ds.Query(query.Query{}) + if err != nil { + return nil, err + } + defer res.Close() + + for { + res, ok := res.NextSync() + if !ok { + break + } + + var deal ClientDeal + err := cbor.DecodeInto(res.Value, &deal) + if err != nil { + return nil, err + } + + out = append(out, deal) + } + + return out, nil +} diff --git a/chain/deals/types.go b/chain/deals/types.go index 5197024a0..532c0ade3 100644 --- a/chain/deals/types.go +++ b/chain/deals/types.go @@ -1,6 +1,7 @@ package deals import ( + "github.com/filecoin-project/go-lotus/api" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -29,19 +30,6 @@ const ( SerializationIPLD = "IPLD" ) -type DealState int - -const ( - Unknown = iota - Rejected - Accepted - Started - Failed - Staged - Sealing - Complete -) - type StorageDealProposal struct { PieceRef cid.Cid // TODO: port to spec SerializationMode SerializationMode @@ -71,17 +59,17 @@ type PieceInclusionProof struct { } type StorageDealResponse struct { - State DealState + State api.DealState - // Rejected / Accepted / Failed / Staged + // DealRejected / DealAccepted / DealFailed / DealStaged Message string Proposal cid.Cid - // Sealing + // DealSealing PieceInclusionProof PieceInclusionProof CommD []byte // TODO: not in spec - // Complete + // DealComplete SectorCommitMessage *cid.Cid } diff --git a/node/impl/full/client.go b/node/impl/full/client.go index c4d397e13..aedc428a9 100644 --- a/node/impl/full/client.go +++ b/node/impl/full/client.go @@ -117,6 +117,24 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add return &c, err } +func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) { + deals, err := a.DealClient.List() + if err != nil { + return nil, err + } + + out := make([]api.DealInfo, len(deals)) + for k, v := range deals { + out[k] = api.DealInfo{ + ProposalCid: v.ProposalCid, + State: v.State, + Miner: v.Miner, + } + } + + return out, nil +} + func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { // TODO: check if we have the ENTIRE dag