deals: API to list client deals

This commit is contained in:
Łukasz Magiera 2019-09-10 16:13:24 +02:00
parent 388e3ffa96
commit 1fc7a48759
11 changed files with 119 additions and 52 deletions

View File

@ -86,6 +86,7 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore // ClientImport imports file under the specified path into filestore
ClientImport(ctx context.Context, path string) (cid.Cid, error) ClientImport(ctx context.Context, path string) (cid.Cid, error)
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error)
ClientListDeals(ctx context.Context) ([]DealInfo, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error
@ -161,6 +162,12 @@ type Import struct {
Size uint64 Size uint64
} }
type DealInfo struct {
ProposalCid cid.Cid
State DealState
Miner peer.ID
}
type MsgWait struct { type MsgWait struct {
InBlock cid.Cid InBlock cid.Cid
Receipt types.MessageReceipt Receipt types.MessageReceipt

View File

@ -70,6 +70,7 @@ type FullNodeStruct struct {
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
ClientListDeals func(ctx context.Context) ([]DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"` ClientRetrieve func(ctx context.Context, order RetrievalOrder, path string) error `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -169,6 +170,10 @@ func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, mine
return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration) return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration)
} }
func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]DealInfo, error) {
return c.Internal.ClientListDeals(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error { func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error {
return c.Internal.ClientRetrieve(ctx, order, path) return c.Internal.ClientRetrieve(ctx, order, path)
} }

View File

@ -6,6 +6,19 @@ import (
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
type DealState int
const (
DealUnknown = iota
DealRejected
DealAccepted
DealStarted
DealFailed
DealStaged
DealSealing
DealComplete
)
// TODO: check if this exists anywhere else // TODO: check if this exists anywhere else
type MultiaddrSlice []ma.Multiaddr type MultiaddrSlice []ma.Multiaddr

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"math" "math"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -31,7 +32,7 @@ var log = logging.Logger("deals")
type ClientDeal struct { type ClientDeal struct {
ProposalCid cid.Cid ProposalCid cid.Cid
Proposal StorageDealProposal Proposal StorageDealProposal
State DealState State api.DealState
Miner peer.ID Miner peer.ID
s inet.Stream s inet.Stream
@ -55,7 +56,7 @@ type Client struct {
} }
type clientDealUpdate struct { type clientDealUpdate struct {
newState DealState newState api.DealState
id cid.Cid id cid.Cid
err error err error
} }
@ -116,7 +117,7 @@ func (c *Client) onIncoming(deal ClientDeal) {
go func() { go func() {
c.updated <- clientDealUpdate{ c.updated <- clientDealUpdate{
newState: Unknown, newState: api.DealUnknown,
id: deal.ProposalCid, id: deal.ProposalCid,
err: nil, err: nil,
} }
@ -142,14 +143,14 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
} }
switch update.newState { switch update.newState {
case Unknown: // new case api.DealUnknown: // new
c.handle(ctx, deal, c.new, Accepted) c.handle(ctx, deal, c.new, api.DealAccepted)
case Accepted: case api.DealAccepted:
c.handle(ctx, deal, c.accepted, Staged) c.handle(ctx, deal, c.accepted, api.DealStaged)
case Staged: case api.DealStaged:
c.handle(ctx, deal, c.staged, Sealing) c.handle(ctx, deal, c.staged, api.DealSealing)
case Sealing: case api.DealSealing:
c.handle(ctx, deal, c.sealing, Complete) c.handle(ctx, deal, c.sealing, api.DealComplete)
} }
} }
@ -208,7 +209,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
deal := ClientDeal{ deal := ClientDeal{
ProposalCid: proposalNd.Cid(), ProposalCid: proposalNd.Cid(),
Proposal: proposal, Proposal: proposal,
State: Unknown, State: api.DealUnknown,
Miner: p.MinerID, Miner: p.MinerID,
s: s, s: s,
@ -224,6 +225,10 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
}) })
} }
func (c *Client) List() ([]ClientDeal, error) {
return c.deals.ListClient()
}
func (c *Client) Stop() { func (c *Client) Stop() {
close(c.stop) close(c.stop)
<-c.stopped <-c.stopped

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -11,7 +12,7 @@ import (
type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error type clientHandlerFunc func(ctx context.Context, deal ClientDeal) error
func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next DealState) { func (c *Client) handle(ctx context.Context, deal ClientDeal, cb clientHandlerFunc, next api.DealState) {
go func() { go func() {
err := cb(ctx, deal) err := cb(ctx, deal)
select { select {
@ -31,7 +32,7 @@ func (c *Client) new(ctx context.Context, deal ClientDeal) error {
return err return err
} }
if resp.State != Accepted { if resp.State != api.DealAccepted {
return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State) return xerrors.Errorf("deal wasn't accepted (State=%d)", resp.State)
} }
@ -48,7 +49,7 @@ func (c *Client) accepted(ctx context.Context, deal ClientDeal) error {
return err return err
} }
if resp.State != Staged { if resp.State != api.DealStaged {
return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State) return xerrors.Errorf("deal wasn't staged (State=%d)", resp.State)
} }
@ -65,7 +66,7 @@ func (c *Client) staged(ctx context.Context, deal ClientDeal) error {
return err return err
} }
if resp.State != Sealing { if resp.State != api.DealSealing {
return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State) return xerrors.Errorf("deal wasn't sealed (State=%d)", resp.State)
} }
@ -88,7 +89,7 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) error {
return err return err
} }
if resp.State != Complete { if resp.State != api.DealComplete {
return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State) return xerrors.Errorf("deal wasn't complete (State=%d)", resp.State)
} }

