jsonrpc: Websocket server

This commit is contained in:
Łukasz Magiera 2019-07-12 19:12:38 +02:00
parent 0eb208e1d3
commit 6a20d0dafe
5 changed files with 304 additions and 193 deletions

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"reflect"
"sync/atomic"
@ -82,70 +81,10 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
}
stop := make(chan struct{})
errs := make(chan error, 1)
requests := make(chan clientRequest)
responses := make(chan io.Reader)
nextMessage := func() {
mtype, r, err := conn.NextReader()
if err != nil {
r, _ := io.Pipe()
r.CloseWithError(err) // nolint
return
}
if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage {
r, _ := io.Pipe()
r.CloseWithError(errors.New("unsupported message type")) // nolint
return
}
responses <- r
}
go func() {
var err error
defer func() {
close(requests)
cerr := conn.Close()
if err == nil {
err = cerr
}
errs <- cerr
// close requests somehow
}()
inflight := map[int64]clientRequest{}
go nextMessage()
for {
select {
case req := <-requests:
inflight[*req.req.ID] = req
if err = conn.WriteJSON(req.req); err != nil {
return
}
case r := <- responses:
var resp clientResponse
if err = json.NewDecoder(r).Decode(&resp); err != nil {
return
}
req, ok := inflight[resp.ID]
if !ok {
log.Error("client got unknown ID in response")
continue
}
req.ready <- resp
delete(inflight, resp.ID)
go nextMessage()
case <-stop:
return
}
}
}()
handlers := map[string]rpcHandler{}
go handleWsConn(context.TODO(), conn, handlers, requests, stop)
for i := 0; i < typ.NumField(); i++ {
f := typ.Field(i)
@ -235,6 +174,5 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
return func() {
close(stop)
<-errs // TODO: return
}, nil
}

View File

@ -2,18 +2,13 @@ package jsonrpc
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"io"
"reflect"
)
const (
rpcParseError = -32700
rpcMethodNotFound = -32601
rpcInvalidParams = -32602
)
type rpcHandler struct {
paramReceivers []reflect.Type
nParams int
@ -27,33 +22,9 @@ type rpcHandler struct {
valOut int
}
// RPCServer provides a jsonrpc 2.0 http server handler
type RPCServer struct {
methods map[string]rpcHandler
}
type handlers map[string]rpcHandler
// NewServer creates new RPCServer instance
func NewServer() *RPCServer {
return &RPCServer{
methods: map[string]rpcHandler{},
}
}
type param struct {
data []byte // from unmarshal
v reflect.Value // to marshal
}
func (p *param) UnmarshalJSON(raw []byte) error {
p.data = make([]byte, len(raw))
copy(p.data, raw)
return nil
}
func (p *param) MarshalJSON() ([]byte, error) {
return json.Marshal(p.v.Interface())
}
// Request / response
type request struct {
Jsonrpc string `json:"jsonrpc"`
@ -81,46 +52,95 @@ type response struct {
Error *respError `json:"error,omitempty"`
}
// TODO: return errors to clients per spec
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Register
func (h handlers) register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
//TODO: expect ptr
for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
funcType := method.Func.Type()
hasCtx := 0
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
hasCtx = 1
}
ins := funcType.NumIn() - 1 - hasCtx
recvs := make([]reflect.Type, ins)
for i := 0; i < ins; i++ {
recvs[i] = method.Type.In(i + 1 + hasCtx)
}
valOut, errOut, _ := processFuncOut(funcType)
h[namespace+"."+method.Name] = rpcHandler{
paramReceivers: recvs,
nParams: ins,
handlerFunc: method.Func,
receiver: val,
hasCtx: hasCtx,
errOut: errOut,
valOut: valOut,
}
}
}
// Handle
type rpcErrFunc func(w io.Writer, req *request, code int, err error)
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
var req request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.rpcError(w, &req, rpcParseError, err)
if err := json.NewDecoder(r).Decode(&req); err != nil {
rpcError(w, &req, rpcParseError, err)
return
}
handler, ok := s.methods[req.Method]
h.handle(ctx, req, w, rpcError)
}
func (h handlers) handle(ctx context.Context, req request, w io.Writer, rpcError rpcErrFunc) {
handler, ok := h[req.Method]
if !ok {
s.rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
return
}
if len(req.Params) != handler.nParams {
s.rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
return
}
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
callParams[1] = reflect.ValueOf(r.Context())
callParams[1] = reflect.ValueOf(ctx)
}
for i := 0; i < handler.nParams; i++ {
rp := reflect.New(handler.paramReceivers[i])
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
s.rpcError(w, &req, rpcParseError, err)
rpcError(w, &req, rpcParseError, err)
return
}
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
}
///////////////////
callResult := handler.handlerFunc.Call(callParams)
if req.ID == nil {
return // notification
}
///////////////////
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
@ -144,90 +164,3 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
}
func (s *RPCServer) rpcError(w http.ResponseWriter, req *request, code int, err error) {
w.WriteHeader(500)
if req.ID == nil { // notification
return
}
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
},
}
_ = json.NewEncoder(w).Encode(resp)
}
// Register registers new RPC handler
//
// Handler is any value with methods defined
func (s *RPCServer) Register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
//TODO: expect ptr
for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
funcType := method.Func.Type()
hasCtx := 0
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
hasCtx = 1
}
ins := funcType.NumIn() - 1 - hasCtx
recvs := make([]reflect.Type, ins)
for i := 0; i < ins; i++ {
recvs[i] = method.Type.In(i + 1 + hasCtx)
}
valOut, errOut, _ := processFuncOut(funcType)
fmt.Println(namespace + "." + method.Name)
s.methods[namespace+"."+method.Name] = rpcHandler{
paramReceivers: recvs,
nParams: ins,
handlerFunc: method.Func,
receiver: val,
hasCtx: hasCtx,
errOut: errOut,
valOut: valOut,
}
}
}
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
errOut = -1
valOut = -1
n = funcType.NumOut()
switch n {
case 0:
case 1:
if funcType.Out(0) == errorType {
errOut = 0
} else {
valOut = 0
}
case 2:
valOut = 0
errOut = 1
if funcType.Out(1) != errorType {
panic("expected error as second return value")
}
default:
panic("too many error values")
}
return
}
var _ error = &respError{}

