diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 8bd512705..23944af6c 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -78,27 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { }) } -type waitReadCloser struct { +// watchReadCloser watches the ReadCloser and closes the watch channel when +// either: (1) the ReaderCloser fails on Read (including with a benign error +// like EOF), or (2) when Close is called. +// +// Use it be notified of terminal states, in situations where a Read failure (or +// EOF) is considered a terminal state too (besides Close). +type watchReadCloser struct { io.ReadCloser - wait chan struct{} + watch chan struct{} + closeOnce sync.Once } -func (w *waitReadCloser) Read(p []byte) (int, error) { +func (w *watchReadCloser) Read(p []byte) (int, error) { n, err := w.ReadCloser.Read(p) if err != nil { - close(w.wait) + w.closeOnce.Do(func() { + close(w.watch) + }) } return n, err } -func (w *waitReadCloser) Close() error { - close(w.wait) +func (w *watchReadCloser) Close() error { + w.closeOnce.Do(func() { + close(w.watch) + }) return w.ReadCloser.Close() } func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { var readersLk sync.Mutex - readers := map[uuid.UUID]chan *waitReadCloser{} + readers := map[uuid.UUID]chan *watchReadCloser{} hnd := func(resp http.ResponseWriter, req *http.Request) { strId := path.Base(req.URL.Path) @@ -111,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock() - wr := &waitReadCloser{ + wr := &watchReadCloser{ ReadCloser: req.Body, - wait: make(chan struct{}), + watch: make(chan struct{}), } tctx, cancel := context.WithTimeout(req.Context(), Timeout) @@ -134,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { } select { - case <-wr.wait: + case <-wr.watch: + // TODO should we check if we failed the Read, and if so + // return an HTTP 500? i.e. turn watch into a chan error? case <-req.Context().Done(): log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) resp.WriteHeader(500) @@ -167,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock()