Merge pull request #3162 from filecoin-project/feat/data-transfer-status

Find out what's up with data transfers!
This commit is contained in:
Łukasz Magiera 2020-08-19 23:30:44 +02:00 committed by GitHub
commit 3ed3e4f5d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 291 additions and 16 deletions

View File

@ -256,6 +256,9 @@ type FullNode interface {
ClientGenCar(ctx context.Context, ref FileRef, outpath string) error
// ClientDealSize calculates real deal data size
ClientDealSize(ctx context.Context, root cid.Cid) (DataSize, error)
// ClientListTransfers returns the status of all ongoing transfers of data
ClientListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)

View File

@ -142,6 +142,8 @@ type FullNodeStruct struct {
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *abi.BitField, bool, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -449,6 +451,14 @@ func (c *FullNodeStruct) ClientDealSize(ctx context.Context, root cid.Cid) (api.
return c.Internal.ClientDealSize(ctx, root)
}
func (c *FullNodeStruct) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
return c.Internal.ClientListDataTransfers(ctx)
}
func (c *FullNodeStruct) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
return c.Internal.ClientDataTransferUpdates(ctx)
}
func (c *FullNodeStruct) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64,
sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) {
return c.Internal.GasEstimateGasPremium(ctx, nblocksincl, sender, gaslimit, tsk)

View File

@ -2,10 +2,13 @@ package api
import (
"encoding/json"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -98,3 +101,15 @@ func (ms *MessageSendSpec) Get() MessageSendSpec {
return *ms
}
type DataTransferChannel struct {
TransferID datatransfer.TransferID
Status datatransfer.Status
BaseCID cid.Cid
IsInitiator bool
IsSender bool
Voucher string
Message string
OtherPeer peer.ID
Transferred uint64
}

View File

@ -3,6 +3,7 @@ package cli
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
@ -10,8 +11,10 @@ import (
"text/tabwriter"
"time"
tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/fatih/color"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
@ -76,6 +79,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientCommPCmd),
WithCategory("util", clientCarGenCmd),
WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers),
},
}
@ -1203,3 +1207,170 @@ var clientInfoCmd = &cli.Command{
return nil
},
}
var clientListTransfers = &cli.Command{
Name: "list-transfers",
Usage: "List ongoing data transfers for deals",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
Value: true,
},
&cli.BoolFlag{
Name: "completed",
Usage: "show completed data transfers",
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}
completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")
if watch {
channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil {
return err
}
for {
tm.Clear() // Clear current screen
tm.MoveCursor(1, 1)
outputChannels(tm.Screen, channels, completed, color)
tm.Flush()
select {
case <-ctx.Done():
return nil
case channelUpdate := <-channelUpdates:
var found bool
for i, existing := range channels {
if existing.TransferID == channelUpdate.TransferID &&
existing.OtherPeer == channelUpdate.OtherPeer &&
existing.IsSender == channelUpdate.IsSender &&
existing.IsInitiator == channelUpdate.IsInitiator {
channels[i] = channelUpdate
found = true
break
}
}
if !found {
channels = append(channels, channelUpdate)
}
}
}
}
outputChannels(os.Stdout, channels, completed, color)
return nil
},
}
func outputChannels(out io.Writer, channels []api.DataTransferChannel, completed bool, color bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
var receivingChannels, sendingChannels []lapi.DataTransferChannel
for _, channel := range channels {
if !completed && channel.Status == datatransfer.Completed {
continue
}
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
receivingChannels = append(receivingChannels, channel)
}
}
fmt.Fprintf(out, "Sending Channels\n\n")
w := tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Sending To"),
tablewriter.Col("Root Cid"),
tablewriter.Col("Initiated?"),
tablewriter.Col("Transferred"),
tablewriter.Col("Voucher"),
tablewriter.NewLineCol("Message"))
for _, channel := range sendingChannels {
w.Write(toChannelOutput(color, "Sending To", channel))
}
w.Flush(out)
fmt.Fprintf(out, "\nReceiving Channels\n\n")
w = tablewriter.New(tablewriter.Col("ID"),
tablewriter.Col("Status"),
tablewriter.Col("Receiving From"),
tablewriter.Col("Root Cid"),
tablewriter.Col("Initiated?"),
tablewriter.Col("Transferred"),
tablewriter.Col("Voucher"),
tablewriter.NewLineCol("Message"))
for _, channel := range receivingChannels {
w.Write(toChannelOutput(color, "Receiving From", channel))
}
w.Flush(out)
}
func channelStatusString(useColor bool, status datatransfer.Status) string {
s := datatransfer.Statuses[status]
if !useColor {
return s
}
switch status {
case datatransfer.Failed, datatransfer.Cancelled:
return color.RedString(s)
case datatransfer.Completed:
return color.GreenString(s)
default:
return s
}
}
func toChannelOutput(useColor bool, otherPartyColumn string, channel api.DataTransferChannel) map[string]interface{} {
rootCid := channel.BaseCID.String()
rootCid = "..." + rootCid[len(rootCid)-8:]
otherParty := channel.OtherPeer.String()
otherParty = "..." + otherParty[len(otherParty)-8:]
initiated := "N"
if channel.IsInitiator {
initiated = "Y"
}
voucher := channel.Voucher
if len(voucher) > 40 {
voucher = "..." + voucher[len(voucher)-37:]
}
return map[string]interface{}{
"ID": channel.TransferID,
"Status": channelStatusString(useColor, channel.Status),
otherPartyColumn: otherParty,
"Root Cid": rootCid,
"Initiated?": initiated,
"Transferred": channel.Transferred,
"Voucher": voucher,
"Message": channel.Message,
}
}

