WS JsonRPC client

This commit is contained in:
Łukasz Magiera 2019-07-12 17:29:41 +02:00
parent ea2039ecb6
commit 0eb208e1d3
5 changed files with 117 additions and 67 deletions

View File

@ -6,8 +6,8 @@ import (
) )
// NewRPC creates a new http jsonrpc client. // NewRPC creates a new http jsonrpc client.
func NewRPC(addr string) api.API { func NewRPC(addr string) (api.API, error) {
var res api.Struct var res api.Struct
jsonrpc.NewClient(addr, "Filecoin", &res.Internal) _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal)
return &res return &res, err
} }

View File

@ -35,7 +35,7 @@ func getAPI(ctx *cli.Context) (api.API, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.NewRPC("http://" + addr + "/rpc/v0"), nil return client.NewRPC("ws://" + addr + "/rpc/v0")
} }
// reqContext returns context for cli execution. Calling it for the first time // reqContext returns context for cli execution. Calling it for the first time

View File

@ -1,16 +1,25 @@
package cli package cli
import ( import (
"fmt"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
) )
var versionCmd = &cli.Command{ var versionCmd = &cli.Command{
Name: "version", Name: "version",
Usage: "Print version", Usage: "Print version",
Action: func(context *cli.Context) error { Action: func(cctx *cli.Context) error {
api, err := getAPI(cctx)
if err != nil {
return err
}
ctx := reqContext(cctx)
// TODO: print more useful things // TODO: print more useful things
cli.VersionPrinter(context) fmt.Println(api.Version(ctx))
cli.VersionPrinter(cctx)
return nil return nil
}, },
} }

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.12
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543
github.com/gorilla/websocket v1.4.0
github.com/ipfs/go-bitswap v0.1.5 github.com/ipfs/go-bitswap v0.1.5
github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.0.2 github.com/ipfs/go-blockservice v0.0.2

View File