View File

@ -26,11 +26,11 @@ type MinerDeal struct {
Client peer.ID Client peer.ID
Proposal StorageDealProposal Proposal StorageDealProposal
ProposalCid cid.Cid ProposalCid cid.Cid
State DealState State api.DealState
Ref cid.Cid Ref cid.Cid
SectorID uint64 // Set when State >= Staged SectorID uint64 // Set when State >= DealStaged
s inet.Stream s inet.Stream
} }
@ -57,7 +57,7 @@ type Handler struct {
} }
type minerDealUpdate struct { type minerDealUpdate struct {
newState DealState newState api.DealState
id cid.Cid id cid.Cid
err error err error
mut func(*MinerDeal) mut func(*MinerDeal)
@ -102,9 +102,9 @@ func (h *Handler) Run(ctx context.Context) {
for { for {
select { select {
case deal := <-h.incoming: // Accepted case deal := <-h.incoming: // DealAccepted
h.onIncoming(deal) h.onIncoming(deal)
case update := <-h.updated: // Staged case update := <-h.updated: // DealStaged
h.onUpdated(ctx, update) h.onUpdated(ctx, update)
case <-h.stop: case <-h.stop:
return return
@ -127,7 +127,7 @@ func (h *Handler) onIncoming(deal MinerDeal) {
go func() { go func() {
h.updated <- minerDealUpdate{ h.updated <- minerDealUpdate{
newState: Accepted, newState: api.DealAccepted,
id: deal.ProposalCid, id: deal.ProposalCid,
err: nil, err: nil,
} }
@ -156,12 +156,12 @@ func (h *Handler) onUpdated(ctx context.Context, update minerDealUpdate) {
} }
switch update.newState { switch update.newState {
case Accepted: case api.DealAccepted:
h.handle(ctx, deal, h.accept, Staged) h.handle(ctx, deal, h.accept, api.DealStaged)
case Staged: case api.DealStaged:
h.handle(ctx, deal, h.staged, Sealing) h.handle(ctx, deal, h.staged, api.DealSealing)
case Sealing: case api.DealSealing:
h.handle(ctx, deal, h.sealing, Complete) h.handle(ctx, deal, h.sealing, api.DealComplete)
} }
} }
@ -181,7 +181,7 @@ func (h *Handler) newDeal(s inet.Stream, proposal StorageDealProposal) (MinerDea
Client: s.Conn().RemotePeer(), Client: s.Conn().RemotePeer(),
Proposal: proposal, Proposal: proposal,
ProposalCid: proposalNd.Cid(), ProposalCid: proposalNd.Cid(),
State: Unknown, State: api.DealUnknown,
Ref: ref, Ref: ref,

View File

@ -3,6 +3,7 @@ package deals
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/build"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
@ -18,7 +19,7 @@ import (
type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) type minerHandlerFunc func(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error)
func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next DealState) { func (h *Handler) handle(ctx context.Context, deal MinerDeal, cb minerHandlerFunc, next api.DealState) {
go func() { go func() {
mut, err := cb(ctx, deal) mut, err := cb(ctx, deal)
select { select {
@ -118,7 +119,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
log.Info("fetching data for a deal") log.Info("fetching data for a deal")
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Accepted, State: api.DealAccepted,
Message: "", Message: "",
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
}) })
@ -133,7 +134,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) { func (h *Handler) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Staged, State: api.DealStaged,
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
}) })
if err != nil { if err != nil {
@ -234,7 +235,7 @@ func (h *Handler) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
} }
err = h.sendSignedResponse(StorageDealResponse{ err = h.sendSignedResponse(StorageDealResponse{
State: Sealing, State: api.DealSealing,
Proposal: deal.ProposalCid, Proposal: deal.ProposalCid,
PieceInclusionProof: ip, PieceInclusionProof: ip,
CommD: status.CommD[:], CommD: status.CommD[:],

View File

@ -2,6 +2,7 @@ package deals
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/api"
"runtime" "runtime"
"github.com/filecoin-project/go-lotus/chain/actors" "github.com/filecoin-project/go-lotus/chain/actors"
@ -28,7 +29,7 @@ func (h *Handler) failDeal(id cid.Cid, cerr error) {
log.Errorf("deal %s failed: %s", id, cerr) log.Errorf("deal %s failed: %s", id, cerr)
err := h.sendSignedResponse(StorageDealResponse{ err := h.sendSignedResponse(StorageDealResponse{
State: Failed, State: api.DealFailed,
Message: cerr.Error(), Message: cerr.Error(),
Proposal: id, Proposal: id,
}) })

View File

@ -3,6 +3,7 @@ package deals
import ( import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
@ -106,3 +107,30 @@ func (st *StateStore) mutate(i cid.Cid, mutator func([]byte) ([]byte, error)) er
return st.ds.Put(k, mutated) return st.ds.Put(k, mutated)
} }
func (st *StateStore) ListClient() ([]ClientDeal, error) {
var out []ClientDeal
res, err := st.ds.Query(query.Query{})
if err != nil {
return nil, err
}
defer res.Close()
for {
res, ok := res.NextSync()
if !ok {
break
}
var deal ClientDeal
err := cbor.DecodeInto(res.Value, &deal)
if err != nil {
return nil, err
}
out = append(out, deal)
}
return out, nil
}

View File

@ -1,6 +1,7 @@
package deals package deals
import ( import (
"github.com/filecoin-project/go-lotus/api"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
@ -29,19 +30,6 @@ const (
SerializationIPLD = "IPLD" SerializationIPLD = "IPLD"
) )
type DealState int
const (
Unknown = iota
Rejected
Accepted
Started
Failed
Staged
Sealing
Complete
)
type StorageDealProposal struct { type StorageDealProposal struct {
PieceRef cid.Cid // TODO: port to spec PieceRef cid.Cid // TODO: port to spec
SerializationMode SerializationMode SerializationMode SerializationMode
@ -71,17 +59,17 @@ type PieceInclusionProof struct {
} }
type StorageDealResponse struct { type StorageDealResponse struct {
State DealState State api.DealState
// Rejected / Accepted / Failed / Staged // DealRejected / DealAccepted / DealFailed / DealStaged
Message string Message string
Proposal cid.Cid Proposal cid.Cid
// Sealing // DealSealing
PieceInclusionProof PieceInclusionProof PieceInclusionProof PieceInclusionProof
CommD []byte // TODO: not in spec CommD []byte // TODO: not in spec
// Complete // DealComplete
SectorCommitMessage *cid.Cid SectorCommitMessage *cid.Cid
} }

View File

@ -117,6 +117,24 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
return &c, err return &c, err
} }
func (a *ClientAPI) ClientListDeals(ctx context.Context) ([]api.DealInfo, error) {
deals, err := a.DealClient.List()
if err != nil {
return nil, err
}
out := make([]api.DealInfo, len(deals))
for k, v := range deals {
out[k] = api.DealInfo{
ProposalCid: v.ProposalCid,
State: v.State,
Miner: v.Miner,
}
}
return out, nil
}
func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag // TODO: check if we have the ENTIRE dag