Merge branch 'master' into fix/6786-rest
This commit is contained in:
commit
2c5a867eab
4
extern/storage-sealing/currentdealinfo.go
vendored
4
extern/storage-sealing/currentdealinfo.go
vendored
@ -69,6 +69,10 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
|
|||||||
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
|
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if lookup == nil {
|
||||||
|
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: not found", publishCid)
|
||||||
|
}
|
||||||
|
|
||||||
if lookup.Receipt.ExitCode != exitcode.Ok {
|
if lookup.Receipt.ExitCode != exitcode.Ok {
|
||||||
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
|
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNotFound = errors.New("Could not find")
|
var errNotFound = errors.New("could not find")
|
||||||
|
|
||||||
func TestGetCurrentDealInfo(t *testing.T) {
|
func TestGetCurrentDealInfo(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@ -180,6 +180,12 @@ func TestGetCurrentDealInfo(t *testing.T) {
|
|||||||
expectedDealID: zeroDealID,
|
expectedDealID: zeroDealID,
|
||||||
expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid),
|
expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid),
|
||||||
},
|
},
|
||||||
|
"search message not found": {
|
||||||
|
publishCid: dummyCid,
|
||||||
|
targetProposal: &proposal,
|
||||||
|
expectedDealID: zeroDealID,
|
||||||
|
expectedError: xerrors.Errorf("looking for publish deal message %s: not found", dummyCid),
|
||||||
|
},
|
||||||
"return code not ok": {
|
"return code not ok": {
|
||||||
publishCid: dummyCid,
|
publishCid: dummyCid,
|
||||||
searchMessageLookup: &MsgLookup{
|
searchMessageLookup: &MsgLookup{
|
||||||
|
@ -78,27 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type waitReadCloser struct {
|
// watchReadCloser watches the ReadCloser and closes the watch channel when
|
||||||
|
// either: (1) the ReaderCloser fails on Read (including with a benign error
|
||||||
|
// like EOF), or (2) when Close is called.
|
||||||
|
//
|
||||||
|
// Use it be notified of terminal states, in situations where a Read failure (or
|
||||||
|
// EOF) is considered a terminal state too (besides Close).
|
||||||
|
type watchReadCloser struct {
|
||||||
io.ReadCloser
|
io.ReadCloser
|
||||||
wait chan struct{}
|
watch chan struct{}
|
||||||
|
closeOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *waitReadCloser) Read(p []byte) (int, error) {
|
func (w *watchReadCloser) Read(p []byte) (int, error) {
|
||||||
n, err := w.ReadCloser.Read(p)
|
n, err := w.ReadCloser.Read(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
close(w.wait)
|
w.closeOnce.Do(func() {
|
||||||
|
close(w.watch)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *waitReadCloser) Close() error {
|
func (w *watchReadCloser) Close() error {
|
||||||
close(w.wait)
|
w.closeOnce.Do(func() {
|
||||||
|
close(w.watch)
|
||||||
|
})
|
||||||
return w.ReadCloser.Close()
|
return w.ReadCloser.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
||||||
var readersLk sync.Mutex
|
var readersLk sync.Mutex
|
||||||
readers := map[uuid.UUID]chan *waitReadCloser{}
|
readers := map[uuid.UUID]chan *watchReadCloser{}
|
||||||
|
|
||||||
hnd := func(resp http.ResponseWriter, req *http.Request) {
|
hnd := func(resp http.ResponseWriter, req *http.Request) {
|
||||||
strId := path.Base(req.URL.Path)
|
strId := path.Base(req.URL.Path)
|
||||||
@ -111,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
|||||||
readersLk.Lock()
|
readersLk.Lock()
|
||||||
ch, found := readers[u]
|
ch, found := readers[u]
|
||||||
if !found {
|
if !found {
|
||||||
ch = make(chan *waitReadCloser)
|
ch = make(chan *watchReadCloser)
|
||||||
readers[u] = ch
|
readers[u] = ch
|
||||||
}
|
}
|
||||||
readersLk.Unlock()
|
readersLk.Unlock()
|
||||||
|
|
||||||
wr := &waitReadCloser{
|
wr := &watchReadCloser{
|
||||||
ReadCloser: req.Body,
|
ReadCloser: req.Body,
|
||||||
wait: make(chan struct{}),
|
watch: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
|
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
|
||||||
@ -134,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-wr.wait:
|
case <-wr.watch:
|
||||||
|
// TODO should we check if we failed the Read, and if so
|
||||||
|
// return an HTTP 500? i.e. turn watch into a chan error?
|
||||||
case <-req.Context().Done():
|
case <-req.Context().Done():
|
||||||
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
|
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
|
||||||
resp.WriteHeader(500)
|
resp.WriteHeader(500)
|
||||||
@ -167,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
|||||||
readersLk.Lock()
|
readersLk.Lock()
|
||||||
ch, found := readers[u]
|
ch, found := readers[u]
|
||||||
if !found {
|
if !found {
|
||||||
ch = make(chan *waitReadCloser)
|
ch = make(chan *watchReadCloser)
|
||||||
readers[u] = ch
|
readers[u] = ch
|
||||||
}
|
}
|
||||||
readersLk.Unlock()
|
readersLk.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user