diff --git a/api/api_full.go b/api/api_full.go index 6feeb12cf..72799c846 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -256,6 +256,9 @@ type FullNode interface { ClientGenCar(ctx context.Context, ref FileRef, outpath string) error // ClientDealSize calculates real deal data size ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error) + // ClientListTransfers returns the status of all ongoing transfers of data + ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) + ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 8fb8e68ed..282dd3116 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -127,21 +127,23 @@ type FullNodeStruct struct { WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletDelete func(context.Context, address.Address) error `perm:"write"` - ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` - ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` - ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` - ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` - ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` - ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` - ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` - ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` - ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` - ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` - ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` - ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` - ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` - ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` - ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientImport func(ctx context.Context, ref api.FileRef) (*api.ImportRes, error) `perm:"admin"` + ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"` + ClientRemoveImport func(ctx context.Context, importID multistore.StoreID) error `perm:"admin"` + ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` + ClientFindData func(ctx context.Context, root cid.Cid, piece *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` + ClientMinerQueryOffer func(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (api.QueryOffer, error) `perm:"read"` + ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` + ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"` + ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"` + ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"` + ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` + ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"` + ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` + ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` + ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` + ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -449,6 +451,14 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api. return c.Internal.ClientDealSize(ctx, root) } +func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + return c.Internal.ClientListDataTransfers(ctx) +} + +func (c *FullNodeStruct) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + return c.Internal.ClientDataTransferUpdates(ctx) +} + func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) { return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk) diff --git a/api/types.go b/api/types.go index 8a682db17..9a874d1c2 100644 --- a/api/types.go +++ b/api/types.go @@ -2,10 +2,13 @@ package api import ( "encoding/json" + "github.com/filecoin-project/go-address" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec { return *ms } + +type DataTransferChannel struct { + TransferID datatransfer.TransferID + Status datatransfer.Status + BaseCID cid.Cid + IsInitiator bool + IsSender bool + Voucher string + Message string + OtherPeer peer.ID + Transferred uint64 +} diff --git a/cli/client.go b/cli/client.go index 519d11e5a..b3ee6220b 100644 --- a/cli/client.go +++ b/cli/client.go @@ -3,6 +3,7 @@ package cli import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sort" @@ -10,8 +11,10 @@ import ( "text/tabwriter" "time" + tm "github.com/buger/goterm" "github.com/docker/go-units" "github.com/fatih/color" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" @@ -76,6 +79,7 @@ var clientCmd = &cli.Command{ WithCategory("util", clientCommPCmd), WithCategory("util", clientCarGenCmd), WithCategory("util", clientInfoCmd), + WithCategory("util", clientListTransfers), }, } @@ -1203,3 +1207,170 @@ var clientInfoCmd = &cli.Command{ return nil }, } + +var clientListTransfers = &cli.Command{ + Name: "list-transfers", + Usage: "List ongoing data transfers for deals", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "color", + Usage: "use color in display output", + Value: true, + }, + &cli.BoolFlag{ + Name: "completed", + Usage: "show completed data transfers", + }, + &cli.BoolFlag{ + Name: "watch", + Usage: "watch deal updates in real-time, rather than a one time list", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + channels, err := api.ClientListDataTransfers(ctx) + if err != nil { + return err + } + + completed := cctx.Bool("completed") + color := cctx.Bool("color") + watch := cctx.Bool("watch") + + if watch { + channelUpdates, err := api.ClientDataTransferUpdates(ctx) + if err != nil { + return err + } + + for { + tm.Clear() // Clear current screen + + tm.MoveCursor(1, 1) + + outputChannels(tm.Screen, channels, completed, color) + + tm.Flush() + + select { + case <-ctx.Done(): + return nil + case channelUpdate := <-channelUpdates: + var found bool + for i, existing := range channels { + if existing.TransferID == channelUpdate.TransferID && + existing.OtherPeer == channelUpdate.OtherPeer && + existing.IsSender == channelUpdate.IsSender && + existing.IsInitiator == channelUpdate.IsInitiator { + channels[i] = channelUpdate + found = true + break + } + } + if !found { + channels = append(channels, channelUpdate) + } + } + } + } + outputChannels(os.Stdout, channels, completed, color) + return nil + }, +} + +func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) { + sort.Slice(channels, func(i, j int) bool { + return channels[i].TransferID < channels[j].TransferID + }) + + var receivingChannels, sendingChannels []lapi.DataTransferChannel + for _, channel := range channels { + if !completed && channel.Status == datatransfer.Completed { + continue + } + if channel.IsSender { + sendingChannels = append(sendingChannels, channel) + } else { + receivingChannels = append(receivingChannels, channel) + } + } + + fmt.Fprintf(out, "Sending Channels\n\n") + w := tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Sending To"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range sendingChannels { + w.Write(toChannelOutput(color, "Sending To", channel)) + } + w.Flush(out) + + fmt.Fprintf(out, "\nReceiving Channels\n\n") + w = tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Receiving From"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.Col("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range receivingChannels { + w.Write(toChannelOutput(color, "Receiving From", channel)) + } + w.Flush(out) +} + +func channelStatusString(useColor bool, status datatransfer.Status) string { + s := datatransfer.Statuses[status] + if !useColor { + return s + } + + switch status { + case datatransfer.Failed, datatransfer.Cancelled: + return color.RedString(s) + case datatransfer.Completed: + return color.GreenString(s) + default: + return s + } +} + +func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} { + rootCid := channel.BaseCID.String() + rootCid = "..." + rootCid[len(rootCid)-8:] + + otherParty := channel.OtherPeer.String() + otherParty = "..." + otherParty[len(otherParty)-8:] + + initiated := "N" + if channel.IsInitiator { + initiated = "Y" + } + + voucher := channel.Voucher + if len(voucher) > 40 { + voucher = "..." + voucher[len(voucher)-37:] + } + + return map[string]interface{}{ + "ID": channel.TransferID, + "Status": channelStatusString(useColor, channel.Status), + otherPartyColumn: otherParty, + "Root Cid": rootCid, + "Initiated?": initiated, + "Transferred": channel.Transferred, + "Voucher": voucher, + "Message": channel.Message, + } +} diff --git a/go.mod b/go.mod index aeb96ffbd..d3c945c38 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d + github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 github.com/coreos/go-systemd/v22 v22.0.0 github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e github.com/dgraph-io/badger/v2 v2.0.3 diff --git a/go.sum b/go.sum index 26966c505..a916c6074 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4= +github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index d9ece2d9e..248b9ec6a 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,10 +2,12 @@ package client import ( "context" + "encoding/json" "fmt" "io" "os" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi/big" "golang.org/x/xerrors" @@ -24,6 +26,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" mh "github.com/multiformats/go-multihash" "go.uber.org/fx" @@ -74,6 +77,8 @@ type API struct { CombinedBstore dtypes.ClientBlockstore // TODO: try to remove RetrievalStoreMgr dtypes.ClientRetrievalStoreManager + DataTransfer dtypes.ClientDataTransfer + Host host.Host } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -466,11 +471,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { if state.PayloadCID.Equals(order.Root) { - events <- marketevents.RetrievalEvent{ + select { + case <-ctx.Done(): + return + case events <- marketevents.RetrievalEvent{ Event: event, Status: state.Status, BytesReceived: state.TotalReceived, FundsSpent: state.FundsSpent, + }: } switch state.Status { @@ -755,3 +764,67 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multisto return nd.Cid(), nil } + +func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) + for _, channelState := range inProgressChannels { + apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState)) + } + + return apiChannels, nil +} + +func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { + channels := make(chan api.DataTransferChannel) + + unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) { + channel := toAPIChannel(a.Host.ID(), channelState) + select { + case <-ctx.Done(): + case channels <- channel: + } + }) + + go func() { + defer unsub() + <-ctx.Done() + }() + + return channels, nil +} + +func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel { + channel := api.DataTransferChannel{ + TransferID: channelState.TransferID(), + Status: channelState.Status(), + BaseCID: channelState.BaseCID(), + IsSender: channelState.Sender() == hostID, + Message: channelState.Message(), + } + stringer, ok := channelState.Voucher().(fmt.Stringer) + if ok { + channel.Voucher = stringer.String() + } else { + voucherJSON, err := json.Marshal(channelState.Voucher()) + if err != nil { + channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error() + } else { + channel.Voucher = string(voucherJSON) + } + } + if channel.IsSender { + channel.IsInitiator = !channelState.IsPull() + channel.Transferred = channelState.Sent() + channel.OtherPeer = channelState.Recipient() + } else { + channel.IsInitiator = channelState.IsPull() + channel.Transferred = channelState.Received() + channel.OtherPeer = channelState.Sender() + } + return channel +}