feat(cli): add a list retrievals command

Currently, there is no way to inspect retrievals on a client. This adds said command, allow with
corresponding APIs
This commit is contained in:
hannahhoward 2021-05-26 19:50:34 -07:00
parent 6267832504
commit 19b6dc8d1e
11 changed files with 420 additions and 0 deletions

View File

@ -344,6 +344,10 @@ type FullNode interface {
// ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel
// of status updates.
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error) //perm:admin
// ClientListRetrievals returns information about retrievals made by the local client
ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) //perm:write
// ClientGetRetrievalUpdates returns status of updated retrieval deals
ClientGetRetrievalUpdates(ctx context.Context) (<-chan RetrievalInfo, error) //perm:write
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) //perm:read
// ClientCalcCommP calculates the CommP and data size of the specified CID

View File

@ -580,6 +580,21 @@ func (mr *MockFullNodeMockRecorder) ClientGetDealUpdates(arg0 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientGetDealUpdates", reflect.TypeOf((*MockFullNode)(nil).ClientGetDealUpdates), arg0)
}
// ClientGetRetrievalUpdates mocks base method
func (m *MockFullNode) ClientGetRetrievalUpdates(arg0 context.Context) (<-chan api.RetrievalInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientGetRetrievalUpdates", arg0)
ret0, _ := ret[0].(<-chan api.RetrievalInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ClientGetRetrievalUpdates indicates an expected call of ClientGetRetrievalUpdates
func (mr *MockFullNodeMockRecorder) ClientGetRetrievalUpdates(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientGetRetrievalUpdates", reflect.TypeOf((*MockFullNode)(nil).ClientGetRetrievalUpdates), arg0)
}
// ClientHasLocal mocks base method
func (m *MockFullNode) ClientHasLocal(arg0 context.Context, arg1 cid.Cid) (bool, error) {
m.ctrl.T.Helper()
@ -655,6 +670,21 @@ func (mr *MockFullNodeMockRecorder) ClientListImports(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientListImports", reflect.TypeOf((*MockFullNode)(nil).ClientListImports), arg0)
}
// ClientListRetrievals mocks base method
func (m *MockFullNode) ClientListRetrievals(arg0 context.Context) ([]api.RetrievalInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientListRetrievals", arg0)
ret0, _ := ret[0].([]api.RetrievalInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ClientListRetrievals indicates an expected call of ClientListRetrievals
func (mr *MockFullNodeMockRecorder) ClientListRetrievals(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientListRetrievals", reflect.TypeOf((*MockFullNode)(nil).ClientListRetrievals), arg0)
}
// ClientMinerQueryOffer mocks base method
func (m *MockFullNode) ClientMinerQueryOffer(arg0 context.Context, arg1 address.Address, arg2 cid.Cid, arg3 *cid.Cid) (api.QueryOffer, error) {
m.ctrl.T.Helper()

View File

@ -179,6 +179,8 @@ type FullNodeStruct struct {
ClientGetDealUpdates func(p0 context.Context) (<-chan DealInfo, error) `perm:"write"`
ClientGetRetrievalUpdates func(p0 context.Context) (<-chan RetrievalInfo, error) `perm:"write"`
ClientHasLocal func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"write"`
ClientImport func(p0 context.Context, p1 FileRef) (*ImportRes, error) `perm:"admin"`
@ -189,6 +191,8 @@ type FullNodeStruct struct {
ClientListImports func(p0 context.Context) ([]Import, error) `perm:"write"`
ClientListRetrievals func(p0 context.Context) ([]RetrievalInfo, error) `perm:"write"`
ClientMinerQueryOffer func(p0 context.Context, p1 address.Address, p2 cid.Cid, p3 *cid.Cid) (QueryOffer, error) `perm:"read"`
ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
@ -1293,6 +1297,14 @@ func (s *FullNodeStub) ClientGetDealUpdates(p0 context.Context) (<-chan DealInfo
return nil, xerrors.New("method not supported")
}
func (s *FullNodeStruct) ClientGetRetrievalUpdates(p0 context.Context) (<-chan RetrievalInfo, error) {
return s.Internal.ClientGetRetrievalUpdates(p0)
}
func (s *FullNodeStub) ClientGetRetrievalUpdates(p0 context.Context) (<-chan RetrievalInfo, error) {
return nil, xerrors.New("method not supported")
}
func (s *FullNodeStruct) ClientHasLocal(p0 context.Context, p1 cid.Cid) (bool, error) {
return s.Internal.ClientHasLocal(p0, p1)
}
@ -1333,6 +1345,14 @@ func (s *FullNodeStub) ClientListImports(p0 context.Context) ([]Import, error) {
return *new([]Import), xerrors.New("method not supported")
}
func (s *FullNodeStruct) ClientListRetrievals(p0 context.Context) ([]RetrievalInfo, error) {
return s.Internal.ClientListRetrievals(p0)
}
func (s *FullNodeStub) ClientListRetrievals(p0 context.Context) ([]RetrievalInfo, error) {
return *new([]RetrievalInfo), xerrors.New("method not supported")
}
func (s *FullNodeStruct) ClientMinerQueryOffer(p0 context.Context, p1 address.Address, p2 cid.Cid, p3 *cid.Cid) (QueryOffer, error) {
return s.Internal.ClientMinerQueryOffer(p0, p1, p2, p3)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"time"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/chain/types"
datatransfer "github.com/filecoin-project/go-data-transfer"
@ -176,3 +177,21 @@ type MessagePrototype struct {
Message types.Message
ValidNonce bool
}
type RetrievalInfo struct {
PayloadCID cid.Cid
ID retrievalmarket.DealID
PieceCID *cid.Cid
PricePerByte abi.TokenAmount
UnsealPrice abi.TokenAmount
Status retrievalmarket.DealStatus
Message string // more information about deal state, particularly errors
Provider peer.ID
BytesReceived uint64
BytesPaidFor uint64
TotalPaid abi.TokenAmount
TransferChannelID *datatransfer.ChannelID
DataTransfer *DataTransferChannel
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -92,6 +92,7 @@ var clientCmd = &cli.Command{
WithCategory("retrieval", clientFindCmd),
WithCategory("retrieval", clientRetrieveCmd),
WithCategory("retrieval", clientCancelRetrievalDealCmd),
WithCategory("retrieval", clientListRetrievalsCmd),
WithCategory("util", clientCommPCmd),
WithCategory("util", clientCarGenCmd),
WithCategory("util", clientBalancesCmd),
@ -1209,6 +1210,192 @@ var clientRetrieveCmd = &cli.Command{
},
}
var clientListRetrievalsCmd = &cli.Command{
Name: "list-retrievals",
Usage: "List retrieval market deals",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "verbose",
Aliases: []string{"v"},
Usage: "print verbose deal details",
},
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/failing deals",
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed retrievals",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
verbose := cctx.Bool("verbose")
color := cctx.Bool("color")
watch := cctx.Bool("watch")
showFailed := cctx.Bool("show-failed")
completed := cctx.Bool("completed")
localDeals, err := api.ClientListRetrievals(ctx)
if err != nil {
return err
}
if watch {
updates, err := api.ClientGetRetrievalUpdates(ctx)
if err != nil {
return err
}
for {
tm.Clear()
tm.MoveCursor(1, 1)
err = outputRetrievalDeals(ctx, tm.Screen, localDeals, verbose, color, showFailed, completed)
if err != nil {
return err
}
tm.Flush()
select {
case <-ctx.Done():
return nil
case updated := <-updates:
var found bool
for i, existing := range localDeals {
if existing.ID == updated.ID {
localDeals[i] = updated
found = true
break
}
}
if !found {
localDeals = append(localDeals, updated)
}
}
}
}
return outputRetrievalDeals(ctx, cctx.App.Writer, localDeals, verbose, color, showFailed, completed)
},
}
func outputRetrievalDeals(ctx context.Context, out io.Writer, localDeals []lapi.RetrievalInfo, verbose bool, color bool, showFailed bool, completed bool) error {
var deals []api.RetrievalInfo
for _, deal := range localDeals {
if !showFailed && retrievalmarket.IsTerminalError(deal.Status) {
continue
}
if !completed && retrievalmarket.IsTerminalSuccess(deal.Status) {
continue
}
deals = append(deals, deal)
}
tableColumns := []tablewriter.Column{
tablewriter.Col("PayloadCID"),
tablewriter.Col("DealId"),
tablewriter.Col("Provider"),
tablewriter.Col("Status"),
tablewriter.Col("PricePerByte"),
tablewriter.Col("Received"),
tablewriter.Col("TotalPaid"),
}
if verbose {
tableColumns = append(tableColumns,
tablewriter.Col("PieceCID"),
tablewriter.Col("UnsealPrice"),
tablewriter.Col("BytesPaidFor"),
tablewriter.Col("TransferChannelID"),
tablewriter.Col("TransferStatus"),
)
}
tableColumns = append(tableColumns, tablewriter.NewLineCol("Message"))
w := tablewriter.New(tableColumns...)
for _, d := range deals {
w.Write(toRetrievalOutput(d, color, verbose))
}
return w.Flush(out)
}
func toRetrievalOutput(d api.RetrievalInfo, color bool, verbose bool) map[string]interface{} {
payloadCID := d.PayloadCID.String()
provider := d.Provider.String()
if !verbose {
payloadCID = ellipsis(payloadCID, 8)
provider = ellipsis(provider, 8)
}
retrievalOutput := map[string]interface{}{
"PayloadCID": payloadCID,
"DealId": d.ID,
"Provider": provider,
"Status": retrievalStatusString(color, d.Status),
"PricePerByte": types.FIL(d.PricePerByte),
"Received": units.BytesSize(float64(d.BytesReceived)),
"TotalPaid": types.FIL(d.TotalPaid),
"Message": d.Message,
}
if verbose {
transferChannelID := ""
if d.TransferChannelID != nil {
transferChannelID = d.TransferChannelID.String()
}
transferStatus := ""
if d.DataTransfer != nil {
transferStatus = datatransfer.Statuses[d.DataTransfer.Status]
}
pieceCID := ""
if d.PieceCID != nil {
pieceCID = d.PieceCID.String()
}
retrievalOutput["PieceCID"] = pieceCID
retrievalOutput["UnsealPrice"] = types.FIL(d.UnsealPrice)
retrievalOutput["BytesPaidFor"] = units.BytesSize(float64(d.BytesPaidFor))
retrievalOutput["TransferChannelID"] = transferChannelID
retrievalOutput["TransferStatus"] = transferStatus
}
return retrievalOutput
}
func retrievalStatusString(c bool, status retrievalmarket.DealStatus) string {
s := retrievalmarket.DealStatuses[status]
if !c {
return s
}
if retrievalmarket.IsTerminalError(status) {
return color.RedString(s)
}
if retrievalmarket.IsTerminalSuccess(status) {
return color.GreenString(s)
}
return s
}
var clientInspectDealCmd = &cli.Command{
Name: "inspect-deal",
Usage: "Inspect detailed information about deal's lifecycle and the various stages it goes through",

View File

@ -44,11 +44,13 @@
* [ClientGetDealInfo](#ClientGetDealInfo)
* [ClientGetDealStatus](#ClientGetDealStatus)
* [ClientGetDealUpdates](#ClientGetDealUpdates)
* [ClientGetRetrievalUpdates](#ClientGetRetrievalUpdates)
* [ClientHasLocal](#ClientHasLocal)
* [ClientImport](#ClientImport)
* [ClientListDataTransfers](#ClientListDataTransfers)
* [ClientListDeals](#ClientListDeals)
* [ClientListImports](#ClientListImports)
* [ClientListRetrievals](#ClientListRetrievals)
* [ClientMinerQueryOffer](#ClientMinerQueryOffer)
* [ClientQueryAsk](#ClientQueryAsk)
* [ClientRemoveImport](#ClientRemoveImport)
@ -1199,6 +1201,54 @@ Response:
}
```
### ClientGetRetrievalUpdates
ClientGetRetrievalUpdates returns status of updated retrieval deals
Perms: write
Inputs: `null`
Response:
```json
{
"PayloadCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"ID": 5,
"PieceCID": null,
"PricePerByte": "0",
"UnsealPrice": "0",
"Status": 0,
"Message": "string value",
"Provider": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"BytesReceived": 42,
"BytesPaidFor": 42,
"TotalPaid": "0",
"TransferChannelID": {
"Initiator": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"Responder": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"ID": 3
},
"DataTransfer": {
"TransferID": 3,
"Status": 1,
"BaseCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"IsInitiator": true,
"IsSender": true,
"Voucher": "string value",
"Message": "string value",
"OtherPeer": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"Transferred": 42,
"Stages": {
"Stages": null
}
}
}
```
### ClientHasLocal
ClientHasLocal indicates whether a certain CID is locally stored.
@ -1266,6 +1316,16 @@ Response: `null`
ClientListImports lists imported files and their root CIDs
Perms: write
Inputs: `null`
Response: `null`
### ClientListRetrievals
ClientListRetrievals returns information about retrievals made by the local client
Perms: write
Inputs: `null`

View File

@ -377,6 +377,7 @@ COMMANDS:
find Find data in the network
retrieve Retrieve data from network
cancel-retrieval Cancel a retrieval deal by deal ID; this also cancels the associated transfer
list-retrievals List retrieval market deals
STORAGE:
deal Initialize storage deal with a miner
query-ask Find a miners ask
@ -521,6 +522,27 @@ OPTIONS:
```
### lotus client list-retrievals
```
NAME:
lotus client list-retrievals - List retrieval market deals
USAGE:
lotus client list-retrievals [command options] [arguments...]
CATEGORY:
RETRIEVAL
OPTIONS:
--verbose, -v print verbose deal details (default: false)
--color use color in display output (default: true)
--show-failed show failed/failing deals (default: false)
--completed show completed retrievals (default: false)
--watch watch deal updates in real-time, rather than a one time list (default: false)
--help, -h show help (default: false)
```
### lotus client deal
```
NAME:

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"sort"
"time"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -835,6 +836,83 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {
deals, err := a.Retrieval.ListDeals()
if err != nil {
return nil, err
}
dataTransfersByID, err := a.transfersByID(ctx)
if err != nil {
return nil, err
}
out := make([]api.RetrievalInfo, 0, len(deals))
for _, v := range deals {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.ChannelID != nil {
if ch, ok := dataTransfersByID[*v.ChannelID]; ok {
transferCh = &ch
}
}
out = append(out, a.newRetrievalInfoWithTransfer(transferCh, v))
}
sort.Slice(out, func(a, b int) bool {
return out[a].ID < out[b].ID
})
return out, nil
}
func (a *API) ClientGetRetrievalUpdates(ctx context.Context) (<-chan api.RetrievalInfo, error) {
updates := make(chan api.RetrievalInfo)
unsub := a.Retrieval.SubscribeToEvents(func(_ rm.ClientEvent, deal rm.ClientDealState) {
updates <- a.newRetrievalInfo(ctx, deal)
})
go func() {
defer unsub()
<-ctx.Done()
}()
return updates, nil
}
func (a *API) newRetrievalInfoWithTransfer(ch *api.DataTransferChannel, deal rm.ClientDealState) api.RetrievalInfo {
return api.RetrievalInfo{
PayloadCID: deal.PayloadCID,
ID: deal.ID,
PieceCID: deal.PieceCID,
PricePerByte: deal.PricePerByte,
UnsealPrice: deal.UnsealPrice,
Status: deal.Status,
Message: deal.Message,
Provider: deal.Sender,
BytesReceived: deal.TotalReceived,
BytesPaidFor: deal.BytesPaidFor,
TotalPaid: deal.FundsSpent,
TransferChannelID: deal.ChannelID,
DataTransfer: ch,
}
}
func (a *API) newRetrievalInfo(ctx context.Context, v rm.ClientDealState) api.RetrievalInfo {
// Find the data transfer associated with this deal
var transferCh *api.DataTransferChannel
if v.ChannelID != nil {
state, err := a.DataTransfer.ChannelState(ctx, *v.ChannelID)
// Note: If there was an error just ignore it, as the data transfer may
// be not found if it's no longer active
if err == nil {
ch := api.NewDataTransferChannel(a.Host.ID(), state)
ch.Stages = state.Stages()
transferCh = &ch
}
}
return a.newRetrievalInfoWithTransfer(transferCh, v)
}
type multiStoreRetrievalStore struct {
storeID multistore.StoreID
store *multistore.Store