diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index c0efa7337..448418c67 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -3,7 +3,6 @@ package jsonrpc import ( "context" "encoding/json" - "errors" "fmt" "net/http" "reflect" @@ -42,12 +41,14 @@ type clientResponse struct { Error *respError `json:"error,omitempty"` } +type makeChanSink func() (context.Context, func([]byte, bool)) + type clientRequest struct { req request ready chan clientResponse // retCh provides a context and sink for handling incoming channel messages - retCh func() (context.Context, func([]byte, bool)) + retCh makeChanSink } // ClientCloser is used to close Client from further use @@ -62,24 +63,33 @@ func NewClient(addr string, namespace string, handler interface{}, requestHeader return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) } +type client struct { + namespace string + + requests chan clientRequest + idCtr int64 +} + // NewMergeClient is like NewClient, but allows to specify multiple structs // to be filled in the same namespace, using one connection func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { - var idCtr int64 - conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) if err != nil { return nil, err } + c := client{ + namespace: namespace, + } + stop := make(chan struct{}) - requests := make(chan clientRequest) + c.requests = make(chan clientRequest) handlers := map[string]rpcHandler{} go (&wsConn{ conn: conn, handler: handlers, - requests: requests, + requests: c.requests, stop: stop, }).handleWsConn(context.TODO()) @@ -96,157 +106,11 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe val := reflect.ValueOf(handler) for i := 0; i < typ.NumField(); i++ { - f := typ.Field(i) - ftyp := f.Type - if ftyp.Kind() != reflect.Func { - return nil, xerrors.New("handler field not a func") + fn, err := c.makeRpcFunc(typ.Field(i)) + if err != nil { + return nil, err } - valOut, errOut, nout := processFuncOut(ftyp) - - processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = rval - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - if resp.Error != nil { - out[errOut].Set(reflect.ValueOf(resp.Error)) - } - } - - return out - } - - processError := func(err error) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = reflect.New(ftyp.Out(valOut)).Elem() - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - out[errOut].Set(reflect.ValueOf(&ErrClient{err})) - } - - return out - } - - hasCtx := 0 - if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { - hasCtx = 1 - } - retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan - - fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { - id := atomic.AddInt64(&idCtr, 1) - params := make([]param, len(args)-hasCtx) - for i, arg := range args[hasCtx:] { - params[i] = param{ - v: arg, - } - } - - var ctx context.Context - if hasCtx == 1 { - ctx = args[0].Interface().(context.Context) - } - - var retVal reflect.Value - - // if the function returns a channel, we need to provide a sink for the - // messages - var chCtor func() (context.Context, func([]byte, bool)) - - if retCh { - retVal = reflect.Zero(ftyp.Out(valOut)) - - chCtor = func() (context.Context, func([]byte, bool)) { - // unpack chan type to make sure it's reflect.BothDir - ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) - ch := reflect.MakeChan(ctyp, 0) // todo: buffer? - retVal = ch.Convert(ftyp.Out(valOut)) - - return ctx, func(result []byte, ok bool) { - if !ok { - // remote channel closed, close ours too - ch.Close() - return - } - - val := reflect.New(ftyp.Out(valOut).Elem()) - if err := json.Unmarshal(result, val.Interface()); err != nil { - log.Errorf("error unmarshaling chan response: %s", err) - return - } - - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea - } - } - } - - req := request{ - Jsonrpc: "2.0", - ID: &id, - Method: namespace + "." + f.Name, - Params: params, - } - - rchan := make(chan clientResponse, 1) - requests <- clientRequest{ - req: req, - ready: rchan, - - retCh: chCtor, - } - var ctxDone <-chan struct{} - var resp clientResponse - - if ctx != nil { - ctxDone = ctx.Done() - } - - // wait for response, handle context cancellation - loop: - for { - select { - case resp = <-rchan: - break loop - case <-ctxDone: // send cancel request - ctxDone = nil - - requests <- clientRequest{ - req: request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, - }, - } - } - } - - if valOut != -1 && !retCh { - retVal = reflect.New(ftyp.Out(valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { - return processError(xerrors.Errorf("unmarshaling result: %w", err)) - } - } - - retVal = retVal.Elem() - } - - if resp.ID != *req.ID { - return processError(errors.New("request and response id didn't match")) - } - - return processResponse(resp, retVal) - }) - val.Elem().Field(i).Set(fn) } } @@ -255,3 +119,186 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe close(stop) }, nil } + +func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) { + retVal := reflect.Zero(ftyp.Out(valOut)) + + chCtor := func() (context.Context, func([]byte, bool)) { + // unpack chan type to make sure it's reflect.BothDir + ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) + ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + retVal = ch.Convert(ftyp.Out(valOut)) + + return ctx, func(result []byte, ok bool) { + if !ok { + // remote channel closed, close ours too + ch.Close() + return + } + + val := reflect.New(ftyp.Out(valOut).Elem()) + if err := json.Unmarshal(result, val.Interface()); err != nil { + log.Errorf("error unmarshaling chan response: %s", err) + return + } + + ch.Send(val.Elem()) // todo: select on ctx is probably a good idea + } + } + + return func() reflect.Value { return retVal }, chCtor +} + +func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse { + rchan := make(chan clientResponse, 1) + c.requests <- clientRequest{ + req: req, + ready: rchan, + + retCh: chCtor, + } + var ctxDone <-chan struct{} + var resp clientResponse + + if ctx != nil { + ctxDone = ctx.Done() + } + + // wait for response, handle context cancellation +loop: + for { + select { + case resp = <-rchan: + break loop + case <-ctxDone: // send cancel request + ctxDone = nil + + c.requests <- clientRequest{ + req: request{ + Jsonrpc: "2.0", + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(*req.ID)}}, + }, + } + } + } + + return resp +} + +type rpcFunc struct { + client *client + + ftyp reflect.Type + name string + + nout int + valOut int + errOut int + + hasCtx int + retCh bool +} + +func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = rval + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + if resp.Error != nil { + out[fn.errOut].Set(reflect.ValueOf(resp.Error)) + } + } + + return out +} + +func (fn *rpcFunc) processError(err error) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem() + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err})) + } + + return out +} + +func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) { + id := atomic.AddInt64(&fn.client.idCtr, 1) + params := make([]param, len(args)-fn.hasCtx) + for i, arg := range args[fn.hasCtx:] { + params[i] = param{ + v: arg, + } + } + + var ctx context.Context + if fn.hasCtx == 1 { + ctx = args[0].Interface().(context.Context) + } + + retVal := func() reflect.Value { return reflect.Value{} } + + // if the function returns a channel, we need to provide a sink for the + // messages + var chCtor makeChanSink + if fn.retCh { + retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut) + } + + req := request{ + Jsonrpc: "2.0", + ID: &id, + Method: fn.client.namespace + "." + fn.name, + Params: params, + } + + resp := fn.client.sendRequest(ctx, req, chCtor) + + if resp.ID != *req.ID { + return fn.processError(xerrors.New("request and response id didn't match")) + } + + if fn.valOut != -1 && !fn.retCh { + val := reflect.New(fn.ftyp.Out(fn.valOut)) + + if resp.Result != nil { + log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) + if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { + return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) + } + } + + retVal = func() reflect.Value { return val.Elem() } + } + + return fn.processResponse(resp, retVal()) +} + +func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { + ftyp := f.Type + if ftyp.Kind() != reflect.Func { + return reflect.Value{}, xerrors.New("handler field not a func") + } + + fun := &rpcFunc{ + client: c, + ftyp: ftyp, + name: f.Name, + } + fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) + + if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { + fun.hasCtx = 1 + } + fun.retCh = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan + + return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil +}