feat(data-transfer): fill in utils

This commit is contained in:
hannahhoward 2020-10-22 13:40:26 -07:00
parent 0289c39850
commit 98297cef4d
7 changed files with 252 additions and 7 deletions

View File

@ -312,6 +312,8 @@ type FullNode interface {
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// ClientRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer // ClientRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel // ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel
// which are stuck due to insufficient funds // which are stuck due to insufficient funds
ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error

View File

@ -5,7 +5,9 @@ import (
"context" "context"
"time" "time"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
@ -81,6 +83,10 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error) DealsList(ctx context.Context) ([]MarketDeal, error)

View File

@ -172,6 +172,7 @@ type FullNodeStruct struct {
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
ClientRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"` ClientRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"` ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"` StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
@ -284,6 +285,8 @@ type StorageMinerStruct struct {
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"` MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
PledgeSector func(context.Context) error `perm:"write"` PledgeSector func(context.Context) error `perm:"write"`
@ -568,6 +571,10 @@ func (c *FullNodeStruct) ClientRestartDataTransfer(ctx context.Context, transfer
return c.Internal.ClientRestartDataTransfer(ctx, transferID, otherPeer, isInitiator) return c.Internal.ClientRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
} }
func (c *FullNodeStruct) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.ClientCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}
func (c *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error { func (c *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
return c.Internal.ClientRetrieveTryRestartInsufficientFunds(ctx, paymentChannel) return c.Internal.ClientRetrieveTryRestartInsufficientFunds(ctx, paymentChannel)
} }
@ -1304,6 +1311,14 @@ func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-c
return c.Internal.MarketDataTransferUpdates(ctx) return c.Internal.MarketDataTransferUpdates(ctx)
} }
func (c *StorageMinerStruct) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}
func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}
func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file) return c.Internal.DealsImportData(ctx, dealPropCid, file)
} }

View File

@ -91,6 +91,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientInfoCmd), WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers), WithCategory("util", clientListTransfers),
WithCategory("util", clientRestartTransfer), WithCategory("util", clientRestartTransfer),
WithCategory("util", clientCancelTransfer),
}, },
} }
@ -1694,6 +1695,66 @@ var clientRestartTransfer = &cli.Command{
}, },
} }
var clientCancelTransfer = &cli.Command{
Name: "cancel-transfer",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}
return api.ClientCancelDataTransfer(ctx, transferID, other, initiator)
},
}
var clientListTransfers = &cli.Command{ var clientListTransfers = &cli.Command{
Name: "list-transfers", Name: "list-transfers",
Usage: "List ongoing data transfers for deals", Usage: "List ongoing data transfers for deals",
@ -1711,6 +1772,10 @@ var clientListTransfers = &cli.Command{
Name: "watch", Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list", Usage: "watch deal updates in real-time, rather than a one time list",
}, },
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx) api, closer, err := GetFullNodeAPI(cctx)
@ -1728,7 +1793,7 @@ var clientListTransfers = &cli.Command{
completed := cctx.Bool("completed") completed := cctx.Bool("completed")
color := cctx.Bool("color") color := cctx.Bool("color")
watch := cctx.Bool("watch") watch := cctx.Bool("watch")
showFailed := cctx.Bool("show-failed")
if watch { if watch {
channelUpdates, err := api.ClientDataTransferUpdates(ctx) channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil { if err != nil {
@ -1740,7 +1805,7 @@ var clientListTransfers = &cli.Command{
tm.MoveCursor(1, 1) tm.MoveCursor(1, 1)
OutputDataTransferChannels(tm.Screen, channels, completed, color) OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)
tm.Flush() tm.Flush()
@ -1765,13 +1830,13 @@ var clientListTransfers = &cli.Command{
} }
} }
} }
OutputDataTransferChannels(os.Stdout, channels, completed, color) OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil return nil
}, },
} }
// OutputDataTransferChannels generates table output for a list of channels // OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) { func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool, showFailed bool) {
sort.Slice(channels, func(i, j int) bool { sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID return channels[i].TransferID < channels[j].TransferID
}) })
@ -1781,6 +1846,9 @@ func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChann
if !completed && channel.Status == datatransfer.Completed { if !completed && channel.Status == datatransfer.Completed {
continue continue
} }
if !showFailed && (channel.Status == datatransfer.Failed || channel.Status == datatransfer.Cancelled) {
continue
}
if channel.IsSender { if channel.IsSender {
sendingChannels = append(sendingChannels, channel) sendingChannels = append(sendingChannels, channel)
} else { } else {

View File

@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
@ -13,8 +14,10 @@ import (
tm "github.com/buger/goterm" tm "github.com/buger/goterm"
"github.com/docker/go-units" "github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc" "github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase" "github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -569,6 +572,128 @@ var dataTransfersCmd = &cli.Command{
Usage: "Manage data transfers", Usage: "Manage data transfers",
Subcommands: []*cli.Command{ Subcommands: []*cli.Command{
transfersListCmd, transfersListCmd,
marketRestartTransfer,
marketCancelTransfer,
},
}
var marketRestartTransfer = &cli.Command{
Name: "restart",
Usage: "Force restart a stalled data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}
return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator)
},
}
var marketCancelTransfer = &cli.Command{
Name: "cancel",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}
return nodeApi.MarketCancelDataTransfer(ctx, transferID, other, initiator)
}, },
} }
@ -589,6 +714,10 @@ var transfersListCmd = &cli.Command{
Name: "watch", Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list", Usage: "watch deal updates in real-time, rather than a one time list",
}, },
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx) api, closer, err := lcli.GetStorageMinerAPI(cctx)
@ -606,7 +735,7 @@ var transfersListCmd = &cli.Command{
completed := cctx.Bool("completed") completed := cctx.Bool("completed")
color := cctx.Bool("color") color := cctx.Bool("color")
watch := cctx.Bool("watch") watch := cctx.Bool("watch")
showFailed := cctx.Bool("show-failed")
if watch { if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx) channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil { if err != nil {
@ -618,7 +747,7 @@ var transfersListCmd = &cli.Command{
tm.MoveCursor(1, 1) tm.MoveCursor(1, 1)
lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color) lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)
tm.Flush() tm.Flush()
@ -643,7 +772,7 @@ var transfersListCmd = &cli.Command{
} }
} }
} }
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color) lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil return nil
}, },
} }

View File

@ -858,6 +858,14 @@ func (a *API) ClientRestartDataTransfer(ctx context.Context, transferID datatran
return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID}) return a.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
} }
func (a *API) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := a.Host.ID()
if isInitiator {
return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return a.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
func newDealInfo(v storagemarket.ClientDeal) api.DealInfo { func newDealInfo(v storagemarket.ClientDeal) api.DealInfo {
return api.DealInfo{ return api.DealInfo{
ProposalCid: v.ProposalCid, ProposalCid: v.ProposalCid,

View File

@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -400,6 +401,22 @@ func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.D
return apiChannels, nil return apiChannels, nil
} }
func (sm *StorageMinerAPI) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := sm.Host.ID()
if isInitiator {
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
func (sm *StorageMinerAPI) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := sm.Host.ID()
if isInitiator {
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return sm.DataTransfer.CloseDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}
func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) { func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel) channels := make(chan api.DataTransferChannel)