jsonrpc: Fix channel registration deadlock
This commit is contained in:
parent
d1f419c9d1
commit
fbc0330fa8
@ -99,7 +99,7 @@ func (h handlers) register(namespace string, r interface{}) {
|
|||||||
// Handle
|
// Handle
|
||||||
|
|
||||||
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
||||||
type chanOut func(reflect.Value) (interface{}, error)
|
type chanOut func(reflect.Value, int64) error
|
||||||
|
|
||||||
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
||||||
wf := func(cb func(io.Writer)) {
|
wf := func(cb func(io.Writer)) {
|
||||||
@ -222,23 +222,25 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
|
|||||||
if handler.valOut != -1 {
|
if handler.valOut != -1 {
|
||||||
resp.Result = callResult[handler.valOut].Interface()
|
resp.Result = callResult[handler.valOut].Interface()
|
||||||
}
|
}
|
||||||
|
if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
|
||||||
|
// Channel responses are sent from channel control goroutine.
|
||||||
|
// Sending responses here could cause deadlocks on writeLk, or allow
|
||||||
|
// sending channel messages before this rpc call returns
|
||||||
|
|
||||||
w(func(w io.Writer) {
|
//noinspection GoNilness // already checked above
|
||||||
if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
|
err = chOut(callResult[handler.valOut], *req.ID)
|
||||||
// this must happen in the writer callback, otherwise we may start sending
|
if err == nil {
|
||||||
// channel messages before we send this response
|
return // channel goroutine handles responding
|
||||||
|
|
||||||
//noinspection GoNilness // already checked above
|
|
||||||
resp.Result, err = chOut(callResult[handler.valOut])
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
|
|
||||||
resp.Error = &respError{
|
|
||||||
Code: 1,
|
|
||||||
Message: err.(error).Error(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
|
||||||
|
resp.Error = &respError{
|
||||||
|
Code: 1,
|
||||||
|
Message: err.(error).Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w(func(w io.Writer) {
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
|
@ -375,5 +375,51 @@ func TestChan(t *testing.T) {
|
|||||||
serverHandler.wait <- struct{}{}
|
serverHandler.wait <- struct{}{}
|
||||||
_, ok = <-sub
|
_, ok = <-sub
|
||||||
require.Equal(t, false, ok)
|
require.Equal(t, false, ok)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestControlChanDeadlock(t *testing.T) {
|
||||||
|
for r := 0; r < 20; r++ {
|
||||||
|
testControlChanDeadlock(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testControlChanDeadlock(t *testing.T) {
|
||||||
|
var client struct {
|
||||||
|
Sub func(context.Context, int, int) (<-chan int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 5000
|
||||||
|
|
||||||
|
serverHandler := &ChanHandler{
|
||||||
|
wait: make(chan struct{}, n),
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcServer := NewServer()
|
||||||
|
rpcServer.Register("ChanHandler", serverHandler)
|
||||||
|
|
||||||
|
testServ := httptest.NewServer(rpcServer)
|
||||||
|
defer testServ.Close()
|
||||||
|
|
||||||
|
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
serverHandler.wait <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
sub, err := client.Sub(ctx, 1, -1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
require.Equal(t, i+1, <-sub)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = client.Sub(ctx, 2, -1)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
@ -34,8 +34,10 @@ type frame struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type outChanReg struct {
|
type outChanReg struct {
|
||||||
id uint64
|
reqID int64
|
||||||
ch reflect.Value
|
|
||||||
|
chID uint64
|
||||||
|
ch reflect.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
type wsConn struct {
|
type wsConn struct {
|
||||||
@ -154,7 +156,7 @@ func (c *wsConn) handleOutChans() {
|
|||||||
chosen, val, ok := reflect.Select(cases)
|
chosen, val, ok := reflect.Select(cases)
|
||||||
|
|
||||||
switch chosen {
|
switch chosen {
|
||||||
case 0: // control channel
|
case 0: // registration channel
|
||||||
if !ok {
|
if !ok {
|
||||||
// control channel closed - signals closed connection
|
// control channel closed - signals closed connection
|
||||||
// This shouldn't happen, instead the exiting channel should get closed
|
// This shouldn't happen, instead the exiting channel should get closed
|
||||||
@ -164,12 +166,25 @@ func (c *wsConn) handleOutChans() {
|
|||||||
|
|
||||||
registration := val.Interface().(outChanReg)
|
registration := val.Interface().(outChanReg)
|
||||||
|
|
||||||
caseToID = append(caseToID, registration.id)
|
caseToID = append(caseToID, registration.chID)
|
||||||
cases = append(cases, reflect.SelectCase{
|
cases = append(cases, reflect.SelectCase{
|
||||||
Dir: reflect.SelectRecv,
|
Dir: reflect.SelectRecv,
|
||||||
Chan: registration.ch,
|
Chan: registration.ch,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
c.nextWriter(func(w io.Writer) {
|
||||||
|
resp := &response{
|
||||||
|
Jsonrpc: "2.0",
|
||||||
|
ID: registration.reqID,
|
||||||
|
Result: registration.chID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
continue
|
continue
|
||||||
case 1: // exiting channel
|
case 1: // exiting channel
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -217,7 +232,7 @@ func (c *wsConn) handleOutChans() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleChanOut registers output channel for forwarding to client
|
// handleChanOut registers output channel for forwarding to client
|
||||||
func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) {
|
func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error {
|
||||||
c.spawnOutChanHandlerOnce.Do(func() {
|
c.spawnOutChanHandlerOnce.Do(func() {
|
||||||
go c.handleOutChans()
|
go c.handleOutChans()
|
||||||
})
|
})
|
||||||
@ -225,12 +240,14 @@ func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case c.registerCh <- outChanReg{
|
case c.registerCh <- outChanReg{
|
||||||
id: id,
|
reqID: req,
|
||||||
ch: ch,
|
|
||||||
|
chID: id,
|
||||||
|
ch: ch,
|
||||||
}:
|
}:
|
||||||
return id, nil
|
return nil
|
||||||
case <-c.exiting:
|
case <-c.exiting:
|
||||||
return nil, xerrors.New("connection closing")
|
return xerrors.New("connection closing")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user