From d1f419c9d1734582f23b41f1cc670ace4aec0633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 20 Jan 2020 19:21:16 +0100 Subject: [PATCH 1/2] jsonrpc: Fix channel closing race --- lib/jsonrpc/handler.go | 11 +++++++-- lib/jsonrpc/websocket.go | 49 ++++++++++++++++++++++++++++------------ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index e771b172e..11eb5efeb 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -99,7 +99,7 @@ func (h handlers) register(namespace string, r interface{}) { // Handle type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error) -type chanOut func(reflect.Value) interface{} +type chanOut func(reflect.Value) (interface{}, error) func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { wf := func(cb func(io.Writer)) { @@ -229,7 +229,14 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer // channel messages before we send this response //noinspection GoNilness // already checked above - resp.Result = chOut(callResult[handler.valOut]) + resp.Result, err = chOut(callResult[handler.valOut]) + if err != nil { + log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err) + resp.Error = &respError{ + Code: 1, + Message: err.(error).Error(), + } + } } if err := json.NewEncoder(w).Encode(resp); err != nil { diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 23da51598..e56781e14 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "github.com/gorilla/websocket" + "golang.org/x/xerrors" ) const wsCancel = "xrpc.cancel" @@ -134,25 +135,30 @@ func (c *wsConn) sendRequest(req request) { // (forwards channel messages to client) func (c *wsConn) handleOutChans() { regV := reflect.ValueOf(c.registerCh) + exitV := reflect.ValueOf(c.exiting) cases := []reflect.SelectCase{ { // registration chan always 0 Dir: reflect.SelectRecv, Chan: regV, }, + { // exit chan always 1 + Dir: reflect.SelectRecv, + Chan: exitV, + }, } + internal := len(cases) var caseToID []uint64 for { chosen, val, ok := reflect.Select(cases) - if chosen == 0 { // control channel + switch chosen { + case 0: // control channel if !ok { // control channel closed - signals closed connection - // - // We're not closing any channels as we're on receiving end. - // Also, context cancellation below should take care of any running - // requests + // This shouldn't happen, instead the exiting channel should get closed + log.Warn("control channel closed") return } @@ -164,21 +170,32 @@ func (c *wsConn) handleOutChans() { Chan: registration.ch, }) + continue + case 1: // exiting channel + if !ok { + // exiting channel closed - signals closed connection + // + // We're not closing any channels as we're on receiving end. + // Also, context cancellation below should take care of any running + // requests + return + } + log.Warn("exiting channel received a message") continue } if !ok { // Output channel closed, cleanup, and tell remote that this happened - n := len(caseToID) + n := len(cases) - 1 if n > 0 { cases[chosen] = cases[n] - caseToID[chosen-1] = caseToID[n-1] + caseToID[chosen-internal] = caseToID[n-internal] } - id := caseToID[chosen-1] + id := caseToID[chosen-internal] cases = cases[:n] - caseToID = caseToID[:n-1] + caseToID = caseToID[:n-internal] c.sendRequest(request{ Jsonrpc: "2.0", @@ -194,24 +211,27 @@ func (c *wsConn) handleOutChans() { Jsonrpc: "2.0", ID: nil, // notification Method: chValue, - Params: []param{{v: reflect.ValueOf(caseToID[chosen-1])}, {v: val}}, + Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}}, }) } } // handleChanOut registers output channel for forwarding to client -func (c *wsConn) handleChanOut(ch reflect.Value) interface{} { +func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) { c.spawnOutChanHandlerOnce.Do(func() { go c.handleOutChans() }) id := atomic.AddUint64(&c.chanCtr, 1) - c.registerCh <- outChanReg{ + select { + case c.registerCh <- outChanReg{ id: id, ch: ch, + }: + return id, nil + case <-c.exiting: + return nil, xerrors.New("connection closing") } - - return id } // // @@ -389,7 +409,6 @@ func (c *wsConn) handleWsConn(ctx context.Context) { c.chanHandlers = map[uint64]func(m []byte, ok bool){} c.registerCh = make(chan outChanReg) - defer close(c.registerCh) defer close(c.exiting) // //// From fbc0330fa817a36d14e7063531538289e136d545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 21 Jan 2020 14:48:17 +0100 Subject: [PATCH 2/2] jsonrpc: Fix channel registration deadlock --- lib/jsonrpc/handler.go | 32 ++++++++++++++------------- lib/jsonrpc/rpc_test.go | 48 +++++++++++++++++++++++++++++++++++++++- lib/jsonrpc/websocket.go | 35 +++++++++++++++++++++-------- 3 files changed, 90 insertions(+), 25 deletions(-) diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index 11eb5efeb..0edc25fa9 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -99,7 +99,7 @@ func (h handlers) register(namespace string, r interface{}) { // Handle type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error) -type chanOut func(reflect.Value) (interface{}, error) +type chanOut func(reflect.Value, int64) error func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { wf := func(cb func(io.Writer)) { @@ -222,23 +222,25 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer if handler.valOut != -1 { resp.Result = callResult[handler.valOut].Interface() } + if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan { + // Channel responses are sent from channel control goroutine. + // Sending responses here could cause deadlocks on writeLk, or allow + // sending channel messages before this rpc call returns - w(func(w io.Writer) { - if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan { - // this must happen in the writer callback, otherwise we may start sending - // channel messages before we send this response - - //noinspection GoNilness // already checked above - resp.Result, err = chOut(callResult[handler.valOut]) - if err != nil { - log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err) - resp.Error = &respError{ - Code: 1, - Message: err.(error).Error(), - } - } + //noinspection GoNilness // already checked above + err = chOut(callResult[handler.valOut], *req.ID) + if err == nil { + return // channel goroutine handles responding } + log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err) + resp.Error = &respError{ + Code: 1, + Message: err.(error).Error(), + } + } + + w(func(w io.Writer) { if err := json.NewEncoder(w).Encode(resp); err != nil { log.Error(err) return diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go index 3073816a6..45601f6e0 100644 --- a/lib/jsonrpc/rpc_test.go +++ b/lib/jsonrpc/rpc_test.go @@ -375,5 +375,51 @@ func TestChan(t *testing.T) { serverHandler.wait <- struct{}{} _, ok = <-sub require.Equal(t, false, ok) - +} + +func TestControlChanDeadlock(t *testing.T) { + for r := 0; r < 20; r++ { + testControlChanDeadlock(t) + } +} + +func testControlChanDeadlock(t *testing.T) { + var client struct { + Sub func(context.Context, int, int) (<-chan int, error) + } + + n := 5000 + + serverHandler := &ChanHandler{ + wait: make(chan struct{}, n), + } + + rpcServer := NewServer() + rpcServer.Register("ChanHandler", serverHandler) + + testServ := httptest.NewServer(rpcServer) + defer testServ.Close() + + closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) + require.NoError(t, err) + + defer closer() + + for i := 0; i < n; i++ { + serverHandler.wait <- struct{}{} + } + + ctx, _ := context.WithCancel(context.Background()) + + sub, err := client.Sub(ctx, 1, -1) + require.NoError(t, err) + + go func() { + for i := 0; i < n; i++ { + require.Equal(t, i+1, <-sub) + } + }() + + _, err = client.Sub(ctx, 2, -1) + require.NoError(t, err) } diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index e56781e14..da4f59d6e 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -34,8 +34,10 @@ type frame struct { } type outChanReg struct { - id uint64 - ch reflect.Value + reqID int64 + + chID uint64 + ch reflect.Value } type wsConn struct { @@ -154,7 +156,7 @@ func (c *wsConn) handleOutChans() { chosen, val, ok := reflect.Select(cases) switch chosen { - case 0: // control channel + case 0: // registration channel if !ok { // control channel closed - signals closed connection // This shouldn't happen, instead the exiting channel should get closed @@ -164,12 +166,25 @@ func (c *wsConn) handleOutChans() { registration := val.Interface().(outChanReg) - caseToID = append(caseToID, registration.id) + caseToID = append(caseToID, registration.chID) cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: registration.ch, }) + c.nextWriter(func(w io.Writer) { + resp := &response{ + Jsonrpc: "2.0", + ID: registration.reqID, + Result: registration.chID, + } + + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Error(err) + return + } + }) + continue case 1: // exiting channel if !ok { @@ -217,7 +232,7 @@ func (c *wsConn) handleOutChans() { } // handleChanOut registers output channel for forwarding to client -func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) { +func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error { c.spawnOutChanHandlerOnce.Do(func() { go c.handleOutChans() }) @@ -225,12 +240,14 @@ func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) { select { case c.registerCh <- outChanReg{ - id: id, - ch: ch, + reqID: req, + + chID: id, + ch: ch, }: - return id, nil + return nil case <-c.exiting: - return nil, xerrors.New("connection closing") + return xerrors.New("connection closing") } }