diff --git a/api/api_full.go b/api/api_full.go index 6feeb12cf..3e2683241 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -256,7 +256,8 @@ type FullNode interface { ClientGenCar(ctx context.Context, ref FileRef, outpath string) error // ClientDealSize calculates real deal data size ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error) - + // ClientListTransfers returns the status of all ongoing transfers of data + ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 8fb8e68ed..ffa28be90 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -142,6 +142,7 @@ type FullNodeStruct struct { ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"` ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"` ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"` + ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"` @@ -449,6 +450,10 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api. return c.Internal.ClientDealSize(ctx, root) } +func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + return c.Internal.ClientListDataTransfers(ctx) +} + func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) { return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk) diff --git a/api/types.go b/api/types.go index 8a682db17..4b707dc9f 100644 --- a/api/types.go +++ b/api/types.go @@ -2,10 +2,13 @@ package api import ( "encoding/json" + "github.com/filecoin-project/go-address" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec { return *ms } + +type DataTransferChannel struct { + TransferID datatransfer.TransferID + Status datatransfer.Status + BaseCID cid.Cid + IsInitiator bool + IsSender bool + VoucherJSON string + Message string + OtherPeer peer.ID + Transferred uint64 +} diff --git a/cli/client.go b/cli/client.go index 519d11e5a..fe4ed8c1f 100644 --- a/cli/client.go +++ b/cli/client.go @@ -12,6 +12,7 @@ import ( "github.com/docker/go-units" "github.com/fatih/color" + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" @@ -76,6 +77,7 @@ var clientCmd = &cli.Command{ WithCategory("util", clientCommPCmd), WithCategory("util", clientCarGenCmd), WithCategory("util", clientInfoCmd), + WithCategory("util", clientListTransfers), }, } @@ -1203,3 +1205,104 @@ var clientInfoCmd = &cli.Command{ return nil }, } + +var clientListTransfers = &cli.Command{ + Name: "list-transfers", + Usage: "Monitor ongoing data transfers for deals", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "color", + Usage: "use color in display output", + Value: true, + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + channels, err := api.ClientListDataTransfers(ctx) + if err != nil { + return err + } + + sort.Slice(channels, func(i, j int) bool { + return channels[i].TransferID < channels[j].TransferID + }) + + var receivingChannels, sendingChannels []lapi.DataTransferChannel + for _, channel := range channels { + if channel.IsSender { + sendingChannels = append(sendingChannels, channel) + } else { + receivingChannels = append(receivingChannels, channel) + } + } + + color := cctx.Bool("color") + + fmt.Fprintf(os.Stdout, "Sending Channels\n\n") + w := tablewriter.New(tablewriter.Col("ID"), + tablewriter.Col("Status"), + tablewriter.Col("Other Party"), + tablewriter.Col("Root Cid"), + tablewriter.Col("Initiated?"), + tablewriter.Col("Transferred"), + tablewriter.NewLineCol("Voucher"), + tablewriter.NewLineCol("Message")) + for _, channel := range sendingChannels { + w.Write(toChannelOutput(color, channel)) + } + w.Flush(os.Stdout) + + fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n") + for _, channel := range receivingChannels { + + w.Write(toChannelOutput(color, channel)) + } + return w.Flush(os.Stdout) + }, +} + +func channelStatusString(useColor bool, status datatransfer.Status) string { + s := datatransfer.Statuses[status] + if !useColor { + return s + } + + switch status { + case datatransfer.Failed, datatransfer.Cancelled: + return color.RedString(s) + case datatransfer.Completed: + return color.GreenString(s) + default: + return s + } +} + +func toChannelOutput(useColor bool, channel api.DataTransferChannel) map[string]interface{} { + rootCid := channel.BaseCID.String() + rootCid = "..." + rootCid[len(rootCid)-8:] + + otherParty := channel.OtherPeer.String() + otherParty = "..." + otherParty[len(otherParty)-8:] + + initiated := "N" + if channel.IsInitiator { + initiated = "Y" + } + + return map[string]interface{}{ + "ID": channel.TransferID, + "Status": channelStatusString(useColor, channel.Status), + "Other Party": otherParty, + "Root Cid": rootCid, + "Initiated?": initiated, + "Transferred": channel.Transferred, + "Voucher": channel.VoucherJSON, + "Message": channel.Message, + } +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index d9ece2d9e..38696431b 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,6 +2,7 @@ package client import ( "context" + "encoding/json" "fmt" "io" "os" @@ -24,6 +25,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" mh "github.com/multiformats/go-multihash" "go.uber.org/fx" @@ -74,6 +76,8 @@ type API struct { CombinedBstore dtypes.ClientBlockstore // TODO: try to remove RetrievalStoreMgr dtypes.ClientRetrievalStoreManager + DataTransfer dtypes.ClientDataTransfer + Host host.Host } func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch { @@ -755,3 +759,37 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multisto return nd.Cid(), nil } + +func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) { + inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels)) + for channelID, channelState := range inProgressChannels { + channel := api.DataTransferChannel{ + TransferID: channelState.TransferID(), + Status: channelState.Status(), + BaseCID: channelState.BaseCID(), + IsInitiator: channelID.Initiator == a.Host.ID(), + IsSender: channelState.Sender() == a.Host.ID(), + Message: channelState.Message(), + } + voucherJSON, err := json.Marshal(channelState.Voucher()) + if err != nil { + return nil, err + } + channel.VoucherJSON = string(voucherJSON) + if channel.IsSender { + channel.Transferred = channelState.Sent() + channel.OtherPeer = channelState.Recipient() + } else { + channel.Transferred = channelState.Received() + channel.OtherPeer = channelState.Sender() + } + apiChannels = append(apiChannels, channel) + } + + return apiChannels, nil +}