From cd4571f0b0d5a2a0a83f204f2f070bcb84d10c15 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov <anton.evangelatov@gmail.com> Date: Tue, 20 Jul 2021 11:49:59 +0200 Subject: [PATCH 1/2] wrap close(wait) with sync.Once to avoid panic --- lib/rpcenc/reader.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 8bd512705..f78e6123b 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -80,19 +80,24 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { type waitReadCloser struct { io.ReadCloser - wait chan struct{} + wait chan struct{} + closeOnce sync.Once } func (w *waitReadCloser) Read(p []byte) (int, error) { n, err := w.ReadCloser.Read(p) if err != nil { - close(w.wait) + w.closeOnce.Do(func() { + close(w.wait) + }) } return n, err } func (w *waitReadCloser) Close() error { - close(w.wait) + w.closeOnce.Do(func() { + close(w.wait) + }) return w.ReadCloser.Close() } From 8413c080e3c9f3f5a64a3d33a6cba15972f8e4c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= <raul@protocol.ai> Date: Tue, 20 Jul 2021 14:21:09 +0100 Subject: [PATCH 2/2] add godocs. --- lib/rpcenc/reader.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index f78e6123b..23944af6c 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -78,32 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { }) } -type waitReadCloser struct { +// watchReadCloser watches the ReadCloser and closes the watch channel when +// either: (1) the ReaderCloser fails on Read (including with a benign error +// like EOF), or (2) when Close is called. +// +// Use it be notified of terminal states, in situations where a Read failure (or +// EOF) is considered a terminal state too (besides Close). +type watchReadCloser struct { io.ReadCloser - wait chan struct{} + watch chan struct{} closeOnce sync.Once } -func (w *waitReadCloser) Read(p []byte) (int, error) { +func (w *watchReadCloser) Read(p []byte) (int, error) { n, err := w.ReadCloser.Read(p) if err != nil { w.closeOnce.Do(func() { - close(w.wait) + close(w.watch) }) } return n, err } -func (w *waitReadCloser) Close() error { +func (w *watchReadCloser) Close() error { w.closeOnce.Do(func() { - close(w.wait) + close(w.watch) }) return w.ReadCloser.Close() } func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { var readersLk sync.Mutex - readers := map[uuid.UUID]chan *waitReadCloser{} + readers := map[uuid.UUID]chan *watchReadCloser{} hnd := func(resp http.ResponseWriter, req *http.Request) { strId := path.Base(req.URL.Path) @@ -116,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock() - wr := &waitReadCloser{ + wr := &watchReadCloser{ ReadCloser: req.Body, - wait: make(chan struct{}), + watch: make(chan struct{}), } tctx, cancel := context.WithTimeout(req.Context(), Timeout) @@ -139,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { } select { - case <-wr.wait: + case <-wr.watch: + // TODO should we check if we failed the Read, and if so + // return an HTTP 500? i.e. turn watch into a chan error? case <-req.Context().Done(): log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) resp.WriteHeader(500) @@ -172,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock()