Merge pull request #2112 from laser/bugs/2111-hanging-client-retrieve

return error if retrieval deal rejected
This commit is contained in:
Łukasz Magiera 2020-06-23 21:56:02 +02:00 committed by GitHub
commit 0354275b39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -31,7 +31,7 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
@ -59,8 +59,8 @@ type API struct {
paych.PaychAPI paych.PaychAPI
SMDealClient storagemarket.StorageClient SMDealClient storagemarket.StorageClient
RetDiscovery retrievalmarket.PeerResolver RetDiscovery rm.PeerResolver
Retrieval retrievalmarket.RetrievalClient Retrieval rm.RetrievalClient
Chain *store.ChainStore Chain *store.ChainStore
LocalDAG dtypes.ClientDAG LocalDAG dtypes.ClientDAG
@ -202,7 +202,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
out := make([]api.QueryOffer, len(peers)) out := make([]api.QueryOffer, len(peers))
for k, p := range peers { for k, p := range peers {
out[k] = a.makeRetrievalQuery(ctx, p, root, retrievalmarket.QueryParams{}) out[k] = a.makeRetrievalQuery(ctx, p, root, rm.QueryParams{})
} }
return out, nil return out, nil
@ -213,25 +213,25 @@ func (a *API) ClientMinerQueryOffer(ctx context.Context, payload cid.Cid, miner
if err != nil { if err != nil {
return api.QueryOffer{}, err return api.QueryOffer{}, err
} }
rp := retrievalmarket.RetrievalPeer{ rp := rm.RetrievalPeer{
Address: miner, Address: miner,
ID: mi.PeerId, ID: mi.PeerId,
} }
return a.makeRetrievalQuery(ctx, rp, payload, retrievalmarket.QueryParams{}), nil return a.makeRetrievalQuery(ctx, rp, payload, rm.QueryParams{}), nil
} }
func (a *API) makeRetrievalQuery(ctx context.Context, rp retrievalmarket.RetrievalPeer, payload cid.Cid, qp retrievalmarket.QueryParams) api.QueryOffer { func (a *API) makeRetrievalQuery(ctx context.Context, rp rm.RetrievalPeer, payload cid.Cid, qp rm.QueryParams) api.QueryOffer {
queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp) queryResponse, err := a.Retrieval.Query(ctx, rp, payload, qp)
if err != nil { if err != nil {
return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID} return api.QueryOffer{Err: err.Error(), Miner: rp.Address, MinerPeerID: rp.ID}
} }
var errStr string var errStr string
switch queryResponse.Status { switch queryResponse.Status {
case retrievalmarket.QueryResponseAvailable: case rm.QueryResponseAvailable:
errStr = "" errStr = ""
case retrievalmarket.QueryResponseUnavailable: case rm.QueryResponseUnavailable:
errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message) errStr = fmt.Sprintf("retrieval query offer was unavailable: %s", queryResponse.Message)
case retrievalmarket.QueryResponseError: case rm.QueryResponseError:
errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message) errStr = fmt.Sprintf("retrieval query offer errored: %s", queryResponse.Message)
} }
@ -345,13 +345,35 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
retrievalResult := make(chan error, 1) retrievalResult := make(chan error, 1)
unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) { if state.PayloadCID.Equals(order.Root) {
switch state.Status { switch state.Status {
case retrievalmarket.DealStatusFailed, retrievalmarket.DealStatusErrored: case rm.DealStatusCompleted:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case retrievalmarket.DealStatusCompleted:
retrievalResult <- nil retrievalResult <- nil
case rm.DealStatusRejected:
retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored,
rm.DealStatusFailed:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case
rm.DealStatusAccepted,
rm.DealStatusAwaitingAcceptance,
rm.DealStatusBlocksComplete,
rm.DealStatusFinalizing,
rm.DealStatusFundsNeeded,
rm.DealStatusFundsNeededLastPayment,
rm.DealStatusNew,
rm.DealStatusOngoing,
rm.DealStatusPaymentChannelAddingFunds,
rm.DealStatusPaymentChannelAllocatingLane,
rm.DealStatusPaymentChannelCreating,
rm.DealStatusPaymentChannelReady,
rm.DealStatusVerified:
return
default:
retrievalResult <- xerrors.Errorf("Unhandled Retrieval Status: %+v", state.Status)
} }
} }
}) })
@ -361,7 +383,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_, err := a.Retrieval.Retrieve( _, err := a.Retrieval.Retrieve(
ctx, ctx,
order.Root, order.Root,
retrievalmarket.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease), rm.NewParamsV0(ppb, order.PaymentInterval, order.PaymentIntervalIncrease),
order.Total, order.Total,
order.MinerPeerID, order.MinerPeerID,
order.Client, order.Client,