78
lib/jsonrpc/server.go Normal file
View File

@ -0,0 +1,78 @@
package jsonrpc
import (
"encoding/json"
"io"
"net/http"
"github.com/gorilla/websocket"
)
const (
rpcParseError = -32700
rpcMethodNotFound = -32601
rpcInvalidParams = -32602
)
// RPCServer provides a jsonrpc 2.0 http server handler
type RPCServer struct {
methods handlers
}
// NewServer creates new RPCServer instance
func NewServer() *RPCServer {
return &RPCServer{
methods: map[string]rpcHandler{},
}
}
var upgrader = websocket.Upgrader{}
func (s *RPCServer) handleWS(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
defer c.Close()
handleWsConn(r.Context(), c, s.methods, nil, nil)
}
// TODO: return errors to clients per spec
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Connection") == "Upgrade" {
s.handleWS(w, r)
return
}
s.methods.handleReader(r.Context(), r.Body, w, s.rpcError)
}
func (s *RPCServer) rpcError(w io.Writer, req *request, code int, err error) {
w.(http.ResponseWriter).WriteHeader(500)
if req.ID == nil { // notification
return
}
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
},
}
_ = json.NewEncoder(w).Encode(resp)
}
// Register registers new RPC handler
//
// Handler is any value with methods defined
func (s *RPCServer) Register(namespace string, handler interface{}) {
s.methods.register(namespace, handler)
}
var _ error = &respError{}

49
lib/jsonrpc/util.go Normal file
View File

@ -0,0 +1,49 @@
package jsonrpc
import (
"encoding/json"
"reflect"
)
type param struct {
data []byte // from unmarshal
v reflect.Value // to marshal
}
func (p *param) UnmarshalJSON(raw []byte) error {
p.data = make([]byte, len(raw))
copy(p.data, raw)
return nil
}
func (p *param) MarshalJSON() ([]byte, error) {
return json.Marshal(p.v.Interface())
}
// processFuncOut finds value and error Outs in function
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
errOut = -1 // -1 if not found
valOut = -1
n = funcType.NumOut()
switch n {
case 0:
case 1:
if funcType.Out(0) == errorType {
errOut = 0
} else {
valOut = 0
}
case 2:
valOut = 0
errOut = 1
if funcType.Out(1) != errorType {
panic("expected error as second return value")
}
default:
panic("too many error values")
}
return
}

113
lib/jsonrpc/websocket.go Normal file
View File

@ -0,0 +1,113 @@
package jsonrpc
import (
"context"
"encoding/json"
"errors"
"io"
"github.com/gorilla/websocket"
)
type frame struct {
// common
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
// request
Method string `json:"method,omitempty"`
Params []param `json:"params,omitempty"`
// response
Result result `json:"result,omitempty"`
Error *respError `json:"error,omitempty"`
}
func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) {
incoming := make(chan io.Reader)
nextMessage := func() {
mtype, r, err := conn.NextReader()
if err != nil {
r, _ := io.Pipe()
r.CloseWithError(err) // nolint
incoming <- r
return
}
if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage {
r, _ := io.Pipe()
r.CloseWithError(errors.New("unsupported message type")) // nolint
incoming <- r
return
}
incoming <- r
}
go nextMessage()
inflight := map[int64]clientRequest{}
for {
select {
case r := <-incoming:
var frame frame
if err := json.NewDecoder(r).Decode(&frame); err != nil {
log.Error("handle me:", err)
return
}
if frame.Method != "" {
// call
req := request{
Jsonrpc: frame.Jsonrpc,
ID: frame.ID,
Method: frame.Method,
Params: frame.Params,
}
// TODO: ignore ID
wcl, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Error("handle me:", err)
return
}
handler.handle(ctx, req, wcl, func(w io.Writer, req *request, code int, err error) {
log.Error("handle me:", err) // TODO: seriously
return
})
if err := wcl.Close(); err != nil {
log.Error("handle me:", err)
return
}
} else {
// response
req, ok := inflight[*frame.ID]
if !ok {
log.Error("client got unknown ID in response")
continue
}
req.ready <- clientResponse{
Jsonrpc: frame.Jsonrpc,
Result: frame.Result,
ID: *frame.ID,
Error: frame.Error,
}
delete(inflight, *frame.ID)
}
go nextMessage()
case req := <-requests:
inflight[*req.req.ID] = req
if err := conn.WriteJSON(req.req); err != nil {
log.Error("handle me:", err)
return
}
case <-stop:
return
}
}
}