Merge pull request #3379 from filecoin-project/feat/add-list-deals-watch-option

Add watch option to list-deals
This commit is contained in:
Łukasz Magiera 2020-08-28 22:33:48 +02:00 committed by GitHub
commit 16f0daab90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 201 additions and 75 deletions

View File

@ -243,6 +243,8 @@ type FullNode interface {
ClientGetDealInfo(context.Context, cid.Cid) (*DealInfo, error) ClientGetDealInfo(context.Context, cid.Cid) (*DealInfo, error)
// ClientListDeals returns information about the deals made by the local client. // ClientListDeals returns information about the deals made by the local client.
ClientListDeals(ctx context.Context) ([]DealInfo, error) ClientListDeals(ctx context.Context) ([]DealInfo, error)
// ClientGetDealUpdates returns the status of updated deals
ClientGetDealUpdates(ctx context.Context) (<-chan DealInfo, error)
// ClientHasLocal indicates whether a certain CID is locally stored. // ClientHasLocal indicates whether a certain CID is locally stored.
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
// ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer). // ClientFindData identifies peers that have a certain file, and returns QueryOffers (one per peer).

View File

@ -139,6 +139,7 @@ type FullNodeStruct struct {
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientGetDealUpdates func(ctx context.Context) (<-chan api.DealInfo, error) `perm:"read"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
@ -433,6 +434,10 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx) return c.Internal.ClientListDeals(ctx)
} }
func (c *FullNodeStruct) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) {
return c.Internal.ClientGetDealUpdates(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
return c.Internal.ClientRetrieve(ctx, order, ref) return c.Internal.ClientRetrieve(ctx, order, ref)
} }

View File

@ -1,6 +1,7 @@
package cli package cli
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -978,6 +979,10 @@ var clientListDeals = &cli.Command{
Usage: "use color in display output", Usage: "use color in display output",
Value: true, Value: true,
}, },
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx) api, closer, err := GetFullNodeAPI(cctx)
@ -987,48 +992,95 @@ var clientListDeals = &cli.Command{
defer closer() defer closer()
ctx := ReqContext(cctx) ctx := ReqContext(cctx)
head, err := api.ChainHead(ctx) verbose := cctx.Bool("verbose")
if err != nil { color := cctx.Bool("color")
return err watch := cctx.Bool("watch")
}
localDeals, err := api.ClientListDeals(ctx) localDeals, err := api.ClientListDeals(ctx)
if err != nil { if err != nil {
return err return err
} }
sort.Slice(localDeals, func(i, j int) bool { if watch {
return localDeals[i].CreationTime.Before(localDeals[j].CreationTime) updates, err := api.ClientGetDealUpdates(ctx)
}) if err != nil {
return err
}
var deals []deal for {
for _, v := range localDeals { tm.Clear()
tm.MoveCursor(1, 1)
err = outputStorageDeals(ctx, tm.Screen, api, localDeals, verbose, color)
if err != nil {
return err
}
tm.Flush()
select {
case <-ctx.Done():
return nil
case updated := <-updates:
var found bool
for i, existing := range localDeals {
if existing.ProposalCid.Equals(updated.ProposalCid) {
localDeals[i] = updated
found = true
break
}
}
if !found {
localDeals = append(localDeals, updated)
}
}
}
}
return outputStorageDeals(ctx, os.Stdout, api, localDeals, cctx.Bool("verbose"), cctx.Bool("color"))
},
}
func dealFromDealInfo(ctx context.Context, full api.FullNode, head *types.TipSet, v api.DealInfo) deal {
if v.DealID == 0 { if v.DealID == 0 {
deals = append(deals, deal{ return deal{
LocalDeal: v, LocalDeal: v,
OnChainDealState: market.DealState{ OnChainDealState: market.DealState{
SectorStartEpoch: -1, SectorStartEpoch: -1,
LastUpdatedEpoch: -1, LastUpdatedEpoch: -1,
SlashEpoch: -1, SlashEpoch: -1,
}, },
}) }
} else { }
onChain, err := api.StateMarketStorageDeal(ctx, v.DealID, head.Key())
onChain, err := full.StateMarketStorageDeal(ctx, v.DealID, head.Key())
if err != nil { if err != nil {
deals = append(deals, deal{LocalDeal: v}) return deal{LocalDeal: v}
} else { }
deals = append(deals, deal{
return deal{
LocalDeal: v, LocalDeal: v,
OnChainDealState: onChain.State, OnChainDealState: onChain.State,
}
}
func outputStorageDeals(ctx context.Context, out io.Writer, full api.FullNode, localDeals []api.DealInfo, verbose bool, color bool) error {
sort.Slice(localDeals, func(i, j int) bool {
return localDeals[i].CreationTime.Before(localDeals[j].CreationTime)
}) })
}
} head, err := full.ChainHead(ctx)
if err != nil {
return err
} }
color := cctx.Bool("color") var deals []deal
for _, localDeal := range localDeals {
deals = append(deals, dealFromDealInfo(ctx, full, head, localDeal))
}
if cctx.Bool("verbose") { if verbose {
w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0)
fmt.Fprintf(w, "Created\tDealCid\tDealId\tProvider\tState\tOn Chain?\tSlashed?\tPieceCID\tSize\tPrice\tDuration\tMessage\n") fmt.Fprintf(w, "Created\tDealCid\tDealId\tProvider\tState\tOn Chain?\tSlashed?\tPieceCID\tSize\tPrice\tDuration\tMessage\n")
for _, d := range deals { for _, d := range deals {
onChain := "N" onChain := "N"
@ -1091,8 +1143,7 @@ var clientListDeals = &cli.Command{
}) })
} }
return w.Flush(os.Stdout) return w.Flush(out)
},
} }
func dealStateString(c bool, state storagemarket.StorageDealStatus) string { func dealStateString(c bool, state storagemarket.StorageDealStatus) string {

View File

@ -36,6 +36,7 @@
* [ClientFindData](#ClientFindData) * [ClientFindData](#ClientFindData)
* [ClientGenCar](#ClientGenCar) * [ClientGenCar](#ClientGenCar)
* [ClientGetDealInfo](#ClientGetDealInfo) * [ClientGetDealInfo](#ClientGetDealInfo)
* [ClientGetDealUpdates](#ClientGetDealUpdates)
* [ClientHasLocal](#ClientHasLocal) * [ClientHasLocal](#ClientHasLocal)
* [ClientImport](#ClientImport) * [ClientImport](#ClientImport)
* [ClientListDataTransfers](#ClientListDataTransfers) * [ClientListDataTransfers](#ClientListDataTransfers)
@ -915,6 +916,42 @@ Response:
} }
``` ```
### ClientGetDealUpdates
ClientGetDealUpdates returns the status of updated deals
Perms: read
Inputs: `null`
Response:
```json
{
"ProposalCid": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"State": 42,
"Message": "string value",
"Provider": "t01234",
"DataRef": {
"TransferType": "string value",
"Root": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"PieceCid": null,
"PieceSize": 1024
},
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Size": 42,
"PricePerEpoch": "0",
"Duration": 42,
"DealID": 5432,
"CreationTime": "0001-01-01T00:00:00Z"
}
```
### ClientHasLocal ### ClientHasLocal
ClientHasLocal indicates whether a certain CID is locally stored. ClientHasLocal indicates whether a certain CID is locally stored.

View File

@ -223,6 +223,21 @@ func (a *API) ClientGetDealInfo(ctx context.Context, d cid.Cid) (*api.DealInfo,
}, nil }, nil
} }
func (a *API) ClientGetDealUpdates(ctx context.Context) (<-chan api.DealInfo, error) {
updates := make(chan api.DealInfo)
unsub := a.SMDealClient.SubscribeToEvents(func(_ storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
updates <- newDealInfo(deal)
})
go func() {
defer unsub()
<-ctx.Done()
}()
return updates, nil
}
func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) { func (a *API) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag // TODO: check if we have the ENTIRE dag
@ -816,3 +831,19 @@ func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTra
return channels, nil return channels, nil
} }
func newDealInfo(v storagemarket.ClientDeal) api.DealInfo {
return api.DealInfo{
ProposalCid: v.ProposalCid,
DataRef: v.DataRef,
State: v.State,
Message: v.Message,
Provider: v.Proposal.Provider,
PieceCID: v.Proposal.PieceCID,
Size: uint64(v.Proposal.PieceSize.Unpadded()),
PricePerEpoch: v.Proposal.StoragePricePerEpoch,
Duration: uint64(v.Proposal.Duration()),
DealID: v.DealID,
CreationTime: v.CreationTime.Time(),
}
}