feat(cli): data transfer command

add command to monitor data transfers
This commit is contained in:
hannahhoward 2020-08-18 16:26:21 -07:00
parent e47e51275a
commit f511cc188a
5 changed files with 163 additions and 1 deletions

View File

@ -256,7 +256,8 @@ type FullNode interface {
ClientGenCar(ctx context.Context, ref FileRef, outpath string) error
// ClientDealSize calculates real deal data size
ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error)
// ClientListTransfers returns the status of all ongoing transfers of data
ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)

View File

@ -142,6 +142,7 @@ type FullNodeStruct struct {
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -449,6 +450,10 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api.
return c.Internal.ClientDealSize(ctx, root)
}
func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.ClientListDataTransfers(ctx)
}
func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) {
return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk)

View File

@ -2,10 +2,13 @@ package api
import (
"encoding/json"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec {
return *ms
}
type DataTransferChannel struct {
TransferID datatransfer.TransferID
Status datatransfer.Status
BaseCID cid.Cid
IsInitiator bool
IsSender bool
VoucherJSON string
Message string
OtherPeer peer.ID
Transferred uint64
}

View File

@ -12,6 +12,7 @@ import (
"github.com/docker/go-units"
"github.com/fatih/color"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
@ -76,6 +77,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientCommPCmd),
WithCategory("util", clientCarGenCmd),
WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers),
},
}
@ -1203,3 +1205,104 @@ var clientInfoCmd = &cli.Command{
return nil
},
}
var clientListTransfers = &cli.Command{
Name: "list-transfers",
Usage: "Monitor ongoing data transfers for deals",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
var receivingChannels, sendingChannels []lapi.DataTransferChannel
for _, channel := range channels {
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
receivingChannels = append(receivingChannels, channel)
}
}
color := cctx.Bool("color")
fmt.Fprintf(os.Stdout, "Sending Channels\n\n")
w := tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Other Party"),
tablewriter.Col("Root Cid"),
tablewriter.Col("Initiated?"),
tablewriter.Col("Transferred"),
tablewriter.NewLineCol("Voucher"),
tablewriter.NewLineCol("Message"))
for _, channel := range sendingChannels {
w.Write(toChannelOutput(color, channel))
}
w.Flush(os.Stdout)
fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n")
for _, channel := range receivingChannels {
w.Write(toChannelOutput(color, channel))
}
return w.Flush(os.Stdout)
},
}
func channelStatusString(useColor bool, status datatransfer.Status) string {
s := datatransfer.Statuses[status]
if !useColor {
return s
}
switch status {
case datatransfer.Failed, datatransfer.Cancelled:
return color.RedString(s)
case datatransfer.Completed:
return color.GreenString(s)
default:
return s
}
}
func toChannelOutput(useColor bool, channel api.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]
otherParty := channel.OtherPeer.String()
otherParty = "..." + otherParty[len(otherParty)-8:]
initiated := "N"
if channel.IsInitiator {
initiated = "Y"
}
return map[string]interface{}{
"ID": channel.TransferID,
"Status": channelStatusString(useColor, channel.Status),
"Other Party": otherParty,
"Root Cid": rootCid,
"Initiated?": initiated,
"Transferred": channel.Transferred,
"Voucher": channel.VoucherJSON,
"Message": channel.Message,
}
}

View File

@ -2,6 +2,7 @@ package client
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
@ -24,6 +25,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
mh "github.com/multiformats/go-multihash"
"go.uber.org/fx"
@ -74,6 +76,8 @@ type API struct {
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
DataTransfer dtypes.ClientDataTransfer
Host host.Host
}
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -755,3 +759,37 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multisto
return nd.Cid(), nil
}
func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for channelID, channelState := range inProgressChannels {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsInitiator: channelID.Initiator == a.Host.ID(),
IsSender: channelState.Sender() == a.Host.ID(),
Message: channelState.Message(),
}
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
return nil, err
}
channel.VoucherJSON = string(voucherJSON)
if channel.IsSender {
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
apiChannels = append(apiChannels, channel)
}
return apiChannels, nil
}