lotus/lib/jsonrpc/client.go

205 lines
4.2 KiB
Go
Raw Normal View History

2019-07-03 17:39:07 +00:00
package jsonrpc
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
2019-07-03 14:32:31 +00:00
"sync/atomic"
2019-07-08 19:07:16 +00:00
2019-07-12 15:29:41 +00:00
"github.com/gorilla/websocket"
2019-07-08 19:07:16 +00:00
logging "github.com/ipfs/go-log"
)
2019-07-08 19:07:16 +00:00
var log = logging.Logger("rpc")
var (
errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem()
)
2019-07-02 19:08:30 +00:00
// ErrClient is an error which occurred on the client side the library
type ErrClient struct {
err error
}
func (e *ErrClient) Error() string {
return fmt.Sprintf("RPC client error: %s", e.err)
}
2019-07-02 17:45:03 +00:00
// Unwrap unwraps the actual error
func (e *ErrClient) Unwrap(err error) error {
return e.err
}
2019-07-12 15:29:41 +00:00
type result []byte
2019-07-12 15:29:41 +00:00
func (p *result) UnmarshalJSON(raw []byte) error {
*p = make([]byte, len(raw))
copy(*p, raw)
return nil
}
type clientResponse struct {
2019-07-08 12:46:30 +00:00
Jsonrpc string `json:"jsonrpc"`
Result result `json:"result"`
ID int64 `json:"id"`
Error *respError `json:"error,omitempty"`
}
2019-07-12 15:29:41 +00:00
type clientRequest struct {
req request
ready chan clientResponse
}
// ClientCloser is used to close Client from further use
type ClientCloser func()
2019-07-02 17:45:03 +00:00
// NewClient creates new josnrpc 2.0 client
//
// handler must be pointer to a struct with function fields
2019-07-03 14:32:31 +00:00
// Returned value closes the client connection
2019-07-02 17:45:03 +00:00
// TODO: Example
2019-07-12 15:29:41 +00:00
func NewClient(addr string, namespace string, handler interface{}) (ClientCloser, error) {
htyp := reflect.TypeOf(handler)
if htyp.Kind() != reflect.Ptr {
panic("expected handler to be a pointer")
}
typ := htyp.Elem()
if typ.Kind() != reflect.Struct {
panic("handler should be a struct")
}
val := reflect.ValueOf(handler)
2019-07-03 14:32:31 +00:00
var idCtr int64
2019-07-12 15:29:41 +00:00
conn, _, err := websocket.DefaultDialer.Dial(addr, nil)
if err != nil {
return nil, err
}
stop := make(chan struct{})
requests := make(chan clientRequest)
2019-07-12 17:12:38 +00:00
handlers := map[string]rpcHandler{}
go handleWsConn(context.TODO(), conn, handlers, requests, stop)
2019-07-12 15:29:41 +00:00
for i := 0; i < typ.NumField(); i++ {
f := typ.Field(i)
ftyp := f.Type
if ftyp.Kind() != reflect.Func {
panic("handler field not a func")
}
2019-07-08 12:46:30 +00:00
valOut, errOut, nout := processFuncOut(ftyp)
2019-07-12 15:29:41 +00:00
processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value {
out := make([]reflect.Value, nout)
if valOut != -1 {
2019-07-12 15:29:41 +00:00
out[valOut] = rval.Elem()
}
if errOut != -1 {
out[errOut] = reflect.New(errorType).Elem()
if resp.Error != nil {
2019-07-02 13:49:10 +00:00
out[errOut].Set(reflect.ValueOf(resp.Error))
}
}
return out
}
processError := func(err error) []reflect.Value {
out := make([]reflect.Value, nout)
if valOut != -1 {
out[valOut] = reflect.New(ftyp.Out(valOut)).Elem()
}
if errOut != -1 {
out[errOut] = reflect.New(errorType).Elem()
out[errOut].Set(reflect.ValueOf(&ErrClient{err}))
}
return out
}
hasCtx := 0
if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
hasCtx = 1
}
fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) {
2019-07-03 14:32:31 +00:00
id := atomic.AddInt64(&idCtr, 1)
2019-07-08 12:46:30 +00:00
params := make([]param, len(args)-hasCtx)
for i, arg := range args[hasCtx:] {
2019-07-08 12:46:30 +00:00
params[i] = param{
v: arg,
}
}
2019-07-08 12:46:30 +00:00
req := request{
Jsonrpc: "2.0",
2019-07-02 19:08:30 +00:00
ID: &id,
Method: namespace + "." + f.Name,
Params: params,
}
2019-07-12 15:29:41 +00:00
rchan := make(chan clientResponse, 1)
requests <- clientRequest{
req: req,
ready: rchan,
2019-07-08 19:07:16 +00:00
}
2019-07-15 16:21:48 +00:00
var ctxDone <-chan struct{}
var resp clientResponse
if hasCtx == 1 {
ctxDone = args[0].Interface().(context.Context).Done()
}
loop:
for {
select {
case resp = <-rchan:
break loop
case <-ctxDone: // send cancel request
ctxDone = nil
requests <- clientRequest{
req: request{
Jsonrpc: "2.0",
2019-07-15 16:32:43 +00:00
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(id)}},
2019-07-15 16:21:48 +00:00
},
}
}
}
2019-07-12 15:29:41 +00:00
var rval reflect.Value
2019-07-08 19:07:16 +00:00
if valOut != -1 {
2019-07-12 15:29:41 +00:00
rval = reflect.New(ftyp.Out(valOut))
2019-07-15 16:21:48 +00:00
if resp.Result != nil {
log.Debugw("rpc result", "type", ftyp.Out(valOut))
if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil {
return processError(err)
}
2019-07-12 15:29:41 +00:00
}
2019-07-02 19:08:30 +00:00
}
if resp.ID != *req.ID {
return processError(errors.New("request and response id didn't match"))
}
2019-07-12 15:29:41 +00:00
return processResponse(resp, rval)
})
val.Elem().Field(i).Set(fn)
}
2019-07-03 14:32:31 +00:00
2019-07-12 15:29:41 +00:00
return func() {
close(stop)
}, nil
}