Merge pull request #6800 from filecoin-project/nonsense/fix-double-close-panic

This commit is contained in:
raulk 2021-07-21 00:02:12 +01:00 committed by GitHub
commit c1f0ebbefa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -78,27 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
}) })
} }
type waitReadCloser struct { // watchReadCloser watches the ReadCloser and closes the watch channel when
// either: (1) the ReaderCloser fails on Read (including with a benign error
// like EOF), or (2) when Close is called.
//
// Use it be notified of terminal states, in situations where a Read failure (or
// EOF) is considered a terminal state too (besides Close).
type watchReadCloser struct {
io.ReadCloser io.ReadCloser
wait chan struct{} watch chan struct{}
closeOnce sync.Once
} }
func (w *waitReadCloser) Read(p []byte) (int, error) { func (w *watchReadCloser) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p) n, err := w.ReadCloser.Read(p)
if err != nil { if err != nil {
close(w.wait) w.closeOnce.Do(func() {
close(w.watch)
})
} }
return n, err return n, err
} }
func (w *waitReadCloser) Close() error { func (w *watchReadCloser) Close() error {
close(w.wait) w.closeOnce.Do(func() {
close(w.watch)
})
return w.ReadCloser.Close() return w.ReadCloser.Close()
} }
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
var readersLk sync.Mutex var readersLk sync.Mutex
readers := map[uuid.UUID]chan *waitReadCloser{} readers := map[uuid.UUID]chan *watchReadCloser{}
hnd := func(resp http.ResponseWriter, req *http.Request) { hnd := func(resp http.ResponseWriter, req *http.Request) {
strId := path.Base(req.URL.Path) strId := path.Base(req.URL.Path)
@ -111,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
readersLk.Lock() readersLk.Lock()
ch, found := readers[u] ch, found := readers[u]
if !found { if !found {
ch = make(chan *waitReadCloser) ch = make(chan *watchReadCloser)
readers[u] = ch readers[u] = ch
} }
readersLk.Unlock() readersLk.Unlock()
wr := &waitReadCloser{ wr := &watchReadCloser{
ReadCloser: req.Body, ReadCloser: req.Body,
wait: make(chan struct{}), watch: make(chan struct{}),
} }
tctx, cancel := context.WithTimeout(req.Context(), Timeout) tctx, cancel := context.WithTimeout(req.Context(), Timeout)
@ -134,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
} }
select { select {
case <-wr.wait: case <-wr.watch:
// TODO should we check if we failed the Read, and if so
// return an HTTP 500? i.e. turn watch into a chan error?
case <-req.Context().Done(): case <-req.Context().Done():
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
resp.WriteHeader(500) resp.WriteHeader(500)
@ -167,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
readersLk.Lock() readersLk.Lock()
ch, found := readers[u] ch, found := readers[u]
if !found { if !found {
ch = make(chan *waitReadCloser) ch = make(chan *watchReadCloser)
readers[u] = ch readers[u] = ch
} }
readersLk.Unlock() readersLk.Unlock()