Merge pull request #31 from filecoin-project/feat/wsrpc

JsonRPC over WebSockets
This commit is contained in:
Whyrusleeping 2019-07-16 09:36:32 -07:00 committed by GitHub
commit 35905cb3fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 516 additions and 221 deletions

View File

@ -6,8 +6,8 @@ import (
) )
// NewRPC creates a new http jsonrpc client. // NewRPC creates a new http jsonrpc client.
func NewRPC(addr string) api.API { func NewRPC(addr string) (api.API, error) {
var res api.Struct var res api.Struct
jsonrpc.NewClient(addr, "Filecoin", &res.Internal) _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal)
return &res return &res, err
} }

View File

@ -35,7 +35,7 @@ func getAPI(ctx *cli.Context) (api.API, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client.NewRPC("http://" + addr + "/rpc/v0"), nil return client.NewRPC("ws://" + addr + "/rpc/v0")
} }
// reqContext returns context for cli execution. Calling it for the first time // reqContext returns context for cli execution. Calling it for the first time

View File

@ -1,16 +1,25 @@
package cli package cli
import ( import (
"fmt"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
) )
var versionCmd = &cli.Command{ var versionCmd = &cli.Command{
Name: "version", Name: "version",
Usage: "Print version", Usage: "Print version",
Action: func(context *cli.Context) error { Action: func(cctx *cli.Context) error {
api, err := getAPI(cctx)
if err != nil {
return err
}
ctx := reqContext(cctx)
// TODO: print more useful things // TODO: print more useful things
cli.VersionPrinter(context) fmt.Println(api.Version(ctx))
cli.VersionPrinter(cctx)
return nil return nil
}, },
} }

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.12
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543
github.com/gorilla/websocket v1.4.0
github.com/ipfs/go-bitswap v0.1.5 github.com/ipfs/go-bitswap v0.1.5
github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.0 github.com/ipfs/go-blockservice v0.1.0

View File

