From 0eb208e1d3448ff41f2e21e56f24ae3ca868703e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jul 2019 17:29:41 +0200 Subject: [PATCH 1/8] WS JsonRPC client --- api/client/client.go | 6 +- cli/cmd.go | 2 +- cli/version.go | 13 ++- go.mod | 1 + lib/jsonrpc/rpc_client.go | 162 ++++++++++++++++++++++++-------------- 5 files changed, 117 insertions(+), 67 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index 2dbc98f21..8d23aea3b 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -6,8 +6,8 @@ import ( ) // NewRPC creates a new http jsonrpc client. -func NewRPC(addr string) api.API { +func NewRPC(addr string) (api.API, error) { var res api.Struct - jsonrpc.NewClient(addr, "Filecoin", &res.Internal) - return &res + _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal) + return &res, err } diff --git a/cli/cmd.go b/cli/cmd.go index 414312e67..7cd4a5b99 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -35,7 +35,7 @@ func getAPI(ctx *cli.Context) (api.API, error) { if err != nil { return nil, err } - return client.NewRPC("http://" + addr + "/rpc/v0"), nil + return client.NewRPC("ws://" + addr + "/rpc/v0") } // reqContext returns context for cli execution. Calling it for the first time diff --git a/cli/version.go b/cli/version.go index 57d101d83..cb58c8ba9 100644 --- a/cli/version.go +++ b/cli/version.go @@ -1,16 +1,25 @@ package cli import ( + "fmt" + "gopkg.in/urfave/cli.v2" ) var versionCmd = &cli.Command{ Name: "version", Usage: "Print version", - Action: func(context *cli.Context) error { + Action: func(cctx *cli.Context) error { + api, err := getAPI(cctx) + if err != nil { + return err + } + + ctx := reqContext(cctx) // TODO: print more useful things - cli.VersionPrinter(context) + fmt.Println(api.Version(ctx)) + cli.VersionPrinter(cctx) return nil }, } diff --git a/go.mod b/go.mod index e61e84aa7..97228e0d9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/BurntSushi/toml v0.3.1 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 + github.com/gorilla/websocket v1.4.0 github.com/ipfs/go-bitswap v0.1.5 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.0.2 diff --git a/lib/jsonrpc/rpc_client.go b/lib/jsonrpc/rpc_client.go index b923108c7..4426c26db 100644 --- a/lib/jsonrpc/rpc_client.go +++ b/lib/jsonrpc/rpc_client.go @@ -1,23 +1,20 @@ package jsonrpc import ( - "bytes" "context" "encoding/json" "errors" "fmt" - "io/ioutil" - "net/http" + "io" "reflect" "sync/atomic" + "github.com/gorilla/websocket" logging "github.com/ipfs/go-log" ) var log = logging.Logger("rpc") -const clientDebug = true - var ( errorType = reflect.TypeOf(new(error)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem() @@ -37,12 +34,12 @@ func (e *ErrClient) Unwrap(err error) error { return e.err } -type result reflect.Value +type result []byte -func (r *result) UnmarshalJSON(raw []byte) error { - err := json.Unmarshal(raw, reflect.Value(*r).Interface()) - log.Debugw("rpc unmarshal response", "raw", string(raw), "err", err) - return err +func (p *result) UnmarshalJSON(raw []byte) error { + *p = make([]byte, len(raw)) + copy(*p, raw) + return nil } type clientResponse struct { @@ -52,6 +49,11 @@ type clientResponse struct { Error *respError `json:"error,omitempty"` } +type clientRequest struct { + req request + ready chan clientResponse +} + // ClientCloser is used to close Client from further use type ClientCloser func() @@ -60,7 +62,7 @@ type ClientCloser func() // handler must be pointer to a struct with function fields // Returned value closes the client connection // TODO: Example -func NewClient(addr string, namespace string, handler interface{}) ClientCloser { +func NewClient(addr string, namespace string, handler interface{}) (ClientCloser, error) { htyp := reflect.TypeOf(handler) if htyp.Kind() != reflect.Ptr { panic("expected handler to be a pointer") @@ -74,6 +76,77 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser var idCtr int64 + conn, _, err := websocket.DefaultDialer.Dial(addr, nil) + if err != nil { + return nil, err + } + + stop := make(chan struct{}) + errs := make(chan error, 1) + requests := make(chan clientRequest) + responses := make(chan io.Reader) + + nextMessage := func() { + mtype, r, err := conn.NextReader() + if err != nil { + r, _ := io.Pipe() + r.CloseWithError(err) // nolint + return + + } + if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { + r, _ := io.Pipe() + r.CloseWithError(errors.New("unsupported message type")) // nolint + return + } + responses <- r + } + + go func() { + var err error + defer func() { + close(requests) + cerr := conn.Close() + if err == nil { + err = cerr + } + errs <- cerr + + // close requests somehow + }() + + inflight := map[int64]clientRequest{} + + go nextMessage() + + for { + select { + case req := <-requests: + inflight[*req.req.ID] = req + if err = conn.WriteJSON(req.req); err != nil { + return + } + case r := <- responses: + var resp clientResponse + if err = json.NewDecoder(r).Decode(&resp); err != nil { + return + } + req, ok := inflight[resp.ID] + if !ok { + log.Error("client got unknown ID in response") + continue + } + + req.ready <- resp + delete(inflight, resp.ID) + + go nextMessage() + case <-stop: + return + } + } + }() + for i := 0; i < typ.NumField(); i++ { f := typ.Field(i) ftyp := f.Type @@ -83,11 +156,11 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser valOut, errOut, nout := processFuncOut(ftyp) - processResponse := func(resp clientResponse, code int) []reflect.Value { + processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { out := make([]reflect.Value, nout) if valOut != -1 { - out[valOut] = reflect.Value(resp.Result).Elem() + out[valOut] = rval.Elem() } if errOut != -1 { out[errOut] = reflect.New(errorType).Elem() @@ -134,67 +207,34 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser Params: params, } - b, err := json.Marshal(&req) - if err != nil { - return processError(err) + rchan := make(chan clientResponse, 1) + requests <- clientRequest{ + req: req, + ready: rchan, } + resp := <- rchan + var rval reflect.Value - // prepare / execute http request - - hreq, err := http.NewRequest("POST", addr, bytes.NewReader(b)) - if err != nil { - return processError(err) - } - if hasCtx == 1 { - hreq = hreq.WithContext(args[0].Interface().(context.Context)) - } - hreq.Header.Set("Content-Type", "application/json") - - httpResp, err := http.DefaultClient.Do(hreq) - if err != nil { - return processError(err) - } - - // process response - - if clientDebug { - rsp, err := ioutil.ReadAll(httpResp.Body) - if err != nil { - return processError(err) - } - if err := httpResp.Body.Close(); err != nil { - return processError(err) - } - - log.Debugw("rpc response", "body", string(rsp)) - - httpResp.Body = ioutil.NopCloser(bytes.NewReader(rsp)) - } - - var resp clientResponse if valOut != -1 { log.Debugw("rpc result", "type", ftyp.Out(valOut)) - resp.Result = result(reflect.New(ftyp.Out(valOut))) - } - - if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil { - return processError(err) - } - - if err := httpResp.Body.Close(); err != nil { - return processError(err) + rval = reflect.New(ftyp.Out(valOut)) + if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil { + return processError(err) + } } if resp.ID != *req.ID { return processError(errors.New("request and response id didn't match")) } - return processResponse(resp, httpResp.StatusCode) + return processResponse(resp, rval) }) val.Elem().Field(i).Set(fn) } - // TODO: if this is still unused as of 2020, remove the closer stuff - return func() {} // noop for now, not for long though + return func() { + close(stop) + <-errs // TODO: return + }, nil } From 6a20d0dafec833cdf027922ecd2a743721f3f1c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jul 2019 19:12:38 +0200 Subject: [PATCH 2/8] jsonrpc: Websocket server --- lib/jsonrpc/{rpc_client.go => client.go} | 66 +------- lib/jsonrpc/{rpc_server.go => handler.go} | 191 +++++++--------------- lib/jsonrpc/server.go | 78 +++++++++ lib/jsonrpc/util.go | 49 ++++++ lib/jsonrpc/websocket.go | 113 +++++++++++++ 5 files changed, 304 insertions(+), 193 deletions(-) rename lib/jsonrpc/{rpc_client.go => client.go} (75%) rename lib/jsonrpc/{rpc_server.go => handler.go} (53%) create mode 100644 lib/jsonrpc/server.go create mode 100644 lib/jsonrpc/util.go create mode 100644 lib/jsonrpc/websocket.go diff --git a/lib/jsonrpc/rpc_client.go b/lib/jsonrpc/client.go similarity index 75% rename from lib/jsonrpc/rpc_client.go rename to lib/jsonrpc/client.go index 4426c26db..31a4d9ed2 100644 --- a/lib/jsonrpc/rpc_client.go +++ b/lib/jsonrpc/client.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "reflect" "sync/atomic" @@ -82,70 +81,10 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser } stop := make(chan struct{}) - errs := make(chan error, 1) requests := make(chan clientRequest) - responses := make(chan io.Reader) - nextMessage := func() { - mtype, r, err := conn.NextReader() - if err != nil { - r, _ := io.Pipe() - r.CloseWithError(err) // nolint - return - - } - if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { - r, _ := io.Pipe() - r.CloseWithError(errors.New("unsupported message type")) // nolint - return - } - responses <- r - } - - go func() { - var err error - defer func() { - close(requests) - cerr := conn.Close() - if err == nil { - err = cerr - } - errs <- cerr - - // close requests somehow - }() - - inflight := map[int64]clientRequest{} - - go nextMessage() - - for { - select { - case req := <-requests: - inflight[*req.req.ID] = req - if err = conn.WriteJSON(req.req); err != nil { - return - } - case r := <- responses: - var resp clientResponse - if err = json.NewDecoder(r).Decode(&resp); err != nil { - return - } - req, ok := inflight[resp.ID] - if !ok { - log.Error("client got unknown ID in response") - continue - } - - req.ready <- resp - delete(inflight, resp.ID) - - go nextMessage() - case <-stop: - return - } - } - }() + handlers := map[string]rpcHandler{} + go handleWsConn(context.TODO(), conn, handlers, requests, stop) for i := 0; i < typ.NumField(); i++ { f := typ.Field(i) @@ -235,6 +174,5 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser return func() { close(stop) - <-errs // TODO: return }, nil } diff --git a/lib/jsonrpc/rpc_server.go b/lib/jsonrpc/handler.go similarity index 53% rename from lib/jsonrpc/rpc_server.go rename to lib/jsonrpc/handler.go index 104380861..787d3f954 100644 --- a/lib/jsonrpc/rpc_server.go +++ b/lib/jsonrpc/handler.go @@ -2,18 +2,13 @@ package jsonrpc import ( "bytes" + "context" "encoding/json" "fmt" - "net/http" + "io" "reflect" ) -const ( - rpcParseError = -32700 - rpcMethodNotFound = -32601 - rpcInvalidParams = -32602 -) - type rpcHandler struct { paramReceivers []reflect.Type nParams int @@ -27,33 +22,9 @@ type rpcHandler struct { valOut int } -// RPCServer provides a jsonrpc 2.0 http server handler -type RPCServer struct { - methods map[string]rpcHandler -} +type handlers map[string]rpcHandler -// NewServer creates new RPCServer instance -func NewServer() *RPCServer { - return &RPCServer{ - methods: map[string]rpcHandler{}, - } -} - -type param struct { - data []byte // from unmarshal - - v reflect.Value // to marshal -} - -func (p *param) UnmarshalJSON(raw []byte) error { - p.data = make([]byte, len(raw)) - copy(p.data, raw) - return nil -} - -func (p *param) MarshalJSON() ([]byte, error) { - return json.Marshal(p.v.Interface()) -} +// Request / response type request struct { Jsonrpc string `json:"jsonrpc"` @@ -81,46 +52,95 @@ type response struct { Error *respError `json:"error,omitempty"` } -// TODO: return errors to clients per spec -func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { +// Register + +func (h handlers) register(namespace string, r interface{}) { + val := reflect.ValueOf(r) + //TODO: expect ptr + + for i := 0; i < val.NumMethod(); i++ { + method := val.Type().Method(i) + + funcType := method.Func.Type() + hasCtx := 0 + if funcType.NumIn() >= 2 && funcType.In(1) == contextType { + hasCtx = 1 + } + + ins := funcType.NumIn() - 1 - hasCtx + recvs := make([]reflect.Type, ins) + for i := 0; i < ins; i++ { + recvs[i] = method.Type.In(i + 1 + hasCtx) + } + + valOut, errOut, _ := processFuncOut(funcType) + + h[namespace+"."+method.Name] = rpcHandler{ + paramReceivers: recvs, + nParams: ins, + + handlerFunc: method.Func, + receiver: val, + + hasCtx: hasCtx, + + errOut: errOut, + valOut: valOut, + } + } +} + +// Handle + +type rpcErrFunc func(w io.Writer, req *request, code int, err error) + +func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { var req request - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.rpcError(w, &req, rpcParseError, err) + if err := json.NewDecoder(r).Decode(&req); err != nil { + rpcError(w, &req, rpcParseError, err) return } - handler, ok := s.methods[req.Method] + h.handle(ctx, req, w, rpcError) +} + +func (h handlers) handle(ctx context.Context, req request, w io.Writer, rpcError rpcErrFunc) { + handler, ok := h[req.Method] if !ok { - s.rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) + rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) return } if len(req.Params) != handler.nParams { - s.rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count")) + rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count")) return } callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams) callParams[0] = handler.receiver if handler.hasCtx == 1 { - callParams[1] = reflect.ValueOf(r.Context()) + callParams[1] = reflect.ValueOf(ctx) } for i := 0; i < handler.nParams; i++ { rp := reflect.New(handler.paramReceivers[i]) if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil { - s.rpcError(w, &req, rpcParseError, err) + rpcError(w, &req, rpcParseError, err) return } callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface()) } + /////////////////// + callResult := handler.handlerFunc.Call(callParams) if req.ID == nil { return // notification } + /////////////////// + resp := response{ Jsonrpc: "2.0", ID: *req.ID, @@ -144,90 +164,3 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } - -func (s *RPCServer) rpcError(w http.ResponseWriter, req *request, code int, err error) { - w.WriteHeader(500) - if req.ID == nil { // notification - return - } - - resp := response{ - Jsonrpc: "2.0", - ID: *req.ID, - Error: &respError{ - Code: code, - Message: err.Error(), - }, - } - - _ = json.NewEncoder(w).Encode(resp) -} - -// Register registers new RPC handler -// -// Handler is any value with methods defined -func (s *RPCServer) Register(namespace string, r interface{}) { - val := reflect.ValueOf(r) - //TODO: expect ptr - - for i := 0; i < val.NumMethod(); i++ { - method := val.Type().Method(i) - - funcType := method.Func.Type() - hasCtx := 0 - if funcType.NumIn() >= 2 && funcType.In(1) == contextType { - hasCtx = 1 - } - - ins := funcType.NumIn() - 1 - hasCtx - recvs := make([]reflect.Type, ins) - for i := 0; i < ins; i++ { - recvs[i] = method.Type.In(i + 1 + hasCtx) - } - - valOut, errOut, _ := processFuncOut(funcType) - - fmt.Println(namespace + "." + method.Name) - - s.methods[namespace+"."+method.Name] = rpcHandler{ - paramReceivers: recvs, - nParams: ins, - - handlerFunc: method.Func, - receiver: val, - - hasCtx: hasCtx, - - errOut: errOut, - valOut: valOut, - } - } -} - -func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) { - errOut = -1 - valOut = -1 - n = funcType.NumOut() - - switch n { - case 0: - case 1: - if funcType.Out(0) == errorType { - errOut = 0 - } else { - valOut = 0 - } - case 2: - valOut = 0 - errOut = 1 - if funcType.Out(1) != errorType { - panic("expected error as second return value") - } - default: - panic("too many error values") - } - - return -} - -var _ error = &respError{} diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go new file mode 100644 index 000000000..73ce68349 --- /dev/null +++ b/lib/jsonrpc/server.go @@ -0,0 +1,78 @@ +package jsonrpc + +import ( + "encoding/json" + "io" + "net/http" + + "github.com/gorilla/websocket" +) + +const ( + rpcParseError = -32700 + rpcMethodNotFound = -32601 + rpcInvalidParams = -32602 +) + +// RPCServer provides a jsonrpc 2.0 http server handler +type RPCServer struct { + methods handlers +} + +// NewServer creates new RPCServer instance +func NewServer() *RPCServer { + return &RPCServer{ + methods: map[string]rpcHandler{}, + } +} + +var upgrader = websocket.Upgrader{} + +func (s *RPCServer) handleWS(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + defer c.Close() + + handleWsConn(r.Context(), c, s.methods, nil, nil) +} + +// TODO: return errors to clients per spec +func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Connection") == "Upgrade" { + s.handleWS(w, r) + return + } + + s.methods.handleReader(r.Context(), r.Body, w, s.rpcError) +} + +func (s *RPCServer) rpcError(w io.Writer, req *request, code int, err error) { + w.(http.ResponseWriter).WriteHeader(500) + if req.ID == nil { // notification + return + } + + resp := response{ + Jsonrpc: "2.0", + ID: *req.ID, + Error: &respError{ + Code: code, + Message: err.Error(), + }, + } + + _ = json.NewEncoder(w).Encode(resp) +} + +// Register registers new RPC handler +// +// Handler is any value with methods defined +func (s *RPCServer) Register(namespace string, handler interface{}) { + s.methods.register(namespace, handler) +} + +var _ error = &respError{} diff --git a/lib/jsonrpc/util.go b/lib/jsonrpc/util.go new file mode 100644 index 000000000..ece0af7de --- /dev/null +++ b/lib/jsonrpc/util.go @@ -0,0 +1,49 @@ +package jsonrpc + +import ( + "encoding/json" + "reflect" +) + +type param struct { + data []byte // from unmarshal + + v reflect.Value // to marshal +} + +func (p *param) UnmarshalJSON(raw []byte) error { + p.data = make([]byte, len(raw)) + copy(p.data, raw) + return nil +} + +func (p *param) MarshalJSON() ([]byte, error) { + return json.Marshal(p.v.Interface()) +} + +// processFuncOut finds value and error Outs in function +func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) { + errOut = -1 // -1 if not found + valOut = -1 + n = funcType.NumOut() + + switch n { + case 0: + case 1: + if funcType.Out(0) == errorType { + errOut = 0 + } else { + valOut = 0 + } + case 2: + valOut = 0 + errOut = 1 + if funcType.Out(1) != errorType { + panic("expected error as second return value") + } + default: + panic("too many error values") + } + + return +} diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go new file mode 100644 index 000000000..0570e2b08 --- /dev/null +++ b/lib/jsonrpc/websocket.go @@ -0,0 +1,113 @@ +package jsonrpc + +import ( + "context" + "encoding/json" + "errors" + "io" + + "github.com/gorilla/websocket" +) + +type frame struct { + // common + Jsonrpc string `json:"jsonrpc"` + ID *int64 `json:"id,omitempty"` + + // request + Method string `json:"method,omitempty"` + Params []param `json:"params,omitempty"` + + // response + Result result `json:"result,omitempty"` + Error *respError `json:"error,omitempty"` +} + +func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) { + incoming := make(chan io.Reader) + + nextMessage := func() { + mtype, r, err := conn.NextReader() + if err != nil { + r, _ := io.Pipe() + r.CloseWithError(err) // nolint + incoming <- r + return + + } + if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { + r, _ := io.Pipe() + r.CloseWithError(errors.New("unsupported message type")) // nolint + incoming <- r + return + } + incoming <- r + } + + go nextMessage() + + inflight := map[int64]clientRequest{} + + for { + select { + case r := <-incoming: + var frame frame + if err := json.NewDecoder(r).Decode(&frame); err != nil { + log.Error("handle me:", err) + return + } + + if frame.Method != "" { + // call + req := request{ + Jsonrpc: frame.Jsonrpc, + ID: frame.ID, + Method: frame.Method, + Params: frame.Params, + } + + // TODO: ignore ID + wcl, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + log.Error("handle me:", err) + return + } + + handler.handle(ctx, req, wcl, func(w io.Writer, req *request, code int, err error) { + log.Error("handle me:", err) // TODO: seriously + return + }) + + if err := wcl.Close(); err != nil { + log.Error("handle me:", err) + return + } + } else { + // response + req, ok := inflight[*frame.ID] + if !ok { + log.Error("client got unknown ID in response") + continue + } + + req.ready <- clientResponse{ + Jsonrpc: frame.Jsonrpc, + Result: frame.Result, + ID: *frame.ID, + Error: frame.Error, + } + delete(inflight, *frame.ID) + } + + go nextMessage() + case req := <-requests: + inflight[*req.req.ID] = req + if err := conn.WriteJSON(req.req); err != nil { + log.Error("handle me:", err) + return + } + case <-stop: + return + } + } +} From df90b725005e11e732f0e2dc2b5cceb8b23acd89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jul 2019 19:12:51 +0200 Subject: [PATCH 3/8] jsonrpc: update tests --- lib/jsonrpc/rpc_test.go | 42 ++++++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go index d496e25c5..307050061 100644 --- a/lib/jsonrpc/rpc_test.go +++ b/lib/jsonrpc/rpc_test.go @@ -70,7 +70,10 @@ func TestRPC(t *testing.T) { AddGet func(int) int StringMatch func(t TestType, i2 int64) (out TestOut, err error) } - closer := NewClient(testServ.URL, "SimpleServerHandler", &client) + closer, err := NewClient(testServ.URL, "SimpleServerHandler", &client) + if err != nil { + t.Fatal(err) + } defer closer() // Add(int) error @@ -83,7 +86,7 @@ func TestRPC(t *testing.T) { t.Error("expected 2") } - err := client.Add(-3546) + err = client.Add(-3546) if err == nil { t.Fatal("expected error") } @@ -130,7 +133,10 @@ func TestRPC(t *testing.T) { var noret struct { Add func(int) } - closer = NewClient(testServ.URL, "SimpleServerHandler", &noret) + closer, err = NewClient(testServ.URL, "SimpleServerHandler", &noret) + if err != nil { + t.Fatal(err) + } // this one should actually work noret.Add(4) @@ -142,7 +148,10 @@ func TestRPC(t *testing.T) { var noparam struct { Add func() } - closer = NewClient(testServ.URL, "SimpleServerHandler", &noparam) + closer, err = NewClient(testServ.URL, "SimpleServerHandler", &noparam) + if err != nil { + t.Fatal(err) + } // shouldn't panic noparam.Add() @@ -151,7 +160,10 @@ func TestRPC(t *testing.T) { var erronly struct { AddGet func() (int, error) } - closer = NewClient(testServ.URL, "SimpleServerHandler", &erronly) + closer, err = NewClient(testServ.URL, "SimpleServerHandler", &erronly) + if err != nil { + t.Fatal(err) + } _, err = erronly.AddGet() if err == nil || err.Error() != "RPC error (-32602): wrong param count" { @@ -162,7 +174,10 @@ func TestRPC(t *testing.T) { var wrongtype struct { Add func(string) error } - closer = NewClient(testServ.URL, "SimpleServerHandler", &wrongtype) + closer, err = NewClient(testServ.URL, "SimpleServerHandler", &wrongtype) + if err != nil { + t.Fatal(err) + } err = wrongtype.Add("not an int") if err == nil || err.Error() != "RPC error (-32700): json: cannot unmarshal string into Go value of type int" { @@ -173,7 +188,10 @@ func TestRPC(t *testing.T) { var notfound struct { NotThere func(string) error } - closer = NewClient(testServ.URL, "SimpleServerHandler", ¬found) + closer, err = NewClient(testServ.URL, "SimpleServerHandler", ¬found) + if err != nil { + t.Fatal(err) + } err = notfound.NotThere("hello?") if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" { @@ -219,7 +237,10 @@ func TestCtx(t *testing.T) { var client struct { Test func(ctx context.Context) } - closer := NewClient(testServ.URL, "CtxHandler", &client) + closer, err := NewClient(testServ.URL, "CtxHandler", &client) + if err != nil { + t.Fatal(err) + } ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() @@ -239,7 +260,10 @@ func TestCtx(t *testing.T) { var noCtxClient struct { Test func() } - closer = NewClient(testServ.URL, "CtxHandler", &noCtxClient) + closer, err = NewClient(testServ.URL, "CtxHandler", &noCtxClient) + if err != nil { + t.Fatal(err) + } noCtxClient.Test() From 96ed4225fb70ba56fea1673bffcd9c714101e291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 13 Jul 2019 14:44:20 +0200 Subject: [PATCH 4/8] improve error handling in handleWsConn --- lib/jsonrpc/websocket.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 0570e2b08..46219e7af 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -25,20 +25,18 @@ type frame struct { func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) { incoming := make(chan io.Reader) + var incErr error nextMessage := func() { mtype, r, err := conn.NextReader() if err != nil { - r, _ := io.Pipe() - r.CloseWithError(err) // nolint - incoming <- r + incErr = err + close(incoming) return - } if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { - r, _ := io.Pipe() - r.CloseWithError(errors.New("unsupported message type")) // nolint - incoming <- r + incErr = errors.New("unsupported message type") + close(incoming) return } incoming <- r @@ -50,7 +48,14 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r for { select { - case r := <-incoming: + case r, ok := <-incoming: + if !ok { + if incErr != nil { + log.Debugf("websocket error", "error", incErr) + } + return // remote closed + } + var frame frame if err := json.NewDecoder(r).Decode(&frame); err != nil { log.Error("handle me:", err) @@ -107,6 +112,9 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r return } case <-stop: + if err := conn.Close(); err != nil { + log.Debugf("websocket close error", "error", err) + } return } } From b93d71e8cbd229bedd33747fd299199187142101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 15 Jul 2019 17:05:45 +0200 Subject: [PATCH 5/8] jsonrpc: ws addresses in tests --- lib/jsonrpc/rpc_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go index 307050061..3366f6fa3 100644 --- a/lib/jsonrpc/rpc_test.go +++ b/lib/jsonrpc/rpc_test.go @@ -62,7 +62,6 @@ func TestRPC(t *testing.T) { // httptest stuff testServ := httptest.NewServer(rpcServer) defer testServ.Close() - // setup client var client struct { @@ -70,7 +69,7 @@ func TestRPC(t *testing.T) { AddGet func(int) int StringMatch func(t TestType, i2 int64) (out TestOut, err error) } - closer, err := NewClient(testServ.URL, "SimpleServerHandler", &client) + closer, err := NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &client) if err != nil { t.Fatal(err) } @@ -133,7 +132,7 @@ func TestRPC(t *testing.T) { var noret struct { Add func(int) } - closer, err = NewClient(testServ.URL, "SimpleServerHandler", &noret) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &noret) if err != nil { t.Fatal(err) } @@ -148,7 +147,7 @@ func TestRPC(t *testing.T) { var noparam struct { Add func() } - closer, err = NewClient(testServ.URL, "SimpleServerHandler", &noparam) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam) if err != nil { t.Fatal(err) } @@ -160,7 +159,7 @@ func TestRPC(t *testing.T) { var erronly struct { AddGet func() (int, error) } - closer, err = NewClient(testServ.URL, "SimpleServerHandler", &erronly) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly) if err != nil { t.Fatal(err) } @@ -174,7 +173,7 @@ func TestRPC(t *testing.T) { var wrongtype struct { Add func(string) error } - closer, err = NewClient(testServ.URL, "SimpleServerHandler", &wrongtype) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype) if err != nil { t.Fatal(err) } @@ -188,7 +187,7 @@ func TestRPC(t *testing.T) { var notfound struct { NotThere func(string) error } - closer, err = NewClient(testServ.URL, "SimpleServerHandler", ¬found) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found) if err != nil { t.Fatal(err) } @@ -237,7 +236,7 @@ func TestCtx(t *testing.T) { var client struct { Test func(ctx context.Context) } - closer, err := NewClient(testServ.URL, "CtxHandler", &client) + closer, err := NewClient("ws://" + testServ.Listener.Addr().String(), "CtxHandler", &client) if err != nil { t.Fatal(err) } @@ -260,7 +259,7 @@ func TestCtx(t *testing.T) { var noCtxClient struct { Test func() } - closer, err = NewClient(testServ.URL, "CtxHandler", &noCtxClient) + closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient) if err != nil { t.Fatal(err) } From 1153f050bb35e7bbc76ff1d57af864b13400cc47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 15 Jul 2019 18:21:48 +0200 Subject: [PATCH 6/8] Context cancellation over websockets --- lib/jsonrpc/client.go | 34 ++++++++++-- lib/jsonrpc/handler.go | 24 ++++++--- lib/jsonrpc/server.go | 35 ++++++------ lib/jsonrpc/websocket.go | 113 +++++++++++++++++++++++++++++---------- 4 files changed, 151 insertions(+), 55 deletions(-) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 31a4d9ed2..145d54d62 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -151,14 +151,40 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser req: req, ready: rchan, } - resp := <- rchan + var ctxDone <-chan struct{} + var resp clientResponse + + if hasCtx == 1 { + ctxDone = args[0].Interface().(context.Context).Done() + } + + loop: + for { + select { + case resp = <-rchan: + break loop + case <-ctxDone: // send cancel request + ctxDone = nil + + requests <- clientRequest{ + req: request{ + Jsonrpc: "2.0", + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(id)}}, + }, + } + } + } var rval reflect.Value if valOut != -1 { - log.Debugw("rpc result", "type", ftyp.Out(valOut)) rval = reflect.New(ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil { - return processError(err) + + if resp.Result != nil { + log.Debugw("rpc result", "type", ftyp.Out(valOut)) + if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil { + return processError(err) + } } } diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index 787d3f954..3c5e31324 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -92,19 +92,25 @@ func (h handlers) register(namespace string, r interface{}) { // Handle -type rpcErrFunc func(w io.Writer, req *request, code int, err error) +type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error) func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { + wf := func(cb func(io.Writer)) { + cb(w) + } + var req request if err := json.NewDecoder(r).Decode(&req); err != nil { - rpcError(w, &req, rpcParseError, err) + rpcError(wf, &req, rpcParseError, err) return } - h.handle(ctx, req, w, rpcError) + h.handle(ctx, req, wf, rpcError, func() {}) } -func (h handlers) handle(ctx context.Context, req request, w io.Writer, rpcError rpcErrFunc) { +func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func()) { + defer done() + handler, ok := h[req.Method] if !ok { rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) @@ -159,8 +165,10 @@ func (h handlers) handle(ctx context.Context, req request, w io.Writer, rpcError resp.Result = callResult[handler.valOut].Interface() } - if err := json.NewEncoder(w).Encode(resp); err != nil { - fmt.Println(err) - return - } + w(func(w io.Writer) { + if err := json.NewEncoder(w).Encode(resp); err != nil { + fmt.Println(err) + return + } + }) } diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index 73ce68349..457463846 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -47,25 +47,30 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - s.methods.handleReader(r.Context(), r.Body, w, s.rpcError) + s.methods.handleReader(r.Context(), r.Body, w, rpcError) } -func (s *RPCServer) rpcError(w io.Writer, req *request, code int, err error) { - w.(http.ResponseWriter).WriteHeader(500) - if req.ID == nil { // notification - return - } +func rpcError(wf func(func(io.Writer)), req *request, code int, err error) { + wf(func(w io.Writer) { + if hw, ok := w.(http.ResponseWriter); ok { + hw.WriteHeader(500) + } - resp := response{ - Jsonrpc: "2.0", - ID: *req.ID, - Error: &respError{ - Code: code, - Message: err.Error(), - }, - } + if req.ID == nil { // notification + return + } - _ = json.NewEncoder(w).Encode(resp) + resp := response{ + Jsonrpc: "2.0", + ID: *req.ID, + Error: &respError{ + Code: code, + Message: err.Error(), + }, + } + + _ = json.NewEncoder(w).Encode(resp) + }) } // Register registers new RPC handler diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 46219e7af..3fede6b12 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -5,10 +5,14 @@ import ( "encoding/json" "errors" "io" + "io/ioutil" + "sync" "github.com/gorilla/websocket" ) +const wsCancel = "xrpc.cancel" + type frame struct { // common Jsonrpc string `json:"jsonrpc"` @@ -42,9 +46,50 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r incoming <- r } + var writeLk sync.Mutex + nextWriter := func(cb func(io.Writer)) { + writeLk.Lock() + defer writeLk.Unlock() + + wcl, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + log.Error("handle me:", err) + return + } + + cb(wcl) + + if err := wcl.Close(); err != nil { + log.Error("handle me:", err) + return + } + } + go nextMessage() inflight := map[int64]clientRequest{} + handling := map[int64]context.CancelFunc{} + var handlingLk sync.Mutex + + cancelCtx := func(req frame) { + if req.ID != nil { + log.Warnf("%s call with ID set, won't respond", wsCancel) + } + + var id int64 + if err := json.Unmarshal(req.Params[0].data, &id); err != nil { + log.Error("handle me:", err) + return + } + + handlingLk.Lock() + defer handlingLk.Unlock() + + cf, ok := handling[id] + if ok { + cf() + } + } for { select { @@ -62,33 +107,8 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r return } - if frame.Method != "" { - // call - req := request{ - Jsonrpc: frame.Jsonrpc, - ID: frame.ID, - Method: frame.Method, - Params: frame.Params, - } - - // TODO: ignore ID - wcl, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - log.Error("handle me:", err) - return - } - - handler.handle(ctx, req, wcl, func(w io.Writer, req *request, code int, err error) { - log.Error("handle me:", err) // TODO: seriously - return - }) - - if err := wcl.Close(); err != nil { - log.Error("handle me:", err) - return - } - } else { - // response + switch frame.Method { + case "": // Response to our call req, ok := inflight[*frame.ID] if !ok { log.Error("client got unknown ID in response") @@ -102,11 +122,48 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r Error: frame.Error, } delete(inflight, *frame.ID) + case wsCancel: + cancelCtx(frame) + default: // Remote call + req := request{ + Jsonrpc: frame.Jsonrpc, + ID: frame.ID, + Method: frame.Method, + Params: frame.Params, + } + + ctx, cf := context.WithCancel(ctx) + + nw := func(cb func(io.Writer)) { + cb(ioutil.Discard) + } + done := func(){} + if frame.ID != nil { + nw = nextWriter + + + handlingLk.Lock() + handling[*frame.ID] = cf + handlingLk.Unlock() + + done = func() { + handlingLk.Lock() + defer handlingLk.Unlock() + + cf := handling[*frame.ID] + cf() + delete(handling, *frame.ID) + } + } + + go handler.handle(ctx, req, nw, rpcError, done) } go nextMessage() case req := <-requests: - inflight[*req.req.ID] = req + if req.req.ID != nil { + inflight[*req.req.ID] = req + } if err := conn.WriteJSON(req.req); err != nil { log.Error("handle me:", err) return From 3fca70f112d7c93c6b4ebab46f5fab3ad6127295 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 15 Jul 2019 18:28:00 +0200 Subject: [PATCH 7/8] Fix rpcBuilder in node_test --- node/node_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/node/node_test.go b/node/node_test.go index 6b36cf834..fb1d348c2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -53,7 +53,12 @@ func rpcBuilder(t *testing.T, n int) []api.API { rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", a) testServ := httptest.NewServer(rpcServer) // todo: close - out[i] = client.NewRPC(testServ.URL) + + var err error + out[i], err = client.NewRPC("ws://" + testServ.Listener.Addr().String()) + if err != nil { + t.Fatal(err) + } } return out } From 661043f5c8c99db82a320c180dd8112644f5a497 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 15 Jul 2019 18:32:43 +0200 Subject: [PATCH 8/8] Lint fixes --- lib/jsonrpc/client.go | 4 ++-- lib/jsonrpc/rpc_test.go | 16 ++++++++-------- lib/jsonrpc/server.go | 6 +++++- lib/jsonrpc/websocket.go | 16 +++++++--------- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 145d54d62..d03600f86 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -169,8 +169,8 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser requests <- clientRequest{ req: request{ Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(id)}}, }, } } diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go index 3366f6fa3..f52d71792 100644 --- a/lib/jsonrpc/rpc_test.go +++ b/lib/jsonrpc/rpc_test.go @@ -69,7 +69,7 @@ func TestRPC(t *testing.T) { AddGet func(int) int StringMatch func(t TestType, i2 int64) (out TestOut, err error) } - closer, err := NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &client) + closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client) if err != nil { t.Fatal(err) } @@ -132,7 +132,7 @@ func TestRPC(t *testing.T) { var noret struct { Add func(int) } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &noret) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret) if err != nil { t.Fatal(err) } @@ -147,7 +147,7 @@ func TestRPC(t *testing.T) { var noparam struct { Add func() } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam) if err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestRPC(t *testing.T) { var erronly struct { AddGet func() (int, error) } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly) if err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ func TestRPC(t *testing.T) { var wrongtype struct { Add func(string) error } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype) if err != nil { t.Fatal(err) } @@ -187,7 +187,7 @@ func TestRPC(t *testing.T) { var notfound struct { NotThere func(string) error } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found) if err != nil { t.Fatal(err) } @@ -236,7 +236,7 @@ func TestCtx(t *testing.T) { var client struct { Test func(ctx context.Context) } - closer, err := NewClient("ws://" + testServ.Listener.Addr().String(), "CtxHandler", &client) + closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client) if err != nil { t.Fatal(err) } @@ -259,7 +259,7 @@ func TestCtx(t *testing.T) { var noCtxClient struct { Test func() } - closer, err = NewClient("ws://" + testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient) + closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient) if err != nil { t.Fatal(err) } diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index 457463846..fdf3a5a67 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -35,9 +35,13 @@ func (s *RPCServer) handleWS(w http.ResponseWriter, r *http.Request) { w.WriteHeader(500) return } - defer c.Close() handleWsConn(r.Context(), c, s.methods, nil, nil) + + if err := c.Close(); err != nil { + log.Error(err) + return + } } // TODO: return errors to clients per spec diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 3fede6b12..6d21be1e0 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -15,16 +15,16 @@ const wsCancel = "xrpc.cancel" type frame struct { // common - Jsonrpc string `json:"jsonrpc"` - ID *int64 `json:"id,omitempty"` + Jsonrpc string `json:"jsonrpc"` + ID *int64 `json:"id,omitempty"` // request - Method string `json:"method,omitempty"` - Params []param `json:"params,omitempty"` + Method string `json:"method,omitempty"` + Params []param `json:"params,omitempty"` // response - Result result `json:"result,omitempty"` - Error *respError `json:"error,omitempty"` + Result result `json:"result,omitempty"` + Error *respError `json:"error,omitempty"` } func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) { @@ -137,11 +137,10 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r nw := func(cb func(io.Writer)) { cb(ioutil.Discard) } - done := func(){} + done := cf if frame.ID != nil { nw = nextWriter - handlingLk.Lock() handling[*frame.ID] = cf handlingLk.Unlock() @@ -150,7 +149,6 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r handlingLk.Lock() defer handlingLk.Unlock() - cf := handling[*frame.ID] cf() delete(handling, *frame.ID) }