1
go.mod
View File

@ -12,6 +12,7 @@ require (
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
github.com/coreos/go-systemd/v22 v22.0.0
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.0.3

2
go.sum
View File

@ -112,6 +112,8 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129 h1:gfAMKE626QEuKG3si0pdTRcr/YEbBoxY+3GOH3gWvl4=
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=

View File

@ -2,10 +2,12 @@ package client
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"golang.org/x/xerrors"
@ -24,6 +26,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
mh "github.com/multiformats/go-multihash"
"go.uber.org/fx"
@ -74,6 +77,8 @@ type API struct {
CombinedBstore dtypes.ClientBlockstore // TODO: try to remove
RetrievalStoreMgr dtypes.ClientRetrievalStoreManager
DataTransfer dtypes.ClientDataTransfer
Host host.Host
}
func calcDealExpiration(minDuration uint64, md *miner.DeadlineInfo, startEpoch abi.ChainEpoch) abi.ChainEpoch {
@ -466,11 +471,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) {
events <- marketevents.RetrievalEvent{
select {
case <-ctx.Done():
return
case events <- marketevents.RetrievalEvent{
Event: event,
Status: state.Status,
BytesReceived: state.TotalReceived,
FundsSpent: state.FundsSpent,
}:
}
switch state.Status {
@ -755,3 +764,67 @@ func (a *API) clientImport(ctx context.Context, ref api.FileRef, store *multisto
return nd.Cid(), nil
}
func (a *API) ClientListDataTransfers(ctx context.Context) ([]api.DataTransferChannel, error) {
inProgressChannels, err := a.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
apiChannels := make([]api.DataTransferChannel, 0, len(inProgressChannels))
for _, channelState := range inProgressChannels {
apiChannels = append(apiChannels, toAPIChannel(a.Host.ID(), channelState))
}
return apiChannels, nil
}
func (a *API) ClientDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)
unsub := a.DataTransfer.SubscribeToEvents(func(evt datatransfer.Event, channelState datatransfer.ChannelState) {
channel := toAPIChannel(a.Host.ID(), channelState)
select {
case <-ctx.Done():
case channels <- channel:
}
})
go func() {
defer unsub()
<-ctx.Done()
}()
return channels, nil
}
func toAPIChannel(hostID peer.ID, channelState datatransfer.ChannelState) api.DataTransferChannel {
channel := api.DataTransferChannel{
TransferID: channelState.TransferID(),
Status: channelState.Status(),
BaseCID: channelState.BaseCID(),
IsSender: channelState.Sender() == hostID,
Message: channelState.Message(),
}
stringer, ok := channelState.Voucher().(fmt.Stringer)
if ok {
channel.Voucher = stringer.String()
} else {
voucherJSON, err := json.Marshal(channelState.Voucher())
if err != nil {
channel.Voucher = fmt.Errorf("Voucher Serialization: %w", err).Error()
} else {
channel.Voucher = string(voucherJSON)
}
}
if channel.IsSender {
channel.IsInitiator = !channelState.IsPull()
channel.Transferred = channelState.Sent()
channel.OtherPeer = channelState.Recipient()
} else {
channel.IsInitiator = channelState.IsPull()
channel.Transferred = channelState.Received()
channel.OtherPeer = channelState.Sender()
}
return channel
}