@ -1,23 +1,19 @@
package jsonrpc package jsonrpc
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"net/http"
"reflect" "reflect"
"sync/atomic" "sync/atomic"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
) )
var log = logging.Logger("rpc") var log = logging.Logger("rpc")
const clientDebug = true
var ( var (
errorType = reflect.TypeOf(new(error)).Elem() errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem()
@ -37,12 +33,12 @@ func (e *ErrClient) Unwrap(err error) error {
return e.err return e.err
} }
type result reflect.Value type result []byte
func (r *result) UnmarshalJSON(raw []byte) error { func (p *result) UnmarshalJSON(raw []byte) error {
err := json.Unmarshal(raw, reflect.Value(*r).Interface()) *p = make([]byte, len(raw))
log.Debugw("rpc unmarshal response", "raw", string(raw), "err", err) copy(*p, raw)
return err return nil
} }
type clientResponse struct { type clientResponse struct {
@ -52,6 +48,11 @@ type clientResponse struct {
Error *respError `json:"error,omitempty"` Error *respError `json:"error,omitempty"`
} }
type clientRequest struct {
req request
ready chan clientResponse
}
// ClientCloser is used to close Client from further use // ClientCloser is used to close Client from further use
type ClientCloser func() type ClientCloser func()
@ -60,7 +61,7 @@ type ClientCloser func()
// handler must be pointer to a struct with function fields // handler must be pointer to a struct with function fields
// Returned value closes the client connection // Returned value closes the client connection
// TODO: Example // TODO: Example
func NewClient(addr string, namespace string, handler interface{}) ClientCloser { func NewClient(addr string, namespace string, handler interface{}) (ClientCloser, error) {
htyp := reflect.TypeOf(handler) htyp := reflect.TypeOf(handler)
if htyp.Kind() != reflect.Ptr { if htyp.Kind() != reflect.Ptr {
panic("expected handler to be a pointer") panic("expected handler to be a pointer")
@ -74,6 +75,17 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
var idCtr int64 var idCtr int64
conn, _, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
return nil, err
}
stop := make(chan struct{})
requests := make(chan clientRequest)
handlers := map[string]rpcHandler{}
go handleWsConn(context.TODO(), conn, handlers, requests, stop)
for i := 0; i < typ.NumField(); i++ { for i := 0; i < typ.NumField(); i++ {
f := typ.Field(i) f := typ.Field(i)
ftyp := f.Type ftyp := f.Type
@ -83,11 +95,11 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
valOut, errOut, nout := processFuncOut(ftyp) valOut, errOut, nout := processFuncOut(ftyp)
processResponse := func(resp clientResponse, code int) []reflect.Value { processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value {
out := make([]reflect.Value, nout) out := make([]reflect.Value, nout)
if valOut != -1 { if valOut != -1 {
out[valOut] = reflect.Value(resp.Result).Elem() out[valOut] = rval.Elem()
} }
if errOut != -1 { if errOut != -1 {
out[errOut] = reflect.New(errorType).Elem() out[errOut] = reflect.New(errorType).Elem()
@ -134,67 +146,59 @@ func NewClient(addr string, namespace string, handler interface{}) ClientCloser
Params: params, Params: params,
} }
b, err := json.Marshal(&req) rchan := make(chan clientResponse, 1)
if err != nil { requests <- clientRequest{
return processError(err) req: req,
ready: rchan,
} }
var ctxDone <-chan struct{}
// prepare / execute http request
hreq, err := http.NewRequest("POST", addr, bytes.NewReader(b))
if err != nil {
return processError(err)
}
if hasCtx == 1 {
hreq = hreq.WithContext(args[0].Interface().(context.Context))
}
hreq.Header.Set("Content-Type", "application/json")
httpResp, err := http.DefaultClient.Do(hreq)
if err != nil {
return processError(err)
}
// process response
if clientDebug {
rsp, err := ioutil.ReadAll(httpResp.Body)
if err != nil {
return processError(err)
}
if err := httpResp.Body.Close(); err != nil {
return processError(err)
}
log.Debugw("rpc response", "body", string(rsp))
httpResp.Body = ioutil.NopCloser(bytes.NewReader(rsp))
}
var resp clientResponse var resp clientResponse
if hasCtx == 1 {
ctxDone = args[0].Interface().(context.Context).Done()
}
loop:
for {
select {
case resp = <-rchan:
break loop
case <-ctxDone: // send cancel request
ctxDone = nil
requests <- clientRequest{
req: request{
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(id)}},
},
}
}
}
var rval reflect.Value
if valOut != -1 { if valOut != -1 {
rval = reflect.New(ftyp.Out(valOut))
if resp.Result != nil {
log.Debugw("rpc result", "type", ftyp.Out(valOut)) log.Debugw("rpc result", "type", ftyp.Out(valOut))
resp.Result = result(reflect.New(ftyp.Out(valOut))) if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil {
}
if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return processError(err) return processError(err)
} }
}
if err := httpResp.Body.Close(); err != nil {
return processError(err)
} }
if resp.ID != *req.ID { if resp.ID != *req.ID {
return processError(errors.New("request and response id didn't match")) return processError(errors.New("request and response id didn't match"))
} }
return processResponse(resp, httpResp.StatusCode) return processResponse(resp, rval)
}) })
val.Elem().Field(i).Set(fn) val.Elem().Field(i).Set(fn)
} }
// TODO: if this is still unused as of 2020, remove the closer stuff return func() {
return func() {} // noop for now, not for long though close(stop)
}, nil
} }

View File

