feat(cli): add updates to data transfer

Add an API to get data transfer updates and add modify the CLI to be an
ongoing monitoring plan
This commit is contained in:
hannahhoward 2020-08-18 17:36:22 -07:00
parent ef7f7375e7
commit 66ac7c195c
7 changed files with 168 additions and 81 deletions

View File

@ -258,6 +258,8 @@ type FullNode interface {
ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error)
// ClientListTransfers returns the status of all ongoing transfers of data
ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)

View File

@ -143,6 +143,7 @@ type FullNodeStruct struct {
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -454,6 +455,10 @@ func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.Dat
return c.Internal.ClientListDataTransfers(ctx)
}
func (c *FullNodeStruct) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.ClientDataTransferUpdates(ctx)
}
func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) {
return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk)

View File

@ -108,7 +108,7 @@ type DataTransferChannel struct {
BaseCID cid.Cid
IsInitiator bool
IsSender bool
VoucherJSON string
Voucher string
Message string
OtherPeer peer.ID
Transferred uint64

View File

@ -10,6 +10,7 @@ import (
"text/tabwriter"
"time"
tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/fatih/color"
datatransfer "github.com/filecoin-project/go-data-transfer"
@ -1215,6 +1216,10 @@ var clientListTransfers = &cli.Command{
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
@ -1229,12 +1234,26 @@ var clientListTransfers = &cli.Command{
return err
}
channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil {
return err
}
for {
tm.Clear() // Clear current screen
tm.MoveCursor(1, 1)
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
completed := cctx.Bool("completed")
var receivingChannels, sendingChannels []lapi.DataTransferChannel
for _, channel := range channels {
if !completed && channel.Status == datatransfer.Completed {
continue
}
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
@ -1244,7 +1263,7 @@ var clientListTransfers = &cli.Command{
color := cctx.Bool("color")
fmt.Fprintf(os.Stdout, "Sending Channels\n\n")
tm.Printf("Sending Channels\n\n")
w := tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Sending To"),
@ -1256,7 +1275,7 @@ var clientListTransfers = &cli.Command{
for _, channel := range sendingChannels {
w.Write(toChannelOutput(color, "Sending To", channel))
}
w.Flush(os.Stdout)
w.Flush(tm.Screen)
fmt.Fprintf(os.Stdout, "\nReceiving Channels\n\n")
w = tablewriter.New(tablewriter.Col("ID"),
@ -1271,7 +1290,30 @@ var clientListTransfers = &cli.Command{
w.Write(toChannelOutput(color, "Receiving From", channel))
}
return w.Flush(os.Stdout)
w.Flush(tm.Screen)
tm.Flush()
select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
},
}
@ -1303,7 +1345,7 @@ func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTra
initiated = "Y"
}
voucher := channel.VoucherJSON
voucher := channel.Voucher
if len(voucher) > 40 {
voucher = "..." + voucher[len(voucher)-37:]
}

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
github.com/coreos/go-systemd/v22 v22.0.0
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.0.3

2
go.sum
View File

@ -112,6 +112,8 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"golang.org/x/xerrors"
@ -470,11 +471,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) {
events <- marketevents.RetrievalEvent{
select {
case <-ctx.Done():
return
case events <- marketevents.RetrievalEvent{
Event: event,
Status: state.Status,
BytesReceived: state.TotalReceived,
FundsSpent: state.FundsSpent,
}:
}
switch state.Status {
@ -767,29 +772,59 @@ func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferCh
}
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for channelID, channelState := range inProgressChannels {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsInitiator: channelID.Initiator == a.Host.ID(),
IsSender: channelState.Sender() == a.Host.ID(),
Message: channelState.Message(),
}
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
return nil, err
}
channel.VoucherJSON = string(voucherJSON)
if channel.IsSender {
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
apiChannels = append(apiChannels, channel)
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState))
}
return apiChannels, nil
}
func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)
unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := toAPIChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})
go func() {
defer unsub()
<-ctx.Done()
}()
return channels, nil
}
func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}