extract lib/jsonrpc to go-jsonrpc
This commit is contained in:
parent
75e47db520
commit
eeca031525
@ -1,11 +1,12 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
"net/http"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
)
|
||||
|
||||
// NewCommonRPC creates a new http jsonrpc client.
|
||||
|
@ -16,9 +16,10 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
|
@ -8,14 +8,16 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
)
|
||||
|
||||
type CidWindow [][]cid.Cid
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||
"github.com/filecoin-project/sector-storage/ffiwrapper"
|
||||
|
||||
@ -23,7 +24,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/auth"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/lib/lotuslog"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/sector-storage"
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
mux "github.com/gorilla/mux"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
@ -19,7 +20,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/auth"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/lib/ulimit"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
|
@ -9,14 +9,6 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/lib/auth"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
@ -24,6 +16,14 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"contrib.go.opencensus.io/exporter/prometheus"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
"github.com/filecoin-project/lotus/lib/auth"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
)
|
||||
|
||||
var log = logging.Logger("main")
|
||||
|
3
go.mod
3
go.mod
@ -23,6 +23,7 @@ require (
|
||||
github.com/filecoin-project/go-data-transfer v0.3.0
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
|
||||
github.com/filecoin-project/go-fil-markets v0.2.3
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.0
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200505180321-973f8949ea8e
|
||||
github.com/filecoin-project/go-statestore v0.1.0
|
||||
@ -59,7 +60,7 @@ require (
|
||||
github.com/ipfs/go-ipld-cbor v0.0.5-0.20200428170625-a0bd04d3cbdf
|
||||
github.com/ipfs/go-ipld-format v0.2.0
|
||||
github.com/ipfs/go-log v1.0.4
|
||||
github.com/ipfs/go-log/v2 v2.0.5
|
||||
github.com/ipfs/go-log/v2 v2.0.8
|
||||
github.com/ipfs/go-merkledag v0.3.1
|
||||
github.com/ipfs/go-path v0.0.7
|
||||
github.com/ipfs/go-unixfs v0.2.4
|
||||
|
4
go.sum
4
go.sum
@ -164,6 +164,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go
|
||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
|
||||
github.com/filecoin-project/go-fil-markets v0.2.3 h1:00exBcwysQVEx7wvzcdVz9ZT3HLMXKmbQNIz9ktyeO8=
|
||||
github.com/filecoin-project/go-fil-markets v0.2.3/go.mod h1:LI3VFHse33aU0djAmFQ8+Hg39i0J8ibAoppGu6TbgkA=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.0 h1:NBHruefnWWfbizxFMnStXlXKEAxEno3DrM0iLd8SuCM=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.0/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||
@ -434,6 +436,8 @@ github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW
|
||||
github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
|
||||
github.com/ipfs/go-log/v2 v2.0.5 h1:fL4YI+1g5V/b1Yxr1qAiXTMg1H8z9vx/VmJxBuQMHvU=
|
||||
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
|
||||
github.com/ipfs/go-log/v2 v2.0.8 h1:3b3YNopMHlj4AvyhWAx0pDxqSQWYi4/WuWO7yRV6/Qg=
|
||||
github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
|
||||
github.com/ipfs/go-merkledag v0.0.3/go.mod h1:Oc5kIXLHokkE1hWGMBHw+oxehkAaTOqtEb7Zbh6BhLA=
|
||||
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
|
||||
github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
|
||||
|
@ -1,435 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"go.opencensus.io/trace"
|
||||
"go.opencensus.io/trace/propagation"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
const (
|
||||
methodRetryFrequency = time.Second * 3
|
||||
)
|
||||
|
||||
var (
|
||||
errorType = reflect.TypeOf(new(error)).Elem()
|
||||
contextType = reflect.TypeOf(new(context.Context)).Elem()
|
||||
|
||||
log = logging.Logger("rpc")
|
||||
)
|
||||
|
||||
// ErrClient is an error which occurred on the client side the library
|
||||
type ErrClient struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *ErrClient) Error() string {
|
||||
return fmt.Sprintf("RPC client error: %s", e.err)
|
||||
}
|
||||
|
||||
// Unwrap unwraps the actual error
|
||||
func (e *ErrClient) Unwrap(err error) error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
type clientResponse struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Result json.RawMessage `json:"result"`
|
||||
ID int64 `json:"id"`
|
||||
Error *respError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type makeChanSink func() (context.Context, func([]byte, bool))
|
||||
|
||||
type clientRequest struct {
|
||||
req request
|
||||
ready chan clientResponse
|
||||
|
||||
// retCh provides a context and sink for handling incoming channel messages
|
||||
retCh makeChanSink
|
||||
}
|
||||
|
||||
// ClientCloser is used to close Client from further use
|
||||
type ClientCloser func()
|
||||
|
||||
// NewClient creates new jsonrpc 2.0 client
|
||||
//
|
||||
// handler must be pointer to a struct with function fields
|
||||
// Returned value closes the client connection
|
||||
// TODO: Example
|
||||
func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) {
|
||||
return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
namespace string
|
||||
|
||||
requests chan clientRequest
|
||||
exiting <-chan struct{}
|
||||
idCtr int64
|
||||
}
|
||||
|
||||
// NewMergeClient is like NewClient, but allows to specify multiple structs
|
||||
// to be filled in the same namespace, using one connection
|
||||
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
connFactory := func() (*websocket.Conn, error) {
|
||||
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
|
||||
return conn, err
|
||||
}
|
||||
|
||||
if config.proxyConnFactory != nil {
|
||||
connFactory = config.proxyConnFactory(connFactory)
|
||||
}
|
||||
|
||||
conn, err := connFactory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := client{
|
||||
namespace: namespace,
|
||||
}
|
||||
|
||||
stop := make(chan struct{})
|
||||
exiting := make(chan struct{})
|
||||
c.requests = make(chan clientRequest)
|
||||
c.exiting = exiting
|
||||
|
||||
handlers := map[string]rpcHandler{}
|
||||
go (&wsConn{
|
||||
conn: conn,
|
||||
connFactory: connFactory,
|
||||
reconnectInterval: config.ReconnectInterval,
|
||||
handler: handlers,
|
||||
requests: c.requests,
|
||||
stop: stop,
|
||||
exiting: exiting,
|
||||
}).handleWsConn(context.TODO())
|
||||
|
||||
for _, handler := range outs {
|
||||
htyp := reflect.TypeOf(handler)
|
||||
if htyp.Kind() != reflect.Ptr {
|
||||
return nil, xerrors.New("expected handler to be a pointer")
|
||||
}
|
||||
typ := htyp.Elem()
|
||||
if typ.Kind() != reflect.Struct {
|
||||
return nil, xerrors.New("handler should be a struct")
|
||||
}
|
||||
|
||||
val := reflect.ValueOf(handler)
|
||||
|
||||
for i := 0; i < typ.NumField(); i++ {
|
||||
fn, err := c.makeRpcFunc(typ.Field(i))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
val.Elem().Field(i).Set(fn)
|
||||
}
|
||||
}
|
||||
|
||||
return func() {
|
||||
close(stop)
|
||||
<-exiting
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) {
|
||||
retVal := reflect.Zero(ftyp.Out(valOut))
|
||||
|
||||
chCtor := func() (context.Context, func([]byte, bool)) {
|
||||
// unpack chan type to make sure it's reflect.BothDir
|
||||
ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem())
|
||||
ch := reflect.MakeChan(ctyp, 0) // todo: buffer?
|
||||
chCtx, chCancel := context.WithCancel(ctx)
|
||||
retVal = ch.Convert(ftyp.Out(valOut))
|
||||
|
||||
incoming := make(chan reflect.Value, 32)
|
||||
|
||||
// gorotuine to handle buffering of items
|
||||
go func() {
|
||||
buf := (&list.List{}).Init()
|
||||
|
||||
for {
|
||||
front := buf.Front()
|
||||
|
||||
cases := []reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(chCtx.Done()),
|
||||
},
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(incoming),
|
||||
},
|
||||
}
|
||||
|
||||
if front != nil {
|
||||
cases = append(cases, reflect.SelectCase{
|
||||
Dir: reflect.SelectSend,
|
||||
Chan: ch,
|
||||
Send: front.Value.(reflect.Value).Elem(),
|
||||
})
|
||||
}
|
||||
|
||||
chosen, val, _ := reflect.Select(cases)
|
||||
|
||||
switch chosen {
|
||||
case 0:
|
||||
ch.Close()
|
||||
return
|
||||
case 1:
|
||||
vvval := val.Interface().(reflect.Value)
|
||||
buf.PushBack(vvval)
|
||||
if buf.Len() > 1 {
|
||||
if buf.Len() > 10 {
|
||||
log.Warnw("rpc output message buffer", "n", buf.Len())
|
||||
} else {
|
||||
log.Infow("rpc output message buffer", "n", buf.Len())
|
||||
}
|
||||
}
|
||||
|
||||
case 2:
|
||||
buf.Remove(front)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ctx, func(result []byte, ok bool) {
|
||||
if !ok {
|
||||
chCancel()
|
||||
return
|
||||
}
|
||||
|
||||
val := reflect.New(ftyp.Out(valOut).Elem())
|
||||
if err := json.Unmarshal(result, val.Interface()); err != nil {
|
||||
log.Errorf("error unmarshaling chan response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
log.Errorf("got rpc message with cancelled context: %s", ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case incoming <- val:
|
||||
case <-chCtx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return func() reflect.Value { return retVal }, chCtor
|
||||
}
|
||||
|
||||
func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) {
|
||||
rchan := make(chan clientResponse, 1)
|
||||
creq := clientRequest{
|
||||
req: req,
|
||||
ready: rchan,
|
||||
|
||||
retCh: chCtor,
|
||||
}
|
||||
select {
|
||||
case c.requests <- creq:
|
||||
case <-c.exiting:
|
||||
return clientResponse{}, fmt.Errorf("websocket routine exiting")
|
||||
}
|
||||
|
||||
var ctxDone <-chan struct{}
|
||||
var resp clientResponse
|
||||
|
||||
if ctx != nil {
|
||||
ctxDone = ctx.Done()
|
||||
}
|
||||
|
||||
// wait for response, handle context cancellation
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case resp = <-rchan:
|
||||
break loop
|
||||
case <-ctxDone: // send cancel request
|
||||
ctxDone = nil
|
||||
|
||||
cancelReq := clientRequest{
|
||||
req: request{
|
||||
Jsonrpc: "2.0",
|
||||
Method: wsCancel,
|
||||
Params: []param{{v: reflect.ValueOf(*req.ID)}},
|
||||
},
|
||||
}
|
||||
select {
|
||||
case c.requests <- cancelReq:
|
||||
case <-c.exiting:
|
||||
log.Warn("failed to send request cancellation, websocket routing exited")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type rpcFunc struct {
|
||||
client *client
|
||||
|
||||
ftyp reflect.Type
|
||||
name string
|
||||
|
||||
nout int
|
||||
valOut int
|
||||
errOut int
|
||||
|
||||
hasCtx int
|
||||
returnValueIsChannel bool
|
||||
|
||||
retry bool
|
||||
}
|
||||
|
||||
func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value {
|
||||
out := make([]reflect.Value, fn.nout)
|
||||
|
||||
if fn.valOut != -1 {
|
||||
out[fn.valOut] = rval
|
||||
}
|
||||
if fn.errOut != -1 {
|
||||
out[fn.errOut] = reflect.New(errorType).Elem()
|
||||
if resp.Error != nil {
|
||||
out[fn.errOut].Set(reflect.ValueOf(resp.Error))
|
||||
}
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (fn *rpcFunc) processError(err error) []reflect.Value {
|
||||
out := make([]reflect.Value, fn.nout)
|
||||
|
||||
if fn.valOut != -1 {
|
||||
out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem()
|
||||
}
|
||||
if fn.errOut != -1 {
|
||||
out[fn.errOut] = reflect.New(errorType).Elem()
|
||||
out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err}))
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) {
|
||||
id := atomic.AddInt64(&fn.client.idCtr, 1)
|
||||
params := make([]param, len(args)-fn.hasCtx)
|
||||
for i, arg := range args[fn.hasCtx:] {
|
||||
params[i] = param{
|
||||
v: arg,
|
||||
}
|
||||
}
|
||||
|
||||
var ctx context.Context
|
||||
var span *trace.Span
|
||||
if fn.hasCtx == 1 {
|
||||
ctx = args[0].Interface().(context.Context)
|
||||
ctx, span = trace.StartSpan(ctx, "api.call")
|
||||
defer span.End()
|
||||
}
|
||||
|
||||
retVal := func() reflect.Value { return reflect.Value{} }
|
||||
|
||||
// if the function returns a channel, we need to provide a sink for the
|
||||
// messages
|
||||
var chCtor makeChanSink
|
||||
if fn.returnValueIsChannel {
|
||||
retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut)
|
||||
}
|
||||
|
||||
req := request{
|
||||
Jsonrpc: "2.0",
|
||||
ID: &id,
|
||||
Method: fn.client.namespace + "." + fn.name,
|
||||
Params: params,
|
||||
}
|
||||
|
||||
if span != nil {
|
||||
span.AddAttributes(trace.StringAttribute("method", req.Method))
|
||||
|
||||
eSC := base64.StdEncoding.EncodeToString(
|
||||
propagation.Binary(span.SpanContext()))
|
||||
req.Meta = map[string]string{
|
||||
"SpanContext": eSC,
|
||||
}
|
||||
}
|
||||
|
||||
var resp clientResponse
|
||||
var err error
|
||||
// keep retrying if got a forced closed websocket conn and calling method
|
||||
// has retry annotation
|
||||
for {
|
||||
resp, err = fn.client.sendRequest(ctx, req, chCtor)
|
||||
if err != nil {
|
||||
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
|
||||
}
|
||||
|
||||
if resp.ID != *req.ID {
|
||||
return fn.processError(xerrors.New("request and response id didn't match"))
|
||||
}
|
||||
|
||||
if fn.valOut != -1 && !fn.returnValueIsChannel {
|
||||
val := reflect.New(fn.ftyp.Out(fn.valOut))
|
||||
|
||||
if resp.Result != nil {
|
||||
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
|
||||
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
|
||||
log.Warnw("unmarshaling failed", "message", string(resp.Result))
|
||||
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
retVal = func() reflect.Value { return val.Elem() }
|
||||
}
|
||||
retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry
|
||||
if !retry {
|
||||
break
|
||||
}
|
||||
time.Sleep(methodRetryFrequency)
|
||||
}
|
||||
|
||||
return fn.processResponse(resp, retVal())
|
||||
}
|
||||
|
||||
func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
|
||||
ftyp := f.Type
|
||||
if ftyp.Kind() != reflect.Func {
|
||||
return reflect.Value{}, xerrors.New("handler field not a func")
|
||||
}
|
||||
|
||||
fun := &rpcFunc{
|
||||
client: c,
|
||||
ftyp: ftyp,
|
||||
name: f.Name,
|
||||
retry: f.Tag.Get("retry") == "true",
|
||||
}
|
||||
fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp)
|
||||
|
||||
if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
|
||||
fun.hasCtx = 1
|
||||
}
|
||||
fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan
|
||||
|
||||
return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil
|
||||
}
|
@ -1,275 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
"go.opencensus.io/trace"
|
||||
"go.opencensus.io/trace/propagation"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type rpcHandler struct {
|
||||
paramReceivers []reflect.Type
|
||||
nParams int
|
||||
|
||||
receiver reflect.Value
|
||||
handlerFunc reflect.Value
|
||||
|
||||
hasCtx int
|
||||
|
||||
errOut int
|
||||
valOut int
|
||||
}
|
||||
|
||||
type handlers map[string]rpcHandler
|
||||
|
||||
// Request / response
|
||||
|
||||
type request struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
ID *int64 `json:"id,omitempty"`
|
||||
Method string `json:"method"`
|
||||
Params []param `json:"params"`
|
||||
Meta map[string]string `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
type respError struct {
|
||||
Code int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func (e *respError) Error() string {
|
||||
if e.Code >= -32768 && e.Code <= -32000 {
|
||||
return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message)
|
||||
}
|
||||
return e.Message
|
||||
}
|
||||
|
||||
type response struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Result interface{} `json:"result,omitempty"`
|
||||
ID int64 `json:"id"`
|
||||
Error *respError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Register
|
||||
|
||||
func (h handlers) register(namespace string, r interface{}) {
|
||||
val := reflect.ValueOf(r)
|
||||
//TODO: expect ptr
|
||||
|
||||
for i := 0; i < val.NumMethod(); i++ {
|
||||
method := val.Type().Method(i)
|
||||
|
||||
funcType := method.Func.Type()
|
||||
hasCtx := 0
|
||||
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
|
||||
hasCtx = 1
|
||||
}
|
||||
|
||||
ins := funcType.NumIn() - 1 - hasCtx
|
||||
recvs := make([]reflect.Type, ins)
|
||||
for i := 0; i < ins; i++ {
|
||||
recvs[i] = method.Type.In(i + 1 + hasCtx)
|
||||
}
|
||||
|
||||
valOut, errOut, _ := processFuncOut(funcType)
|
||||
|
||||
h[namespace+"."+method.Name] = rpcHandler{
|
||||
paramReceivers: recvs,
|
||||
nParams: ins,
|
||||
|
||||
handlerFunc: method.Func,
|
||||
receiver: val,
|
||||
|
||||
hasCtx: hasCtx,
|
||||
|
||||
errOut: errOut,
|
||||
valOut: valOut,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle
|
||||
|
||||
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
||||
type chanOut func(reflect.Value, int64) error
|
||||
|
||||
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
||||
wf := func(cb func(io.Writer)) {
|
||||
cb(w)
|
||||
}
|
||||
|
||||
var req request
|
||||
if err := json.NewDecoder(r).Decode(&req); err != nil {
|
||||
rpcError(wf, &req, rpcParseError, xerrors.Errorf("unmarshaling request: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
h.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
|
||||
}
|
||||
|
||||
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
|
||||
defer func() {
|
||||
if i := recover(); i != nil {
|
||||
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
|
||||
log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
out = f.Call(params)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (handlers) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
|
||||
if req.Meta == nil {
|
||||
return ctx, nil
|
||||
}
|
||||
if eSC, ok := req.Meta["SpanContext"]; ok {
|
||||
bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC)))
|
||||
_, err := base64.StdEncoding.Decode(bSC, []byte(eSC))
|
||||
if err != nil {
|
||||
log.Errorf("SpanContext: decode", "error", err)
|
||||
return ctx, nil
|
||||
}
|
||||
sc, ok := propagation.FromBinary(bSC)
|
||||
if !ok {
|
||||
log.Errorf("SpanContext: could not create span", "data", bSC)
|
||||
return ctx, nil
|
||||
}
|
||||
ctx, span := trace.StartSpanWithRemoteParent(ctx, "api.handle", sc)
|
||||
span.AddAttributes(trace.StringAttribute("method", req.Method))
|
||||
return ctx, span
|
||||
}
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
|
||||
// Not sure if we need to sanitize the incoming req.Method or not.
|
||||
ctx, span := h.getSpan(ctx, req)
|
||||
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
|
||||
defer span.End()
|
||||
|
||||
handler, ok := h[req.Method]
|
||||
if !ok {
|
||||
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
|
||||
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
|
||||
done(false)
|
||||
return
|
||||
}
|
||||
|
||||
if len(req.Params) != handler.nParams {
|
||||
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
|
||||
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
||||
done(false)
|
||||
return
|
||||
}
|
||||
|
||||
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
|
||||
defer done(outCh)
|
||||
|
||||
if chOut == nil && outCh {
|
||||
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
|
||||
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
||||
return
|
||||
}
|
||||
|
||||
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
|
||||
callParams[0] = handler.receiver
|
||||
if handler.hasCtx == 1 {
|
||||
callParams[1] = reflect.ValueOf(ctx)
|
||||
}
|
||||
|
||||
for i := 0; i < handler.nParams; i++ {
|
||||
rp := reflect.New(handler.paramReceivers[i])
|
||||
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
|
||||
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s' (param: %T): %w", req.Method, rp.Interface(), err))
|
||||
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
||||
return
|
||||
}
|
||||
|
||||
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
|
||||
}
|
||||
|
||||
///////////////////
|
||||
|
||||
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
|
||||
if err != nil {
|
||||
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
|
||||
stats.Record(ctx, metrics.RPCRequestError.M(1))
|
||||
return
|
||||
}
|
||||
if req.ID == nil {
|
||||
return // notification
|
||||
}
|
||||
|
||||
///////////////////
|
||||
|
||||
resp := response{
|
||||
Jsonrpc: "2.0",
|
||||
ID: *req.ID,
|
||||
}
|
||||
|
||||
if handler.errOut != -1 {
|
||||
err := callResult[handler.errOut].Interface()
|
||||
if err != nil {
|
||||
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
|
||||
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
||||
resp.Error = &respError{
|
||||
Code: 1,
|
||||
Message: err.(error).Error(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var kind reflect.Kind
|
||||
var res interface{}
|
||||
var nonZero bool
|
||||
if handler.valOut != -1 {
|
||||
res = callResult[handler.valOut].Interface()
|
||||
kind = callResult[handler.valOut].Kind()
|
||||
nonZero = !callResult[handler.valOut].IsZero()
|
||||
}
|
||||
|
||||
if res != nil && kind == reflect.Chan {
|
||||
// Channel responses are sent from channel control goroutine.
|
||||
// Sending responses here could cause deadlocks on writeLk, or allow
|
||||
// sending channel messages before this rpc call returns
|
||||
|
||||
//noinspection GoNilness // already checked above
|
||||
err = chOut(callResult[handler.valOut], *req.ID)
|
||||
if err == nil {
|
||||
return // channel goroutine handles responding
|
||||
}
|
||||
|
||||
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
|
||||
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
||||
resp.Error = &respError{
|
||||
Code: 1,
|
||||
Message: err.(error).Error(),
|
||||
}
|
||||
} else if resp.Error == nil {
|
||||
// check error as JSON-RPC spec prohibits error and value at the same time
|
||||
resp.Result = res
|
||||
}
|
||||
if resp.Error != nil && nonZero {
|
||||
log.Errorw("error and res returned", "request", req, "r.err", resp.Error, "res", res)
|
||||
}
|
||||
|
||||
w(func(w io.Writer) {
|
||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||
log.Error(err)
|
||||
stats.Record(ctx, metrics.RPCResponseError.M(1))
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ReconnectInterval time.Duration
|
||||
|
||||
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
|
||||
}
|
||||
|
||||
var defaultConfig = Config{
|
||||
ReconnectInterval: time.Second * 5,
|
||||
}
|
||||
|
||||
type Option func(c *Config)
|
||||
|
||||
func WithReconnectInterval(d time.Duration) func(c *Config) {
|
||||
return func(c *Config) {
|
||||
c.ReconnectInterval = d
|
||||
}
|
||||
}
|
@ -1,623 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http/httptest"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logging.SetLogLevel("rpc", "DEBUG")
|
||||
}
|
||||
|
||||
type SimpleServerHandler struct {
|
||||
n int
|
||||
}
|
||||
|
||||
type TestType struct {
|
||||
S string
|
||||
I int
|
||||
}
|
||||
|
||||
type TestOut struct {
|
||||
TestType
|
||||
Ok bool
|
||||
}
|
||||
|
||||
func (h *SimpleServerHandler) Add(in int) error {
|
||||
if in == -3546 {
|
||||
return errors.New("test")
|
||||
}
|
||||
|
||||
h.n += in
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *SimpleServerHandler) AddGet(in int) int {
|
||||
h.n += in
|
||||
return h.n
|
||||
}
|
||||
|
||||
func (h *SimpleServerHandler) StringMatch(t TestType, i2 int64) (out TestOut, err error) {
|
||||
if strconv.FormatInt(i2, 10) == t.S {
|
||||
out.Ok = true
|
||||
}
|
||||
if i2 != int64(t.I) {
|
||||
return TestOut{}, errors.New(":(")
|
||||
}
|
||||
out.I = t.I
|
||||
out.S = t.S
|
||||
return
|
||||
}
|
||||
|
||||
func TestRPC(t *testing.T) {
|
||||
// setup server
|
||||
|
||||
serverHandler := &SimpleServerHandler{}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("SimpleServerHandler", serverHandler)
|
||||
|
||||
// httptest stuff
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
// setup client
|
||||
|
||||
var client struct {
|
||||
Add func(int) error
|
||||
AddGet func(int) int
|
||||
StringMatch func(t TestType, i2 int64) (out TestOut, err error)
|
||||
}
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
defer closer()
|
||||
|
||||
// Add(int) error
|
||||
|
||||
require.NoError(t, client.Add(2))
|
||||
require.Equal(t, 2, serverHandler.n)
|
||||
|
||||
err = client.Add(-3546)
|
||||
require.EqualError(t, err, "test")
|
||||
|
||||
// AddGet(int) int
|
||||
|
||||
n := client.AddGet(3)
|
||||
require.Equal(t, 5, n)
|
||||
require.Equal(t, 5, serverHandler.n)
|
||||
|
||||
// StringMatch
|
||||
|
||||
o, err := client.StringMatch(TestType{S: "0"}, 0)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "0", o.S)
|
||||
require.Equal(t, 0, o.I)
|
||||
|
||||
_, err = client.StringMatch(TestType{S: "5"}, 5)
|
||||
require.EqualError(t, err, ":(")
|
||||
|
||||
o, err = client.StringMatch(TestType{S: "8", I: 8}, 8)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "8", o.S)
|
||||
require.Equal(t, 8, o.I)
|
||||
|
||||
// Invalid client handlers
|
||||
|
||||
var noret struct {
|
||||
Add func(int)
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// this one should actually work
|
||||
noret.Add(4)
|
||||
require.Equal(t, 9, serverHandler.n)
|
||||
closer()
|
||||
|
||||
var noparam struct {
|
||||
Add func()
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// shouldn't panic
|
||||
noparam.Add()
|
||||
closer()
|
||||
|
||||
var erronly struct {
|
||||
AddGet func() (int, error)
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = erronly.AddGet()
|
||||
if err == nil || err.Error() != "RPC error (-32602): wrong param count" {
|
||||
t.Error("wrong error:", err)
|
||||
}
|
||||
closer()
|
||||
|
||||
var wrongtype struct {
|
||||
Add func(string) error
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = wrongtype.Add("not an int")
|
||||
if err == nil || !strings.Contains(err.Error(), "RPC error (-32700):") || !strings.Contains(err.Error(), "json: cannot unmarshal string into Go value of type int") {
|
||||
t.Error("wrong error:", err)
|
||||
}
|
||||
closer()
|
||||
|
||||
var notfound struct {
|
||||
NotThere func(string) error
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = notfound.NotThere("hello?")
|
||||
if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" {
|
||||
t.Error("wrong error:", err)
|
||||
}
|
||||
closer()
|
||||
}
|
||||
|
||||
type CtxHandler struct {
|
||||
lk sync.Mutex
|
||||
|
||||
cancelled bool
|
||||
i int
|
||||
}
|
||||
|
||||
func (h *CtxHandler) Test(ctx context.Context) {
|
||||
h.lk.Lock()
|
||||
defer h.lk.Unlock()
|
||||
timeout := time.After(300 * time.Millisecond)
|
||||
h.i++
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
case <-ctx.Done():
|
||||
h.cancelled = true
|
||||
}
|
||||
}
|
||||
|
||||
func TestCtx(t *testing.T) {
|
||||
// setup server
|
||||
|
||||
serverHandler := &CtxHandler{}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("CtxHandler", serverHandler)
|
||||
|
||||
// httptest stuff
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
// setup client
|
||||
|
||||
var client struct {
|
||||
Test func(ctx context.Context)
|
||||
}
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
client.Test(ctx)
|
||||
serverHandler.lk.Lock()
|
||||
|
||||
if !serverHandler.cancelled {
|
||||
t.Error("expected cancellation on the server side")
|
||||
}
|
||||
|
||||
serverHandler.cancelled = false
|
||||
|
||||
serverHandler.lk.Unlock()
|
||||
closer()
|
||||
|
||||
var noCtxClient struct {
|
||||
Test func()
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
noCtxClient.Test()
|
||||
|
||||
serverHandler.lk.Lock()
|
||||
|
||||
if serverHandler.cancelled || serverHandler.i != 2 {
|
||||
t.Error("wrong serverHandler state")
|
||||
}
|
||||
|
||||
serverHandler.lk.Unlock()
|
||||
closer()
|
||||
}
|
||||
|
||||
type UnUnmarshalable int
|
||||
|
||||
func (*UnUnmarshalable) UnmarshalJSON([]byte) error {
|
||||
return errors.New("nope")
|
||||
}
|
||||
|
||||
type UnUnmarshalableHandler struct{}
|
||||
|
||||
func (*UnUnmarshalableHandler) GetUnUnmarshalableStuff() (UnUnmarshalable, error) {
|
||||
return UnUnmarshalable(5), nil
|
||||
}
|
||||
|
||||
func TestUnmarshalableResult(t *testing.T) {
|
||||
var client struct {
|
||||
GetUnUnmarshalableStuff func() (UnUnmarshalable, error)
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("Handler", &UnUnmarshalableHandler{})
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "Handler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
defer closer()
|
||||
|
||||
_, err = client.GetUnUnmarshalableStuff()
|
||||
require.EqualError(t, err, "RPC client error: unmarshaling result: nope")
|
||||
}
|
||||
|
||||
type ChanHandler struct {
|
||||
wait chan struct{}
|
||||
ctxdone <-chan struct{}
|
||||
}
|
||||
|
||||
func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error) {
|
||||
out := make(chan int)
|
||||
h.ctxdone = ctx.Done()
|
||||
|
||||
wait := h.wait
|
||||
|
||||
log.Warnf("SERVER SUB!")
|
||||
go func() {
|
||||
defer close(out)
|
||||
var n int
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("ctxdone1", i, eq)
|
||||
return
|
||||
case <-wait:
|
||||
fmt.Println("CONSUMED WAIT: ", i)
|
||||
}
|
||||
|
||||
n += i
|
||||
|
||||
if n == eq {
|
||||
fmt.Println("eq")
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("ctxdone2")
|
||||
return
|
||||
case out <- n:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func TestChan(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}, 5),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// sub
|
||||
|
||||
sub, err := client.Sub(ctx, 2, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// recv one
|
||||
|
||||
require.Equal(t, 2, <-sub)
|
||||
|
||||
// recv many (order)
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
serverHandler.wait <- struct{}{}
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
require.Equal(t, 4, <-sub)
|
||||
require.Equal(t, 6, <-sub)
|
||||
require.Equal(t, 8, <-sub)
|
||||
|
||||
// close (through ctx)
|
||||
cancel()
|
||||
|
||||
_, ok := <-sub
|
||||
require.Equal(t, false, ok)
|
||||
|
||||
// sub (again)
|
||||
|
||||
serverHandler.wait = make(chan struct{}, 5)
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
log.Warnf("last sub")
|
||||
sub, err = client.Sub(ctx, 3, 6)
|
||||
require.NoError(t, err)
|
||||
|
||||
log.Warnf("waiting for value now")
|
||||
require.Equal(t, 3, <-sub)
|
||||
log.Warnf("not equal")
|
||||
|
||||
// close (remote)
|
||||
serverHandler.wait <- struct{}{}
|
||||
_, ok = <-sub
|
||||
require.Equal(t, false, ok)
|
||||
}
|
||||
|
||||
func TestChanClosing(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}, 5),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
ctx1, cancel1 := context.WithCancel(context.Background())
|
||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||
|
||||
// sub
|
||||
|
||||
sub1, err := client.Sub(ctx1, 2, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
sub2, err := client.Sub(ctx2, 3, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// recv one
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
require.Equal(t, 2, <-sub1)
|
||||
require.Equal(t, 3, <-sub2)
|
||||
|
||||
cancel1()
|
||||
|
||||
require.Equal(t, 0, <-sub1)
|
||||
time.Sleep(time.Millisecond * 50) // make sure the loop has exited (having a shared wait channel makes this annoying)
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
require.Equal(t, 6, <-sub2)
|
||||
|
||||
cancel2()
|
||||
require.Equal(t, 0, <-sub2)
|
||||
}
|
||||
|
||||
func TestChanServerClose(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}, 5),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
tctx, tcancel := context.WithCancel(context.Background())
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
testServ.Config.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
|
||||
return tctx
|
||||
}
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// sub
|
||||
|
||||
sub, err := client.Sub(ctx, 2, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// recv one
|
||||
|
||||
require.Equal(t, 2, <-sub)
|
||||
|
||||
// make sure we're blocked
|
||||
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case <-sub:
|
||||
t.Fatal("didn't expect to get anything from sub")
|
||||
}
|
||||
|
||||
// close server
|
||||
|
||||
tcancel()
|
||||
testServ.Close()
|
||||
|
||||
_, ok := <-sub
|
||||
require.Equal(t, false, ok)
|
||||
}
|
||||
|
||||
func TestServerChanLockClose(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
|
||||
var closeConn func() error
|
||||
|
||||
_, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(),
|
||||
"ChanHandler",
|
||||
[]interface{}{&client}, nil,
|
||||
func(c *Config) {
|
||||
c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) {
|
||||
return func() (*websocket.Conn, error) {
|
||||
c, err := f()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
closeConn = c.UnderlyingConn().Close
|
||||
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// sub
|
||||
|
||||
sub, err := client.Sub(ctx, 2, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// recv one
|
||||
|
||||
go func() {
|
||||
serverHandler.wait <- struct{}{}
|
||||
}()
|
||||
require.Equal(t, 2, <-sub)
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
serverHandler.wait <- struct{}{}
|
||||
}
|
||||
|
||||
if err := closeConn(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
<-serverHandler.ctxdone
|
||||
}
|
||||
|
||||
func TestControlChanDeadlock(t *testing.T) {
|
||||
for r := 0; r < 20; r++ {
|
||||
testControlChanDeadlock(t)
|
||||
}
|
||||
}
|
||||
|
||||
func testControlChanDeadlock(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
n := 5000
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}, n),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
serverHandler.wait <- struct{}{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sub, err := client.Sub(ctx, 1, -1)
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
for i := 0; i < n; i++ {
|
||||
if <-sub != i+1 {
|
||||
panic("bad!")
|
||||
//require.Equal(t, i+1, <-sub)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// reset this channel so its not shared between the sub requests...
|
||||
serverHandler.wait = make(chan struct{}, n)
|
||||
for i := 0; i < n; i++ {
|
||||
serverHandler.wait <- struct{}{}
|
||||
}
|
||||
|
||||
_, err = client.Sub(ctx, 2, -1)
|
||||
require.NoError(t, err)
|
||||
<-done
|
||||
}
|
@ -1,114 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
const (
|
||||
rpcParseError = -32700
|
||||
rpcMethodNotFound = -32601
|
||||
rpcInvalidParams = -32602
|
||||
)
|
||||
|
||||
// RPCServer provides a jsonrpc 2.0 http server handler
|
||||
type RPCServer struct {
|
||||
methods handlers
|
||||
}
|
||||
|
||||
// NewServer creates new RPCServer instance
|
||||
func NewServer() *RPCServer {
|
||||
return &RPCServer{
|
||||
methods: map[string]rpcHandler{},
|
||||
}
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
// TODO: allow setting
|
||||
// (note that we still are mostly covered by jwt tokens)
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
|
||||
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
|
||||
}
|
||||
|
||||
c, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
(&wsConn{
|
||||
conn: c,
|
||||
handler: s.methods,
|
||||
exiting: make(chan struct{}),
|
||||
}).handleWsConn(ctx)
|
||||
|
||||
if err := c.Close(); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: return errors to clients per spec
|
||||
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
h := strings.ToLower(r.Header.Get("Connection"))
|
||||
if strings.Contains(h, "upgrade") {
|
||||
s.handleWS(ctx, w, r)
|
||||
return
|
||||
}
|
||||
|
||||
s.methods.handleReader(ctx, r.Body, w, rpcError)
|
||||
}
|
||||
|
||||
func rpcError(wf func(func(io.Writer)), req *request, code int, err error) {
|
||||
log.Errorf("RPC Error: %s", err)
|
||||
wf(func(w io.Writer) {
|
||||
if hw, ok := w.(http.ResponseWriter); ok {
|
||||
hw.WriteHeader(500)
|
||||
}
|
||||
|
||||
log.Warnf("rpc error: %s", err)
|
||||
|
||||
if req.ID == nil { // notification
|
||||
return
|
||||
}
|
||||
|
||||
resp := response{
|
||||
Jsonrpc: "2.0",
|
||||
ID: *req.ID,
|
||||
Error: &respError{
|
||||
Code: code,
|
||||
Message: err.Error(),
|
||||
},
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(resp)
|
||||
if err != nil {
|
||||
log.Warnf("failed to write rpc error: %s", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Register registers new RPC handler
|
||||
//
|
||||
// Handler is any value with methods defined
|
||||
func (s *RPCServer) Register(namespace string, handler interface{}) {
|
||||
s.methods.register(namespace, handler)
|
||||
}
|
||||
|
||||
var _ error = &respError{}
|
@ -1,55 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type param struct {
|
||||
data []byte // from unmarshal
|
||||
|
||||
v reflect.Value // to marshal
|
||||
}
|
||||
|
||||
func (p *param) UnmarshalJSON(raw []byte) error {
|
||||
p.data = make([]byte, len(raw))
|
||||
copy(p.data, raw)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *param) MarshalJSON() ([]byte, error) {
|
||||
if p.v.Kind() == reflect.Invalid {
|
||||
return p.data, nil
|
||||
}
|
||||
|
||||
return json.Marshal(p.v.Interface())
|
||||
}
|
||||
|
||||
// processFuncOut finds value and error Outs in function
|
||||
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
|
||||
errOut = -1 // -1 if not found
|
||||
valOut = -1
|
||||
n = funcType.NumOut()
|
||||
|
||||
switch n {
|
||||
case 0:
|
||||
case 1:
|
||||
if funcType.Out(0) == errorType {
|
||||
errOut = 0
|
||||
} else {
|
||||
valOut = 0
|
||||
}
|
||||
case 2:
|
||||
valOut = 0
|
||||
errOut = 1
|
||||
if funcType.Out(1) != errorType {
|
||||
panic("expected error as second return value")
|
||||
}
|
||||
default:
|
||||
errstr := fmt.Sprintf("too many return values: %s", funcType)
|
||||
panic(errstr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
@ -1,560 +0,0 @@
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
const wsCancel = "xrpc.cancel"
|
||||
const chValue = "xrpc.ch.val"
|
||||
const chClose = "xrpc.ch.close"
|
||||
|
||||
type frame struct {
|
||||
// common
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
ID *int64 `json:"id,omitempty"`
|
||||
Meta map[string]string `json:"meta,omitempty"`
|
||||
|
||||
// request
|
||||
Method string `json:"method,omitempty"`
|
||||
Params []param `json:"params,omitempty"`
|
||||
|
||||
// response
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Error *respError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type outChanReg struct {
|
||||
reqID int64
|
||||
|
||||
chID uint64
|
||||
ch reflect.Value
|
||||
}
|
||||
|
||||
type wsConn struct {
|
||||
// outside params
|
||||
conn *websocket.Conn
|
||||
connFactory func() (*websocket.Conn, error)
|
||||
reconnectInterval time.Duration
|
||||
handler handlers
|
||||
requests <-chan clientRequest
|
||||
stop <-chan struct{}
|
||||
exiting chan struct{}
|
||||
|
||||
// incoming messages
|
||||
incoming chan io.Reader
|
||||
incomingErr error
|
||||
|
||||
// outgoing messages
|
||||
writeLk sync.Mutex
|
||||
|
||||
// ////
|
||||
// Client related
|
||||
|
||||
// inflight are requests we've sent to the remote
|
||||
inflight map[int64]clientRequest
|
||||
|
||||
// chanHandlers is a map of client-side channel handlers
|
||||
chanHandlers map[uint64]func(m []byte, ok bool)
|
||||
|
||||
// ////
|
||||
// Server related
|
||||
|
||||
// handling are the calls we handle
|
||||
handling map[int64]context.CancelFunc
|
||||
handlingLk sync.Mutex
|
||||
|
||||
spawnOutChanHandlerOnce sync.Once
|
||||
|
||||
// chanCtr is a counter used for identifying output channels on the server side
|
||||
chanCtr uint64
|
||||
|
||||
registerCh chan outChanReg
|
||||
}
|
||||
|
||||
// //
|
||||
// WebSocket Message utils //
|
||||
// //
|
||||
|
||||
// nextMessage wait for one message and puts it to the incoming channel
|
||||
func (c *wsConn) nextMessage() {
|
||||
msgType, r, err := c.conn.NextReader()
|
||||
if err != nil {
|
||||
c.incomingErr = err
|
||||
close(c.incoming)
|
||||
return
|
||||
}
|
||||
if msgType != websocket.BinaryMessage && msgType != websocket.TextMessage {
|
||||
c.incomingErr = errors.New("unsupported message type")
|
||||
close(c.incoming)
|
||||
return
|
||||
}
|
||||
c.incoming <- r
|
||||
}
|
||||
|
||||
// nextWriter waits for writeLk and invokes the cb callback with WS message
|
||||
// writer when the lock is acquired
|
||||
func (c *wsConn) nextWriter(cb func(io.Writer)) {
|
||||
c.writeLk.Lock()
|
||||
defer c.writeLk.Unlock()
|
||||
|
||||
wcl, err := c.conn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
log.Error("handle me:", err)
|
||||
return
|
||||
}
|
||||
|
||||
cb(wcl)
|
||||
|
||||
if err := wcl.Close(); err != nil {
|
||||
log.Error("handle me:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *wsConn) sendRequest(req request) error {
|
||||
c.writeLk.Lock()
|
||||
defer c.writeLk.Unlock()
|
||||
if err := c.conn.WriteJSON(req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// //
|
||||
// Output channels //
|
||||
// //
|
||||
|
||||
// handleOutChans handles channel communication on the server side
|
||||
// (forwards channel messages to client)
|
||||
func (c *wsConn) handleOutChans() {
|
||||
regV := reflect.ValueOf(c.registerCh)
|
||||
exitV := reflect.ValueOf(c.exiting)
|
||||
|
||||
cases := []reflect.SelectCase{
|
||||
{ // registration chan always 0
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: regV,
|
||||
},
|
||||
{ // exit chan always 1
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: exitV,
|
||||
},
|
||||
}
|
||||
internal := len(cases)
|
||||
var caseToID []uint64
|
||||
|
||||
for {
|
||||
chosen, val, ok := reflect.Select(cases)
|
||||
|
||||
switch chosen {
|
||||
case 0: // registration channel
|
||||
if !ok {
|
||||
// control channel closed - signals closed connection
|
||||
// This shouldn't happen, instead the exiting channel should get closed
|
||||
log.Warn("control channel closed")
|
||||
return
|
||||
}
|
||||
|
||||
registration := val.Interface().(outChanReg)
|
||||
|
||||
caseToID = append(caseToID, registration.chID)
|
||||
cases = append(cases, reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: registration.ch,
|
||||
})
|
||||
|
||||
c.nextWriter(func(w io.Writer) {
|
||||
resp := &response{
|
||||
Jsonrpc: "2.0",
|
||||
ID: registration.reqID,
|
||||
Result: registration.chID,
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
continue
|
||||
case 1: // exiting channel
|
||||
if !ok {
|
||||
// exiting channel closed - signals closed connection
|
||||
//
|
||||
// We're not closing any channels as we're on receiving end.
|
||||
// Also, context cancellation below should take care of any running
|
||||
// requests
|
||||
return
|
||||
}
|
||||
log.Warn("exiting channel received a message")
|
||||
continue
|
||||
}
|
||||
|
||||
if !ok {
|
||||
// Output channel closed, cleanup, and tell remote that this happened
|
||||
|
||||
id := caseToID[chosen-internal]
|
||||
|
||||
n := len(cases) - 1
|
||||
if n > 0 {
|
||||
cases[chosen] = cases[n]
|
||||
caseToID[chosen-internal] = caseToID[n-internal]
|
||||
}
|
||||
|
||||
cases = cases[:n]
|
||||
caseToID = caseToID[:n-internal]
|
||||
|
||||
if err := c.sendRequest(request{
|
||||
Jsonrpc: "2.0",
|
||||
ID: nil, // notification
|
||||
Method: chClose,
|
||||
Params: []param{{v: reflect.ValueOf(id)}},
|
||||
}); err != nil {
|
||||
log.Warnf("closed out channel sendRequest failed: %s", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// forward message
|
||||
if err := c.sendRequest(request{
|
||||
Jsonrpc: "2.0",
|
||||
ID: nil, // notification
|
||||
Method: chValue,
|
||||
Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}},
|
||||
}); err != nil {
|
||||
log.Warnf("sendRequest failed: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleChanOut registers output channel for forwarding to client
|
||||
func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error {
|
||||
c.spawnOutChanHandlerOnce.Do(func() {
|
||||
go c.handleOutChans()
|
||||
})
|
||||
id := atomic.AddUint64(&c.chanCtr, 1)
|
||||
|
||||
select {
|
||||
case c.registerCh <- outChanReg{
|
||||
reqID: req,
|
||||
|
||||
chID: id,
|
||||
ch: ch,
|
||||
}:
|
||||
return nil
|
||||
case <-c.exiting:
|
||||
return xerrors.New("connection closing")
|
||||
}
|
||||
}
|
||||
|
||||
// //
|
||||
// Context.Done propagation //
|
||||
// //
|
||||
|
||||
// handleCtxAsync handles context lifetimes for client
|
||||
// TODO: this should be aware of events going through chanHandlers, and quit
|
||||
// when the related channel is closed.
|
||||
// This should also probably be a single goroutine,
|
||||
// Note that not doing this should be fine for now as long as we are using
|
||||
// contexts correctly (cancelling when async functions are no longer is use)
|
||||
func (c *wsConn) handleCtxAsync(actx context.Context, id int64) {
|
||||
<-actx.Done()
|
||||
|
||||
c.sendRequest(request{
|
||||
Jsonrpc: "2.0",
|
||||
Method: wsCancel,
|
||||
Params: []param{{v: reflect.ValueOf(id)}},
|
||||
})
|
||||
}
|
||||
|
||||
// cancelCtx is a built-in rpc which handles context cancellation over rpc
|
||||
func (c *wsConn) cancelCtx(req frame) {
|
||||
if req.ID != nil {
|
||||
log.Warnf("%s call with ID set, won't respond", wsCancel)
|
||||
}
|
||||
|
||||
var id int64
|
||||
if err := json.Unmarshal(req.Params[0].data, &id); err != nil {
|
||||
log.Error("handle me:", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.handlingLk.Lock()
|
||||
defer c.handlingLk.Unlock()
|
||||
|
||||
cf, ok := c.handling[id]
|
||||
if ok {
|
||||
cf()
|
||||
}
|
||||
}
|
||||
|
||||
// //
|
||||
// Main Handling logic //
|
||||
// //
|
||||
|
||||
func (c *wsConn) handleChanMessage(frame frame) {
|
||||
var chid uint64
|
||||
if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil {
|
||||
log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
hnd, ok := c.chanHandlers[chid]
|
||||
if !ok {
|
||||
log.Errorf("xrpc.ch.val: handler %d not found", chid)
|
||||
return
|
||||
}
|
||||
|
||||
hnd(frame.Params[1].data, true)
|
||||
}
|
||||
|
||||
func (c *wsConn) handleChanClose(frame frame) {
|
||||
var chid uint64
|
||||
if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil {
|
||||
log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
hnd, ok := c.chanHandlers[chid]
|
||||
if !ok {
|
||||
log.Errorf("xrpc.ch.val: handler %d not found", chid)
|
||||
return
|
||||
}
|
||||
|
||||
delete(c.chanHandlers, chid)
|
||||
|
||||
hnd(nil, false)
|
||||
}
|
||||
|
||||
func (c *wsConn) handleResponse(frame frame) {
|
||||
req, ok := c.inflight[*frame.ID]
|
||||
if !ok {
|
||||
log.Error("client got unknown ID in response")
|
||||
return
|
||||
}
|
||||
|
||||
if req.retCh != nil && frame.Result != nil {
|
||||
// output is channel
|
||||
var chid uint64
|
||||
if err := json.Unmarshal(frame.Result, &chid); err != nil {
|
||||
log.Errorf("failed to unmarshal channel id response: %s, data '%s'", err, string(frame.Result))
|
||||
return
|
||||
}
|
||||
|
||||
var chanCtx context.Context
|
||||
chanCtx, c.chanHandlers[chid] = req.retCh()
|
||||
go c.handleCtxAsync(chanCtx, *frame.ID)
|
||||
}
|
||||
|
||||
req.ready <- clientResponse{
|
||||
Jsonrpc: frame.Jsonrpc,
|
||||
Result: frame.Result,
|
||||
ID: *frame.ID,
|
||||
Error: frame.Error,
|
||||
}
|
||||
delete(c.inflight, *frame.ID)
|
||||
}
|
||||
|
||||
func (c *wsConn) handleCall(ctx context.Context, frame frame) {
|
||||
req := request{
|
||||
Jsonrpc: frame.Jsonrpc,
|
||||
ID: frame.ID,
|
||||
Meta: frame.Meta,
|
||||
Method: frame.Method,
|
||||
Params: frame.Params,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
nextWriter := func(cb func(io.Writer)) {
|
||||
cb(ioutil.Discard)
|
||||
}
|
||||
done := func(keepCtx bool) {
|
||||
if !keepCtx {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
if frame.ID != nil {
|
||||
nextWriter = c.nextWriter
|
||||
|
||||
c.handlingLk.Lock()
|
||||
c.handling[*frame.ID] = cancel
|
||||
c.handlingLk.Unlock()
|
||||
|
||||
done = func(keepctx bool) {
|
||||
c.handlingLk.Lock()
|
||||
defer c.handlingLk.Unlock()
|
||||
|
||||
if !keepctx {
|
||||
cancel()
|
||||
delete(c.handling, *frame.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go c.handler.handle(ctx, req, nextWriter, rpcError, done, c.handleChanOut)
|
||||
}
|
||||
|
||||
// handleFrame handles all incoming messages (calls and responses)
|
||||
func (c *wsConn) handleFrame(ctx context.Context, frame frame) {
|
||||
// Get message type by method name:
|
||||
// "" - response
|
||||
// "xrpc.*" - builtin
|
||||
// anything else - incoming remote call
|
||||
switch frame.Method {
|
||||
case "": // Response to our call
|
||||
c.handleResponse(frame)
|
||||
case wsCancel:
|
||||
c.cancelCtx(frame)
|
||||
case chValue:
|
||||
c.handleChanMessage(frame)
|
||||
case chClose:
|
||||
c.handleChanClose(frame)
|
||||
default: // Remote call
|
||||
c.handleCall(ctx, frame)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *wsConn) closeInFlight() {
|
||||
for id, req := range c.inflight {
|
||||
req.ready <- clientResponse{
|
||||
Jsonrpc: "2.0",
|
||||
ID: id,
|
||||
Error: &respError{
|
||||
Message: "handler: websocket connection closed",
|
||||
Code: 2,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
c.handlingLk.Lock()
|
||||
for _, cancel := range c.handling {
|
||||
cancel()
|
||||
}
|
||||
c.handlingLk.Unlock()
|
||||
|
||||
c.inflight = map[int64]clientRequest{}
|
||||
c.handling = map[int64]context.CancelFunc{}
|
||||
}
|
||||
|
||||
func (c *wsConn) closeChans() {
|
||||
for chid := range c.chanHandlers {
|
||||
hnd := c.chanHandlers[chid]
|
||||
delete(c.chanHandlers, chid)
|
||||
hnd(nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *wsConn) handleWsConn(ctx context.Context) {
|
||||
c.incoming = make(chan io.Reader)
|
||||
c.inflight = map[int64]clientRequest{}
|
||||
c.handling = map[int64]context.CancelFunc{}
|
||||
c.chanHandlers = map[uint64]func(m []byte, ok bool){}
|
||||
|
||||
c.registerCh = make(chan outChanReg)
|
||||
defer close(c.exiting)
|
||||
|
||||
// ////
|
||||
|
||||
// on close, make sure to return from all pending calls, and cancel context
|
||||
// on all calls we handle
|
||||
defer c.closeInFlight()
|
||||
|
||||
// wait for the first message
|
||||
go c.nextMessage()
|
||||
for {
|
||||
select {
|
||||
case r, ok := <-c.incoming:
|
||||
if !ok {
|
||||
if c.incomingErr != nil {
|
||||
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
|
||||
log.Debugw("websocket error", "error", c.incomingErr)
|
||||
// connection dropped unexpectedly, do our best to recover it
|
||||
c.closeInFlight()
|
||||
c.closeChans()
|
||||
c.incoming = make(chan io.Reader) // listen again for responses
|
||||
go func() {
|
||||
if c.connFactory == nil { // likely the server side, don't try to reconnect
|
||||
return
|
||||
}
|
||||
|
||||
var conn *websocket.Conn
|
||||
for conn == nil {
|
||||
time.Sleep(c.reconnectInterval)
|
||||
var err error
|
||||
if conn, err = c.connFactory(); err != nil {
|
||||
log.Debugw("websocket connection retry failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.writeLk.Lock()
|
||||
c.conn = conn
|
||||
c.incomingErr = nil
|
||||
c.writeLk.Unlock()
|
||||
|
||||
go c.nextMessage()
|
||||
}()
|
||||
continue
|
||||
}
|
||||
}
|
||||
return // remote closed
|
||||
}
|
||||
|
||||
// debug util - dump all messages to stderr
|
||||
// r = io.TeeReader(r, os.Stderr)
|
||||
|
||||
var frame frame
|
||||
if err := json.NewDecoder(r).Decode(&frame); err != nil {
|
||||
log.Error("handle me:", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.handleFrame(ctx, frame)
|
||||
go c.nextMessage()
|
||||
case req := <-c.requests:
|
||||
c.writeLk.Lock()
|
||||
if req.req.ID != nil {
|
||||
if c.incomingErr != nil { // No conn?, immediate fail
|
||||
req.ready <- clientResponse{
|
||||
Jsonrpc: "2.0",
|
||||
ID: *req.req.ID,
|
||||
Error: &respError{
|
||||
Message: "handler: websocket connection closed",
|
||||
Code: 2,
|
||||
},
|
||||
}
|
||||
c.writeLk.Unlock()
|
||||
break
|
||||
}
|
||||
c.inflight[*req.req.ID] = req
|
||||
}
|
||||
c.writeLk.Unlock()
|
||||
if err := c.sendRequest(req.req); err != nil {
|
||||
log.Errorf("sendReqest failed (Handle me): %s", err)
|
||||
}
|
||||
case <-c.stop:
|
||||
c.writeLk.Lock()
|
||||
cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
|
||||
if err := c.conn.WriteMessage(websocket.CloseMessage, cmsg); err != nil {
|
||||
log.Warn("failed to write close message: ", err)
|
||||
}
|
||||
if err := c.conn.Close(); err != nil {
|
||||
log.Warnw("websocket close error", "error", err)
|
||||
}
|
||||
c.writeLk.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@ -9,7 +9,8 @@ import (
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
|
@ -8,9 +8,9 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
)
|
||||
|
||||
const listenAddr = "127.0.0.1:2222"
|
||||
|
@ -4,13 +4,14 @@ import (
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics"
|
||||
)
|
||||
|
||||
// Global Tags
|
||||
var (
|
||||
Version, _ = tag.NewKey("version")
|
||||
Commit, _ = tag.NewKey("commit")
|
||||
RPCMethod, _ = tag.NewKey("method")
|
||||
PeerID, _ = tag.NewKey("peer_id")
|
||||
FailureType, _ = tag.NewKey("failure_type")
|
||||
MessageFrom, _ = tag.NewKey("message_from")
|
||||
@ -31,9 +32,6 @@ var (
|
||||
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
||||
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
|
||||
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
|
||||
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
|
||||
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
|
||||
RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
var (
|
||||
@ -82,26 +80,10 @@ var (
|
||||
Measure: PeerCount,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
// All RPC related metrics should at the very least tag the RPCMethod
|
||||
RPCInvalidMethodView = &view.View{
|
||||
Measure: RPCInvalidMethod,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{RPCMethod},
|
||||
}
|
||||
RPCRequestErrorView = &view.View{
|
||||
Measure: RPCRequestError,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{RPCMethod},
|
||||
}
|
||||
RPCResponseErrorView = &view.View{
|
||||
Measure: RPCResponseError,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{RPCMethod},
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
||||
var DefaultViews = []*view.View{
|
||||
var DefaultViews = append([]*view.View{
|
||||
InfoView,
|
||||
ChainNodeHeightView,
|
||||
ChainNodeWorkerHeightView,
|
||||
@ -111,8 +93,4 @@ var DefaultViews = []*view.View{
|
||||
MessageReceivedView,
|
||||
MessageValidationFailureView,
|
||||
MessageValidationSuccessView,
|
||||
PeerCountView,
|
||||
RPCInvalidMethodView,
|
||||
RPCRequestErrorView,
|
||||
RPCResponseErrorView,
|
||||
}
|
||||
PeerCountView}, rpcmetrics.DefaultViews...)
|
||||
|
@ -2,6 +2,7 @@ package impl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"net/http"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
@ -11,7 +12,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/sector-storage"
|
||||
)
|
||||
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
@ -39,7 +40,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||
genesis "github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
|
||||
@ -16,7 +17,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user