@ -1,23 +1,20 @@
package jsonrpc package jsonrpc
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io"
"net/http"
"reflect" "reflect"
"sync/atomic" "sync/atomic"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
) )
var log = logging.Logger("rpc") var log = logging.Logger("rpc")
const clientDebug = true
var ( var (
errorType = reflect.TypeOf(new(error)).Elem() errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem()
@ -37,12 +34,12 @@ func (e *ErrClient) Unwrap(err error) error {
return e.err return e.err
} }
type result reflect.Value type result []byte
func (r *result) UnmarshalJSON(raw []byte) error { func (p *result) UnmarshalJSON(raw []byte) error {
err := json.Unmarshal(raw, reflect.Value(*r).Interface()) *p = make([]byte, len(raw))
log.Debugw("rpc unmarshal response", "raw", string(raw), "err", err) copy(*p, raw)
return err return nil
} }
type clientResponse struct { type clientResponse struct {
@ -52,6 +49,11 @@ type clientResponse struct {
Error *respError `json:"error,omitempty"` Error *respError `json:"error,omitempty"`
} }
type clientRequest struct {
req request
ready chan clientResponse
}
// ClientCloser is used to close Client from further use // ClientCloser is used to close Client from further use
type ClientCloser func() type ClientCloser func()
@ -60,7 +62,7 @@ type ClientCloser func()
// handler must be pointer to a struct with function fields // handler must be pointer to a struct with function fields
// Returned value closes the client connection // Returned value closes the client connection
// TODO: Example // TODO: Example
func NewClient(addr string, namespace string, handler interface{}) ClientCloser { func NewClient(addr string, namespace string, handler interface{}) (ClientCloser, error) {
htyp := reflect.TypeOf(handler) htyp := reflect.TypeOf(handler)
if htyp.Kind() != reflect.Ptr { if htyp.Kind() != reflect.Ptr {
panic("expected handler to be a pointer") panic("expected handler to be a pointer")
@ -74,6 +76,77 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
var idCtr int64 var idCtr int64
conn, _, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
return nil, err
}
stop := make(chan struct{})
errs := make(chan error, 1)
requests := make(chan clientRequest)
responses := make(chan io.Reader)
nextMessage := func() {
mtype, r, err := conn.NextReader()
if err != nil {
r, _ := io.Pipe()
r.CloseWithError(err) // nolint
return
}
if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage {
r, _ := io.Pipe()
r.CloseWithError(errors.New("unsupported message type")) // nolint
return
}
responses <- r
}
go func() {
var err error
defer func() {
close(requests)
cerr := conn.Close()
if err == nil {
err = cerr
}
errs <- cerr
// close requests somehow
}()
inflight := map[int64]clientRequest{}
go nextMessage()
for {
select {
case req := <-requests:
inflight[*req.req.ID] = req
if err = conn.WriteJSON(req.req); err != nil {
return
}
case r := <- responses:
var resp clientResponse
if err = json.NewDecoder(r).Decode(&resp); err != nil {
return
}
req, ok := inflight[resp.ID]
if !ok {
log.Error("client got unknown ID in response")
continue
}
req.ready <- resp
delete(inflight, resp.ID)
go nextMessage()
case <-stop:
return
}
}
}()
for i := 0; i < typ.NumField(); i++ { for i := 0; i < typ.NumField(); i++ {
f := typ.Field(i) f := typ.Field(i)
ftyp := f.Type ftyp := f.Type
@ -83,11 +156,11 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
valOut, errOut, nout := processFuncOut(ftyp) valOut, errOut, nout := processFuncOut(ftyp)
processResponse := func(resp clientResponse, code int) []reflect.Value { processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value {
out := make([]reflect.Value, nout) out := make([]reflect.Value, nout)
if valOut != -1 { if valOut != -1 {
out[valOut] = reflect.Value(resp.Result).Elem() out[valOut] = rval.Elem()
} }
if errOut != -1 { if errOut != -1 {
out[errOut] = reflect.New(errorType).Elem() out[errOut] = reflect.New(errorType).Elem()
@ -134,67 +207,34 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
Params: params, Params: params,
} }
b, err := json.Marshal(&req) rchan := make(chan clientResponse, 1)
if err != nil { requests <- clientRequest{
return processError(err) req: req,
ready: rchan,
} }
resp := <- rchan
var rval reflect.Value
// prepare / execute http request
hreq, err := http.NewRequest("POST", addr, bytes.NewReader(b))
if err != nil {
return processError(err)
}
if hasCtx == 1 {
hreq = hreq.WithContext(args[0].Interface().(context.Context))
}
hreq.Header.Set("Content-Type", "application/json")
httpResp, err := http.DefaultClient.Do(hreq)
if err != nil {
return processError(err)
}
// process response
if clientDebug {
rsp, err := ioutil.ReadAll(httpResp.Body)
if err != nil {
return processError(err)
}
if err := httpResp.Body.Close(); err != nil {
return processError(err)
}
log.Debugw("rpc response", "body", string(rsp))
httpResp.Body = ioutil.NopCloser(bytes.NewReader(rsp))
}
var resp clientResponse
if valOut != -1 { if valOut != -1 {
log.Debugw("rpc result", "type", ftyp.Out(valOut)) log.Debugw("rpc result", "type", ftyp.Out(valOut))
resp.Result = result(reflect.New(ftyp.Out(valOut))) rval = reflect.New(ftyp.Out(valOut))
} if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil {
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return processError(err) return processError(err)
} }
if err := httpResp.Body.Close(); err != nil {
return processError(err)
} }
if resp.ID != *req.ID { if resp.ID != *req.ID {
return processError(errors.New("request and response id didn't match")) return processError(errors.New("request and response id didn't match"))
} }
return processResponse(resp, httpResp.StatusCode) return processResponse(resp, rval)
}) })
val.Elem().Field(i).Set(fn) val.Elem().Field(i).Set(fn)
} }
// TODO: if this is still unused as of 2020, remove the closer stuff return func() {
return func() {} // noop for now, not for long though close(stop)
<-errs // TODO: return
}, nil
} }