Merge pull request #7759 from filecoin-project/feat/transfer-diagnostics

Add api for transfer diagnostics
This commit is contained in:
Jiaying Wang 2021-12-22 17:07:54 -05:00 committed by GitHub
commit 921fda94c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 317 additions and 19 deletions

View File

@ -165,6 +165,8 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) //perm:read MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) //perm:read
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) //perm:write MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) //perm:write
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) //perm:write MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) //perm:write
// MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync
MarketDataTransferDiagnostics(ctx context.Context, p peer.ID) (*TransferDiagnostics, error) //perm:write
// MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer // MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error //perm:write MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error //perm:write
// MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer // MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -120,6 +121,7 @@ func init() {
addExample(api.FullAPIVersion1) addExample(api.FullAPIVersion1)
addExample(api.PCHInbound) addExample(api.PCHInbound)
addExample(time.Minute) addExample(time.Minute)
addExample(graphsync.RequestID(4))
addExample(datatransfer.TransferID(3)) addExample(datatransfer.TransferID(3))
addExample(datatransfer.Ongoing) addExample(datatransfer.Ongoing)
addExample(storeIDExample) addExample(storeIDExample)

View File

@ -668,6 +668,8 @@ type StorageMinerStruct struct {
MarketCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` MarketCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
MarketDataTransferDiagnostics func(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) `perm:"write"`
MarketDataTransferUpdates func(p0 context.Context) (<-chan DataTransferChannel, error) `perm:"write"` MarketDataTransferUpdates func(p0 context.Context) (<-chan DataTransferChannel, error) `perm:"write"`
MarketGetAsk func(p0 context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` MarketGetAsk func(p0 context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
@ -3972,6 +3974,17 @@ func (s *StorageMinerStub) MarketCancelDataTransfer(p0 context.Context, p1 datat
return ErrNotSupported return ErrNotSupported
} }
func (s *StorageMinerStruct) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) {
if s.Internal.MarketDataTransferDiagnostics == nil {
return nil, ErrNotSupported
}
return s.Internal.MarketDataTransferDiagnostics(p0, p1)
}
func (s *StorageMinerStub) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) {
return nil, ErrNotSupported
}
func (s *StorageMinerStruct) MarketDataTransferUpdates(p0 context.Context) (<-chan DataTransferChannel, error) { func (s *StorageMinerStruct) MarketDataTransferUpdates(p0 context.Context) (<-chan DataTransferChannel, error) {
if s.Internal.MarketDataTransferUpdates == nil { if s.Internal.MarketDataTransferUpdates == nil {
return nil, ErrNotSupported return nil, ErrNotSupported

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -53,6 +54,30 @@ type MessageSendSpec struct {
MaxFee abi.TokenAmount MaxFee abi.TokenAmount
} }
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
type GraphSyncDataTransfer struct {
// GraphSync request id for this transfer
RequestID graphsync.RequestID
// Graphsync state for this transfer
RequestState string
// If a channel ID is present, indicates whether this is the current graphsync request for this channel
// (could have changed in a restart)
IsCurrentChannelRequest bool
// Data transfer channel ID for this transfer
ChannelID *datatransfer.ChannelID
// Data transfer state for this transfer
ChannelState *DataTransferChannel
// Diagnostic information about this request -- and unexpected inconsistencies in
// request state
Diagnostics []string
}
// TransferDiagnostics give current information about transfers going over graphsync that may be helpful for debugging
type TransferDiagnostics struct {
ReceivingTransfers []*GraphSyncDataTransfer
SendingTransfers []*GraphSyncDataTransfer
}
type DataTransferChannel struct { type DataTransferChannel struct {
TransferID datatransfer.TransferID TransferID datatransfer.TransferID
Status datatransfer.Status Status datatransfer.Status

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -638,6 +638,7 @@ var dataTransfersCmd = &cli.Command{
transfersListCmd, transfersListCmd,
marketRestartTransfer, marketRestartTransfer,
marketCancelTransfer, marketCancelTransfer,
transfersDiagnosticsCmd,
}, },
} }
@ -857,6 +858,38 @@ var transfersListCmd = &cli.Command{
}, },
} }
var transfersDiagnosticsCmd = &cli.Command{
Name: "diagnostics",
Usage: "Get detailed diagnostics on active transfers with a specific peer",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
api, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
targetPeer, err := peer.Decode(cctx.Args().First())
if err != nil {
return err
}
diagnostics, err := api.MarketDataTransferDiagnostics(ctx, targetPeer)
if err != nil {
return err
}
out, err := json.MarshalIndent(diagnostics, "", "\t")
if err != nil {
return err
}
fmt.Println(string(out))
return nil
},
}
var dealsPendingPublish = &cli.Command{ var dealsPendingPublish = &cli.Command{
Name: "pending-publish", Name: "pending-publish",
Usage: "list deals waiting in publish queue", Usage: "list deals waiting in publish queue",

View File

@ -49,6 +49,7 @@
* [LogSetLevel](#LogSetLevel) * [LogSetLevel](#LogSetLevel)
* [Market](#Market) * [Market](#Market)
* [MarketCancelDataTransfer](#MarketCancelDataTransfer) * [MarketCancelDataTransfer](#MarketCancelDataTransfer)
* [MarketDataTransferDiagnostics](#MarketDataTransferDiagnostics)
* [MarketDataTransferUpdates](#MarketDataTransferUpdates) * [MarketDataTransferUpdates](#MarketDataTransferUpdates)
* [MarketGetAsk](#MarketGetAsk) * [MarketGetAsk](#MarketGetAsk)
* [MarketGetDealUpdates](#MarketGetDealUpdates) * [MarketGetDealUpdates](#MarketGetDealUpdates)
@ -724,6 +725,27 @@ Inputs:
Response: `{}` Response: `{}`
### MarketDataTransferDiagnostics
MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync
Perms: write
Inputs:
```json
[
"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"
]
```
Response:
```json
{
"ReceivingTransfers": null,
"SendingTransfers": null
}
```
### MarketDataTransferUpdates ### MarketDataTransferUpdates

View File

@ -974,10 +974,11 @@ USAGE:
lotus-miner data-transfers command [command options] [arguments...] lotus-miner data-transfers command [command options] [arguments...]
COMMANDS: COMMANDS:
list List ongoing data transfers for this miner list List ongoing data transfers for this miner
restart Force restart a stalled data transfer restart Force restart a stalled data transfer
cancel Force cancel a data transfer cancel Force cancel a data transfer
help, h Shows a list of commands or help for one command diagnostics Get detailed diagnostics on active transfers with a specific peer
help, h Shows a list of commands or help for one command
OPTIONS: OPTIONS:
--help, -h show help (default: false) --help, -h show help (default: false)
@ -1034,6 +1035,19 @@ OPTIONS:
``` ```
### lotus-miner data-transfers diagnostics
```
NAME:
lotus-miner data-transfers diagnostics - Get detailed diagnostics on active transfers with a specific peer
USAGE:
lotus-miner data-transfers diagnostics [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
## lotus-miner dagstore ## lotus-miner dagstore
``` ```
NAME: NAME:

6
go.mod
View File

@ -33,7 +33,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.1 github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3 github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-crypto v0.0.1 github.com/filecoin-project/go-crypto v0.0.1
github.com/filecoin-project/go-data-transfer v1.12.0 github.com/filecoin-project/go-data-transfer v1.12.1
github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.14.1 github.com/filecoin-project/go-fil-markets v1.14.1
@ -78,7 +78,7 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.2.0 github.com/ipfs/go-ds-measure v0.2.0
github.com/ipfs/go-fs-lock v0.0.6 github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.11.0 github.com/ipfs/go-graphsync v0.11.5
github.com/ipfs/go-ipfs-blockstore v1.1.2 github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-chunker v0.0.5
@ -150,7 +150,7 @@ require (
github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325 github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
go.opencensus.io v0.23.0 go.opencensus.io v0.23.0
go.opentelemetry.io/otel v1.2.0 go.opentelemetry.io/otel v1.3.0
go.opentelemetry.io/otel/bridge/opencensus v0.25.0 go.opentelemetry.io/otel/bridge/opencensus v0.25.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0 go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0 go.opentelemetry.io/otel/sdk v1.2.0

20
go.sum
View File

@ -312,8 +312,9 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.12.0 h1:y44x35JvB93kezahMURKizIa/aizGTPSHqi5cbAfTEo=
github.com/filecoin-project/go-data-transfer v1.12.0/go.mod h1:tDrD2jLU2TpVhd+5B8iqBp0fQRV4lP80WZccKXugjYc= github.com/filecoin-project/go-data-transfer v1.12.0/go.mod h1:tDrD2jLU2TpVhd+5B8iqBp0fQRV4lP80WZccKXugjYc=
github.com/filecoin-project/go-data-transfer v1.12.1 h1:gAznAZKySVs2FS6T/vDq7R3f0DewLnxeROe0oOE6bZU=
github.com/filecoin-project/go-data-transfer v1.12.1/go.mod h1:j3HL645YiQFxcM+q7uPlGApILSqeweDABNgZQP7pDYU=
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff h1:2bG2ggVZ/rInd/YqUfRj4A5siGuYOPxxuD4I8nYLJF0= github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff h1:2bG2ggVZ/rInd/YqUfRj4A5siGuYOPxxuD4I8nYLJF0=
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4= github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
@ -423,6 +424,11 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA=
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.0 h1:j4LrlVXgrbIWO83mmQUnK0Hi+YnbD+vzrE1z/EphbFE=
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg=
@ -713,8 +719,9 @@ github.com/ipfs/go-filestore v1.1.0 h1:Pu4tLBi1bucu6/HU9llaOmb9yLFk/sgP+pW764zND
github.com/ipfs/go-filestore v1.1.0/go.mod h1:6e1/5Y6NvLuCRdmda/KA4GUhXJQ3Uat6vcWm2DJfxc8= github.com/ipfs/go-filestore v1.1.0/go.mod h1:6e1/5Y6NvLuCRdmda/KA4GUhXJQ3Uat6vcWm2DJfxc8=
github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0= github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0=
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM= github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
github.com/ipfs/go-graphsync v0.11.0 h1:PiiD5CnoC3xEHMW8d6uBGqGcoTwiMB5d9CORIEyF6iA=
github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE= github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE=
github.com/ipfs/go-graphsync v0.11.5 h1:WA5hVxGBtcal6L6nqubKiqRolaZxbexOK3GumGFJRR4=
github.com/ipfs/go-graphsync v0.11.5/go.mod h1:+/sZqRwRCQRrV7NCzgBtufmr5QGpUE98XSa7NlsztmM=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
@ -815,8 +822,9 @@ github.com/ipfs/go-path v0.0.7 h1:H06hKMquQ0aYtHiHryOMLpQC1qC3QwXwkahcEVD51Ho=
github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd2Sno= github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd2Sno=
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ= github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4= github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4=
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
@ -1866,8 +1874,9 @@ go.opencensus.io v0.22.6-0.20201102222123-380f4078db9f/go.mod h1:5pWMHQbX5EPX2/6
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ=
go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I=
go.opentelemetry.io/otel v1.3.0 h1:APxLf0eiBwLl+SOXiJJCVYzA1OOJNyAoV8C5RNRyy7Y=
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
go.opentelemetry.io/otel/bridge/opencensus v0.25.0 h1:18Ww8TpCEGes12HZJzB2nEbUglvMLzPxqgZypsrKiNc= go.opentelemetry.io/otel/bridge/opencensus v0.25.0 h1:18Ww8TpCEGes12HZJzB2nEbUglvMLzPxqgZypsrKiNc=
go.opentelemetry.io/otel/bridge/opencensus v0.25.0/go.mod h1:dkZDdaNwLlIutxK2Kc2m3jwW2M1ISaNf8/rOYVwuVHs= go.opentelemetry.io/otel/bridge/opencensus v0.25.0/go.mod h1:dkZDdaNwLlIutxK2Kc2m3jwW2M1ISaNf8/rOYVwuVHs=
go.opentelemetry.io/otel/exporters/jaeger v1.2.0 h1:C/5Egj3MJBXRJi22cSl07suqPqtZLnLFmH//OxETUEc= go.opentelemetry.io/otel/exporters/jaeger v1.2.0 h1:C/5Egj3MJBXRJi22cSl07suqPqtZLnLFmH//OxETUEc=
@ -1886,8 +1895,9 @@ go.opentelemetry.io/otel/sdk/export/metric v0.25.0/go.mod h1:Ej7NOa+WpN49EIcr1HM
go.opentelemetry.io/otel/sdk/metric v0.25.0 h1:J+Ta+4IAA5W9AdWhGQLfciEpavBqqSkBzTDeYvJLFNU= go.opentelemetry.io/otel/sdk/metric v0.25.0 h1:J+Ta+4IAA5W9AdWhGQLfciEpavBqqSkBzTDeYvJLFNU=
go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCbofGGg4ZU2VKKtDRfg= go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCbofGGg4ZU2VKKtDRfg=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
go.opentelemetry.io/otel/trace v1.3.0 h1:doy8Hzb1RJ+I3yFhtDmwNc7tIyw1tNMOIsyPzp1NOGY=
go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=

View File

@ -164,7 +164,9 @@ func ConfigStorageMiner(c interface{}) Option {
Override(HandleRetrievalKey, modules.HandleRetrieval), Override(HandleRetrievalKey, modules.HandleRetrieval),
// Markets (storage) // Markets (storage)
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(dtypes.ProviderTransferNetwork), modules.NewProviderTransferNetwork),
Override(new(dtypes.ProviderTransport), modules.NewProviderTransport),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDataTransfer),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProvider), modules.StorageProvider),

View File

@ -3,6 +3,7 @@ package impl
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
@ -19,6 +20,9 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
gsimpl "github.com/ipfs/go-graphsync/impl"
"github.com/ipfs/go-graphsync/peerstate"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx" "go.uber.org/fx"
@ -28,6 +32,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
gst "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
@ -70,6 +75,8 @@ type StorageMinerAPI struct {
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"` RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
SectorAccessor retrievalmarket.SectorAccessor `optional:"true"` SectorAccessor retrievalmarket.SectorAccessor `optional:"true"`
DataTransfer dtypes.ProviderDataTransfer `optional:"true"` DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
StagingGraphsync dtypes.StagingGraphsync `optional:"true"`
Transport dtypes.ProviderTransport `optional:"true"`
DealPublisher *storageadapter.DealPublisher `optional:"true"` DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
Host host.Host `optional:"true"` Host host.Host `optional:"true"`
@ -553,6 +560,166 @@ func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-cha
return channels, nil return channels, nil
} }
func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mpid peer.ID) (*api.TransferDiagnostics, error) {
gsTransport, ok := sm.Transport.(*gst.Transport)
if !ok {
return nil, errors.New("api only works for graphsync as transport")
}
graphsyncConcrete, ok := sm.StagingGraphsync.(*gsimpl.GraphSync)
if !ok {
return nil, errors.New("api only works for non-mock graphsync implementation")
}
inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx)
if err != nil {
return nil, err
}
allReceivingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
allSendingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState)
for channelID, channel := range inProgressChannels {
if channel.OtherPeer() != mpid {
continue
}
if channel.Status() == datatransfer.Completed {
continue
}
if channel.Status() == datatransfer.Failed || channel.Status() == datatransfer.Cancelled {
continue
}
if channel.SelfPeer() == channel.Sender() {
allSendingChannels[channelID] = channel
} else {
allReceivingChannels[channelID] = channel
}
}
// gather information about active transport channels
transportChannels := gsTransport.ChannelsForPeer(mpid)
// gather information about graphsync state for peer
gsPeerState := graphsyncConcrete.PeerState(mpid)
sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState, allSendingChannels)
receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState, allReceivingChannels)
return &api.TransferDiagnostics{
SendingTransfers: sendingTransfers,
ReceivingTransfers: receivingTransfers,
}, nil
}
// generate transfers matches graphsync state and data transfer state for a given peer
// to produce detailed output on what's happening with a transfer
func (sm *StorageMinerAPI) generateTransfers(ctx context.Context,
transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests,
gsPeerState peerstate.PeerState,
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState) []*api.GraphSyncDataTransfer {
tc := &transferConverter{
matchedChannelIds: make(map[datatransfer.ChannelID]struct{}),
matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer),
gsDiagnostics: gsPeerState.Diagnostics(),
requestStates: gsPeerState.RequestStates,
allChannels: allChannels,
}
// iterate through all operating data transfer transport channels
for channelID, channelRequests := range transportChannels {
originalState, err := sm.DataTransfer.ChannelState(ctx, channelID)
var baseDiagnostics []string
var channelState *api.DataTransferChannel
if err != nil {
baseDiagnostics = append(baseDiagnostics, fmt.Sprintf("Unable to lookup channel state: %s", err))
} else {
cs := api.NewDataTransferChannel(sm.Host.ID(), originalState)
channelState = &cs
}
// add the current request for this channel
tc.convertTransfer(channelID, true, channelState, baseDiagnostics, channelRequests.Current, true)
for _, requestID := range channelRequests.Previous {
// add any previous requests that were cancelled for a restart
tc.convertTransfer(channelID, true, channelState, baseDiagnostics, requestID, false)
}
}
// collect any graphsync data for channels we don't have any data transfer data for
tc.collectRemainingTransfers()
return tc.transfers
}
type transferConverter struct {
matchedChannelIds map[datatransfer.ChannelID]struct{}
matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer
transfers []*api.GraphSyncDataTransfer
gsDiagnostics map[graphsync.RequestID][]string
requestStates graphsync.RequestStates
allChannels map[datatransfer.ChannelID]datatransfer.ChannelState
}
// convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request
func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, hasChannelID bool, channelState *api.DataTransferChannel, baseDiagnostics []string,
requestID graphsync.RequestID, isCurrentChannelRequest bool) {
diagnostics := baseDiagnostics
state, hasState := tc.requestStates[requestID]
stateString := state.String()
if !hasState {
stateString = "no graphsync state found"
}
var channelIDPtr *datatransfer.ChannelID
if !hasChannelID {
diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %d", requestID))
} else {
channelIDPtr = &channelID
if isCurrentChannelRequest && !hasState {
diagnostics = append(diagnostics, fmt.Sprintf("No current request state for data transfer channel id %s", channelID))
} else if !isCurrentChannelRequest && hasState {
diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %d is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID))
}
}
diagnostics = append(diagnostics, tc.gsDiagnostics[requestID]...)
transfer := &api.GraphSyncDataTransfer{
RequestID: requestID,
RequestState: stateString,
IsCurrentChannelRequest: isCurrentChannelRequest,
ChannelID: channelIDPtr,
ChannelState: channelState,
Diagnostics: diagnostics,
}
tc.transfers = append(tc.transfers, transfer)
tc.matchedRequests[requestID] = transfer
if hasChannelID {
tc.matchedChannelIds[channelID] = struct{}{}
}
}
func (tc *transferConverter) collectRemainingTransfers() {
for requestID := range tc.requestStates {
if _, ok := tc.matchedRequests[requestID]; !ok {
tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false)
}
}
for requestID := range tc.gsDiagnostics {
if _, ok := tc.matchedRequests[requestID]; !ok {
tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false)
}
}
for channelID, channelState := range tc.allChannels {
if _, ok := tc.matchedChannelIds[channelID]; !ok {
channelID := channelID
cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState)
transfer := &api.GraphSyncDataTransfer{
RequestID: graphsync.RequestID(-1),
RequestState: "graphsync state unknown",
IsCurrentChannelRequest: false,
ChannelID: &channelID,
ChannelState: &cs,
Diagnostics: []string{"data transfer with no open transport channel, cannot determine linked graphsync request"},
}
tc.transfers = append(tc.transfers, transfer)
}
}
}
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) { func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return sm.DealPublisher.PendingDeals(), nil return sm.DealPublisher.PendingDeals(), nil
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"
dtnet "github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
@ -85,6 +86,7 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator
// ProviderDataTransfer is a data transfer manager for the provider // ProviderDataTransfer is a data transfer manager for the provider
type ProviderDataTransfer datatransfer.Manager type ProviderDataTransfer datatransfer.Manager
type ProviderTransferNetwork dtnet.DataTransferNetwork
type ProviderTransport datatransfer.Transport
type StagingBlockstore blockstore.BasicBlockstore type StagingBlockstore blockstore.BasicBlockstore
type StagingGraphsync graphsync.GraphExchange type StagingGraphsync graphsync.GraphExchange

View File

@ -335,13 +335,19 @@ func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.
}) })
} }
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just // NewProviderTransferNetwork sets up the libp2p2 protocol networking for data transfer
// uses the provider's Staging DAG service for transfers func NewProviderTransferNetwork(h host.Host) dtypes.ProviderTransferNetwork {
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) { return dtnet.NewFromLibp2pHost(h)
net := dtnet.NewFromLibp2pHost(h) }
// NewProviderTransport sets up a data transfer transport over graphsync
func NewProviderTransport(h host.Host, gs dtypes.StagingGraphsync, net dtypes.ProviderTransferNetwork) dtypes.ProviderTransport {
return dtgstransport.NewTransport(h.ID(), gs, net)
}
// NewProviderDataTransfer returns a data transfer manager
func NewProviderDataTransfer(lc fx.Lifecycle, net dtypes.ProviderTransferNetwork, transport dtypes.ProviderTransport, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) {
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers")) dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers"))
transport := dtgstransport.NewTransport(h.ID(), gs, net)
err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec
if err != nil && !os.IsExist(err) { if err != nil && !os.IsExist(err) {
return nil, err return nil, err