From 1b1ec2b8123e850881171137ad7f8bbb657ac17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Jul 2019 02:23:19 +0200 Subject: [PATCH] jsonrpc: cleanup websocket handling logic a bit --- lib/jsonrpc/websocket.go | 101 ++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 2a305b624..b7c953983 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -32,27 +32,51 @@ type frame struct { } func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) { - incoming := make(chan io.Reader) - var incErr error + var incoming = make(chan io.Reader) + var incomingErr error + + var writeLk sync.Mutex + + // inflight are requests we sent to the remote + var inflight = map[int64]clientRequest{} + + // handling are the calls we handle + var handling = map[int64]context.CancelFunc{} + var handlingLk sync.Mutex + + + type outChanReg struct { + id uint64 + ch reflect.Value + } + var chOnce sync.Once + + // chanCtr is a counter used for identifying output channels on the server side + var chanCtr uint64 + + var registerCh = make(chan outChanReg) + defer close(registerCh) + + // chanHandlers is a map of client-side channel handlers + chanHandlers := map[uint64]func(m []byte, ok bool){} + // nextMessage wait for one message and puts it to the incoming channel nextMessage := func() { - mtype, r, err := conn.NextReader() + msgType, r, err := conn.NextReader() if err != nil { - incErr = err + incomingErr = err close(incoming) return } - if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { - incErr = errors.New("unsupported message type") + if msgType != websocket.BinaryMessage && msgType != websocket.TextMessage { + incomingErr = errors.New("unsupported message type") close(incoming) return } incoming <- r } - var writeLk sync.Mutex - // nextWriter waits for writeLk and invokes the cb callback with WS message // writer when the lock is acquired nextWriter := func(cb func(io.Writer)) { @@ -83,28 +107,11 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r writeLk.Unlock() } - // wait for the first message - go nextMessage() - - // inflight are requests we sent to the remote - inflight := map[int64]clientRequest{} - - // handling are the calls we handle - handling := map[int64]context.CancelFunc{} - var handlingLk sync.Mutex - // //// // Subscriptions (func() <-chan Typ - like methods) - var chOnce sync.Once - var outId uint64 - type chReg struct { - id uint64 - ch reflect.Value - } - registerCh := make(chan chReg) - defer close(registerCh) - + // handleOutChans handles channel communication on the server side + // (forwards channel messages to client) handleOutChans := func() { regV := reflect.ValueOf(registerCh) @@ -121,13 +128,15 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r if chosen == 0 { // control channel if !ok { - // not closing any channels as we're on receiving end. + // control channel closed - signals closed connection + // + // We're not closing any channels as we're on receiving end. // Also, context cancellation below should take care of any running // requests return } - registration := val.Interface().(chReg) + registration := val.Interface().(outChanReg) caseToId = append(caseToId, registration.id) cases = append(cases, reflect.SelectCase{ @@ -139,6 +148,8 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } if !ok { + // Output channel closed, cleanup, and tell remote that this happened + n := len(caseToId) if n > 0 { cases[chosen] = cases[n] @@ -158,6 +169,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r continue } + // forward message sendReq(request{ Jsonrpc: "2.0", ID: nil, // notification @@ -167,13 +179,14 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } } + // handleChanOut registers output channel for forwarding to client handleChanOut := func(ch reflect.Value) interface{} { chOnce.Do(func() { go handleOutChans() }) - id := atomic.AddUint64(&outId, 1) + id := atomic.AddUint64(&chanCtr, 1) - registerCh <- chReg{ + registerCh <- outChanReg{ id: id, ch: ch, } @@ -181,10 +194,6 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r return id } - // client side subs - - chanHandlers := map[uint64]func(m []byte, ok bool){} - // //// // on close, make sure to return from all pending calls, and cancel context @@ -207,6 +216,12 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } }() + // handleCtxAsync handles context lifetimes for client + // TODO: this should be aware of events going through chanHandlers, and quit + // when the related channel is closed. + // This should also probably be a single goroutine, + // Note that not doing this should be fine for now as long as we are using + // contexts correctly (cancelling when async functions are no longer is use) handleCtxAsync := func(actx context.Context, id int64) { <-actx.Done() @@ -238,15 +253,25 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } } + // wait for the first message + go nextMessage() + var msgConsumed bool + for { + if msgConsumed { + msgConsumed = false + go nextMessage() + } + select { case r, ok := <-incoming: if !ok { - if incErr != nil { - log.Debugf("websocket error", "error", incErr) + if incomingErr != nil { + log.Debugf("websocket error", "error", incomingErr) } return // remote closed } + msgConsumed = true // debug util - dump all messages to stderr // r = io.TeeReader(r, os.Stderr) @@ -359,8 +384,6 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r go handler.handle(ctx, req, nw, rpcError, done, handleChanOut) } - - go nextMessage() // TODO: fix on errors case req := <-requests: if req.req.ID != nil { inflight[*req.req.ID] = req