Merge pull request #3527 from filecoin-project/feat/watch-option-miner-storage-deals-list

Add watch option to storage-deals list
This commit is contained in:
Łukasz Magiera 2020-09-04 04:13:01 +02:00 committed by GitHub
commit d81feb05d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 51 deletions

View File

@ -73,7 +73,7 @@ type StorageMiner interface {
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error)
MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error)
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)

View File

@ -242,7 +242,7 @@ type StorageMinerStruct struct {
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"` MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"`
MarketGetDealUpdates func(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) `perm:"read"` MarketGetDealUpdates func(ctx context.Context) (<-chan storagemarket.MinerDeal, error) `perm:"read"`
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"` MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"`
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
@ -1097,8 +1097,8 @@ func (c *StorageMinerStruct) MarketListRetrievalDeals(ctx context.Context) ([]re
return c.Internal.MarketListRetrievalDeals(ctx) return c.Internal.MarketListRetrievalDeals(ctx)
} }
func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
return c.Internal.MarketGetDealUpdates(ctx, d) return c.Internal.MarketGetDealUpdates(ctx)
} }
func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) { func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {

View File

@ -334,7 +334,7 @@ loop:
func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) { func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) {
subCtx, cancel := context.WithCancel(ctx) subCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
updates, err := miner.MarketGetDealUpdates(subCtx, *deal) updates, err := miner.MarketGetDealUpdates(subCtx)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -343,6 +343,7 @@ func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode,
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("context timeout") t.Fatal("context timeout")
case di := <-updates: case di := <-updates:
if deal.Equals(di.ProposalCid) {
switch di.State { switch di.State {
case storagemarket.StorageDealProposalRejected: case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected") t.Fatal("deal rejected")
@ -358,6 +359,7 @@ func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode,
} }
} }
} }
}
func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNode) { func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNode) {
snums, err := miner.SectorsList(ctx) snums, err := miner.SectorsList(ctx)

View File

@ -3,6 +3,7 @@ package main
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -345,6 +346,10 @@ var dealsListCmd = &cli.Command{
Name: "verbose", Name: "verbose",
Aliases: []string{"v"}, Aliases: []string{"v"},
}, },
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx) api, closer, err := lcli.GetStorageMinerAPI(cctx)
@ -360,13 +365,55 @@ var dealsListCmd = &cli.Command{
return err return err
} }
verbose := cctx.Bool("verbose")
watch := cctx.Bool("watch")
if watch {
updates, err := api.MarketGetDealUpdates(ctx)
if err != nil {
return err
}
for {
tm.Clear()
tm.MoveCursor(1, 1)
err = outputStorageDeals(tm.Output, deals, verbose)
if err != nil {
return err
}
tm.Flush()
select {
case <-ctx.Done():
return nil
case updated := <-updates:
var found bool
for i, existing := range deals {
if existing.ProposalCid.Equals(updated.ProposalCid) {
deals[i] = updated
found = true
break
}
}
if !found {
deals = append(deals, updated)
}
}
}
}
return outputStorageDeals(os.Stdout, deals, verbose)
},
}
func outputStorageDeals(out io.Writer, deals []storagemarket.MinerDeal, verbose bool) error {
sort.Slice(deals, func(i, j int) bool { sort.Slice(deals, func(i, j int) bool {
return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time()) return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time())
}) })
w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0)
verbose := cctx.Bool("verbose")
if verbose { if verbose {
_, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n") _, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n")
@ -395,7 +442,6 @@ var dealsListCmd = &cli.Command{
} }
return w.Flush() return w.Flush()
},
} }
var getBlocklistCmd = &cli.Command{ var getBlocklistCmd = &cli.Command{

View File

@ -320,15 +320,13 @@ func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retr
return out, nil return out, nil
} }
func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
results := make(chan storagemarket.MinerDeal) results := make(chan storagemarket.MinerDeal)
unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if deal.ProposalCid.Equals(d) {
select { select {
case results <- deal: case results <- deal:
case <-ctx.Done(): case <-ctx.Done():
} }
}
}) })
go func() { go func() {
<-ctx.Done() <-ctx.Done()