From 0eb208e1d3448ff41f2e21e56f24ae3ca868703e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jul 2019 17:29:41 +0200 Subject: [PATCH] WS JsonRPC client --- api/client/client.go | 6 +- cli/cmd.go | 2 +- cli/version.go | 13 ++- go.mod | 1 + lib/jsonrpc/rpc_client.go | 162 ++++++++++++++++++++++++-------------- 5 files changed, 117 insertions(+), 67 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index 2dbc98f21..8d23aea3b 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -6,8 +6,8 @@ import ( ) // NewRPC creates a new http jsonrpc client. -func NewRPC(addr string) api.API { +func NewRPC(addr string) (api.API, error) { var res api.Struct - jsonrpc.NewClient(addr, "Filecoin", &res.Internal) - return &res + _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal) + return &res, err } diff --git a/cli/cmd.go b/cli/cmd.go index 414312e67..7cd4a5b99 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -35,7 +35,7 @@ func getAPI(ctx *cli.Context) (api.API, error) { if err != nil { return nil, err } - return client.NewRPC("http://" + addr + "/rpc/v0"), nil + return client.NewRPC("ws://" + addr + "/rpc/v0") } // reqContext returns context for cli execution. Calling it for the first time diff --git a/cli/version.go b/cli/version.go index 57d101d83..cb58c8ba9 100644 --- a/cli/version.go +++ b/cli/version.go @@ -1,16 +1,25 @@ package cli import ( + "fmt" + "gopkg.in/urfave/cli.v2" ) var versionCmd = &cli.Command{ Name: "version", Usage: "Print version", - Action: func(context *cli.Context) error { + Action: func(cctx *cli.Context) error { + api, err := getAPI(cctx) + if err != nil { + return err + } + + ctx := reqContext(cctx) // TODO: print more useful things - cli.VersionPrinter(context) + fmt.Println(api.Version(ctx)) + cli.VersionPrinter(cctx) return nil }, } diff --git a/go.mod b/go.mod index e61e84aa7..97228e0d9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/BurntSushi/toml v0.3.1 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 + github.com/gorilla/websocket v1.4.0 github.com/ipfs/go-bitswap v0.1.5 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.0.2 diff --git a/lib/jsonrpc/rpc_client.go b/lib/jsonrpc/rpc_client.go index b923108c7..4426c26db 100644 --- a/lib/jsonrpc/rpc_client.go +++ b/lib/jsonrpc/rpc_client.go @@ -1,23 +1,20 @@ package jsonrpc import ( - "bytes" "context" "encoding/json" "errors" "fmt" - "io/ioutil" - "net/http" + "io" "reflect" "sync/atomic" + "github.com/gorilla/websocket" logging "github.com/ipfs/go-log" ) var log = logging.Logger("rpc") -const clientDebug = true - var ( errorType = reflect.TypeOf(new(error)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem() @@ -37,12 +34,12 @@ func (e *ErrClient) Unwrap(err error) error { return e.err } -type result reflect.Value +type result []byte -func (r *result) UnmarshalJSON(raw []byte) error { - err := json.Unmarshal(raw, reflect.Value(*r).Interface()) - log.Debugw("rpc unmarshal response", "raw", string(raw), "err", err) - return err +func (p *result) UnmarshalJSON(raw []byte) error { + *p = make([]byte, len(raw)) + copy(*p, raw) + return nil } type clientResponse struct { @@ -52,6 +49,11 @@ type clientResponse struct { Error *respError `json:"error,omitempty"` } +type clientRequest struct { + req request + ready chan clientResponse +} + // ClientCloser is used to close Client from further use type ClientCloser func() @@ -60,7 +62,7 @@ type ClientCloser func() // handler must be pointer to a struct with function fields // Returned value closes the client connection // TODO: Example -func NewClient(addr string, namespace string, handler interface{}) ClientCloser { +func NewClient(addr string, namespace string, handler interface{}) (ClientCloser, error) { htyp := reflect.TypeOf(handler) if htyp.Kind() != reflect.Ptr { panic("expected handler to be a pointer") @@ -74,6 +76,77 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser var idCtr int64 + conn, _, err := websocket.DefaultDialer.Dial(addr, nil) + if err != nil { + return nil, err + } + + stop := make(chan struct{}) + errs := make(chan error, 1) + requests := make(chan clientRequest) + responses := make(chan io.Reader) + + nextMessage := func() { + mtype, r, err := conn.NextReader() + if err != nil { + r, _ := io.Pipe() + r.CloseWithError(err) // nolint + return + + } + if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage { + r, _ := io.Pipe() + r.CloseWithError(errors.New("unsupported message type")) // nolint + return + } + responses <- r + } + + go func() { + var err error + defer func() { + close(requests) + cerr := conn.Close() + if err == nil { + err = cerr + } + errs <- cerr + + // close requests somehow + }() + + inflight := map[int64]clientRequest{} + + go nextMessage() + + for { + select { + case req := <-requests: + inflight[*req.req.ID] = req + if err = conn.WriteJSON(req.req); err != nil { + return + } + case r := <- responses: + var resp clientResponse + if err = json.NewDecoder(r).Decode(&resp); err != nil { + return + } + req, ok := inflight[resp.ID] + if !ok { + log.Error("client got unknown ID in response") + continue + } + + req.ready <- resp + delete(inflight, resp.ID) + + go nextMessage() + case <-stop: + return + } + } + }() + for i := 0; i < typ.NumField(); i++ { f := typ.Field(i) ftyp := f.Type @@ -83,11 +156,11 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser valOut, errOut, nout := processFuncOut(ftyp) - processResponse := func(resp clientResponse, code int) []reflect.Value { + processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { out := make([]reflect.Value, nout) if valOut != -1 { - out[valOut] = reflect.Value(resp.Result).Elem() + out[valOut] = rval.Elem() } if errOut != -1 { out[errOut] = reflect.New(errorType).Elem() @@ -134,67 +207,34 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser Params: params, } - b, err := json.Marshal(&req) - if err != nil { - return processError(err) + rchan := make(chan clientResponse, 1) + requests <- clientRequest{ + req: req, + ready: rchan, } + resp := <- rchan + var rval reflect.Value - // prepare / execute http request - - hreq, err := http.NewRequest("POST", addr, bytes.NewReader(b)) - if err != nil { - return processError(err) - } - if hasCtx == 1 { - hreq = hreq.WithContext(args[0].Interface().(context.Context)) - } - hreq.Header.Set("Content-Type", "application/json") - - httpResp, err := http.DefaultClient.Do(hreq) - if err != nil { - return processError(err) - } - - // process response - - if clientDebug { - rsp, err := ioutil.ReadAll(httpResp.Body) - if err != nil { - return processError(err) - } - if err := httpResp.Body.Close(); err != nil { - return processError(err) - } - - log.Debugw("rpc response", "body", string(rsp)) - - httpResp.Body = ioutil.NopCloser(bytes.NewReader(rsp)) - } - - var resp clientResponse if valOut != -1 { log.Debugw("rpc result", "type", ftyp.Out(valOut)) - resp.Result = result(reflect.New(ftyp.Out(valOut))) - } - - if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil { - return processError(err) - } - - if err := httpResp.Body.Close(); err != nil { - return processError(err) + rval = reflect.New(ftyp.Out(valOut)) + if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil { + return processError(err) + } } if resp.ID != *req.ID { return processError(errors.New("request and response id didn't match")) } - return processResponse(resp, httpResp.StatusCode) + return processResponse(resp, rval) }) val.Elem().Field(i).Set(fn) } - // TODO: if this is still unused as of 2020, remove the closer stuff - return func() {} // noop for now, not for long though + return func() { + close(stop) + <-errs // TODO: return + }, nil }