Merge pull request #3324 from filecoin-project/fix/retrieval-closing-panic

Resolve retrieval close event panic
This commit is contained in:
Łukasz Magiera 2020-08-26 22:15:21 +02:00 committed by GitHub
commit c4e32fac1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -432,6 +432,45 @@ func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalO
return events, nil
}
type retrievalSubscribeEvent struct {
event rm.ClientEvent
state rm.ClientDealState
}
func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
}
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case events <- marketevents.RetrievalEvent{
Event: subscribeEvent.event,
Status: subscribeEvent.state.Status,
BytesReceived: subscribeEvent.state.TotalReceived,
FundsSpent: subscribeEvent.state.FundsSpent,
}:
}
state := subscribeEvent.state
switch state.Status {
case rm.DealStatusCompleted:
return nil
case rm.DealStatusRejected:
return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
return xerrors.Errorf("Retrieval Error: %s", state.Message)
}
}
}
func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef, events chan marketevents.RetrievalEvent) {
defer close(events)
@ -467,33 +506,15 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return err
}*/
retrievalResult := make(chan error, 1)
var dealId retrievalmarket.DealID
var dealID retrievalmarket.DealID
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) && state.ID == dealId {
if state.PayloadCID.Equals(order.Root) && state.ID == dealID {
select {
case <-ctx.Done():
return
case events <- marketevents.RetrievalEvent{
Event: event,
Status: state.Status,
BytesReceived: state.TotalReceived,
FundsSpent: state.FundsSpent,
}:
}
switch state.Status {
case rm.DealStatusCompleted:
retrievalResult <- nil
case rm.DealStatusRejected:
retrievalResult <- xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message)
case
rm.DealStatusDealNotFound,
rm.DealStatusErrored:
retrievalResult <- xerrors.Errorf("Retrieval Error: %s", state.Message)
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})
@ -516,7 +537,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_ = a.RetrievalStoreMgr.ReleaseStore(store)
}()
dealId, err = a.Retrieval.Retrieve(
dealID, err = a.Retrieval.Retrieve(
ctx,
order.Root,
params,
@ -531,18 +552,13 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
select {
case <-ctx.Done():
unsubscribe()
finish(xerrors.New("Retrieval Timed Out"))
return
case err := <-retrievalResult:
err = readSubscribeEvents(ctx, subscribeEvents, events)
unsubscribe()
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
}
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {