Merge pull request #1807 from filecoin-project/feat/extract-jsonrpc
Extract lib/jsonrpc to go-jsonrpc
This commit is contained in:
@ -1,11 +1,12 @@
package client
import (
// NewCommonRPC creates a new http jsonrpc client.
@ -16,9 +16,10 @@ import (
@ -8,14 +8,16 @@ import (
cid ""
logging ""
lcli ""
cid ""
logging ""
type CidWindow [][]cid.Cid
@ -15,6 +15,7 @@ import (
paramfetch ""
@ -23,7 +24,6 @@ import (
lcli ""
@ -8,6 +8,7 @@ import (
mux ""
manet ""
@ -19,7 +20,6 @@ import (
lcli ""
@ -9,14 +9,6 @@ import (
logging ""
@ -24,6 +16,14 @@ import (
var log = logging.Logger("main")
@ -23,6 +23,7 @@ require (
|||| v0.3.0
|||| v0.0.0-20200208005934-2b8bd03caca5
|||| v0.2.3
|||| v0.1.0
|||| v0.0.0-20200210211231-548257017ca6
|||| v0.0.2-0.20200505180321-973f8949ea8e
|||| v0.1.0
@ -59,7 +60,7 @@ require (
|||| v0.0.5-0.20200428170625-a0bd04d3cbdf
|||| v0.2.0
|||| v1.0.4
|||| v2.0.5
|||| v2.0.8
|||| v0.3.1
|||| v0.0.7
|||| v0.2.4
@ -164,6 +164,8 @@ v0.0.0-20200208005934-2b8bd03caca5/go
|||| v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
|||| v0.2.3 h1:00exBcwysQVEx7wvzcdVz9ZT3HLMXKmbQNIz9ktyeO8=
|||| v0.2.3/go.mod h1:LI3VFHse33aU0djAmFQ8+Hg39i0J8ibAoppGu6TbgkA=
|||| v0.1.0 h1:NBHruefnWWfbizxFMnStXlXKEAxEno3DrM0iLd8SuCM=
|||| v0.1.0/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
|||| v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|||| v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
|||| v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
@ -434,6 +436,8 @@ v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW
|||| v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
|||| v2.0.5 h1:fL4YI+1g5V/b1Yxr1qAiXTMg1H8z9vx/VmJxBuQMHvU=
|||| v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
|||| v2.0.8 h1:3b3YNopMHlj4AvyhWAx0pDxqSQWYi4/WuWO7yRV6/Qg=
|||| v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
|||| v0.0.3/go.mod h1:Oc5kIXLHokkE1hWGMBHw+oxehkAaTOqtEb7Zbh6BhLA=
|||| v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
|||| v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
@ -1,435 +0,0 @@
package jsonrpc
import (
logging ""
const (
methodRetryFrequency = time.Second * 3
var (
errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem()
log = logging.Logger("rpc")
// ErrClient is an error which occurred on the client side the library
type ErrClient struct {
err error
func (e *ErrClient) Error() string {
return fmt.Sprintf("RPC client error: %s", e.err)
// Unwrap unwraps the actual error
func (e *ErrClient) Unwrap(err error) error {
return e.err
type clientResponse struct {
Jsonrpc string `json:"jsonrpc"`
Result json.RawMessage `json:"result"`
ID int64 `json:"id"`
Error *respError `json:"error,omitempty"`
type makeChanSink func() (context.Context, func([]byte, bool))
type clientRequest struct {
req request
ready chan clientResponse
// retCh provides a context and sink for handling incoming channel messages
retCh makeChanSink
// ClientCloser is used to close Client from further use
type ClientCloser func()
// NewClient creates new jsonrpc 2.0 client
// handler must be pointer to a struct with function fields
// Returned value closes the client connection
// TODO: Example
func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) {
return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader)
type client struct {
namespace string
requests chan clientRequest
exiting <-chan struct{}
idCtr int64
// NewMergeClient is like NewClient, but allows to specify multiple structs
// to be filled in the same namespace, using one connection
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
var config Config
for _, o := range opts {
connFactory := func() (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
return conn, err
if config.proxyConnFactory != nil {
connFactory = config.proxyConnFactory(connFactory)
conn, err := connFactory()
if err != nil {
return nil, err
c := client{
namespace: namespace,
stop := make(chan struct{})
exiting := make(chan struct{})
c.requests = make(chan clientRequest)
c.exiting = exiting
handlers := map[string]rpcHandler{}
go (&wsConn{
conn: conn,
connFactory: connFactory,
reconnectInterval: config.ReconnectInterval,
handler: handlers,
requests: c.requests,
stop: stop,
exiting: exiting,
for _, handler := range outs {
htyp := reflect.TypeOf(handler)
if htyp.Kind() != reflect.Ptr {
return nil, xerrors.New("expected handler to be a pointer")
typ := htyp.Elem()
if typ.Kind() != reflect.Struct {
return nil, xerrors.New("handler should be a struct")
val := reflect.ValueOf(handler)
for i := 0; i < typ.NumField(); i++ {
fn, err := c.makeRpcFunc(typ.Field(i))
if err != nil {
return nil, err
return func() {
}, nil
func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) {
retVal := reflect.Zero(ftyp.Out(valOut))
chCtor := func() (context.Context, func([]byte, bool)) {
// unpack chan type to make sure it's reflect.BothDir
ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem())
ch := reflect.MakeChan(ctyp, 0) // todo: buffer?
chCtx, chCancel := context.WithCancel(ctx)
retVal = ch.Convert(ftyp.Out(valOut))
incoming := make(chan reflect.Value, 32)
// gorotuine to handle buffering of items
go func() {
buf := (&list.List{}).Init()
for {
front := buf.Front()
cases := []reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chCtx.Done()),
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(incoming),
if front != nil {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: ch,
Send: front.Value.(reflect.Value).Elem(),
chosen, val, _ := reflect.Select(cases)
switch chosen {
case 0:
case 1:
vvval := val.Interface().(reflect.Value)
if buf.Len() > 1 {
if buf.Len() > 10 {
log.Warnw("rpc output message buffer", "n", buf.Len())
} else {
log.Infow("rpc output message buffer", "n", buf.Len())
case 2:
return ctx, func(result []byte, ok bool) {
if !ok {
val := reflect.New(ftyp.Out(valOut).Elem())
if err := json.Unmarshal(result, val.Interface()); err != nil {
log.Errorf("error unmarshaling chan response: %s", err)
if ctx.Err() != nil {
log.Errorf("got rpc message with cancelled context: %s", ctx.Err())
select {
case incoming <- val:
case <-chCtx.Done():
return func() reflect.Value { return retVal }, chCtor
func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) {
rchan := make(chan clientResponse, 1)
creq := clientRequest{
req: req,
ready: rchan,
retCh: chCtor,
select {
case c.requests <- creq:
case <-c.exiting:
return clientResponse{}, fmt.Errorf("websocket routine exiting")
var ctxDone <-chan struct{}
var resp clientResponse
if ctx != nil {
ctxDone = ctx.Done()
// wait for response, handle context cancellation
for {
select {
case resp = <-rchan:
break loop
case <-ctxDone: // send cancel request
ctxDone = nil
cancelReq := clientRequest{
req: request{
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(*req.ID)}},
select {
case c.requests <- cancelReq:
case <-c.exiting:
log.Warn("failed to send request cancellation, websocket routing exited")
return resp, nil
type rpcFunc struct {
client *client
ftyp reflect.Type
name string
nout int
valOut int
errOut int
hasCtx int
returnValueIsChannel bool
retry bool
func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value {
out := make([]reflect.Value, fn.nout)
if fn.valOut != -1 {
out[fn.valOut] = rval
if fn.errOut != -1 {
out[fn.errOut] = reflect.New(errorType).Elem()
if resp.Error != nil {
return out
func (fn *rpcFunc) processError(err error) []reflect.Value {
out := make([]reflect.Value, fn.nout)
if fn.valOut != -1 {
out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem()
if fn.errOut != -1 {
out[fn.errOut] = reflect.New(errorType).Elem()
return out
func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) {
id := atomic.AddInt64(&fn.client.idCtr, 1)
params := make([]param, len(args)-fn.hasCtx)
for i, arg := range args[fn.hasCtx:] {
params[i] = param{
v: arg,
var ctx context.Context
var span *trace.Span
if fn.hasCtx == 1 {
ctx = args[0].Interface().(context.Context)
ctx, span = trace.StartSpan(ctx, "")
defer span.End()
retVal := func() reflect.Value { return reflect.Value{} }
// if the function returns a channel, we need to provide a sink for the
// messages
var chCtor makeChanSink
if fn.returnValueIsChannel {
retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut)
req := request{
Jsonrpc: "2.0",
ID: &id,
Method: fn.client.namespace + "." +,
Params: params,
if span != nil {
span.AddAttributes(trace.StringAttribute("method", req.Method))
eSC := base64.StdEncoding.EncodeToString(
req.Meta = map[string]string{
"SpanContext": eSC,
var resp clientResponse
var err error
// keep retrying if got a forced closed websocket conn and calling method
// has retry annotation
for {
resp, err = fn.client.sendRequest(ctx, req, chCtor)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
if fn.valOut != -1 && !fn.returnValueIsChannel {
val := reflect.New(fn.ftyp.Out(fn.valOut))
if resp.Result != nil {
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
log.Warnw("unmarshaling failed", "message", string(resp.Result))
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
retVal = func() reflect.Value { return val.Elem() }
retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry
if !retry {
return fn.processResponse(resp, retVal())
func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
ftyp := f.Type
if ftyp.Kind() != reflect.Func {
return reflect.Value{}, xerrors.New("handler field not a func")
fun := &rpcFunc{
client: c,
ftyp: ftyp,
name: f.Name,
retry: f.Tag.Get("retry") == "true",
fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp)
if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
fun.hasCtx = 1
fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan
return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil
@ -1,275 +0,0 @@
package jsonrpc
import (
type rpcHandler struct {
paramReceivers []reflect.Type
nParams int
receiver reflect.Value
handlerFunc reflect.Value
hasCtx int
errOut int
valOut int
type handlers map[string]rpcHandler
// Request / response
type request struct {
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Method string `json:"method"`
Params []param `json:"params"`
Meta map[string]string `json:"meta,omitempty"`
type respError struct {
Code int `json:"code"`
Message string `json:"message"`
func (e *respError) Error() string {
if e.Code >= -32768 && e.Code <= -32000 {
return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message)
return e.Message
type response struct {
Jsonrpc string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
ID int64 `json:"id"`
Error *respError `json:"error,omitempty"`
// Register
func (h handlers) register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
//TODO: expect ptr
for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
funcType := method.Func.Type()
hasCtx := 0
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
hasCtx = 1
ins := funcType.NumIn() - 1 - hasCtx
recvs := make([]reflect.Type, ins)
for i := 0; i < ins; i++ {
recvs[i] = method.Type.In(i + 1 + hasCtx)
valOut, errOut, _ := processFuncOut(funcType)
h[namespace+"."+method.Name] = rpcHandler{
paramReceivers: recvs,
nParams: ins,
handlerFunc: method.Func,
receiver: val,
hasCtx: hasCtx,
errOut: errOut,
valOut: valOut,
// Handle
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
type chanOut func(reflect.Value, int64) error
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
wf := func(cb func(io.Writer)) {
var req request
if err := json.NewDecoder(r).Decode(&req); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.Errorf("unmarshaling request: %w", err))
h.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
defer func() {
if i := recover(); i != nil {
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
out = f.Call(params)
return out, nil
func (handlers) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
if req.Meta == nil {
return ctx, nil
if eSC, ok := req.Meta["SpanContext"]; ok {
bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC)))
_, err := base64.StdEncoding.Decode(bSC, []byte(eSC))
if err != nil {
log.Errorf("SpanContext: decode", "error", err)
return ctx, nil
sc, ok := propagation.FromBinary(bSC)
if !ok {
log.Errorf("SpanContext: could not create span", "data", bSC)
return ctx, nil
ctx, span := trace.StartSpanWithRemoteParent(ctx, "api.handle", sc)
span.AddAttributes(trace.StringAttribute("method", req.Method))
return ctx, span
return ctx, nil
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
// Not sure if we need to sanitize the incoming req.Method or not.
ctx, span := h.getSpan(ctx, req)
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
defer span.End()
handler, ok := h[req.Method]
if !ok {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
if len(req.Params) != handler.nParams {
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
stats.Record(ctx, metrics.RPCRequestError.M(1))
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
defer done(outCh)
if chOut == nil && outCh {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
stats.Record(ctx, metrics.RPCRequestError.M(1))
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
callParams[1] = reflect.ValueOf(ctx)
for i := 0; i < handler.nParams; i++ {
rp := reflect.New(handler.paramReceivers[i])
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s' (param: %T): %w", req.Method, rp.Interface(), err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
if err != nil {
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
if req.ID == nil {
return // notification
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
var kind reflect.Kind
var res interface{}
var nonZero bool
if handler.valOut != -1 {
res = callResult[handler.valOut].Interface()
kind = callResult[handler.valOut].Kind()
nonZero = !callResult[handler.valOut].IsZero()
if res != nil && kind == reflect.Chan {
// Channel responses are sent from channel control goroutine.
// Sending responses here could cause deadlocks on writeLk, or allow
// sending channel messages before this rpc call returns
//noinspection GoNilness // already checked above
err = chOut(callResult[handler.valOut], *req.ID)
if err == nil {
return // channel goroutine handles responding
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
} else if resp.Error == nil {
// check error as JSON-RPC spec prohibits error and value at the same time
resp.Result = res
if resp.Error != nil && nonZero {
log.Errorw("error and res returned", "request", req, "r.err", resp.Error, "res", res)
w(func(w io.Writer) {
if err := json.NewEncoder(w).Encode(resp); err != nil {
stats.Record(ctx, metrics.RPCResponseError.M(1))
@ -1,25 +0,0 @@
package jsonrpc
import (
type Config struct {
ReconnectInterval time.Duration
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
var defaultConfig = Config{
ReconnectInterval: time.Second * 5,
type Option func(c *Config)
func WithReconnectInterval(d time.Duration) func(c *Config) {
return func(c *Config) {
c.ReconnectInterval = d
@ -1,623 +0,0 @@
package jsonrpc
import (
logging ""
func init() {
logging.SetLogLevel("rpc", "DEBUG")
type SimpleServerHandler struct {
n int
type TestType struct {
S string
I int
type TestOut struct {
Ok bool
func (h *SimpleServerHandler) Add(in int) error {
if in == -3546 {
return errors.New("test")
h.n += in
return nil
func (h *SimpleServerHandler) AddGet(in int) int {
h.n += in
return h.n
func (h *SimpleServerHandler) StringMatch(t TestType, i2 int64) (out TestOut, err error) {
if strconv.FormatInt(i2, 10) == t.S {
out.Ok = true
if i2 != int64(t.I) {
return TestOut{}, errors.New(":(")
out.I = t.I
out.S = t.S
func TestRPC(t *testing.T) {
// setup server
serverHandler := &SimpleServerHandler{}
rpcServer := NewServer()
rpcServer.Register("SimpleServerHandler", serverHandler)
// httptest stuff
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
// setup client
var client struct {
Add func(int) error
AddGet func(int) int
StringMatch func(t TestType, i2 int64) (out TestOut, err error)
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client, nil)
require.NoError(t, err)
defer closer()
// Add(int) error
require.NoError(t, client.Add(2))
require.Equal(t, 2, serverHandler.n)
err = client.Add(-3546)
require.EqualError(t, err, "test")
// AddGet(int) int
n := client.AddGet(3)
require.Equal(t, 5, n)
require.Equal(t, 5, serverHandler.n)
// StringMatch
o, err := client.StringMatch(TestType{S: "0"}, 0)
require.NoError(t, err)
require.Equal(t, "0", o.S)
require.Equal(t, 0, o.I)
_, err = client.StringMatch(TestType{S: "5"}, 5)
require.EqualError(t, err, ":(")
o, err = client.StringMatch(TestType{S: "8", I: 8}, 8)
require.NoError(t, err)
require.Equal(t, "8", o.S)
require.Equal(t, 8, o.I)
// Invalid client handlers
var noret struct {
Add func(int)
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret, nil)
require.NoError(t, err)
// this one should actually work
require.Equal(t, 9, serverHandler.n)
var noparam struct {
Add func()
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam, nil)
require.NoError(t, err)
// shouldn't panic
var erronly struct {
AddGet func() (int, error)
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly, nil)
require.NoError(t, err)
_, err = erronly.AddGet()
if err == nil || err.Error() != "RPC error (-32602): wrong param count" {
t.Error("wrong error:", err)
var wrongtype struct {
Add func(string) error
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype, nil)
require.NoError(t, err)
err = wrongtype.Add("not an int")
if err == nil || !strings.Contains(err.Error(), "RPC error (-32700):") || !strings.Contains(err.Error(), "json: cannot unmarshal string into Go value of type int") {
t.Error("wrong error:", err)
var notfound struct {
NotThere func(string) error
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found, nil)
require.NoError(t, err)
err = notfound.NotThere("hello?")
if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" {
t.Error("wrong error:", err)
type CtxHandler struct {
lk sync.Mutex
cancelled bool
i int
func (h *CtxHandler) Test(ctx context.Context) {
timeout := time.After(300 * time.Millisecond)
select {
case <-timeout:
case <-ctx.Done():
h.cancelled = true
func TestCtx(t *testing.T) {
// setup server
serverHandler := &CtxHandler{}
rpcServer := NewServer()
rpcServer.Register("CtxHandler", serverHandler)
// httptest stuff
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
// setup client
var client struct {
Test func(ctx context.Context)
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
if !serverHandler.cancelled {
t.Error("expected cancellation on the server side")
serverHandler.cancelled = false
var noCtxClient struct {
Test func()
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient, nil)
if err != nil {
if serverHandler.cancelled || serverHandler.i != 2 {
t.Error("wrong serverHandler state")
type UnUnmarshalable int
func (*UnUnmarshalable) UnmarshalJSON([]byte) error {
return errors.New("nope")
type UnUnmarshalableHandler struct{}
func (*UnUnmarshalableHandler) GetUnUnmarshalableStuff() (UnUnmarshalable, error) {
return UnUnmarshalable(5), nil
func TestUnmarshalableResult(t *testing.T) {
var client struct {
GetUnUnmarshalableStuff func() (UnUnmarshalable, error)
rpcServer := NewServer()
rpcServer.Register("Handler", &UnUnmarshalableHandler{})
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "Handler", &client, nil)
require.NoError(t, err)
defer closer()
_, err = client.GetUnUnmarshalableStuff()
require.EqualError(t, err, "RPC client error: unmarshaling result: nope")
type ChanHandler struct {
wait chan struct{}
ctxdone <-chan struct{}
func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error) {
out := make(chan int)
h.ctxdone = ctx.Done()
wait := h.wait
log.Warnf("SERVER SUB!")
go func() {
defer close(out)
var n int
for {
select {
case <-ctx.Done():
fmt.Println("ctxdone1", i, eq)
case <-wait:
fmt.Println("CONSUMED WAIT: ", i)
n += i
if n == eq {
select {
case <-ctx.Done():
case out <- n:
return out, nil
func TestChan(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
serverHandler := &ChanHandler{
wait: make(chan struct{}, 5),
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
serverHandler.wait <- struct{}{}
ctx, cancel := context.WithCancel(context.Background())
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
require.Equal(t, 2, <-sub)
// recv many (order)
serverHandler.wait <- struct{}{}
serverHandler.wait <- struct{}{}
serverHandler.wait <- struct{}{}
require.Equal(t, 4, <-sub)
require.Equal(t, 6, <-sub)
require.Equal(t, 8, <-sub)
// close (through ctx)
_, ok := <-sub
require.Equal(t, false, ok)
// sub (again)
serverHandler.wait = make(chan struct{}, 5)
serverHandler.wait <- struct{}{}
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
log.Warnf("last sub")
sub, err = client.Sub(ctx, 3, 6)
require.NoError(t, err)
log.Warnf("waiting for value now")
require.Equal(t, 3, <-sub)
log.Warnf("not equal")
// close (remote)
serverHandler.wait <- struct{}{}
_, ok = <-sub
require.Equal(t, false, ok)
func TestChanClosing(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
serverHandler := &ChanHandler{
wait: make(chan struct{}, 5),
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
ctx1, cancel1 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancel(context.Background())
// sub
sub1, err := client.Sub(ctx1, 2, -1)
require.NoError(t, err)
sub2, err := client.Sub(ctx2, 3, -1)
require.NoError(t, err)
// recv one
serverHandler.wait <- struct{}{}
serverHandler.wait <- struct{}{}
require.Equal(t, 2, <-sub1)
require.Equal(t, 3, <-sub2)
require.Equal(t, 0, <-sub1)
time.Sleep(time.Millisecond * 50) // make sure the loop has exited (having a shared wait channel makes this annoying)
serverHandler.wait <- struct{}{}
require.Equal(t, 6, <-sub2)
require.Equal(t, 0, <-sub2)
func TestChanServerClose(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
serverHandler := &ChanHandler{
wait: make(chan struct{}, 5),
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
tctx, tcancel := context.WithCancel(context.Background())
testServ := httptest.NewServer(rpcServer)
testServ.Config.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
return tctx
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
serverHandler.wait <- struct{}{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
require.Equal(t, 2, <-sub)
// make sure we're blocked
select {
case <-time.After(200 * time.Millisecond):
case <-sub:
t.Fatal("didn't expect to get anything from sub")
// close server
_, ok := <-sub
require.Equal(t, false, ok)
func TestServerChanLockClose(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
serverHandler := &ChanHandler{
wait: make(chan struct{}),
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
var closeConn func() error
_, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(),
[]interface{}{&client}, nil,
func(c *Config) {
c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) {
return func() (*websocket.Conn, error) {
c, err := f()
if err != nil {
return nil, err
closeConn = c.UnderlyingConn().Close
return c, nil
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
go func() {
serverHandler.wait <- struct{}{}
require.Equal(t, 2, <-sub)
for i := 0; i < 100; i++ {
serverHandler.wait <- struct{}{}
if err := closeConn(); err != nil {
func TestControlChanDeadlock(t *testing.T) {
for r := 0; r < 20; r++ {
func testControlChanDeadlock(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
n := 5000
serverHandler := &ChanHandler{
wait: make(chan struct{}, n),
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
for i := 0; i < n; i++ {
serverHandler.wait <- struct{}{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub, err := client.Sub(ctx, 1, -1)
require.NoError(t, err)
done := make(chan struct{})
go func() {
defer close(done)
for i := 0; i < n; i++ {
if <-sub != i+1 {
//require.Equal(t, i+1, <-sub)
// reset this channel so its not shared between the sub requests...
serverHandler.wait = make(chan struct{}, n)
for i := 0; i < n; i++ {
serverHandler.wait <- struct{}{}
_, err = client.Sub(ctx, 2, -1)
require.NoError(t, err)
@ -1,114 +0,0 @@
package jsonrpc
import (
const (
rpcParseError = -32700
rpcMethodNotFound = -32601
rpcInvalidParams = -32602
// RPCServer provides a jsonrpc 2.0 http server handler
type RPCServer struct {
methods handlers
// NewServer creates new RPCServer instance
func NewServer() *RPCServer {
return &RPCServer{
methods: map[string]rpcHandler{},
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// TODO: allow setting
// (note that we still are mostly covered by jwt tokens)
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
conn: c,
handler: s.methods,
exiting: make(chan struct{}),
if err := c.Close(); err != nil {
// TODO: return errors to clients per spec
func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h := strings.ToLower(r.Header.Get("Connection"))
if strings.Contains(h, "upgrade") {
s.handleWS(ctx, w, r)
s.methods.handleReader(ctx, r.Body, w, rpcError)
func rpcError(wf func(func(io.Writer)), req *request, code int, err error) {
log.Errorf("RPC Error: %s", err)
wf(func(w io.Writer) {
if hw, ok := w.(http.ResponseWriter); ok {
log.Warnf("rpc error: %s", err)
if req.ID == nil { // notification
resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
err = json.NewEncoder(w).Encode(resp)
if err != nil {
log.Warnf("failed to write rpc error: %s", err)
// Register registers new RPC handler
// Handler is any value with methods defined
func (s *RPCServer) Register(namespace string, handler interface{}) {
s.methods.register(namespace, handler)
var _ error = &respError{}
@ -1,55 +0,0 @@
package jsonrpc
import (
type param struct {
data []byte // from unmarshal
v reflect.Value // to marshal
func (p *param) UnmarshalJSON(raw []byte) error {
|||| = make([]byte, len(raw))
copy(, raw)
return nil
func (p *param) MarshalJSON() ([]byte, error) {
if p.v.Kind() == reflect.Invalid {
return, nil
return json.Marshal(p.v.Interface())
// processFuncOut finds value and error Outs in function
func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) {
errOut = -1 // -1 if not found
valOut = -1
n = funcType.NumOut()
switch n {
case 0:
case 1:
if funcType.Out(0) == errorType {
errOut = 0
} else {
valOut = 0
case 2:
valOut = 0
errOut = 1
if funcType.Out(1) != errorType {
panic("expected error as second return value")
errstr := fmt.Sprintf("too many return values: %s", funcType)
@ -1,560 +0,0 @@
package jsonrpc
import (
const wsCancel = "xrpc.cancel"
const chValue = ""
const chClose = ""
type frame struct {
// common
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Meta map[string]string `json:"meta,omitempty"`
// request
Method string `json:"method,omitempty"`
Params []param `json:"params,omitempty"`
// response
Result json.RawMessage `json:"result,omitempty"`
Error *respError `json:"error,omitempty"`
type outChanReg struct {
reqID int64
chID uint64
ch reflect.Value
type wsConn struct {
// outside params
conn *websocket.Conn
connFactory func() (*websocket.Conn, error)
reconnectInterval time.Duration
handler handlers
requests <-chan clientRequest
stop <-chan struct{}
exiting chan struct{}
// incoming messages
incoming chan io.Reader
incomingErr error
// outgoing messages
writeLk sync.Mutex
// ////
// Client related
// inflight are requests we've sent to the remote
inflight map[int64]clientRequest
// chanHandlers is a map of client-side channel handlers
chanHandlers map[uint64]func(m []byte, ok bool)
// ////
// Server related
// handling are the calls we handle
handling map[int64]context.CancelFunc
handlingLk sync.Mutex
spawnOutChanHandlerOnce sync.Once
// chanCtr is a counter used for identifying output channels on the server side
chanCtr uint64
registerCh chan outChanReg
// //
// WebSocket Message utils //
// //
// nextMessage wait for one message and puts it to the incoming channel
func (c *wsConn) nextMessage() {
msgType, r, err := c.conn.NextReader()
if err != nil {
c.incomingErr = err
if msgType != websocket.BinaryMessage && msgType != websocket.TextMessage {
c.incomingErr = errors.New("unsupported message type")
c.incoming <- r
// nextWriter waits for writeLk and invokes the cb callback with WS message
// writer when the lock is acquired
func (c *wsConn) nextWriter(cb func(io.Writer)) {
defer c.writeLk.Unlock()
wcl, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Error("handle me:", err)
if err := wcl.Close(); err != nil {
log.Error("handle me:", err)
func (c *wsConn) sendRequest(req request) error {
defer c.writeLk.Unlock()
if err := c.conn.WriteJSON(req); err != nil {
return err
return nil
// //
// Output channels //
// //
// handleOutChans handles channel communication on the server side
// (forwards channel messages to client)
func (c *wsConn) handleOutChans() {
regV := reflect.ValueOf(c.registerCh)
exitV := reflect.ValueOf(c.exiting)
cases := []reflect.SelectCase{
{ // registration chan always 0
Dir: reflect.SelectRecv,
Chan: regV,
{ // exit chan always 1
Dir: reflect.SelectRecv,
Chan: exitV,
internal := len(cases)
var caseToID []uint64
for {
chosen, val, ok := reflect.Select(cases)
switch chosen {
case 0: // registration channel
if !ok {
// control channel closed - signals closed connection
// This shouldn't happen, instead the exiting channel should get closed
log.Warn("control channel closed")
registration := val.Interface().(outChanReg)
caseToID = append(caseToID, registration.chID)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
c.nextWriter(func(w io.Writer) {
resp := &response{
Jsonrpc: "2.0",
ID: registration.reqID,
Result: registration.chID,
if err := json.NewEncoder(w).Encode(resp); err != nil {
case 1: // exiting channel
if !ok {
// exiting channel closed - signals closed connection
// We're not closing any channels as we're on receiving end.
// Also, context cancellation below should take care of any running
// requests
log.Warn("exiting channel received a message")
if !ok {
// Output channel closed, cleanup, and tell remote that this happened
id := caseToID[chosen-internal]
n := len(cases) - 1
if n > 0 {
cases[chosen] = cases[n]
caseToID[chosen-internal] = caseToID[n-internal]
cases = cases[:n]
caseToID = caseToID[:n-internal]
if err := c.sendRequest(request{
Jsonrpc: "2.0",
ID: nil, // notification
Method: chClose,
Params: []param{{v: reflect.ValueOf(id)}},
}); err != nil {
log.Warnf("closed out channel sendRequest failed: %s", err)
// forward message
if err := c.sendRequest(request{
Jsonrpc: "2.0",
ID: nil, // notification
Method: chValue,
Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}},
}); err != nil {
log.Warnf("sendRequest failed: %s", err)
// handleChanOut registers output channel for forwarding to client
func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error {
c.spawnOutChanHandlerOnce.Do(func() {
go c.handleOutChans()
id := atomic.AddUint64(&c.chanCtr, 1)
select {
case c.registerCh <- outChanReg{
reqID: req,
chID: id,
ch: ch,
return nil
case <-c.exiting:
return xerrors.New("connection closing")
// //
// Context.Done propagation //
// //
// handleCtxAsync handles context lifetimes for client
// TODO: this should be aware of events going through chanHandlers, and quit
// when the related channel is closed.
// This should also probably be a single goroutine,
// Note that not doing this should be fine for now as long as we are using
// contexts correctly (cancelling when async functions are no longer is use)
func (c *wsConn) handleCtxAsync(actx context.Context, id int64) {
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(id)}},
// cancelCtx is a built-in rpc which handles context cancellation over rpc
func (c *wsConn) cancelCtx(req frame) {
if req.ID != nil {
log.Warnf("%s call with ID set, won't respond", wsCancel)
var id int64
if err := json.Unmarshal(req.Params[0].data, &id); err != nil {
log.Error("handle me:", err)
defer c.handlingLk.Unlock()
cf, ok := c.handling[id]
if ok {
// //
// Main Handling logic //
// //
func (c *wsConn) handleChanMessage(frame frame) {
var chid uint64
if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil {
log.Error("failed to unmarshal channel id in %s", err)
hnd, ok := c.chanHandlers[chid]
if !ok {
log.Errorf(" handler %d not found", chid)
hnd(frame.Params[1].data, true)
func (c *wsConn) handleChanClose(frame frame) {
var chid uint64
if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil {
log.Error("failed to unmarshal channel id in %s", err)
hnd, ok := c.chanHandlers[chid]
if !ok {
log.Errorf(" handler %d not found", chid)
delete(c.chanHandlers, chid)
hnd(nil, false)
func (c *wsConn) handleResponse(frame frame) {
req, ok := c.inflight[*frame.ID]
if !ok {
log.Error("client got unknown ID in response")
if req.retCh != nil && frame.Result != nil {
// output is channel
var chid uint64
if err := json.Unmarshal(frame.Result, &chid); err != nil {
log.Errorf("failed to unmarshal channel id response: %s, data '%s'", err, string(frame.Result))
var chanCtx context.Context
chanCtx, c.chanHandlers[chid] = req.retCh()
go c.handleCtxAsync(chanCtx, *frame.ID)
req.ready <- clientResponse{
Jsonrpc: frame.Jsonrpc,
Result: frame.Result,
ID: *frame.ID,
Error: frame.Error,
delete(c.inflight, *frame.ID)
func (c *wsConn) handleCall(ctx context.Context, frame frame) {
req := request{
Jsonrpc: frame.Jsonrpc,
ID: frame.ID,
Meta: frame.Meta,
Method: frame.Method,
Params: frame.Params,
ctx, cancel := context.WithCancel(ctx)
nextWriter := func(cb func(io.Writer)) {
done := func(keepCtx bool) {
if !keepCtx {
if frame.ID != nil {
nextWriter = c.nextWriter
c.handling[*frame.ID] = cancel
done = func(keepctx bool) {
defer c.handlingLk.Unlock()
if !keepctx {
delete(c.handling, *frame.ID)
go c.handler.handle(ctx, req, nextWriter, rpcError, done, c.handleChanOut)
// handleFrame handles all incoming messages (calls and responses)
func (c *wsConn) handleFrame(ctx context.Context, frame frame) {
// Get message type by method name:
// "" - response
// "xrpc.*" - builtin
// anything else - incoming remote call
switch frame.Method {
case "": // Response to our call
case wsCancel:
case chValue:
case chClose:
default: // Remote call
c.handleCall(ctx, frame)
func (c *wsConn) closeInFlight() {
for id, req := range c.inflight {
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: id,
Error: &respError{
Message: "handler: websocket connection closed",
Code: 2,
for _, cancel := range c.handling {
c.inflight = map[int64]clientRequest{}
c.handling = map[int64]context.CancelFunc{}
func (c *wsConn) closeChans() {
for chid := range c.chanHandlers {
hnd := c.chanHandlers[chid]
delete(c.chanHandlers, chid)
hnd(nil, false)
func (c *wsConn) handleWsConn(ctx context.Context) {
c.incoming = make(chan io.Reader)
c.inflight = map[int64]clientRequest{}
c.handling = map[int64]context.CancelFunc{}
c.chanHandlers = map[uint64]func(m []byte, ok bool){}
c.registerCh = make(chan outChanReg)
defer close(c.exiting)
// ////
// on close, make sure to return from all pending calls, and cancel context
// on all calls we handle
defer c.closeInFlight()
// wait for the first message
go c.nextMessage()
for {
select {
case r, ok := <-c.incoming:
if !ok {
if c.incomingErr != nil {
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
log.Debugw("websocket error", "error", c.incomingErr)
// connection dropped unexpectedly, do our best to recover it
c.incoming = make(chan io.Reader) // listen again for responses
go func() {
if c.connFactory == nil { // likely the server side, don't try to reconnect
var conn *websocket.Conn
for conn == nil {
var err error
if conn, err = c.connFactory(); err != nil {
log.Debugw("websocket connection retry failed", "error", err)
c.conn = conn
c.incomingErr = nil
go c.nextMessage()
return // remote closed
// debug util - dump all messages to stderr
// r = io.TeeReader(r, os.Stderr)
var frame frame
if err := json.NewDecoder(r).Decode(&frame); err != nil {
log.Error("handle me:", err)
c.handleFrame(ctx, frame)
go c.nextMessage()
case req := <-c.requests:
if req.req.ID != nil {
if c.incomingErr != nil { // No conn?, immediate fail
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: *req.req.ID,
Error: &respError{
Message: "handler: websocket connection closed",
Code: 2,
c.inflight[*req.req.ID] = req
if err := c.sendRequest(req.req); err != nil {
log.Errorf("sendReqest failed (Handle me): %s", err)
case <-c.stop:
cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
if err := c.conn.WriteMessage(websocket.CloseMessage, cmsg); err != nil {
log.Warn("failed to write close message: ", err)
if err := c.conn.Close(); err != nil {
log.Warnw("websocket close error", "error", err)
@ -9,7 +9,8 @@ import (
@ -8,9 +8,9 @@ import (
const listenAddr = ""
@ -4,13 +4,14 @@ import (
rpcmetrics ""
// Global Tags
var (
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
RPCMethod, _ = tag.NewKey("method")
PeerID, _ = tag.NewKey("peer_id")
FailureType, _ = tag.NewKey("failure_type")
MessageFrom, _ = tag.NewKey("message_from")
@ -31,9 +32,6 @@ var (
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless)
var (
@ -82,26 +80,10 @@ var (
Measure: PeerCount,
Aggregation: view.LastValue(),
// All RPC related metrics should at the very least tag the RPCMethod
RPCInvalidMethodView = &view.View{
Measure: RPCInvalidMethod,
Aggregation: view.Count(),
TagKeys: []tag.Key{RPCMethod},
RPCRequestErrorView = &view.View{
Measure: RPCRequestError,
Aggregation: view.Count(),
TagKeys: []tag.Key{RPCMethod},
RPCResponseErrorView = &view.View{
Measure: RPCResponseError,
Aggregation: view.Count(),
TagKeys: []tag.Key{RPCMethod},
// DefaultViews is an array of OpenCensus views for metric gathering purposes
var DefaultViews = []*view.View{
var DefaultViews = append([]*view.View{
@ -111,8 +93,4 @@ var DefaultViews = []*view.View{
PeerCountView}, rpcmetrics.DefaultViews...)
@ -2,6 +2,7 @@ package impl
import (
@ -11,7 +12,6 @@ import (
@ -22,6 +22,7 @@ import (
@ -39,7 +40,6 @@ import (
genesis ""
@ -5,6 +5,7 @@ import (
manet ""
@ -16,7 +17,6 @@ import (
Reference in New Issue
Block a user