@ -2,18 +2,13 @@ package jsonrpc
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "io"
"reflect" "reflect"
) )
const (
rpcParseError = -32700
rpcMethodNotFound = -32601
rpcInvalidParams = -32602
)
type rpcHandler struct { type rpcHandler struct {
paramReceivers []reflect.Type paramReceivers []reflect.Type
nParams int nParams int
@ -27,33 +22,9 @@ type rpcHandler struct {
valOut int valOut int
} }
// RPCServer provides a jsonrpc 2.0 http server handler type handlers map[string]rpcHandler
type RPCServer struct {
methods map[string]rpcHandler
}
// NewServer creates new RPCServer instance // Request / response
func NewServer() *RPCServer {
return &RPCServer{
methods: map[string]rpcHandler{},
}
}
type param struct {
data []byte // from unmarshal
v reflect.Value // to marshal
}
func (p *param) UnmarshalJSON(raw []byte) error {
p.data = make([]byte, len(raw))
copy(p.data, raw)
return nil
}
func (p *param) MarshalJSON() ([]byte, error) {
return json.Marshal(p.v.Interface())
}
type request struct { type request struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
@ -81,92 +52,9 @@ type response struct {
Error *respError `json:"error,omitempty"` Error *respError `json:"error,omitempty"`
} }
// TODO: return errors to clients per spec // Register
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.rpcError(w, &req, rpcParseError, err)
return
}
handler, ok := s.methods[req.Method] func (h handlers) register(namespace string, r interface{}) {
if !ok {
s.rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
return
}
if len(req.Params) != handler.nParams {
s.rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
return
}
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
callParams[1] = reflect.ValueOf(r.Context())
}
for i := 0; i < handler.nParams; i++ {
rp := reflect.New(handler.paramReceivers[i])
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
s.rpcError(w, &req, rpcParseError, err)
return
}
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
}
callResult := handler.handlerFunc.Call(callParams)
if req.ID == nil {
return // notification
}
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
}
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
}
}
}
if handler.valOut != -1 {
resp.Result = callResult[handler.valOut].Interface()
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
fmt.Println(err)
return
}
}
func (s *RPCServer) rpcError(w http.ResponseWriter, req *request, code int, err error) {
w.WriteHeader(500)
if req.ID == nil { // notification
return
}
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
},
}
_ = json.NewEncoder(w).Encode(resp)
}
// Register registers new RPC handler
//
// Handler is any value with methods defined
func (s *RPCServer) Register(namespace string, r interface{}) {
val := reflect.ValueOf(r) val := reflect.ValueOf(r)
//TODO: expect ptr //TODO: expect ptr
@ -187,9 +75,7 @@ func (s *RPCServer) Register(namespace string, r interface{}) {
valOut, errOut, _ := processFuncOut(funcType) valOut, errOut, _ := processFuncOut(funcType)
fmt.Println(namespace + "." + method.Name) h[namespace+"."+method.Name] = rpcHandler{
s.methods[namespace+"."+method.Name] = rpcHandler{
paramReceivers: recvs, paramReceivers: recvs,
nParams: ins, nParams: ins,
@ -204,30 +90,85 @@ func (s *RPCServer) Register(namespace string, r interface{}) {
} }
} }
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) { // Handle
errOut = -1
valOut = -1
n = funcType.NumOut()
switch n { type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
case 0:
case 1: func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
if funcType.Out(0) == errorType { wf := func(cb func(io.Writer)) {
errOut = 0 cb(w)
} else {
valOut = 0
}
case 2:
valOut = 0
errOut = 1
if funcType.Out(1) != errorType {
panic("expected error as second return value")
}
default:
panic("too many error values")
} }
var req request
if err := json.NewDecoder(r).Decode(&req); err != nil {
rpcError(wf, &req, rpcParseError, err)
return return
} }
var _ error = &respError{} h.handle(ctx, req, wf, rpcError, func() {})
}
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func()) {
defer done()
handler, ok := h[req.Method]
if !ok {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
return
}
if len(req.Params) != handler.nParams {
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
return
}
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
callParams[1] = reflect.ValueOf(ctx)
}
for i := 0; i < handler.nParams; i++ {
rp := reflect.New(handler.paramReceivers[i])
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
rpcError(w, &req, rpcParseError, err)
return
}
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
}
///////////////////
callResult := handler.handlerFunc.Call(callParams)
if req.ID == nil {
return // notification
}
///////////////////
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
}
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
}
}
}
if handler.valOut != -1 {
resp.Result = callResult[handler.valOut].Interface()
}
w(func(w io.Writer) {
if err := json.NewEncoder(w).Encode(resp); err != nil {
fmt.Println(err)
return
}
})
}

View File

