diff --git a/api/api_storage.go b/api/api_storage.go index c0d9c9f9c..3f0ef50b7 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -165,6 +165,8 @@ type StorageMiner interface { MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) //perm:read MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) //perm:write MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) //perm:write + // MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync + MarketDataTransferDiagnostics(ctx context.Context, p peer.ID) (*TransferDiagnostics, error) //perm:write // MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error //perm:write // MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 5d87e9d20..03ce9109d 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/google/uuid" "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync" "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -120,6 +121,7 @@ func init() { addExample(api.FullAPIVersion1) addExample(api.PCHInbound) addExample(time.Minute) + addExample(graphsync.RequestID(4)) addExample(datatransfer.TransferID(3)) addExample(datatransfer.Ongoing) addExample(storeIDExample) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 10a21b8fa..09c71a167 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -668,6 +668,8 @@ type StorageMinerStruct struct { MarketCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` + MarketDataTransferDiagnostics func(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) `perm:"write"` + MarketDataTransferUpdates func(p0 context.Context) (<-chan DataTransferChannel, error) `perm:"write"` MarketGetAsk func(p0 context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"` @@ -3972,6 +3974,17 @@ func (s *StorageMinerStub) MarketCancelDataTransfer(p0 context.Context, p1 datat return ErrNotSupported } +func (s *StorageMinerStruct) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) { + if s.Internal.MarketDataTransferDiagnostics == nil { + return nil, ErrNotSupported + } + return s.Internal.MarketDataTransferDiagnostics(p0, p1) +} + +func (s *StorageMinerStub) MarketDataTransferDiagnostics(p0 context.Context, p1 peer.ID) (*TransferDiagnostics, error) { + return nil, ErrNotSupported +} + func (s *StorageMinerStruct) MarketDataTransferUpdates(p0 context.Context) (<-chan DataTransferChannel, error) { if s.Internal.MarketDataTransferUpdates == nil { return nil, ErrNotSupported diff --git a/api/types.go b/api/types.go index 0ecda0405..81345306d 100644 --- a/api/types.go +++ b/api/types.go @@ -10,6 +10,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync" "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -53,6 +54,30 @@ type MessageSendSpec struct { MaxFee abi.TokenAmount } +// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync +type GraphSyncDataTransfer struct { + // GraphSync request id for this transfer + RequestID graphsync.RequestID + // Graphsync state for this transfer + RequestState string + // If a channel ID is present, indicates whether this is the current graphsync request for this channel + // (could have changed in a restart) + IsCurrentChannelRequest bool + // Data transfer channel ID for this transfer + ChannelID *datatransfer.ChannelID + // Data transfer state for this transfer + ChannelState *DataTransferChannel + // Diagnostic information about this request -- and unexpected inconsistencies in + // request state + Diagnostics []string +} + +// TransferDiagnostics give current information about transfers going over graphsync that may be helpful for debugging +type TransferDiagnostics struct { + ReceivingTransfers []*GraphSyncDataTransfer + SendingTransfers []*GraphSyncDataTransfer +} + type DataTransferChannel struct { TransferID datatransfer.TransferID Status datatransfer.Status diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 33a869e4a..c5b981404 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 792ad55b9..b3228b6a6 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 6bd89b2b4..1a0cb26b5 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-miner/market.go b/cmd/lotus-miner/market.go index d3da6f9b3..c7089e74e 100644 --- a/cmd/lotus-miner/market.go +++ b/cmd/lotus-miner/market.go @@ -638,6 +638,7 @@ var dataTransfersCmd = &cli.Command{ transfersListCmd, marketRestartTransfer, marketCancelTransfer, + transfersDiagnosticsCmd, }, } @@ -857,6 +858,38 @@ var transfersListCmd = &cli.Command{ }, } +var transfersDiagnosticsCmd = &cli.Command{ + Name: "diagnostics", + Usage: "Get detailed diagnostics on active transfers with a specific peer", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + api, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + targetPeer, err := peer.Decode(cctx.Args().First()) + if err != nil { + return err + } + diagnostics, err := api.MarketDataTransferDiagnostics(ctx, targetPeer) + if err != nil { + return err + } + out, err := json.MarshalIndent(diagnostics, "", "\t") + if err != nil { + return err + } + fmt.Println(string(out)) + return nil + }, +} + var dealsPendingPublish = &cli.Command{ Name: "pending-publish", Usage: "list deals waiting in publish queue", diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index fbe8b6eb5..49e8c7a1e 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -49,6 +49,7 @@ * [LogSetLevel](#LogSetLevel) * [Market](#Market) * [MarketCancelDataTransfer](#MarketCancelDataTransfer) + * [MarketDataTransferDiagnostics](#MarketDataTransferDiagnostics) * [MarketDataTransferUpdates](#MarketDataTransferUpdates) * [MarketGetAsk](#MarketGetAsk) * [MarketGetDealUpdates](#MarketGetDealUpdates) @@ -724,6 +725,27 @@ Inputs: Response: `{}` +### MarketDataTransferDiagnostics +MarketDataTransferDiagnostics generates debugging information about current data transfers over graphsync + + +Perms: write + +Inputs: +```json +[ + "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" +] +``` + +Response: +```json +{ + "ReceivingTransfers": null, + "SendingTransfers": null +} +``` + ### MarketDataTransferUpdates diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index fa4f08b4b..d50161957 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -974,10 +974,11 @@ USAGE: lotus-miner data-transfers command [command options] [arguments...] COMMANDS: - list List ongoing data transfers for this miner - restart Force restart a stalled data transfer - cancel Force cancel a data transfer - help, h Shows a list of commands or help for one command + list List ongoing data transfers for this miner + restart Force restart a stalled data transfer + cancel Force cancel a data transfer + diagnostics Get detailed diagnostics on active transfers with a specific peer + help, h Shows a list of commands or help for one command OPTIONS: --help, -h show help (default: false) @@ -1034,6 +1035,19 @@ OPTIONS: ``` +### lotus-miner data-transfers diagnostics +``` +NAME: + lotus-miner data-transfers diagnostics - Get detailed diagnostics on active transfers with a specific peer + +USAGE: + lotus-miner data-transfers diagnostics [command options] [arguments...] + +OPTIONS: + --help, -h show help (default: false) + +``` + ## lotus-miner dagstore ``` NAME: diff --git a/go.mod b/go.mod index c6ee3e562..551af628d 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.1 github.com/filecoin-project/go-commp-utils v0.1.3 github.com/filecoin-project/go-crypto v0.0.1 - github.com/filecoin-project/go-data-transfer v1.12.0 + github.com/filecoin-project/go-data-transfer v1.12.1 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.14.1 @@ -78,7 +78,7 @@ require ( github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ds-measure v0.2.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.11.0 + github.com/ipfs/go-graphsync v0.11.5 github.com/ipfs/go-ipfs-blockstore v1.1.2 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 @@ -150,7 +150,7 @@ require ( github.com/whyrusleeping/pubsub v0.0.0-20190708150250-92bcb0691325 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 go.opencensus.io v0.23.0 - go.opentelemetry.io/otel v1.2.0 + go.opentelemetry.io/otel v1.3.0 go.opentelemetry.io/otel/bridge/opencensus v0.25.0 go.opentelemetry.io/otel/exporters/jaeger v1.2.0 go.opentelemetry.io/otel/sdk v1.2.0 diff --git a/go.sum b/go.sum index 54b1c414d..45625140b 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,9 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o= github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-transfer v1.12.0 h1:y44x35JvB93kezahMURKizIa/aizGTPSHqi5cbAfTEo= github.com/filecoin-project/go-data-transfer v1.12.0/go.mod h1:tDrD2jLU2TpVhd+5B8iqBp0fQRV4lP80WZccKXugjYc= +github.com/filecoin-project/go-data-transfer v1.12.1 h1:gAznAZKySVs2FS6T/vDq7R3f0DewLnxeROe0oOE6bZU= +github.com/filecoin-project/go-data-transfer v1.12.1/go.mod h1:j3HL645YiQFxcM+q7uPlGApILSqeweDABNgZQP7pDYU= github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff h1:2bG2ggVZ/rInd/YqUfRj4A5siGuYOPxxuD4I8nYLJF0= github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= @@ -423,6 +424,11 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.1 h1:DX7uPQ4WgAWfoh+NGGlbJQswnYIVvz0SRlLS3rPZQDA= +github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.0 h1:j4LrlVXgrbIWO83mmQUnK0Hi+YnbD+vzrE1z/EphbFE= +github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= @@ -713,8 +719,9 @@ github.com/ipfs/go-filestore v1.1.0 h1:Pu4tLBi1bucu6/HU9llaOmb9yLFk/sgP+pW764zND github.com/ipfs/go-filestore v1.1.0/go.mod h1:6e1/5Y6NvLuCRdmda/KA4GUhXJQ3Uat6vcWm2DJfxc8= github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0= github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM= -github.com/ipfs/go-graphsync v0.11.0 h1:PiiD5CnoC3xEHMW8d6uBGqGcoTwiMB5d9CORIEyF6iA= github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE= +github.com/ipfs/go-graphsync v0.11.5 h1:WA5hVxGBtcal6L6nqubKiqRolaZxbexOK3GumGFJRR4= +github.com/ipfs/go-graphsync v0.11.5/go.mod h1:+/sZqRwRCQRrV7NCzgBtufmr5QGpUE98XSa7NlsztmM= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= @@ -815,8 +822,9 @@ github.com/ipfs/go-path v0.0.7 h1:H06hKMquQ0aYtHiHryOMLpQC1qC3QwXwkahcEVD51Ho= github.com/ipfs/go-path v0.0.7/go.mod h1:6KTKmeRnBXgqrTvzFrPV3CamxcgvXX/4z79tfAd2Sno= github.com/ipfs/go-peertaskqueue v0.0.4/go.mod h1:03H8fhyeMfKNFWqzYEVyMbcPUeYrqP1MX6Kd+aN+rMQ= github.com/ipfs/go-peertaskqueue v0.1.0/go.mod h1:Jmk3IyCcfl1W3jTW3YpghSwSEC6IJ3Vzz/jUmWw8Z0U= -github.com/ipfs/go-peertaskqueue v0.7.0 h1:VyO6G4sbzX80K58N60cCaHsSsypbUNs1GjO5seGNsQ0= github.com/ipfs/go-peertaskqueue v0.7.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= +github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKFlBAypE= +github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU= github.com/ipfs/go-todocounter v0.0.1/go.mod h1:l5aErvQc8qKE2r7NDMjmq5UNAvuZy0rC8BHOplkWvZ4= github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k= github.com/ipfs/go-unixfs v0.2.4/go.mod h1:SUdisfUjNoSDzzhGVxvCL9QO/nKdwXdr+gbMUdqcbYw= @@ -1866,8 +1874,9 @@ go.opencensus.io v0.22.6-0.20201102222123-380f4078db9f/go.mod h1:5pWMHQbX5EPX2/6 go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= -go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ= go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= +go.opentelemetry.io/otel v1.3.0 h1:APxLf0eiBwLl+SOXiJJCVYzA1OOJNyAoV8C5RNRyy7Y= +go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= go.opentelemetry.io/otel/bridge/opencensus v0.25.0 h1:18Ww8TpCEGes12HZJzB2nEbUglvMLzPxqgZypsrKiNc= go.opentelemetry.io/otel/bridge/opencensus v0.25.0/go.mod h1:dkZDdaNwLlIutxK2Kc2m3jwW2M1ISaNf8/rOYVwuVHs= go.opentelemetry.io/otel/exporters/jaeger v1.2.0 h1:C/5Egj3MJBXRJi22cSl07suqPqtZLnLFmH//OxETUEc= @@ -1886,8 +1895,9 @@ go.opentelemetry.io/otel/sdk/export/metric v0.25.0/go.mod h1:Ej7NOa+WpN49EIcr1HM go.opentelemetry.io/otel/sdk/metric v0.25.0 h1:J+Ta+4IAA5W9AdWhGQLfciEpavBqqSkBzTDeYvJLFNU= go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCbofGGg4ZU2VKKtDRfg= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= -go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0= go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= +go.opentelemetry.io/otel/trace v1.3.0 h1:doy8Hzb1RJ+I3yFhtDmwNc7tIyw1tNMOIsyPzp1NOGY= +go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= diff --git a/node/builder_miner.go b/node/builder_miner.go index c8a04255f..e813a2d24 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -164,7 +164,9 @@ func ConfigStorageMiner(c interface{}) Option { Override(HandleRetrievalKey, modules.HandleRetrieval), // Markets (storage) - Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), + Override(new(dtypes.ProviderTransferNetwork), modules.NewProviderTransferNetwork), + Override(new(dtypes.ProviderTransport), modules.NewProviderTransport), + Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDataTransfer), Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b94097d39..764e4fb36 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "encoding/json" + "errors" "fmt" "net/http" "os" @@ -19,6 +20,9 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync" + gsimpl "github.com/ipfs/go-graphsync/impl" + "github.com/ipfs/go-graphsync/peerstate" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/fx" @@ -28,6 +32,7 @@ import ( "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" + gst "github.com/filecoin-project/go-data-transfer/transport/graphsync" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" @@ -70,6 +75,8 @@ type StorageMinerAPI struct { RetrievalProvider retrievalmarket.RetrievalProvider `optional:"true"` SectorAccessor retrievalmarket.SectorAccessor `optional:"true"` DataTransfer dtypes.ProviderDataTransfer `optional:"true"` + StagingGraphsync dtypes.StagingGraphsync `optional:"true"` + Transport dtypes.ProviderTransport `optional:"true"` DealPublisher *storageadapter.DealPublisher `optional:"true"` SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` Host host.Host `optional:"true"` @@ -553,6 +560,166 @@ func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-cha return channels, nil } +func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mpid peer.ID) (*api.TransferDiagnostics, error) { + gsTransport, ok := sm.Transport.(*gst.Transport) + if !ok { + return nil, errors.New("api only works for graphsync as transport") + } + graphsyncConcrete, ok := sm.StagingGraphsync.(*gsimpl.GraphSync) + if !ok { + return nil, errors.New("api only works for non-mock graphsync implementation") + } + + inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + allReceivingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState) + allSendingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState) + for channelID, channel := range inProgressChannels { + if channel.OtherPeer() != mpid { + continue + } + if channel.Status() == datatransfer.Completed { + continue + } + if channel.Status() == datatransfer.Failed || channel.Status() == datatransfer.Cancelled { + continue + } + if channel.SelfPeer() == channel.Sender() { + allSendingChannels[channelID] = channel + } else { + allReceivingChannels[channelID] = channel + } + } + + // gather information about active transport channels + transportChannels := gsTransport.ChannelsForPeer(mpid) + // gather information about graphsync state for peer + gsPeerState := graphsyncConcrete.PeerState(mpid) + + sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState, allSendingChannels) + receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState, allReceivingChannels) + + return &api.TransferDiagnostics{ + SendingTransfers: sendingTransfers, + ReceivingTransfers: receivingTransfers, + }, nil +} + +// generate transfers matches graphsync state and data transfer state for a given peer +// to produce detailed output on what's happening with a transfer +func (sm *StorageMinerAPI) generateTransfers(ctx context.Context, + transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests, + gsPeerState peerstate.PeerState, + allChannels map[datatransfer.ChannelID]datatransfer.ChannelState) []*api.GraphSyncDataTransfer { + tc := &transferConverter{ + matchedChannelIds: make(map[datatransfer.ChannelID]struct{}), + matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer), + gsDiagnostics: gsPeerState.Diagnostics(), + requestStates: gsPeerState.RequestStates, + allChannels: allChannels, + } + + // iterate through all operating data transfer transport channels + for channelID, channelRequests := range transportChannels { + originalState, err := sm.DataTransfer.ChannelState(ctx, channelID) + var baseDiagnostics []string + var channelState *api.DataTransferChannel + if err != nil { + baseDiagnostics = append(baseDiagnostics, fmt.Sprintf("Unable to lookup channel state: %s", err)) + } else { + cs := api.NewDataTransferChannel(sm.Host.ID(), originalState) + channelState = &cs + } + // add the current request for this channel + tc.convertTransfer(channelID, true, channelState, baseDiagnostics, channelRequests.Current, true) + for _, requestID := range channelRequests.Previous { + // add any previous requests that were cancelled for a restart + tc.convertTransfer(channelID, true, channelState, baseDiagnostics, requestID, false) + } + } + + // collect any graphsync data for channels we don't have any data transfer data for + tc.collectRemainingTransfers() + + return tc.transfers +} + +type transferConverter struct { + matchedChannelIds map[datatransfer.ChannelID]struct{} + matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer + transfers []*api.GraphSyncDataTransfer + gsDiagnostics map[graphsync.RequestID][]string + requestStates graphsync.RequestStates + allChannels map[datatransfer.ChannelID]datatransfer.ChannelState +} + +// convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request +func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, hasChannelID bool, channelState *api.DataTransferChannel, baseDiagnostics []string, + requestID graphsync.RequestID, isCurrentChannelRequest bool) { + diagnostics := baseDiagnostics + state, hasState := tc.requestStates[requestID] + stateString := state.String() + if !hasState { + stateString = "no graphsync state found" + } + var channelIDPtr *datatransfer.ChannelID + if !hasChannelID { + diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %d", requestID)) + } else { + channelIDPtr = &channelID + if isCurrentChannelRequest && !hasState { + diagnostics = append(diagnostics, fmt.Sprintf("No current request state for data transfer channel id %s", channelID)) + } else if !isCurrentChannelRequest && hasState { + diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %d is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID)) + } + } + diagnostics = append(diagnostics, tc.gsDiagnostics[requestID]...) + transfer := &api.GraphSyncDataTransfer{ + RequestID: requestID, + RequestState: stateString, + IsCurrentChannelRequest: isCurrentChannelRequest, + ChannelID: channelIDPtr, + ChannelState: channelState, + Diagnostics: diagnostics, + } + tc.transfers = append(tc.transfers, transfer) + tc.matchedRequests[requestID] = transfer + if hasChannelID { + tc.matchedChannelIds[channelID] = struct{}{} + } +} + +func (tc *transferConverter) collectRemainingTransfers() { + for requestID := range tc.requestStates { + if _, ok := tc.matchedRequests[requestID]; !ok { + tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false) + } + } + for requestID := range tc.gsDiagnostics { + if _, ok := tc.matchedRequests[requestID]; !ok { + tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false) + } + } + for channelID, channelState := range tc.allChannels { + if _, ok := tc.matchedChannelIds[channelID]; !ok { + channelID := channelID + cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState) + transfer := &api.GraphSyncDataTransfer{ + RequestID: graphsync.RequestID(-1), + RequestState: "graphsync state unknown", + IsCurrentChannelRequest: false, + ChannelID: &channelID, + ChannelState: &cs, + Diagnostics: []string{"data transfer with no open transport channel, cannot determine linked graphsync request"}, + } + tc.transfers = append(tc.transfers, transfer) + } + } +} + func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) { return sm.DealPublisher.PendingDeals(), nil } diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 6893908f7..542445b1e 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -8,6 +8,7 @@ import ( "github.com/ipfs/go-graphsync" exchange "github.com/ipfs/go-ipfs-exchange-interface" + dtnet "github.com/filecoin-project/go-data-transfer/network" "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" @@ -85,6 +86,7 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator // ProviderDataTransfer is a data transfer manager for the provider type ProviderDataTransfer datatransfer.Manager - +type ProviderTransferNetwork dtnet.DataTransferNetwork +type ProviderTransport datatransfer.Transport type StagingBlockstore blockstore.BasicBlockstore type StagingGraphsync graphsync.GraphExchange diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 552f43660..3b8687525 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -335,13 +335,19 @@ func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api. }) } -// NewProviderDAGServiceDataTransfer returns a data transfer manager that just -// uses the provider's Staging DAG service for transfers -func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) { - net := dtnet.NewFromLibp2pHost(h) +// NewProviderTransferNetwork sets up the libp2p2 protocol networking for data transfer +func NewProviderTransferNetwork(h host.Host) dtypes.ProviderTransferNetwork { + return dtnet.NewFromLibp2pHost(h) +} +// NewProviderTransport sets up a data transfer transport over graphsync +func NewProviderTransport(h host.Host, gs dtypes.StagingGraphsync, net dtypes.ProviderTransferNetwork) dtypes.ProviderTransport { + return dtgstransport.NewTransport(h.ID(), gs, net) +} + +// NewProviderDataTransfer returns a data transfer manager +func NewProviderDataTransfer(lc fx.Lifecycle, net dtypes.ProviderTransferNetwork, transport dtypes.ProviderTransport, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ProviderDataTransfer, error) { dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/provider/transfers")) - transport := dtgstransport.NewTransport(h.ID(), gs, net) err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec if err != nil && !os.IsExist(err) { return nil, err