jsonrpc: Fix channel closing race
This commit is contained in:
parent
4436148fa1
commit
d1f419c9d1
@ -99,7 +99,7 @@ func (h handlers) register(namespace string, r interface{}) {
|
|||||||
// Handle
|
// Handle
|
||||||
|
|
||||||
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
||||||
type chanOut func(reflect.Value) interface{}
|
type chanOut func(reflect.Value) (interface{}, error)
|
||||||
|
|
||||||
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
||||||
wf := func(cb func(io.Writer)) {
|
wf := func(cb func(io.Writer)) {
|
||||||
@ -229,7 +229,14 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
|
|||||||
// channel messages before we send this response
|
// channel messages before we send this response
|
||||||
|
|
||||||
//noinspection GoNilness // already checked above
|
//noinspection GoNilness // already checked above
|
||||||
resp.Result = chOut(callResult[handler.valOut])
|
resp.Result, err = chOut(callResult[handler.valOut])
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
|
||||||
|
resp.Error = &respError{
|
||||||
|
Code: 1,
|
||||||
|
Message: err.(error).Error(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const wsCancel = "xrpc.cancel"
|
const wsCancel = "xrpc.cancel"
|
||||||
@ -134,25 +135,30 @@ func (c *wsConn) sendRequest(req request) {
|
|||||||
// (forwards channel messages to client)
|
// (forwards channel messages to client)
|
||||||
func (c *wsConn) handleOutChans() {
|
func (c *wsConn) handleOutChans() {
|
||||||
regV := reflect.ValueOf(c.registerCh)
|
regV := reflect.ValueOf(c.registerCh)
|
||||||
|
exitV := reflect.ValueOf(c.exiting)
|
||||||
|
|
||||||
cases := []reflect.SelectCase{
|
cases := []reflect.SelectCase{
|
||||||
{ // registration chan always 0
|
{ // registration chan always 0
|
||||||
Dir: reflect.SelectRecv,
|
Dir: reflect.SelectRecv,
|
||||||
Chan: regV,
|
Chan: regV,
|
||||||
},
|
},
|
||||||
|
{ // exit chan always 1
|
||||||
|
Dir: reflect.SelectRecv,
|
||||||
|
Chan: exitV,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
internal := len(cases)
|
||||||
var caseToID []uint64
|
var caseToID []uint64
|
||||||
|
|
||||||
for {
|
for {
|
||||||
chosen, val, ok := reflect.Select(cases)
|
chosen, val, ok := reflect.Select(cases)
|
||||||
|
|
||||||
if chosen == 0 { // control channel
|
switch chosen {
|
||||||
|
case 0: // control channel
|
||||||
if !ok {
|
if !ok {
|
||||||
// control channel closed - signals closed connection
|
// control channel closed - signals closed connection
|
||||||
//
|
// This shouldn't happen, instead the exiting channel should get closed
|
||||||
// We're not closing any channels as we're on receiving end.
|
log.Warn("control channel closed")
|
||||||
// Also, context cancellation below should take care of any running
|
|
||||||
// requests
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,21 +170,32 @@ func (c *wsConn) handleOutChans() {
|
|||||||
Chan: registration.ch,
|
Chan: registration.ch,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
continue
|
||||||
|
case 1: // exiting channel
|
||||||
|
if !ok {
|
||||||
|
// exiting channel closed - signals closed connection
|
||||||
|
//
|
||||||
|
// We're not closing any channels as we're on receiving end.
|
||||||
|
// Also, context cancellation below should take care of any running
|
||||||
|
// requests
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Warn("exiting channel received a message")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
// Output channel closed, cleanup, and tell remote that this happened
|
// Output channel closed, cleanup, and tell remote that this happened
|
||||||
|
|
||||||
n := len(caseToID)
|
n := len(cases) - 1
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
cases[chosen] = cases[n]
|
cases[chosen] = cases[n]
|
||||||
caseToID[chosen-1] = caseToID[n-1]
|
caseToID[chosen-internal] = caseToID[n-internal]
|
||||||
}
|
}
|
||||||
|
|
||||||
id := caseToID[chosen-1]
|
id := caseToID[chosen-internal]
|
||||||
cases = cases[:n]
|
cases = cases[:n]
|
||||||
caseToID = caseToID[:n-1]
|
caseToID = caseToID[:n-internal]
|
||||||
|
|
||||||
c.sendRequest(request{
|
c.sendRequest(request{
|
||||||
Jsonrpc: "2.0",
|
Jsonrpc: "2.0",
|
||||||
@ -194,24 +211,27 @@ func (c *wsConn) handleOutChans() {
|
|||||||
Jsonrpc: "2.0",
|
Jsonrpc: "2.0",
|
||||||
ID: nil, // notification
|
ID: nil, // notification
|
||||||
Method: chValue,
|
Method: chValue,
|
||||||
Params: []param{{v: reflect.ValueOf(caseToID[chosen-1])}, {v: val}},
|
Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleChanOut registers output channel for forwarding to client
|
// handleChanOut registers output channel for forwarding to client
|
||||||
func (c *wsConn) handleChanOut(ch reflect.Value) interface{} {
|
func (c *wsConn) handleChanOut(ch reflect.Value) (interface{}, error) {
|
||||||
c.spawnOutChanHandlerOnce.Do(func() {
|
c.spawnOutChanHandlerOnce.Do(func() {
|
||||||
go c.handleOutChans()
|
go c.handleOutChans()
|
||||||
})
|
})
|
||||||
id := atomic.AddUint64(&c.chanCtr, 1)
|
id := atomic.AddUint64(&c.chanCtr, 1)
|
||||||
|
|
||||||
c.registerCh <- outChanReg{
|
select {
|
||||||
|
case c.registerCh <- outChanReg{
|
||||||
id: id,
|
id: id,
|
||||||
ch: ch,
|
ch: ch,
|
||||||
|
}:
|
||||||
|
return id, nil
|
||||||
|
case <-c.exiting:
|
||||||
|
return nil, xerrors.New("connection closing")
|
||||||
}
|
}
|
||||||
|
|
||||||
return id
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// //
|
// //
|
||||||
@ -389,7 +409,6 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
|
|||||||
c.chanHandlers = map[uint64]func(m []byte, ok bool){}
|
c.chanHandlers = map[uint64]func(m []byte, ok bool){}
|
||||||
|
|
||||||
c.registerCh = make(chan outChanReg)
|
c.registerCh = make(chan outChanReg)
|
||||||
defer close(c.registerCh)
|
|
||||||
defer close(c.exiting)
|
defer close(c.exiting)
|
||||||
|
|
||||||
// ////
|
// ////
|
||||||
|
Loading…
Reference in New Issue
Block a user