feat(storageminer): add api for transfer diagnostics
Add API + CLI for inspecting in depth diagnostics on graphsync transfers with a given peer
This commit is contained in:
parent
1bc987772c
commit
cddf63efe9
@ -165,6 +165,8 @@ type StorageMiner interface {
|
||||
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) //perm:read
|
||||
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) //perm:write
|
||||
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) //perm:write
|
||||
// MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync
|
||||
MarketDataTransferDiagnostics(ctx context.Context, p peer.ID) (*TransferDiagnostics, error) //perm:write
|
||||
// MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
|
||||
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error //perm:write
|
||||
// MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-graphsync"
|
||||
"github.com/libp2p/go-libp2p-core/metrics"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -120,6 +121,7 @@ func init() {
|
||||
addExample(api.FullAPIVersion1)
|
||||
addExample(api.PCHInbound)
|
||||
addExample(time.Minute)
|
||||
addExample(graphsync.RequestID(4))
|
||||
addExample(datatransfer.TransferID(3))
|
||||
addExample(datatransfer.Ongoing)
|
||||
addExample(storeIDExample)
|
||||
|
@ -668,6 +668,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
MarketCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
|
||||
|
||||
MarketDataTransferDiagnostics func(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) `perm:"write"`
|
||||
|
||||
MarketDataTransferUpdates func(p0 context.Context) (<-chan DataTransferChannel, error) `perm:"write"`
|
||||
|
||||
MarketGetAsk func(p0 context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
|
||||
@ -3972,6 +3974,17 @@ func (s *StorageMinerStub) MarketCancelDataTransfer(p0 context.Context, p1 datat
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) {
|
||||
if s.Internal.MarketDataTransferDiagnostics == nil {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
return s.Internal.MarketDataTransferDiagnostics(p0, p1)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) MarketDataTransferUpdates(p0 context.Context) (<-chan DataTransferChannel, error) {
|
||||
if s.Internal.MarketDataTransferUpdates == nil {
|
||||
return nil, ErrNotSupported
|
||||
|
25
api/types.go
25
api/types.go
@ -10,6 +10,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-graphsync"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
@ -53,6 +54,30 @@ type MessageSendSpec struct {
|
||||
MaxFee abi.TokenAmount
|
||||
}
|
||||
|
||||
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
|
||||
type GraphSyncDataTransfer struct {
|
||||
// GraphSync request id for this transfer
|
||||
RequestID graphsync.RequestID
|
||||
// Graphsync state for this transfer
|
||||
RequestState string
|
||||
// If a channel ID is present, indicates whether this is the current graphsync request for this channel
|
||||
// (could have changed in a restart)
|
||||
IsCurrentChannelRequest bool
|
||||
// Data transfer channel ID for this transfer
|
||||
ChannelID *datatransfer.ChannelID
|
||||
// Data transfer state for this transfer
|
||||
ChannelState *DataTransferChannel
|
||||
// Diagnostic information about this request -- and unexpected inconsistencies in
|
||||
// request state
|
||||
Diagnostics []string
|
||||
}
|
||||
|
||||
// TransferDiagnostics give current information about transfers going over graphsync that may be helpful for debugging
|
||||
type TransferDiagnostics struct {
|
||||
ReceivingTransfers []*GraphSyncDataTransfer
|
||||
SendingTransfers []*GraphSyncDataTransfer
|
||||
}
|
||||
|
||||
type DataTransferChannel struct {
|
||||
TransferID datatransfer.TransferID
|
||||
Status datatransfer.Status
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -638,6 +638,7 @@ var dataTransfersCmd = &cli.Command{
|
||||
transfersListCmd,
|
||||
marketRestartTransfer,
|
||||
marketCancelTransfer,
|
||||
transfersDiagnosticsCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -857,6 +858,35 @@ var transfersListCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var transfersDiagnosticsCmd = &cli.Command{
|
||||
Name: "diagnostics",
|
||||
Usage: "Get detailed diagnostics on active transfers with a specific peer",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Args().Present() {
|
||||
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
|
||||
}
|
||||
api, closer, err := lcli.GetMarketsAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
targetPeer := peer.ID(cctx.Args().First())
|
||||
diagnostics, err := api.MarketDataTransferDiagnostics(ctx, targetPeer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out, err := json.MarshalIndent(diagnostics, "", "\t")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println(string(out))
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var dealsPendingPublish = &cli.Command{
|
||||
Name: "pending-publish",
|
||||
Usage: "list deals waiting in publish queue",
|
||||
|
@ -49,6 +49,7 @@
|
||||
* [LogSetLevel](#LogSetLevel)
|
||||
* [Market](#Market)
|
||||
* [MarketCancelDataTransfer](#MarketCancelDataTransfer)
|
||||
* [MarketDataTransferDiagnostics](#MarketDataTransferDiagnostics)
|
||||
* [MarketDataTransferUpdates](#MarketDataTransferUpdates)
|
||||
* [MarketGetAsk](#MarketGetAsk)
|
||||
* [MarketGetDealUpdates](#MarketGetDealUpdates)
|
||||
@ -724,6 +725,27 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### MarketDataTransferDiagnostics
|
||||
MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync
|
||||
|
||||
|
||||
Perms: write
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"ReceivingTransfers": null,
|
||||
"SendingTransfers": null
|
||||
}
|
||||
```
|
||||
|
||||
### MarketDataTransferUpdates
|
||||
|
||||
|
||||
|
@ -977,6 +977,7 @@ COMMANDS:
|
||||
list List ongoing data transfers for this miner
|
||||
restart Force restart a stalled data transfer
|
||||
cancel Force cancel a data transfer
|
||||
diagnostics Get detailed diagnostics on active transfers with a specific peer
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
@ -1034,6 +1035,19 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner data-transfers diagnostics
|
||||
```
|
||||
NAME:
|
||||
lotus-miner data-transfers diagnostics - Get detailed diagnostics on active transfers with a specific peer
|
||||
|
||||
USAGE:
|
||||
lotus-miner data-transfers diagnostics [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
## lotus-miner dagstore
|
||||
```
|
||||
NAME:
|
||||
|
6
go.mod
6
go.mod
@ -33,7 +33,7 @@ require (
|
||||
github.com/filecoin-project/go-cbor-util v0.0.1
|
||||
github.com/filecoin-project/go-commp-utils v0.1.3
|
||||
github.com/filecoin-project/go-crypto v0.0.1
|
||||
github.com/filecoin-project/go-data-transfer v1.12.0
|
||||
github.com/filecoin-project/go-data-transfer v1.12.1
|
||||
github.com/filecoin-project/go-fil-commcid v0.1.0
|
||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
||||
github.com/filecoin-project/go-fil-markets v1.14.1
|
||||
@ -78,7 +78,7 @@ require (
|
||||
github.com/ipfs/go-ds-leveldb v0.5.0
|
||||
github.com/ipfs/go-ds-measure v0.2.0
|
||||
github.com/ipfs/go-fs-lock v0.0.6
|
||||
github.com/ipfs/go-graphsync v0.11.0
|
||||
github.com/ipfs/go-graphsync v0.11.5
|
||||
github.com/ipfs/go-ipfs-blockstore v1.1.2
|
||||
github.com/ipfs/go-ipfs-blocksutil v0.0.1
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.5
|
||||
@ -150,7 +150,7 @@ require (
|
||||
github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325
|
||||
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
|
||||
go.opencensus.io v0.23.0
|
||||
go.opentelemetry.io/otel v1.2.0
|
||||
go.opentelemetry.io/otel v1.3.0
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.25.0
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
|
||||
go.opentelemetry.io/otel/sdk v1.2.0
|
||||
|
20
go.sum
20
go.sum
@ -312,8 +312,9 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
|
||||
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-data-transfer v1.12.0 h1:y44x35JvB93kezahMURKizIa/aizGTPSHqi5cbAfTEo=
|
||||
github.com/filecoin-project/go-data-transfer v1.12.0/go.mod h1:tDrD2jLU2TpVhd+5B8iqBp0fQRV4lP80WZccKXugjYc=
|
||||
github.com/filecoin-project/go-data-transfer v1.12.1 h1:gAznAZKySVs2FS6T/vDq7R3f0DewLnxeROe0oOE6bZU=
|
||||
github.com/filecoin-project/go-data-transfer v1.12.1/go.mod h1:j3HL645YiQFxcM+q7uPlGApILSqeweDABNgZQP7pDYU=
|
||||
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff h1:2bG2ggVZ/rInd/YqUfRj4A5siGuYOPxxuD4I8nYLJF0=
|
||||
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||
@ -423,6 +424,11 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
|
||||
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
|
||||
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
|
||||
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA=
|
||||
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/stdr v1.2.0 h1:j4LrlVXgrbIWO83mmQUnK0Hi+YnbD+vzrE1z/EphbFE=
|
||||
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
|
||||
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
|
||||
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg=
|
||||
@ -713,8 +719,9 @@ github.com/ipfs/go-filestore v1.1.0 h1:Pu4tLBi1bucu6/HU9llaOmb9yLFk/sgP+pW764zND
|
||||
github.com/ipfs/go-filestore v1.1.0/go.mod h1:6e1/5Y6NvLuCRdmda/KA4GUhXJQ3Uat6vcWm2DJfxc8=
|
||||
github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0=
|
||||
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
|
||||
github.com/ipfs/go-graphsync v0.11.0 h1:PiiD5CnoC3xEHMW8d6uBGqGcoTwiMB5d9CORIEyF6iA=
|
||||
github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE=
|
||||
github.com/ipfs/go-graphsync v0.11.5 h1:WA5hVxGBtcal6L6nqubKiqRolaZxbexOK3GumGFJRR4=
|
||||
github.com/ipfs/go-graphsync v0.11.5/go.mod h1:+/sZqRwRCQRrV7NCzgBtufmr5QGpUE98XSa7NlsztmM=
|
||||
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
|
||||
@ -815,8 +822,9 @@ github.com/ipfs/go-path v0.0.7 h1:H06hKMquQ0aYtHiHryOMLpQC1qC3QwXwkahcEVD51Ho=
|
||||
github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd2Sno=
|
||||
github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ=
|
||||
github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U=
|
||||
github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0=
|
||||
github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
|
||||
github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE=
|
||||
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
|
||||
github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4=
|
||||
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
|
||||
github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw=
|
||||
@ -1866,8 +1874,9 @@ go.opencensus.io v0.22.6-0.20201102222123-380f4078db9f/go.mod h1:5pWMHQbX5EPX2/6
|
||||
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
|
||||
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
|
||||
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
|
||||
go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ=
|
||||
go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I=
|
||||
go.opentelemetry.io/otel v1.3.0 h1:APxLf0eiBwLl+SOXiJJCVYzA1OOJNyAoV8C5RNRyy7Y=
|
||||
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.25.0 h1:18Ww8TpCEGes12HZJzB2nEbUglvMLzPxqgZypsrKiNc=
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.25.0/go.mod h1:dkZDdaNwLlIutxK2Kc2m3jwW2M1ISaNf8/rOYVwuVHs=
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.2.0 h1:C/5Egj3MJBXRJi22cSl07suqPqtZLnLFmH//OxETUEc=
|
||||
@ -1886,8 +1895,9 @@ go.opentelemetry.io/otel/sdk/export/metric v0.25.0/go.mod h1:Ej7NOa+WpN49EIcr1HM
|
||||
go.opentelemetry.io/otel/sdk/metric v0.25.0 h1:J+Ta+4IAA5W9AdWhGQLfciEpavBqqSkBzTDeYvJLFNU=
|
||||
go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCbofGGg4ZU2VKKtDRfg=
|
||||
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
|
||||
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
|
||||
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
|
||||
go.opentelemetry.io/otel/trace v1.3.0 h1:doy8Hzb1RJ+I3yFhtDmwNc7tIyw1tNMOIsyPzp1NOGY=
|
||||
go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk=
|
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
|
@ -164,7 +164,9 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||
|
||||
// Markets (storage)
|
||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||
Override(new(dtypes.ProviderTransferNetwork), modules.NewProviderTransferNetwork),
|
||||
Override(new(dtypes.ProviderTransport), modules.NewProviderTransport),
|
||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDataTransfer),
|
||||
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
|
||||
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
||||
|
@ -19,6 +19,9 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-graphsync"
|
||||
gsimpl "github.com/ipfs/go-graphsync/impl"
|
||||
"github.com/ipfs/go-graphsync/peerstate"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"go.uber.org/fx"
|
||||
@ -28,6 +31,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
gst "github.com/filecoin-project/go-data-transfer/transport/graphsync"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
@ -70,6 +74,8 @@ type StorageMinerAPI struct {
|
||||
RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"`
|
||||
SectorAccessor retrievalmarket.SectorAccessor `optional:"true"`
|
||||
DataTransfer dtypes.ProviderDataTransfer `optional:"true"`
|
||||
StagingGraphsync dtypes.StagingGraphsync `optional:"true"`
|
||||
Transport dtypes.ProviderTransport `optional:"true"`
|
||||
DealPublisher *storageadapter.DealPublisher `optional:"true"`
|
||||
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
|
||||
Host host.Host `optional:"true"`
|
||||
@ -553,6 +559,107 @@ func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-cha
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mpid peer.ID) (*api.TransferDiagnostics, error) {
|
||||
|
||||
// gather information about active transport channels
|
||||
transportChannels := sm.Transport.(*gst.Transport).ChannelsForPeer(mpid)
|
||||
// gather information about graphsync state for peer
|
||||
gsPeerState := sm.StagingGraphsync.(*gsimpl.GraphSync).PeerState(mpid)
|
||||
|
||||
sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState)
|
||||
receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState)
|
||||
|
||||
return &api.TransferDiagnostics{
|
||||
SendingTransfers: sendingTransfers,
|
||||
ReceivingTransfers: receivingTransfers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// generate transfers matches graphsync state and data transfer state for a given peer
|
||||
// to produce detailed output on what's happening with a transfer
|
||||
func (sm *StorageMinerAPI) generateTransfers(ctx context.Context,
|
||||
transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests,
|
||||
gsPeerState peerstate.PeerState) []*api.GraphSyncDataTransfer {
|
||||
tc := &transferConverter{
|
||||
matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer),
|
||||
gsDiagnostics: gsPeerState.Diagnostics(),
|
||||
requestStates: gsPeerState.RequestStates,
|
||||
}
|
||||
|
||||
// iterate through all operating data transfer transport channels
|
||||
for channelID, channelRequests := range transportChannels {
|
||||
originalState, err := sm.DataTransfer.ChannelState(ctx, channelID)
|
||||
var baseDiagnostics []string
|
||||
var channelState *api.DataTransferChannel
|
||||
if err != nil {
|
||||
baseDiagnostics = append(baseDiagnostics, fmt.Sprintf("Unable to lookup channel state: %s", err))
|
||||
} else {
|
||||
cs := api.NewDataTransferChannel(sm.Host.ID(), originalState)
|
||||
channelState = &cs
|
||||
}
|
||||
// add the current request for this channel
|
||||
tc.convertTransfer(&channelID, channelState, baseDiagnostics, channelRequests.Current, true)
|
||||
for _, requestID := range channelRequests.Previous {
|
||||
// add any previous requests that were cancelled for a restart
|
||||
tc.convertTransfer(&channelID, channelState, baseDiagnostics, requestID, false)
|
||||
}
|
||||
}
|
||||
|
||||
// collect any graphsync data for channels we don't have any data transfer data for
|
||||
tc.collectRemainingTransfers()
|
||||
|
||||
return tc.transfers
|
||||
}
|
||||
|
||||
type transferConverter struct {
|
||||
matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer
|
||||
transfers []*api.GraphSyncDataTransfer
|
||||
gsDiagnostics map[graphsync.RequestID][]string
|
||||
requestStates graphsync.RequestStates
|
||||
}
|
||||
|
||||
// convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request
|
||||
func (tc *transferConverter) convertTransfer(channelID *datatransfer.ChannelID, channelState *api.DataTransferChannel, baseDiagnostics []string,
|
||||
requestID graphsync.RequestID, isCurrentChannelRequest bool) {
|
||||
diagnostics := baseDiagnostics
|
||||
state, hasState := tc.requestStates[requestID]
|
||||
stateString := state.String()
|
||||
if !hasState {
|
||||
stateString = "no graphsync state found"
|
||||
}
|
||||
if channelID == nil {
|
||||
diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %d", requestID))
|
||||
} else if isCurrentChannelRequest && !hasState {
|
||||
diagnostics = append(diagnostics, fmt.Sprintf("No current request state for data transfer channel id %s", channelID))
|
||||
} else if !isCurrentChannelRequest && hasState {
|
||||
diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %d is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID))
|
||||
}
|
||||
diagnostics = append(diagnostics, tc.gsDiagnostics[requestID]...)
|
||||
transfer := &api.GraphSyncDataTransfer{
|
||||
RequestID: requestID,
|
||||
RequestState: stateString,
|
||||
IsCurrentChannelRequest: isCurrentChannelRequest,
|
||||
ChannelID: channelID,
|
||||
ChannelState: channelState,
|
||||
Diagnostics: diagnostics,
|
||||
}
|
||||
tc.transfers = append(tc.transfers, transfer)
|
||||
tc.matchedRequests[requestID] = transfer
|
||||
}
|
||||
|
||||
func (tc *transferConverter) collectRemainingTransfers() {
|
||||
for requestID := range tc.requestStates {
|
||||
if _, ok := tc.matchedRequests[requestID]; !ok {
|
||||
tc.convertTransfer(nil, nil, nil, requestID, false)
|
||||
}
|
||||
}
|
||||
for requestID := range tc.gsDiagnostics {
|
||||
if _, ok := tc.matchedRequests[requestID]; !ok {
|
||||
tc.convertTransfer(nil, nil, nil, requestID, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
|
||||
return sm.DealPublisher.PendingDeals(), nil
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"github.com/ipfs/go-graphsync"
|
||||
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||
|
||||
dtnet "github.com/filecoin-project/go-data-transfer/network"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
|
||||
|
||||
@ -85,6 +86,7 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator
|
||||
|
||||
// ProviderDataTransfer is a data transfer manager for the provider
|
||||
type ProviderDataTransfer datatransfer.Manager
|
||||
|
||||
type ProviderTransferNetwork dtnet.DataTransferNetwork
|
||||
type ProviderTransport datatransfer.Transport
|
||||
type StagingBlockstore blockstore.BasicBlockstore
|
||||
type StagingGraphsync graphsync.GraphExchange
|
||||
|
@ -335,13 +335,19 @@ func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.
|
||||
})
|
||||
}
|
||||
|
||||
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
||||
// uses the provider's Staging DAG service for transfers
|
||||
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) {
|
||||
net := dtnet.NewFromLibp2pHost(h)
|
||||
// NewProviderTransferNetwork sets up the libp2p2 protocol networking for data transfer
|
||||
func NewProviderTransferNetwork(h host.Host) dtypes.ProviderTransferNetwork {
|
||||
return dtnet.NewFromLibp2pHost(h)
|
||||
}
|
||||
|
||||
// NewProviderTransport sets up a data transfer transport over graphsync
|
||||
func NewProviderTransport(h host.Host, gs dtypes.StagingGraphsync, net dtypes.ProviderTransferNetwork) dtypes.ProviderTransport {
|
||||
return dtgstransport.NewTransport(h.ID(), gs, net)
|
||||
}
|
||||
|
||||
// NewProviderDataTransfer returns a data transfer manager
|
||||
func NewProviderDataTransfer(lc fx.Lifecycle, net dtypes.ProviderTransferNetwork, transport dtypes.ProviderTransport, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) {
|
||||
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers"))
|
||||
transport := dtgstransport.NewTransport(h.ID(), gs, net)
|
||||
err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec
|
||||
if err != nil && !os.IsExist(err) {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user