@ -62,7 +62,6 @@ func TestRPC(t *testing.T) {
// httptest stuff // httptest stuff
testServ := httptest.NewServer(rpcServer) testServ := httptest.NewServer(rpcServer)
defer testServ.Close() defer testServ.Close()
// setup client // setup client
var client struct { var client struct {
@ -70,7 +69,10 @@ func TestRPC(t *testing.T) {
AddGet func(int) int AddGet func(int) int
StringMatch func(t TestType, i2 int64) (out TestOut, err error) StringMatch func(t TestType, i2 int64) (out TestOut, err error)
} }
closer := NewClient(testServ.URL, "SimpleServerHandler", &client) closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client)
if err != nil {
t.Fatal(err)
}
defer closer() defer closer()
// Add(int) error // Add(int) error
@ -83,7 +85,7 @@ func TestRPC(t *testing.T) {
t.Error("expected 2") t.Error("expected 2")
} }
err := client.Add(-3546) err = client.Add(-3546)
if err == nil { if err == nil {
t.Fatal("expected error") t.Fatal("expected error")
} }
@ -130,7 +132,10 @@ func TestRPC(t *testing.T) {
var noret struct { var noret struct {
Add func(int) Add func(int)
} }
closer = NewClient(testServ.URL, "SimpleServerHandler", &noret) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret)
if err != nil {
t.Fatal(err)
}
// this one should actually work // this one should actually work
noret.Add(4) noret.Add(4)
@ -142,7 +147,10 @@ func TestRPC(t *testing.T) {
var noparam struct { var noparam struct {
Add func() Add func()
} }
closer = NewClient(testServ.URL, "SimpleServerHandler", &noparam) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam)
if err != nil {
t.Fatal(err)
}
// shouldn't panic // shouldn't panic
noparam.Add() noparam.Add()
@ -151,7 +159,10 @@ func TestRPC(t *testing.T) {
var erronly struct { var erronly struct {
AddGet func() (int, error) AddGet func() (int, error)
} }
closer = NewClient(testServ.URL, "SimpleServerHandler", &erronly) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly)
if err != nil {
t.Fatal(err)
}
_, err = erronly.AddGet() _, err = erronly.AddGet()
if err == nil || err.Error() != "RPC error (-32602): wrong param count" { if err == nil || err.Error() != "RPC error (-32602): wrong param count" {
@ -162,7 +173,10 @@ func TestRPC(t *testing.T) {
var wrongtype struct { var wrongtype struct {
Add func(string) error Add func(string) error
} }
closer = NewClient(testServ.URL, "SimpleServerHandler", &wrongtype) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype)
if err != nil {
t.Fatal(err)
}
err = wrongtype.Add("not an int") err = wrongtype.Add("not an int")
if err == nil || err.Error() != "RPC error (-32700): json: cannot unmarshal string into Go value of type int" { if err == nil || err.Error() != "RPC error (-32700): json: cannot unmarshal string into Go value of type int" {
@ -173,7 +187,10 @@ func TestRPC(t *testing.T) {
var notfound struct { var notfound struct {
NotThere func(string) error NotThere func(string) error
} }
closer = NewClient(testServ.URL, "SimpleServerHandler", &notfound) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &notfound)
if err != nil {
t.Fatal(err)
}
err = notfound.NotThere("hello?") err = notfound.NotThere("hello?")
if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" { if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" {
@ -219,7 +236,10 @@ func TestCtx(t *testing.T) {
var client struct { var client struct {
Test func(ctx context.Context) Test func(ctx context.Context)
} }
closer := NewClient(testServ.URL, "CtxHandler", &client) closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() defer cancel()
@ -239,7 +259,10 @@ func TestCtx(t *testing.T) {
var noCtxClient struct { var noCtxClient struct {
Test func() Test func()
} }
closer = NewClient(testServ.URL, "CtxHandler", &noCtxClient) closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient)
if err != nil {
t.Fatal(err)
}
noCtxClient.Test() noCtxClient.Test()

87
lib/jsonrpc/server.go Normal file
View File

@ -0,0 +1,87 @@
package jsonrpc
import (
"encoding/json"
"io"
"net/http"
"github.com/gorilla/websocket"
)
const (
rpcParseError = -32700
rpcMethodNotFound = -32601
rpcInvalidParams = -32602
)
// RPCServer provides a jsonrpc 2.0 http server handler
type RPCServer struct {
methods handlers
}
// NewServer creates new RPCServer instance
func NewServer() *RPCServer {
return &RPCServer{
methods: map[string]rpcHandler{},
}
}
var upgrader = websocket.Upgrader{}
func (s *RPCServer) handleWS(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
handleWsConn(r.Context(), c, s.methods, nil, nil)
if err := c.Close(); err != nil {
log.Error(err)
return
}
}
// TODO: return errors to clients per spec
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Connection") == "Upgrade" {
s.handleWS(w, r)
return
}
s.methods.handleReader(r.Context(), r.Body, w, rpcError)
}
func rpcError(wf func(func(io.Writer)), req *request, code int, err error) {
wf(func(w io.Writer) {
if hw, ok := w.(http.ResponseWriter); ok {
hw.WriteHeader(500)
}
if req.ID == nil { // notification
return
}
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
},
}
_ = json.NewEncoder(w).Encode(resp)
})
}
// Register registers new RPC handler
//
// Handler is any value with methods defined
func (s *RPCServer) Register(namespace string, handler interface{}) {
s.methods.register(namespace, handler)
}
var _ error = &respError{}

49
lib/jsonrpc/util.go Normal file
View File

@ -0,0 +1,49 @@
package jsonrpc
import (
"encoding/json"
"reflect"
)
type param struct {
data []byte // from unmarshal
v reflect.Value // to marshal
}
func (p *param) UnmarshalJSON(raw []byte) error {
p.data = make([]byte, len(raw))
copy(p.data, raw)
return nil
}
func (p *param) MarshalJSON() ([]byte, error) {
return json.Marshal(p.v.Interface())
}
// processFuncOut finds value and error Outs in function
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
errOut = -1 // -1 if not found
valOut = -1
n = funcType.NumOut()
switch n {
case 0:
case 1:
if funcType.Out(0) == errorType {
errOut = 0
} else {
valOut = 0
}
case 2:
valOut = 0
errOut = 1
if funcType.Out(1) != errorType {
panic("expected error as second return value")
}
default:
panic("too many error values")
}
return
}

176
lib/jsonrpc/websocket.go Normal file
View File

@ -0,0 +1,176 @@
package jsonrpc
import (
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
"sync"
"github.com/gorilla/websocket"
)
const wsCancel = "xrpc.cancel"
type frame struct {
// common
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
// request
Method string `json:"method,omitempty"`
Params []param `json:"params,omitempty"`
// response
Result result `json:"result,omitempty"`
Error *respError `json:"error,omitempty"`
}
func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, requests <-chan clientRequest, stop <-chan struct{}) {
incoming := make(chan io.Reader)
var incErr error
nextMessage := func() {
mtype, r, err := conn.NextReader()
if err != nil {
incErr = err
close(incoming)
return
}
if mtype != websocket.BinaryMessage && mtype != websocket.TextMessage {
incErr = errors.New("unsupported message type")
close(incoming)
return
}
incoming <- r
}
var writeLk sync.Mutex
nextWriter := func(cb func(io.Writer)) {
writeLk.Lock()
defer writeLk.Unlock()
wcl, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Error("handle me:", err)
return
}
cb(wcl)
if err := wcl.Close(); err != nil {
log.Error("handle me:", err)
return
}
}
go nextMessage()
inflight := map[int64]clientRequest{}
handling := map[int64]context.CancelFunc{}
var handlingLk sync.Mutex
cancelCtx := func(req frame) {
if req.ID != nil {
log.Warnf("%s call with ID set, won't respond", wsCancel)
}
var id int64
if err := json.Unmarshal(req.Params[0].data, &id); err != nil {
log.Error("handle me:", err)
return
}
handlingLk.Lock()
defer handlingLk.Unlock()
cf, ok := handling[id]
if ok {
cf()
}
}
for {
select {
case r, ok := <-incoming:
if !ok {
if incErr != nil {
log.Debugf("websocket error", "error", incErr)
}
return // remote closed
}
var frame frame
if err := json.NewDecoder(r).Decode(&frame); err != nil {
log.Error("handle me:", err)
return
}
switch frame.Method {
case "": // Response to our call
req, ok := inflight[*frame.ID]
if !ok {
log.Error("client got unknown ID in response")
continue
}
req.ready <- clientResponse{
Jsonrpc: frame.Jsonrpc,
Result: frame.Result,
ID: *frame.ID,
Error: frame.Error,
}
delete(inflight, *frame.ID)
case wsCancel:
cancelCtx(frame)
default: // Remote call
req := request{
Jsonrpc: frame.Jsonrpc,
ID: frame.ID,
Method: frame.Method,
Params: frame.Params,
}
ctx, cf := context.WithCancel(ctx)
nw := func(cb func(io.Writer)) {
cb(ioutil.Discard)
}
done := cf
if frame.ID != nil {
nw = nextWriter
handlingLk.Lock()
handling[*frame.ID] = cf
handlingLk.Unlock()
done = func() {
handlingLk.Lock()
defer handlingLk.Unlock()
cf()
delete(handling, *frame.ID)
}
}
go handler.handle(ctx, req, nw, rpcError, done)
}
go nextMessage()
case req := <-requests:
if req.req.ID != nil {
inflight[*req.req.ID] = req
}
if err := conn.WriteJSON(req.req); err != nil {
log.Error("handle me:", err)
return
}
case <-stop:
if err := conn.Close(); err != nil {
log.Debugf("websocket close error", "error", err)
}
return
}
}
}

View File

@ -53,7 +53,12 @@ func rpcBuilder(t *testing.T, n int) []api.API {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", a) rpcServer.Register("Filecoin", a)
testServ := httptest.NewServer(rpcServer) // todo: close testServ := httptest.NewServer(rpcServer) // todo: close
out[i] = client.NewRPC(testServ.URL)
var err error
out[i], err = client.NewRPC("ws://" + testServ.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
} }
return out return out
} }