diff --git a/api/api_storage.go b/api/api_storage.go index 14aa5ff97..48f6e9e45 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -73,7 +73,7 @@ type StorageMiner interface { MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) - MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) + MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index ac7e1b4a3..2b173e038 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -240,7 +240,7 @@ type StorageMinerStruct struct { MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"` MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"` - MarketGetDealUpdates func(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) `perm:"read"` + MarketGetDealUpdates func(ctx context.Context) (<-chan storagemarket.MinerDeal, error) `perm:"read"` MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"` MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"` MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` @@ -1087,8 +1087,8 @@ func (c *StorageMinerStruct) MarketListRetrievalDeals(ctx context.Context) ([]re return c.Internal.MarketListRetrievalDeals(ctx) } -func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { - return c.Internal.MarketGetDealUpdates(ctx, d) +func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) { + return c.Internal.MarketGetDealUpdates(ctx) } func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) { diff --git a/api/test/deals.go b/api/test/deals.go index 00786e785..1dcc1c8d7 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -334,7 +334,7 @@ loop: func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) { subCtx, cancel := context.WithCancel(ctx) defer cancel() - updates, err := miner.MarketGetDealUpdates(subCtx, *deal) + updates, err := miner.MarketGetDealUpdates(subCtx) if err != nil { t.Fatal(err) } @@ -343,18 +343,20 @@ func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, case <-ctx.Done(): t.Fatal("context timeout") case di := <-updates: - switch di.State { - case storagemarket.StorageDealProposalRejected: - t.Fatal("deal rejected") - case storagemarket.StorageDealFailing: - t.Fatal("deal failed") - case storagemarket.StorageDealError: - t.Fatal("deal errored", di.Message) - case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive: - fmt.Println("COMPLETE", di) - return + if deal.Equals(di.ProposalCid) { + switch di.State { + case storagemarket.StorageDealProposalRejected: + t.Fatal("deal rejected") + case storagemarket.StorageDealFailing: + t.Fatal("deal failed") + case storagemarket.StorageDealError: + t.Fatal("deal errored", di.Message) + case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive: + fmt.Println("COMPLETE", di) + return + } + fmt.Println("Deal state: ", storagemarket.DealStates[di.State]) } - fmt.Println("Deal state: ", storagemarket.DealStates[di.State]) } } } diff --git a/cmd/lotus-storage-miner/market.go b/cmd/lotus-storage-miner/market.go index 828fcd2c3..39671f74c 100644 --- a/cmd/lotus-storage-miner/market.go +++ b/cmd/lotus-storage-miner/market.go @@ -3,6 +3,7 @@ package main import ( "bufio" "fmt" + "io" "os" "path/filepath" "sort" @@ -345,6 +346,10 @@ var dealsListCmd = &cli.Command{ Name: "verbose", Aliases: []string{"v"}, }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := lcli.GetStorageMinerAPI(cctx) @@ -360,42 +365,83 @@ var dealsListCmd = &cli.Command{ return err } - sort.Slice(deals, func(i, j int) bool { - return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time()) - }) - - w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0) - verbose := cctx.Bool("verbose") + watch := cctx.Bool("watch") + + if watch { + updates, err := api.MarketGetDealUpdates(ctx) + if err != nil { + return err + } + + for { + tm.Clear() + tm.MoveCursor(1, 1) + + err = outputStorageDeals(tm.Output, deals, verbose) + if err != nil { + return err + } + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case updated := <-updates: + var found bool + for i, existing := range deals { + if existing.ProposalCid.Equals(updated.ProposalCid) { + deals[i] = updated + found = true + break + } + } + if !found { + deals = append(deals, updated) + } + } + } + } + + return outputStorageDeals(os.Stdout, deals, verbose) + }, +} + +func outputStorageDeals(out io.Writer, deals []storagemarket.MinerDeal, verbose bool) error { + sort.Slice(deals, func(i, j int) bool { + return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time()) + }) + + w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0) + + if verbose { + _, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n") + } else { + _, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\n") + } + + for _, deal := range deals { + propcid := deal.ProposalCid.String() + if !verbose { + propcid = "..." + propcid[len(propcid)-8:] + } + + fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration())))) if verbose { - _, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n") - } else { - _, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\n") + _, _ = fmt.Fprintf(w, "%s\t", deal.CreationTime.Time().Format(time.Stamp)) } - for _, deal := range deals { - propcid := deal.ProposalCid.String() - if !verbose { - propcid = "..." + propcid[len(propcid)-8:] - } - - fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration())))) - - if verbose { - _, _ = fmt.Fprintf(w, "%s\t", deal.CreationTime.Time().Format(time.Stamp)) - } - - _, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration()) - if verbose { - _, _ = fmt.Fprintf(w, "\t%s", deal.Message) - } - - _, _ = fmt.Fprintln(w) + _, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration()) + if verbose { + _, _ = fmt.Fprintf(w, "\t%s", deal.Message) } - return w.Flush() - }, + _, _ = fmt.Fprintln(w) + } + + return w.Flush() } var getBlocklistCmd = &cli.Command{ diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 3507453dd..c688ff677 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -320,14 +320,12 @@ func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retr return out, nil } -func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) { +func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) { results := make(chan storagemarket.MinerDeal) unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { - if deal.ProposalCid.Equals(d) { - select { - case results <- deal: - case <-ctx.Done(): - } + select { + case results <- deal: + case <-ctx.Done(): } }) go func() {