diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 8a3f94e35..764e4fb36 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -569,13 +569,38 @@ func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mp if !ok { return nil, errors.New("api only works for non-mock graphsync implementation") } + + inProgressChannels, err := sm.DataTransfer.InProgressChannels(ctx) + if err != nil { + return nil, err + } + + allReceivingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState) + allSendingChannels := make(map[datatransfer.ChannelID]datatransfer.ChannelState) + for channelID, channel := range inProgressChannels { + if channel.OtherPeer() != mpid { + continue + } + if channel.Status() == datatransfer.Completed { + continue + } + if channel.Status() == datatransfer.Failed || channel.Status() == datatransfer.Cancelled { + continue + } + if channel.SelfPeer() == channel.Sender() { + allSendingChannels[channelID] = channel + } else { + allReceivingChannels[channelID] = channel + } + } + // gather information about active transport channels transportChannels := gsTransport.ChannelsForPeer(mpid) // gather information about graphsync state for peer gsPeerState := graphsyncConcrete.PeerState(mpid) - sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState) - receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState) + sendingTransfers := sm.generateTransfers(ctx, transportChannels.SendingChannels, gsPeerState.IncomingState, allSendingChannels) + receivingTransfers := sm.generateTransfers(ctx, transportChannels.ReceivingChannels, gsPeerState.OutgoingState, allReceivingChannels) return &api.TransferDiagnostics{ SendingTransfers: sendingTransfers, @@ -587,11 +612,14 @@ func (sm *StorageMinerAPI) MarketDataTransferDiagnostics(ctx context.Context, mp // to produce detailed output on what's happening with a transfer func (sm *StorageMinerAPI) generateTransfers(ctx context.Context, transportChannels map[datatransfer.ChannelID]gst.ChannelGraphsyncRequests, - gsPeerState peerstate.PeerState) []*api.GraphSyncDataTransfer { + gsPeerState peerstate.PeerState, + allChannels map[datatransfer.ChannelID]datatransfer.ChannelState) []*api.GraphSyncDataTransfer { tc := &transferConverter{ - matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer), - gsDiagnostics: gsPeerState.Diagnostics(), - requestStates: gsPeerState.RequestStates, + matchedChannelIds: make(map[datatransfer.ChannelID]struct{}), + matchedRequests: make(map[graphsync.RequestID]*api.GraphSyncDataTransfer), + gsDiagnostics: gsPeerState.Diagnostics(), + requestStates: gsPeerState.RequestStates, + allChannels: allChannels, } // iterate through all operating data transfer transport channels @@ -620,10 +648,12 @@ func (sm *StorageMinerAPI) generateTransfers(ctx context.Context, } type transferConverter struct { - matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer - transfers []*api.GraphSyncDataTransfer - gsDiagnostics map[graphsync.RequestID][]string - requestStates graphsync.RequestStates + matchedChannelIds map[datatransfer.ChannelID]struct{} + matchedRequests map[graphsync.RequestID]*api.GraphSyncDataTransfer + transfers []*api.GraphSyncDataTransfer + gsDiagnostics map[graphsync.RequestID][]string + requestStates graphsync.RequestStates + allChannels map[datatransfer.ChannelID]datatransfer.ChannelState } // convert transfer assembles transfer and diagnostic data for a given graphsync/data-transfer request @@ -657,6 +687,9 @@ func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, h } tc.transfers = append(tc.transfers, transfer) tc.matchedRequests[requestID] = transfer + if hasChannelID { + tc.matchedChannelIds[channelID] = struct{}{} + } } func (tc *transferConverter) collectRemainingTransfers() { @@ -670,6 +703,21 @@ func (tc *transferConverter) collectRemainingTransfers() { tc.convertTransfer(datatransfer.ChannelID{}, false, nil, nil, requestID, false) } } + for channelID, channelState := range tc.allChannels { + if _, ok := tc.matchedChannelIds[channelID]; !ok { + channelID := channelID + cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState) + transfer := &api.GraphSyncDataTransfer{ + RequestID: graphsync.RequestID(-1), + RequestState: "graphsync state unknown", + IsCurrentChannelRequest: false, + ChannelID: &channelID, + ChannelState: &cs, + Diagnostics: []string{"data transfer with no open transport channel, cannot determine linked graphsync request"}, + } + tc.transfers = append(tc.transfers, transfer) + } + } } func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {