package jsonrpc import ( "container/list" "context" "encoding/base64" "encoding/json" "fmt" "net/http" "reflect" "sync/atomic" "time" "github.com/gorilla/websocket" logging "github.com/ipfs/go-log/v2" "go.opencensus.io/trace" "go.opencensus.io/trace/propagation" "golang.org/x/xerrors" ) const ( methodRetryFrequency = time.Second * 3 ) var ( errorType = reflect.TypeOf(new(error)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem() log = logging.Logger("rpc") ) // ErrClient is an error which occurred on the client side the library type ErrClient struct { err error } func (e *ErrClient) Error() string { return fmt.Sprintf("RPC client error: %s", e.err) } // Unwrap unwraps the actual error func (e *ErrClient) Unwrap(err error) error { return e.err } type clientResponse struct { Jsonrpc string `json:"jsonrpc"` Result json.RawMessage `json:"result"` ID int64 `json:"id"` Error *respError `json:"error,omitempty"` } type makeChanSink func() (context.Context, func([]byte, bool)) type clientRequest struct { req request ready chan clientResponse // retCh provides a context and sink for handling incoming channel messages retCh makeChanSink } // ClientCloser is used to close Client from further use type ClientCloser func() // NewClient creates new jsonrpc 2.0 client // // handler must be pointer to a struct with function fields // Returned value closes the client connection // TODO: Example func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) { return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) } type client struct { namespace string requests chan clientRequest exiting <-chan struct{} idCtr int64 } // NewMergeClient is like NewClient, but allows to specify multiple structs // to be filled in the same namespace, using one connection func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) { var config Config for _, o := range opts { o(&config) } connFactory := func() (*websocket.Conn, error) { conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) return conn, err } if config.proxyConnFactory != nil { connFactory = config.proxyConnFactory(connFactory) } conn, err := connFactory() if err != nil { return nil, err } c := client{ namespace: namespace, } stop := make(chan struct{}) exiting := make(chan struct{}) c.requests = make(chan clientRequest) c.exiting = exiting handlers := map[string]rpcHandler{} go (&wsConn{ conn: conn, connFactory: connFactory, reconnectInterval: config.ReconnectInterval, handler: handlers, requests: c.requests, stop: stop, exiting: exiting, }).handleWsConn(context.TODO()) for _, handler := range outs { htyp := reflect.TypeOf(handler) if htyp.Kind() != reflect.Ptr { return nil, xerrors.New("expected handler to be a pointer") } typ := htyp.Elem() if typ.Kind() != reflect.Struct { return nil, xerrors.New("handler should be a struct") } val := reflect.ValueOf(handler) for i := 0; i < typ.NumField(); i++ { fn, err := c.makeRpcFunc(typ.Field(i)) if err != nil { return nil, err } val.Elem().Field(i).Set(fn) } } return func() { close(stop) <-exiting }, nil } func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) { retVal := reflect.Zero(ftyp.Out(valOut)) chCtor := func() (context.Context, func([]byte, bool)) { // unpack chan type to make sure it's reflect.BothDir ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) ch := reflect.MakeChan(ctyp, 0) // todo: buffer? chCtx, chCancel := context.WithCancel(ctx) retVal = ch.Convert(ftyp.Out(valOut)) incoming := make(chan reflect.Value, 32) // gorotuine to handle buffering of items go func() { buf := (&list.List{}).Init() for { front := buf.Front() cases := []reflect.SelectCase{ { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(chCtx.Done()), }, { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(incoming), }, } if front != nil { cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectSend, Chan: ch, Send: front.Value.(reflect.Value).Elem(), }) } chosen, val, _ := reflect.Select(cases) switch chosen { case 0: ch.Close() return case 1: vvval := val.Interface().(reflect.Value) buf.PushBack(vvval) if buf.Len() > 1 { if buf.Len() > 10 { log.Warnw("rpc output message buffer", "n", buf.Len()) } else { log.Infow("rpc output message buffer", "n", buf.Len()) } } case 2: buf.Remove(front) } } }() return ctx, func(result []byte, ok bool) { if !ok { chCancel() return } val := reflect.New(ftyp.Out(valOut).Elem()) if err := json.Unmarshal(result, val.Interface()); err != nil { log.Errorf("error unmarshaling chan response: %s", err) return } if ctx.Err() != nil { log.Errorf("got rpc message with cancelled context: %s", ctx.Err()) return } select { case incoming <- val: case <-chCtx.Done(): } } } return func() reflect.Value { return retVal }, chCtor } func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) { rchan := make(chan clientResponse, 1) creq := clientRequest{ req: req, ready: rchan, retCh: chCtor, } select { case c.requests <- creq: case <-c.exiting: return clientResponse{}, fmt.Errorf("websocket routine exiting") } var ctxDone <-chan struct{} var resp clientResponse if ctx != nil { ctxDone = ctx.Done() } // wait for response, handle context cancellation loop: for { select { case resp = <-rchan: break loop case <-ctxDone: // send cancel request ctxDone = nil cancelReq := clientRequest{ req: request{ Jsonrpc: "2.0", Method: wsCancel, Params: []param{{v: reflect.ValueOf(*req.ID)}}, }, } select { case c.requests <- cancelReq: case <-c.exiting: log.Warn("failed to send request cancellation, websocket routing exited") } } } return resp, nil } type rpcFunc struct { client *client ftyp reflect.Type name string nout int valOut int errOut int hasCtx int returnValueIsChannel bool retry bool } func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { out := make([]reflect.Value, fn.nout) if fn.valOut != -1 { out[fn.valOut] = rval } if fn.errOut != -1 { out[fn.errOut] = reflect.New(errorType).Elem() if resp.Error != nil { out[fn.errOut].Set(reflect.ValueOf(resp.Error)) } } return out } func (fn *rpcFunc) processError(err error) []reflect.Value { out := make([]reflect.Value, fn.nout) if fn.valOut != -1 { out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem() } if fn.errOut != -1 { out[fn.errOut] = reflect.New(errorType).Elem() out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err})) } return out } func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) { id := atomic.AddInt64(&fn.client.idCtr, 1) params := make([]param, len(args)-fn.hasCtx) for i, arg := range args[fn.hasCtx:] { params[i] = param{ v: arg, } } var ctx context.Context var span *trace.Span if fn.hasCtx == 1 { ctx = args[0].Interface().(context.Context) ctx, span = trace.StartSpan(ctx, "api.call") defer span.End() } retVal := func() reflect.Value { return reflect.Value{} } // if the function returns a channel, we need to provide a sink for the // messages var chCtor makeChanSink if fn.returnValueIsChannel { retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut) } req := request{ Jsonrpc: "2.0", ID: &id, Method: fn.client.namespace + "." + fn.name, Params: params, } if span != nil { span.AddAttributes(trace.StringAttribute("method", req.Method)) eSC := base64.StdEncoding.EncodeToString( propagation.Binary(span.SpanContext())) req.Meta = map[string]string{ "SpanContext": eSC, } } var resp clientResponse var err error // keep retrying if got a forced closed websocket conn and calling method // has retry annotation for { resp, err = fn.client.sendRequest(ctx, req, chCtor) if err != nil { return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) } if resp.ID != *req.ID { return fn.processError(xerrors.New("request and response id didn't match")) } if fn.valOut != -1 && !fn.returnValueIsChannel { val := reflect.New(fn.ftyp.Out(fn.valOut)) if resp.Result != nil { log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { log.Warnw("unmarshaling failed", "message", string(resp.Result)) return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) } } retVal = func() reflect.Value { return val.Elem() } } retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry if !retry { break } time.Sleep(methodRetryFrequency) } return fn.processResponse(resp, retVal()) } func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { ftyp := f.Type if ftyp.Kind() != reflect.Func { return reflect.Value{}, xerrors.New("handler field not a func") } fun := &rpcFunc{ client: c, ftyp: ftyp, name: f.Name, retry: f.Tag.Get("retry") == "true", } fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { fun.hasCtx = 1 } fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil }