diff --git a/api/client/client.go b/api/client/client.go index d0facc55d..d3eceb24a 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -1,11 +1,12 @@ package client import ( - "github.com/filecoin-project/lotus/api/apistruct" "net/http" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/jsonrpc" + "github.com/filecoin-project/lotus/api/apistruct" ) // NewCommonRPC creates a new http jsonrpc client. diff --git a/cli/cmd.go b/cli/cmd.go index c27f31507..fd0ab1e78 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -16,9 +16,10 @@ import ( "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/node/repo" ) diff --git a/cmd/lotus-health/main.go b/cmd/lotus-health/main.go index b37aee29f..99b5b0129 100644 --- a/cmd/lotus-health/main.go +++ b/cmd/lotus-health/main.go @@ -8,14 +8,16 @@ import ( "syscall" "time" + cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/lib/jsonrpc" - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - "gopkg.in/urfave/cli.v2" ) type CidWindow [][]cid.Cid diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index ff7b0c5df..c17f41456 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -15,6 +15,7 @@ import ( "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/go-jsonrpc" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/sector-storage/ffiwrapper" @@ -23,7 +24,6 @@ import ( "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/auth" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/sector-storage" diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 477fb9a52..5ae4373e1 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -8,6 +8,7 @@ import ( "os/signal" "syscall" + "github.com/filecoin-project/go-jsonrpc" mux "github.com/gorilla/mux" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" @@ -19,7 +20,6 @@ import ( "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/auth" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 5b264421a..442f6fd61 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -9,14 +9,6 @@ import ( "os/signal" "syscall" - "github.com/filecoin-project/lotus/api/apistruct" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/lib/auth" - "github.com/filecoin-project/lotus/lib/jsonrpc" - "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/impl" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" @@ -24,6 +16,14 @@ import ( "golang.org/x/xerrors" "contrib.go.opencensus.io/exporter/prometheus" + + "github.com/filecoin-project/go-jsonrpc" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/lib/auth" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/impl" ) var log = logging.Logger("main") diff --git a/go.mod b/go.mod index b80f7d760..29b72cdd0 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.3.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 github.com/filecoin-project/go-fil-markets v0.2.3 + github.com/filecoin-project/go-jsonrpc v0.1.0 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200505180321-973f8949ea8e github.com/filecoin-project/go-statestore v0.1.0 @@ -59,7 +60,7 @@ require ( github.com/ipfs/go-ipld-cbor v0.0.5-0.20200428170625-a0bd04d3cbdf github.com/ipfs/go-ipld-format v0.2.0 github.com/ipfs/go-log v1.0.4 - github.com/ipfs/go-log/v2 v2.0.5 + github.com/ipfs/go-log/v2 v2.0.8 github.com/ipfs/go-merkledag v0.3.1 github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-unixfs v0.2.4 diff --git a/go.sum b/go.sum index 595cf49d2..fa44c7c2e 100644 --- a/go.sum +++ b/go.sum @@ -164,6 +164,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5/go github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc= github.com/filecoin-project/go-fil-markets v0.2.3 h1:00exBcwysQVEx7wvzcdVz9ZT3HLMXKmbQNIz9ktyeO8= github.com/filecoin-project/go-fil-markets v0.2.3/go.mod h1:LI3VFHse33aU0djAmFQ8+Hg39i0J8ibAoppGu6TbgkA= +github.com/filecoin-project/go-jsonrpc v0.1.0 h1:NBHruefnWWfbizxFMnStXlXKEAxEno3DrM0iLd8SuCM= +github.com/filecoin-project/go-jsonrpc v0.1.0/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= @@ -434,6 +436,8 @@ github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.5 h1:fL4YI+1g5V/b1Yxr1qAiXTMg1H8z9vx/VmJxBuQMHvU= github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= +github.com/ipfs/go-log/v2 v2.0.8 h1:3b3YNopMHlj4AvyhWAx0pDxqSQWYi4/WuWO7yRV6/Qg= +github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-merkledag v0.0.3/go.mod h1:Oc5kIXLHokkE1hWGMBHw+oxehkAaTOqtEb7Zbh6BhLA= github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto= github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go deleted file mode 100644 index 9953bace9..000000000 --- a/lib/jsonrpc/client.go +++ /dev/null @@ -1,435 +0,0 @@ -package jsonrpc - -import ( - "container/list" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "net/http" - "reflect" - "sync/atomic" - "time" - - "github.com/gorilla/websocket" - logging "github.com/ipfs/go-log/v2" - "go.opencensus.io/trace" - "go.opencensus.io/trace/propagation" - "golang.org/x/xerrors" -) - -const ( - methodRetryFrequency = time.Second * 3 -) - -var ( - errorType = reflect.TypeOf(new(error)).Elem() - contextType = reflect.TypeOf(new(context.Context)).Elem() - - log = logging.Logger("rpc") -) - -// ErrClient is an error which occurred on the client side the library -type ErrClient struct { - err error -} - -func (e *ErrClient) Error() string { - return fmt.Sprintf("RPC client error: %s", e.err) -} - -// Unwrap unwraps the actual error -func (e *ErrClient) Unwrap(err error) error { - return e.err -} - -type clientResponse struct { - Jsonrpc string `json:"jsonrpc"` - Result json.RawMessage `json:"result"` - ID int64 `json:"id"` - Error *respError `json:"error,omitempty"` -} - -type makeChanSink func() (context.Context, func([]byte, bool)) - -type clientRequest struct { - req request - ready chan clientResponse - - // retCh provides a context and sink for handling incoming channel messages - retCh makeChanSink -} - -// ClientCloser is used to close Client from further use -type ClientCloser func() - -// NewClient creates new jsonrpc 2.0 client -// -// handler must be pointer to a struct with function fields -// Returned value closes the client connection -// TODO: Example -func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) { - return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) -} - -type client struct { - namespace string - - requests chan clientRequest - exiting <-chan struct{} - idCtr int64 -} - -// NewMergeClient is like NewClient, but allows to specify multiple structs -// to be filled in the same namespace, using one connection -func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) { - var config Config - for _, o := range opts { - o(&config) - } - - connFactory := func() (*websocket.Conn, error) { - conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) - return conn, err - } - - if config.proxyConnFactory != nil { - connFactory = config.proxyConnFactory(connFactory) - } - - conn, err := connFactory() - if err != nil { - return nil, err - } - - c := client{ - namespace: namespace, - } - - stop := make(chan struct{}) - exiting := make(chan struct{}) - c.requests = make(chan clientRequest) - c.exiting = exiting - - handlers := map[string]rpcHandler{} - go (&wsConn{ - conn: conn, - connFactory: connFactory, - reconnectInterval: config.ReconnectInterval, - handler: handlers, - requests: c.requests, - stop: stop, - exiting: exiting, - }).handleWsConn(context.TODO()) - - for _, handler := range outs { - htyp := reflect.TypeOf(handler) - if htyp.Kind() != reflect.Ptr { - return nil, xerrors.New("expected handler to be a pointer") - } - typ := htyp.Elem() - if typ.Kind() != reflect.Struct { - return nil, xerrors.New("handler should be a struct") - } - - val := reflect.ValueOf(handler) - - for i := 0; i < typ.NumField(); i++ { - fn, err := c.makeRpcFunc(typ.Field(i)) - if err != nil { - return nil, err - } - - val.Elem().Field(i).Set(fn) - } - } - - return func() { - close(stop) - <-exiting - }, nil -} - -func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) { - retVal := reflect.Zero(ftyp.Out(valOut)) - - chCtor := func() (context.Context, func([]byte, bool)) { - // unpack chan type to make sure it's reflect.BothDir - ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) - ch := reflect.MakeChan(ctyp, 0) // todo: buffer? - chCtx, chCancel := context.WithCancel(ctx) - retVal = ch.Convert(ftyp.Out(valOut)) - - incoming := make(chan reflect.Value, 32) - - // gorotuine to handle buffering of items - go func() { - buf := (&list.List{}).Init() - - for { - front := buf.Front() - - cases := []reflect.SelectCase{ - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(chCtx.Done()), - }, - { - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(incoming), - }, - } - - if front != nil { - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectSend, - Chan: ch, - Send: front.Value.(reflect.Value).Elem(), - }) - } - - chosen, val, _ := reflect.Select(cases) - - switch chosen { - case 0: - ch.Close() - return - case 1: - vvval := val.Interface().(reflect.Value) - buf.PushBack(vvval) - if buf.Len() > 1 { - if buf.Len() > 10 { - log.Warnw("rpc output message buffer", "n", buf.Len()) - } else { - log.Infow("rpc output message buffer", "n", buf.Len()) - } - } - - case 2: - buf.Remove(front) - } - } - }() - - return ctx, func(result []byte, ok bool) { - if !ok { - chCancel() - return - } - - val := reflect.New(ftyp.Out(valOut).Elem()) - if err := json.Unmarshal(result, val.Interface()); err != nil { - log.Errorf("error unmarshaling chan response: %s", err) - return - } - - if ctx.Err() != nil { - log.Errorf("got rpc message with cancelled context: %s", ctx.Err()) - return - } - - select { - case incoming <- val: - case <-chCtx.Done(): - } - } - } - - return func() reflect.Value { return retVal }, chCtor -} - -func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) { - rchan := make(chan clientResponse, 1) - creq := clientRequest{ - req: req, - ready: rchan, - - retCh: chCtor, - } - select { - case c.requests <- creq: - case <-c.exiting: - return clientResponse{}, fmt.Errorf("websocket routine exiting") - } - - var ctxDone <-chan struct{} - var resp clientResponse - - if ctx != nil { - ctxDone = ctx.Done() - } - - // wait for response, handle context cancellation -loop: - for { - select { - case resp = <-rchan: - break loop - case <-ctxDone: // send cancel request - ctxDone = nil - - cancelReq := clientRequest{ - req: request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(*req.ID)}}, - }, - } - select { - case c.requests <- cancelReq: - case <-c.exiting: - log.Warn("failed to send request cancellation, websocket routing exited") - } - } - } - - return resp, nil -} - -type rpcFunc struct { - client *client - - ftyp reflect.Type - name string - - nout int - valOut int - errOut int - - hasCtx int - returnValueIsChannel bool - - retry bool -} - -func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { - out := make([]reflect.Value, fn.nout) - - if fn.valOut != -1 { - out[fn.valOut] = rval - } - if fn.errOut != -1 { - out[fn.errOut] = reflect.New(errorType).Elem() - if resp.Error != nil { - out[fn.errOut].Set(reflect.ValueOf(resp.Error)) - } - } - - return out -} - -func (fn *rpcFunc) processError(err error) []reflect.Value { - out := make([]reflect.Value, fn.nout) - - if fn.valOut != -1 { - out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem() - } - if fn.errOut != -1 { - out[fn.errOut] = reflect.New(errorType).Elem() - out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err})) - } - - return out -} - -func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) { - id := atomic.AddInt64(&fn.client.idCtr, 1) - params := make([]param, len(args)-fn.hasCtx) - for i, arg := range args[fn.hasCtx:] { - params[i] = param{ - v: arg, - } - } - - var ctx context.Context - var span *trace.Span - if fn.hasCtx == 1 { - ctx = args[0].Interface().(context.Context) - ctx, span = trace.StartSpan(ctx, "api.call") - defer span.End() - } - - retVal := func() reflect.Value { return reflect.Value{} } - - // if the function returns a channel, we need to provide a sink for the - // messages - var chCtor makeChanSink - if fn.returnValueIsChannel { - retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut) - } - - req := request{ - Jsonrpc: "2.0", - ID: &id, - Method: fn.client.namespace + "." + fn.name, - Params: params, - } - - if span != nil { - span.AddAttributes(trace.StringAttribute("method", req.Method)) - - eSC := base64.StdEncoding.EncodeToString( - propagation.Binary(span.SpanContext())) - req.Meta = map[string]string{ - "SpanContext": eSC, - } - } - - var resp clientResponse - var err error - // keep retrying if got a forced closed websocket conn and calling method - // has retry annotation - for { - resp, err = fn.client.sendRequest(ctx, req, chCtor) - if err != nil { - return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) - } - - if resp.ID != *req.ID { - return fn.processError(xerrors.New("request and response id didn't match")) - } - - if fn.valOut != -1 && !fn.returnValueIsChannel { - val := reflect.New(fn.ftyp.Out(fn.valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) - if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { - log.Warnw("unmarshaling failed", "message", string(resp.Result)) - return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) - } - } - - retVal = func() reflect.Value { return val.Elem() } - } - retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry - if !retry { - break - } - time.Sleep(methodRetryFrequency) - } - - return fn.processResponse(resp, retVal()) -} - -func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { - ftyp := f.Type - if ftyp.Kind() != reflect.Func { - return reflect.Value{}, xerrors.New("handler field not a func") - } - - fun := &rpcFunc{ - client: c, - ftyp: ftyp, - name: f.Name, - retry: f.Tag.Get("retry") == "true", - } - fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) - - if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { - fun.hasCtx = 1 - } - fun.returnValueIsChannel = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan - - return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil -} diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go deleted file mode 100644 index 5d4380046..000000000 --- a/lib/jsonrpc/handler.go +++ /dev/null @@ -1,275 +0,0 @@ -package jsonrpc - -import ( - "bytes" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "reflect" - - "github.com/filecoin-project/lotus/metrics" - "go.opencensus.io/stats" - "go.opencensus.io/tag" - "go.opencensus.io/trace" - "go.opencensus.io/trace/propagation" - "golang.org/x/xerrors" -) - -type rpcHandler struct { - paramReceivers []reflect.Type - nParams int - - receiver reflect.Value - handlerFunc reflect.Value - - hasCtx int - - errOut int - valOut int -} - -type handlers map[string]rpcHandler - -// Request / response - -type request struct { - Jsonrpc string `json:"jsonrpc"` - ID *int64 `json:"id,omitempty"` - Method string `json:"method"` - Params []param `json:"params"` - Meta map[string]string `json:"meta,omitempty"` -} - -type respError struct { - Code int `json:"code"` - Message string `json:"message"` -} - -func (e *respError) Error() string { - if e.Code >= -32768 && e.Code <= -32000 { - return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message) - } - return e.Message -} - -type response struct { - Jsonrpc string `json:"jsonrpc"` - Result interface{} `json:"result,omitempty"` - ID int64 `json:"id"` - Error *respError `json:"error,omitempty"` -} - -// Register - -func (h handlers) register(namespace string, r interface{}) { - val := reflect.ValueOf(r) - //TODO: expect ptr - - for i := 0; i < val.NumMethod(); i++ { - method := val.Type().Method(i) - - funcType := method.Func.Type() - hasCtx := 0 - if funcType.NumIn() >= 2 && funcType.In(1) == contextType { - hasCtx = 1 - } - - ins := funcType.NumIn() - 1 - hasCtx - recvs := make([]reflect.Type, ins) - for i := 0; i < ins; i++ { - recvs[i] = method.Type.In(i + 1 + hasCtx) - } - - valOut, errOut, _ := processFuncOut(funcType) - - h[namespace+"."+method.Name] = rpcHandler{ - paramReceivers: recvs, - nParams: ins, - - handlerFunc: method.Func, - receiver: val, - - hasCtx: hasCtx, - - errOut: errOut, - valOut: valOut, - } - } -} - -// Handle - -type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error) -type chanOut func(reflect.Value, int64) error - -func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { - wf := func(cb func(io.Writer)) { - cb(w) - } - - var req request - if err := json.NewDecoder(r).Decode(&req); err != nil { - rpcError(wf, &req, rpcParseError, xerrors.Errorf("unmarshaling request: %w", err)) - return - } - - h.handle(ctx, req, wf, rpcError, func(bool) {}, nil) -} - -func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) { - defer func() { - if i := recover(); i != nil { - err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i) - log.Error(err) - } - }() - - out = f.Call(params) - return out, nil -} - -func (handlers) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) { - if req.Meta == nil { - return ctx, nil - } - if eSC, ok := req.Meta["SpanContext"]; ok { - bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC))) - _, err := base64.StdEncoding.Decode(bSC, []byte(eSC)) - if err != nil { - log.Errorf("SpanContext: decode", "error", err) - return ctx, nil - } - sc, ok := propagation.FromBinary(bSC) - if !ok { - log.Errorf("SpanContext: could not create span", "data", bSC) - return ctx, nil - } - ctx, span := trace.StartSpanWithRemoteParent(ctx, "api.handle", sc) - span.AddAttributes(trace.StringAttribute("method", req.Method)) - return ctx, span - } - return ctx, nil -} - -func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) { - // Not sure if we need to sanitize the incoming req.Method or not. - ctx, span := h.getSpan(ctx, req) - ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method)) - defer span.End() - - handler, ok := h[req.Method] - if !ok { - rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) - stats.Record(ctx, metrics.RPCInvalidMethod.M(1)) - done(false) - return - } - - if len(req.Params) != handler.nParams { - rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count")) - stats.Record(ctx, metrics.RPCRequestError.M(1)) - done(false) - return - } - - outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan - defer done(outCh) - - if chOut == nil && outCh { - rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method)) - stats.Record(ctx, metrics.RPCRequestError.M(1)) - return - } - - callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams) - callParams[0] = handler.receiver - if handler.hasCtx == 1 { - callParams[1] = reflect.ValueOf(ctx) - } - - for i := 0; i < handler.nParams; i++ { - rp := reflect.New(handler.paramReceivers[i]) - if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil { - rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s' (param: %T): %w", req.Method, rp.Interface(), err)) - stats.Record(ctx, metrics.RPCRequestError.M(1)) - return - } - - callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface()) - } - - /////////////////// - - callResult, err := doCall(req.Method, handler.handlerFunc, callParams) - if err != nil { - rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err)) - stats.Record(ctx, metrics.RPCRequestError.M(1)) - return - } - if req.ID == nil { - return // notification - } - - /////////////////// - - resp := response{ - Jsonrpc: "2.0", - ID: *req.ID, - } - - if handler.errOut != -1 { - err := callResult[handler.errOut].Interface() - if err != nil { - log.Warnf("error in RPC call to '%s': %+v", req.Method, err) - stats.Record(ctx, metrics.RPCResponseError.M(1)) - resp.Error = &respError{ - Code: 1, - Message: err.(error).Error(), - } - } - } - - var kind reflect.Kind - var res interface{} - var nonZero bool - if handler.valOut != -1 { - res = callResult[handler.valOut].Interface() - kind = callResult[handler.valOut].Kind() - nonZero = !callResult[handler.valOut].IsZero() - } - - if res != nil && kind == reflect.Chan { - // Channel responses are sent from channel control goroutine. - // Sending responses here could cause deadlocks on writeLk, or allow - // sending channel messages before this rpc call returns - - //noinspection GoNilness // already checked above - err = chOut(callResult[handler.valOut], *req.ID) - if err == nil { - return // channel goroutine handles responding - } - - log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err) - stats.Record(ctx, metrics.RPCResponseError.M(1)) - resp.Error = &respError{ - Code: 1, - Message: err.(error).Error(), - } - } else if resp.Error == nil { - // check error as JSON-RPC spec prohibits error and value at the same time - resp.Result = res - } - if resp.Error != nil && nonZero { - log.Errorw("error and res returned", "request", req, "r.err", resp.Error, "res", res) - } - - w(func(w io.Writer) { - if err := json.NewEncoder(w).Encode(resp); err != nil { - log.Error(err) - stats.Record(ctx, metrics.RPCResponseError.M(1)) - return - } - }) -} diff --git a/lib/jsonrpc/options.go b/lib/jsonrpc/options.go deleted file mode 100644 index ce43742b3..000000000 --- a/lib/jsonrpc/options.go +++ /dev/null @@ -1,25 +0,0 @@ -package jsonrpc - -import ( - "time" - - "github.com/gorilla/websocket" -) - -type Config struct { - ReconnectInterval time.Duration - - proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing -} - -var defaultConfig = Config{ - ReconnectInterval: time.Second * 5, -} - -type Option func(c *Config) - -func WithReconnectInterval(d time.Duration) func(c *Config) { - return func(c *Config) { - c.ReconnectInterval = d - } -} diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go deleted file mode 100644 index e41e96c3d..000000000 --- a/lib/jsonrpc/rpc_test.go +++ /dev/null @@ -1,623 +0,0 @@ -package jsonrpc - -import ( - "context" - "errors" - "fmt" - "net" - "net/http/httptest" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/gorilla/websocket" - logging "github.com/ipfs/go-log/v2" - "github.com/stretchr/testify/require" -) - -func init() { - logging.SetLogLevel("rpc", "DEBUG") -} - -type SimpleServerHandler struct { - n int -} - -type TestType struct { - S string - I int -} - -type TestOut struct { - TestType - Ok bool -} - -func (h *SimpleServerHandler) Add(in int) error { - if in == -3546 { - return errors.New("test") - } - - h.n += in - - return nil -} - -func (h *SimpleServerHandler) AddGet(in int) int { - h.n += in - return h.n -} - -func (h *SimpleServerHandler) StringMatch(t TestType, i2 int64) (out TestOut, err error) { - if strconv.FormatInt(i2, 10) == t.S { - out.Ok = true - } - if i2 != int64(t.I) { - return TestOut{}, errors.New(":(") - } - out.I = t.I - out.S = t.S - return -} - -func TestRPC(t *testing.T) { - // setup server - - serverHandler := &SimpleServerHandler{} - - rpcServer := NewServer() - rpcServer.Register("SimpleServerHandler", serverHandler) - - // httptest stuff - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - // setup client - - var client struct { - Add func(int) error - AddGet func(int) int - StringMatch func(t TestType, i2 int64) (out TestOut, err error) - } - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client, nil) - require.NoError(t, err) - defer closer() - - // Add(int) error - - require.NoError(t, client.Add(2)) - require.Equal(t, 2, serverHandler.n) - - err = client.Add(-3546) - require.EqualError(t, err, "test") - - // AddGet(int) int - - n := client.AddGet(3) - require.Equal(t, 5, n) - require.Equal(t, 5, serverHandler.n) - - // StringMatch - - o, err := client.StringMatch(TestType{S: "0"}, 0) - require.NoError(t, err) - require.Equal(t, "0", o.S) - require.Equal(t, 0, o.I) - - _, err = client.StringMatch(TestType{S: "5"}, 5) - require.EqualError(t, err, ":(") - - o, err = client.StringMatch(TestType{S: "8", I: 8}, 8) - require.NoError(t, err) - require.Equal(t, "8", o.S) - require.Equal(t, 8, o.I) - - // Invalid client handlers - - var noret struct { - Add func(int) - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret, nil) - require.NoError(t, err) - - // this one should actually work - noret.Add(4) - require.Equal(t, 9, serverHandler.n) - closer() - - var noparam struct { - Add func() - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam, nil) - require.NoError(t, err) - - // shouldn't panic - noparam.Add() - closer() - - var erronly struct { - AddGet func() (int, error) - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly, nil) - require.NoError(t, err) - - _, err = erronly.AddGet() - if err == nil || err.Error() != "RPC error (-32602): wrong param count" { - t.Error("wrong error:", err) - } - closer() - - var wrongtype struct { - Add func(string) error - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype, nil) - require.NoError(t, err) - - err = wrongtype.Add("not an int") - if err == nil || !strings.Contains(err.Error(), "RPC error (-32700):") || !strings.Contains(err.Error(), "json: cannot unmarshal string into Go value of type int") { - t.Error("wrong error:", err) - } - closer() - - var notfound struct { - NotThere func(string) error - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found, nil) - require.NoError(t, err) - - err = notfound.NotThere("hello?") - if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" { - t.Error("wrong error:", err) - } - closer() -} - -type CtxHandler struct { - lk sync.Mutex - - cancelled bool - i int -} - -func (h *CtxHandler) Test(ctx context.Context) { - h.lk.Lock() - defer h.lk.Unlock() - timeout := time.After(300 * time.Millisecond) - h.i++ - - select { - case <-timeout: - case <-ctx.Done(): - h.cancelled = true - } -} - -func TestCtx(t *testing.T) { - // setup server - - serverHandler := &CtxHandler{} - - rpcServer := NewServer() - rpcServer.Register("CtxHandler", serverHandler) - - // httptest stuff - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - - // setup client - - var client struct { - Test func(ctx context.Context) - } - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client, nil) - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - client.Test(ctx) - serverHandler.lk.Lock() - - if !serverHandler.cancelled { - t.Error("expected cancellation on the server side") - } - - serverHandler.cancelled = false - - serverHandler.lk.Unlock() - closer() - - var noCtxClient struct { - Test func() - } - closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &noCtxClient, nil) - if err != nil { - t.Fatal(err) - } - - noCtxClient.Test() - - serverHandler.lk.Lock() - - if serverHandler.cancelled || serverHandler.i != 2 { - t.Error("wrong serverHandler state") - } - - serverHandler.lk.Unlock() - closer() -} - -type UnUnmarshalable int - -func (*UnUnmarshalable) UnmarshalJSON([]byte) error { - return errors.New("nope") -} - -type UnUnmarshalableHandler struct{} - -func (*UnUnmarshalableHandler) GetUnUnmarshalableStuff() (UnUnmarshalable, error) { - return UnUnmarshalable(5), nil -} - -func TestUnmarshalableResult(t *testing.T) { - var client struct { - GetUnUnmarshalableStuff func() (UnUnmarshalable, error) - } - - rpcServer := NewServer() - rpcServer.Register("Handler", &UnUnmarshalableHandler{}) - - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "Handler", &client, nil) - require.NoError(t, err) - defer closer() - - _, err = client.GetUnUnmarshalableStuff() - require.EqualError(t, err, "RPC client error: unmarshaling result: nope") -} - -type ChanHandler struct { - wait chan struct{} - ctxdone <-chan struct{} -} - -func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error) { - out := make(chan int) - h.ctxdone = ctx.Done() - - wait := h.wait - - log.Warnf("SERVER SUB!") - go func() { - defer close(out) - var n int - - for { - select { - case <-ctx.Done(): - fmt.Println("ctxdone1", i, eq) - return - case <-wait: - fmt.Println("CONSUMED WAIT: ", i) - } - - n += i - - if n == eq { - fmt.Println("eq") - return - } - - select { - case <-ctx.Done(): - fmt.Println("ctxdone2") - return - case out <- n: - } - } - }() - - return out, nil -} - -func TestChan(t *testing.T) { - var client struct { - Sub func(context.Context, int, int) (<-chan int, error) - } - - serverHandler := &ChanHandler{ - wait: make(chan struct{}, 5), - } - - rpcServer := NewServer() - rpcServer.Register("ChanHandler", serverHandler) - - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) - require.NoError(t, err) - - defer closer() - - serverHandler.wait <- struct{}{} - - ctx, cancel := context.WithCancel(context.Background()) - - // sub - - sub, err := client.Sub(ctx, 2, -1) - require.NoError(t, err) - - // recv one - - require.Equal(t, 2, <-sub) - - // recv many (order) - - serverHandler.wait <- struct{}{} - serverHandler.wait <- struct{}{} - serverHandler.wait <- struct{}{} - - require.Equal(t, 4, <-sub) - require.Equal(t, 6, <-sub) - require.Equal(t, 8, <-sub) - - // close (through ctx) - cancel() - - _, ok := <-sub - require.Equal(t, false, ok) - - // sub (again) - - serverHandler.wait = make(chan struct{}, 5) - serverHandler.wait <- struct{}{} - - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() - - log.Warnf("last sub") - sub, err = client.Sub(ctx, 3, 6) - require.NoError(t, err) - - log.Warnf("waiting for value now") - require.Equal(t, 3, <-sub) - log.Warnf("not equal") - - // close (remote) - serverHandler.wait <- struct{}{} - _, ok = <-sub - require.Equal(t, false, ok) -} - -func TestChanClosing(t *testing.T) { - var client struct { - Sub func(context.Context, int, int) (<-chan int, error) - } - - serverHandler := &ChanHandler{ - wait: make(chan struct{}, 5), - } - - rpcServer := NewServer() - rpcServer.Register("ChanHandler", serverHandler) - - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) - require.NoError(t, err) - - defer closer() - - ctx1, cancel1 := context.WithCancel(context.Background()) - ctx2, cancel2 := context.WithCancel(context.Background()) - - // sub - - sub1, err := client.Sub(ctx1, 2, -1) - require.NoError(t, err) - - sub2, err := client.Sub(ctx2, 3, -1) - require.NoError(t, err) - - // recv one - - serverHandler.wait <- struct{}{} - serverHandler.wait <- struct{}{} - - require.Equal(t, 2, <-sub1) - require.Equal(t, 3, <-sub2) - - cancel1() - - require.Equal(t, 0, <-sub1) - time.Sleep(time.Millisecond * 50) // make sure the loop has exited (having a shared wait channel makes this annoying) - - serverHandler.wait <- struct{}{} - require.Equal(t, 6, <-sub2) - - cancel2() - require.Equal(t, 0, <-sub2) -} - -func TestChanServerClose(t *testing.T) { - var client struct { - Sub func(context.Context, int, int) (<-chan int, error) - } - - serverHandler := &ChanHandler{ - wait: make(chan struct{}, 5), - } - - rpcServer := NewServer() - rpcServer.Register("ChanHandler", serverHandler) - - tctx, tcancel := context.WithCancel(context.Background()) - - testServ := httptest.NewServer(rpcServer) - testServ.Config.ConnContext = func(ctx context.Context, c net.Conn) context.Context { - return tctx - } - - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) - require.NoError(t, err) - - defer closer() - - serverHandler.wait <- struct{}{} - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // sub - - sub, err := client.Sub(ctx, 2, -1) - require.NoError(t, err) - - // recv one - - require.Equal(t, 2, <-sub) - - // make sure we're blocked - - select { - case <-time.After(200 * time.Millisecond): - case <-sub: - t.Fatal("didn't expect to get anything from sub") - } - - // close server - - tcancel() - testServ.Close() - - _, ok := <-sub - require.Equal(t, false, ok) -} - -func TestServerChanLockClose(t *testing.T) { - var client struct { - Sub func(context.Context, int, int) (<-chan int, error) - } - - serverHandler := &ChanHandler{ - wait: make(chan struct{}), - } - - rpcServer := NewServer() - rpcServer.Register("ChanHandler", serverHandler) - - testServ := httptest.NewServer(rpcServer) - - var closeConn func() error - - _, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(), - "ChanHandler", - []interface{}{&client}, nil, - func(c *Config) { - c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) { - return func() (*websocket.Conn, error) { - c, err := f() - if err != nil { - return nil, err - } - - closeConn = c.UnderlyingConn().Close - - return c, nil - } - } - }) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // sub - - sub, err := client.Sub(ctx, 2, -1) - require.NoError(t, err) - - // recv one - - go func() { - serverHandler.wait <- struct{}{} - }() - require.Equal(t, 2, <-sub) - - for i := 0; i < 100; i++ { - serverHandler.wait <- struct{}{} - } - - if err := closeConn(); err != nil { - t.Fatal(err) - } - - <-serverHandler.ctxdone -} - -func TestControlChanDeadlock(t *testing.T) { - for r := 0; r < 20; r++ { - testControlChanDeadlock(t) - } -} - -func testControlChanDeadlock(t *testing.T) { - var client struct { - Sub func(context.Context, int, int) (<-chan int, error) - } - - n := 5000 - - serverHandler := &ChanHandler{ - wait: make(chan struct{}, n), - } - - rpcServer := NewServer() - rpcServer.Register("ChanHandler", serverHandler) - - testServ := httptest.NewServer(rpcServer) - defer testServ.Close() - - closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) - require.NoError(t, err) - - defer closer() - - for i := 0; i < n; i++ { - serverHandler.wait <- struct{}{} - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sub, err := client.Sub(ctx, 1, -1) - require.NoError(t, err) - - done := make(chan struct{}) - - go func() { - defer close(done) - for i := 0; i < n; i++ { - if <-sub != i+1 { - panic("bad!") - //require.Equal(t, i+1, <-sub) - } - } - }() - - // reset this channel so its not shared between the sub requests... - serverHandler.wait = make(chan struct{}, n) - for i := 0; i < n; i++ { - serverHandler.wait <- struct{}{} - } - - _, err = client.Sub(ctx, 2, -1) - require.NoError(t, err) - <-done -} diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go deleted file mode 100644 index 860cd6414..000000000 --- a/lib/jsonrpc/server.go +++ /dev/null @@ -1,114 +0,0 @@ -package jsonrpc - -import ( - "context" - "encoding/json" - "io" - "net/http" - "strings" - - "github.com/gorilla/websocket" -) - -const ( - rpcParseError = -32700 - rpcMethodNotFound = -32601 - rpcInvalidParams = -32602 -) - -// RPCServer provides a jsonrpc 2.0 http server handler -type RPCServer struct { - methods handlers -} - -// NewServer creates new RPCServer instance -func NewServer() *RPCServer { - return &RPCServer{ - methods: map[string]rpcHandler{}, - } -} - -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http.Request) { - // TODO: allow setting - // (note that we still are mostly covered by jwt tokens) - w.Header().Set("Access-Control-Allow-Origin", "*") - if r.Header.Get("Sec-WebSocket-Protocol") != "" { - w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) - } - - c, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Error(err) - w.WriteHeader(500) - return - } - - (&wsConn{ - conn: c, - handler: s.methods, - exiting: make(chan struct{}), - }).handleWsConn(ctx) - - if err := c.Close(); err != nil { - log.Error(err) - return - } -} - -// TODO: return errors to clients per spec -func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - h := strings.ToLower(r.Header.Get("Connection")) - if strings.Contains(h, "upgrade") { - s.handleWS(ctx, w, r) - return - } - - s.methods.handleReader(ctx, r.Body, w, rpcError) -} - -func rpcError(wf func(func(io.Writer)), req *request, code int, err error) { - log.Errorf("RPC Error: %s", err) - wf(func(w io.Writer) { - if hw, ok := w.(http.ResponseWriter); ok { - hw.WriteHeader(500) - } - - log.Warnf("rpc error: %s", err) - - if req.ID == nil { // notification - return - } - - resp := response{ - Jsonrpc: "2.0", - ID: *req.ID, - Error: &respError{ - Code: code, - Message: err.Error(), - }, - } - - err = json.NewEncoder(w).Encode(resp) - if err != nil { - log.Warnf("failed to write rpc error: %s", err) - return - } - }) -} - -// Register registers new RPC handler -// -// Handler is any value with methods defined -func (s *RPCServer) Register(namespace string, handler interface{}) { - s.methods.register(namespace, handler) -} - -var _ error = &respError{} diff --git a/lib/jsonrpc/util.go b/lib/jsonrpc/util.go deleted file mode 100644 index aab88ece8..000000000 --- a/lib/jsonrpc/util.go +++ /dev/null @@ -1,55 +0,0 @@ -package jsonrpc - -import ( - "encoding/json" - "fmt" - "reflect" -) - -type param struct { - data []byte // from unmarshal - - v reflect.Value // to marshal -} - -func (p *param) UnmarshalJSON(raw []byte) error { - p.data = make([]byte, len(raw)) - copy(p.data, raw) - return nil -} - -func (p *param) MarshalJSON() ([]byte, error) { - if p.v.Kind() == reflect.Invalid { - return p.data, nil - } - - return json.Marshal(p.v.Interface()) -} - -// processFuncOut finds value and error Outs in function -func processFuncOut(funcType reflect.Type) (valOut int, errOut int, n int) { - errOut = -1 // -1 if not found - valOut = -1 - n = funcType.NumOut() - - switch n { - case 0: - case 1: - if funcType.Out(0) == errorType { - errOut = 0 - } else { - valOut = 0 - } - case 2: - valOut = 0 - errOut = 1 - if funcType.Out(1) != errorType { - panic("expected error as second return value") - } - default: - errstr := fmt.Sprintf("too many return values: %s", funcType) - panic(errstr) - } - - return -} diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go deleted file mode 100644 index 568eaa37f..000000000 --- a/lib/jsonrpc/websocket.go +++ /dev/null @@ -1,560 +0,0 @@ -package jsonrpc - -import ( - "context" - "encoding/json" - "errors" - "io" - "io/ioutil" - "reflect" - "sync" - "sync/atomic" - "time" - - "github.com/gorilla/websocket" - "golang.org/x/xerrors" -) - -const wsCancel = "xrpc.cancel" -const chValue = "xrpc.ch.val" -const chClose = "xrpc.ch.close" - -type frame struct { - // common - Jsonrpc string `json:"jsonrpc"` - ID *int64 `json:"id,omitempty"` - Meta map[string]string `json:"meta,omitempty"` - - // request - Method string `json:"method,omitempty"` - Params []param `json:"params,omitempty"` - - // response - Result json.RawMessage `json:"result,omitempty"` - Error *respError `json:"error,omitempty"` -} - -type outChanReg struct { - reqID int64 - - chID uint64 - ch reflect.Value -} - -type wsConn struct { - // outside params - conn *websocket.Conn - connFactory func() (*websocket.Conn, error) - reconnectInterval time.Duration - handler handlers - requests <-chan clientRequest - stop <-chan struct{} - exiting chan struct{} - - // incoming messages - incoming chan io.Reader - incomingErr error - - // outgoing messages - writeLk sync.Mutex - - // //// - // Client related - - // inflight are requests we've sent to the remote - inflight map[int64]clientRequest - - // chanHandlers is a map of client-side channel handlers - chanHandlers map[uint64]func(m []byte, ok bool) - - // //// - // Server related - - // handling are the calls we handle - handling map[int64]context.CancelFunc - handlingLk sync.Mutex - - spawnOutChanHandlerOnce sync.Once - - // chanCtr is a counter used for identifying output channels on the server side - chanCtr uint64 - - registerCh chan outChanReg -} - -// // -// WebSocket Message utils // -// // - -// nextMessage wait for one message and puts it to the incoming channel -func (c *wsConn) nextMessage() { - msgType, r, err := c.conn.NextReader() - if err != nil { - c.incomingErr = err - close(c.incoming) - return - } - if msgType != websocket.BinaryMessage && msgType != websocket.TextMessage { - c.incomingErr = errors.New("unsupported message type") - close(c.incoming) - return - } - c.incoming <- r -} - -// nextWriter waits for writeLk and invokes the cb callback with WS message -// writer when the lock is acquired -func (c *wsConn) nextWriter(cb func(io.Writer)) { - c.writeLk.Lock() - defer c.writeLk.Unlock() - - wcl, err := c.conn.NextWriter(websocket.TextMessage) - if err != nil { - log.Error("handle me:", err) - return - } - - cb(wcl) - - if err := wcl.Close(); err != nil { - log.Error("handle me:", err) - return - } -} - -func (c *wsConn) sendRequest(req request) error { - c.writeLk.Lock() - defer c.writeLk.Unlock() - if err := c.conn.WriteJSON(req); err != nil { - return err - } - return nil -} - -// // -// Output channels // -// // - -// handleOutChans handles channel communication on the server side -// (forwards channel messages to client) -func (c *wsConn) handleOutChans() { - regV := reflect.ValueOf(c.registerCh) - exitV := reflect.ValueOf(c.exiting) - - cases := []reflect.SelectCase{ - { // registration chan always 0 - Dir: reflect.SelectRecv, - Chan: regV, - }, - { // exit chan always 1 - Dir: reflect.SelectRecv, - Chan: exitV, - }, - } - internal := len(cases) - var caseToID []uint64 - - for { - chosen, val, ok := reflect.Select(cases) - - switch chosen { - case 0: // registration channel - if !ok { - // control channel closed - signals closed connection - // This shouldn't happen, instead the exiting channel should get closed - log.Warn("control channel closed") - return - } - - registration := val.Interface().(outChanReg) - - caseToID = append(caseToID, registration.chID) - cases = append(cases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: registration.ch, - }) - - c.nextWriter(func(w io.Writer) { - resp := &response{ - Jsonrpc: "2.0", - ID: registration.reqID, - Result: registration.chID, - } - - if err := json.NewEncoder(w).Encode(resp); err != nil { - log.Error(err) - return - } - }) - - continue - case 1: // exiting channel - if !ok { - // exiting channel closed - signals closed connection - // - // We're not closing any channels as we're on receiving end. - // Also, context cancellation below should take care of any running - // requests - return - } - log.Warn("exiting channel received a message") - continue - } - - if !ok { - // Output channel closed, cleanup, and tell remote that this happened - - id := caseToID[chosen-internal] - - n := len(cases) - 1 - if n > 0 { - cases[chosen] = cases[n] - caseToID[chosen-internal] = caseToID[n-internal] - } - - cases = cases[:n] - caseToID = caseToID[:n-internal] - - if err := c.sendRequest(request{ - Jsonrpc: "2.0", - ID: nil, // notification - Method: chClose, - Params: []param{{v: reflect.ValueOf(id)}}, - }); err != nil { - log.Warnf("closed out channel sendRequest failed: %s", err) - } - continue - } - - // forward message - if err := c.sendRequest(request{ - Jsonrpc: "2.0", - ID: nil, // notification - Method: chValue, - Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}}, - }); err != nil { - log.Warnf("sendRequest failed: %s", err) - return - } - } -} - -// handleChanOut registers output channel for forwarding to client -func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error { - c.spawnOutChanHandlerOnce.Do(func() { - go c.handleOutChans() - }) - id := atomic.AddUint64(&c.chanCtr, 1) - - select { - case c.registerCh <- outChanReg{ - reqID: req, - - chID: id, - ch: ch, - }: - return nil - case <-c.exiting: - return xerrors.New("connection closing") - } -} - -// // -// Context.Done propagation // -// // - -// handleCtxAsync handles context lifetimes for client -// TODO: this should be aware of events going through chanHandlers, and quit -// when the related channel is closed. -// This should also probably be a single goroutine, -// Note that not doing this should be fine for now as long as we are using -// contexts correctly (cancelling when async functions are no longer is use) -func (c *wsConn) handleCtxAsync(actx context.Context, id int64) { - <-actx.Done() - - c.sendRequest(request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, - }) -} - -// cancelCtx is a built-in rpc which handles context cancellation over rpc -func (c *wsConn) cancelCtx(req frame) { - if req.ID != nil { - log.Warnf("%s call with ID set, won't respond", wsCancel) - } - - var id int64 - if err := json.Unmarshal(req.Params[0].data, &id); err != nil { - log.Error("handle me:", err) - return - } - - c.handlingLk.Lock() - defer c.handlingLk.Unlock() - - cf, ok := c.handling[id] - if ok { - cf() - } -} - -// // -// Main Handling logic // -// // - -func (c *wsConn) handleChanMessage(frame frame) { - var chid uint64 - if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil { - log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err) - return - } - - hnd, ok := c.chanHandlers[chid] - if !ok { - log.Errorf("xrpc.ch.val: handler %d not found", chid) - return - } - - hnd(frame.Params[1].data, true) -} - -func (c *wsConn) handleChanClose(frame frame) { - var chid uint64 - if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil { - log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err) - return - } - - hnd, ok := c.chanHandlers[chid] - if !ok { - log.Errorf("xrpc.ch.val: handler %d not found", chid) - return - } - - delete(c.chanHandlers, chid) - - hnd(nil, false) -} - -func (c *wsConn) handleResponse(frame frame) { - req, ok := c.inflight[*frame.ID] - if !ok { - log.Error("client got unknown ID in response") - return - } - - if req.retCh != nil && frame.Result != nil { - // output is channel - var chid uint64 - if err := json.Unmarshal(frame.Result, &chid); err != nil { - log.Errorf("failed to unmarshal channel id response: %s, data '%s'", err, string(frame.Result)) - return - } - - var chanCtx context.Context - chanCtx, c.chanHandlers[chid] = req.retCh() - go c.handleCtxAsync(chanCtx, *frame.ID) - } - - req.ready <- clientResponse{ - Jsonrpc: frame.Jsonrpc, - Result: frame.Result, - ID: *frame.ID, - Error: frame.Error, - } - delete(c.inflight, *frame.ID) -} - -func (c *wsConn) handleCall(ctx context.Context, frame frame) { - req := request{ - Jsonrpc: frame.Jsonrpc, - ID: frame.ID, - Meta: frame.Meta, - Method: frame.Method, - Params: frame.Params, - } - - ctx, cancel := context.WithCancel(ctx) - - nextWriter := func(cb func(io.Writer)) { - cb(ioutil.Discard) - } - done := func(keepCtx bool) { - if !keepCtx { - cancel() - } - } - if frame.ID != nil { - nextWriter = c.nextWriter - - c.handlingLk.Lock() - c.handling[*frame.ID] = cancel - c.handlingLk.Unlock() - - done = func(keepctx bool) { - c.handlingLk.Lock() - defer c.handlingLk.Unlock() - - if !keepctx { - cancel() - delete(c.handling, *frame.ID) - } - } - } - - go c.handler.handle(ctx, req, nextWriter, rpcError, done, c.handleChanOut) -} - -// handleFrame handles all incoming messages (calls and responses) -func (c *wsConn) handleFrame(ctx context.Context, frame frame) { - // Get message type by method name: - // "" - response - // "xrpc.*" - builtin - // anything else - incoming remote call - switch frame.Method { - case "": // Response to our call - c.handleResponse(frame) - case wsCancel: - c.cancelCtx(frame) - case chValue: - c.handleChanMessage(frame) - case chClose: - c.handleChanClose(frame) - default: // Remote call - c.handleCall(ctx, frame) - } -} - -func (c *wsConn) closeInFlight() { - for id, req := range c.inflight { - req.ready <- clientResponse{ - Jsonrpc: "2.0", - ID: id, - Error: &respError{ - Message: "handler: websocket connection closed", - Code: 2, - }, - } - } - - c.handlingLk.Lock() - for _, cancel := range c.handling { - cancel() - } - c.handlingLk.Unlock() - - c.inflight = map[int64]clientRequest{} - c.handling = map[int64]context.CancelFunc{} -} - -func (c *wsConn) closeChans() { - for chid := range c.chanHandlers { - hnd := c.chanHandlers[chid] - delete(c.chanHandlers, chid) - hnd(nil, false) - } -} - -func (c *wsConn) handleWsConn(ctx context.Context) { - c.incoming = make(chan io.Reader) - c.inflight = map[int64]clientRequest{} - c.handling = map[int64]context.CancelFunc{} - c.chanHandlers = map[uint64]func(m []byte, ok bool){} - - c.registerCh = make(chan outChanReg) - defer close(c.exiting) - - // //// - - // on close, make sure to return from all pending calls, and cancel context - // on all calls we handle - defer c.closeInFlight() - - // wait for the first message - go c.nextMessage() - for { - select { - case r, ok := <-c.incoming: - if !ok { - if c.incomingErr != nil { - if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) { - log.Debugw("websocket error", "error", c.incomingErr) - // connection dropped unexpectedly, do our best to recover it - c.closeInFlight() - c.closeChans() - c.incoming = make(chan io.Reader) // listen again for responses - go func() { - if c.connFactory == nil { // likely the server side, don't try to reconnect - return - } - - var conn *websocket.Conn - for conn == nil { - time.Sleep(c.reconnectInterval) - var err error - if conn, err = c.connFactory(); err != nil { - log.Debugw("websocket connection retry failed", "error", err) - } - } - - c.writeLk.Lock() - c.conn = conn - c.incomingErr = nil - c.writeLk.Unlock() - - go c.nextMessage() - }() - continue - } - } - return // remote closed - } - - // debug util - dump all messages to stderr - // r = io.TeeReader(r, os.Stderr) - - var frame frame - if err := json.NewDecoder(r).Decode(&frame); err != nil { - log.Error("handle me:", err) - return - } - - c.handleFrame(ctx, frame) - go c.nextMessage() - case req := <-c.requests: - c.writeLk.Lock() - if req.req.ID != nil { - if c.incomingErr != nil { // No conn?, immediate fail - req.ready <- clientResponse{ - Jsonrpc: "2.0", - ID: *req.req.ID, - Error: &respError{ - Message: "handler: websocket connection closed", - Code: 2, - }, - } - c.writeLk.Unlock() - break - } - c.inflight[*req.req.ID] = req - } - c.writeLk.Unlock() - if err := c.sendRequest(req.req); err != nil { - log.Errorf("sendReqest failed (Handle me): %s", err) - } - case <-c.stop: - c.writeLk.Lock() - cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - if err := c.conn.WriteMessage(websocket.CloseMessage, cmsg); err != nil { - log.Warn("failed to write close message: ", err) - } - if err := c.conn.Close(); err != nil { - log.Warnw("websocket close error", "error", err) - } - c.writeLk.Unlock() - return - } - } -} diff --git a/lotuspond/api.go b/lotuspond/api.go index ef742b1a2..1f1432ca1 100644 --- a/lotuspond/api.go +++ b/lotuspond/api.go @@ -9,7 +9,8 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/lib/jsonrpc" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/node/repo" ) diff --git a/lotuspond/main.go b/lotuspond/main.go index 1ae49dad6..3abcda635 100644 --- a/lotuspond/main.go +++ b/lotuspond/main.go @@ -8,9 +8,9 @@ import ( "path" "strconv" - "github.com/filecoin-project/lotus/lib/jsonrpc" - "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-jsonrpc" ) const listenAddr = "127.0.0.1:2222" diff --git a/metrics/metrics.go b/metrics/metrics.go index e47caf390..340d57536 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -4,13 +4,14 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + + rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics" ) // Global Tags var ( Version, _ = tag.NewKey("version") Commit, _ = tag.NewKey("commit") - RPCMethod, _ = tag.NewKey("method") PeerID, _ = tag.NewKey("peer_id") FailureType, _ = tag.NewKey("failure_type") MessageFrom, _ = tag.NewKey("message_from") @@ -31,9 +32,6 @@ var ( BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless) BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless) PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) - RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) - RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) - RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) ) var ( @@ -82,26 +80,10 @@ var ( Measure: PeerCount, Aggregation: view.LastValue(), } - // All RPC related metrics should at the very least tag the RPCMethod - RPCInvalidMethodView = &view.View{ - Measure: RPCInvalidMethod, - Aggregation: view.Count(), - TagKeys: []tag.Key{RPCMethod}, - } - RPCRequestErrorView = &view.View{ - Measure: RPCRequestError, - Aggregation: view.Count(), - TagKeys: []tag.Key{RPCMethod}, - } - RPCResponseErrorView = &view.View{ - Measure: RPCResponseError, - Aggregation: view.Count(), - TagKeys: []tag.Key{RPCMethod}, - } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes -var DefaultViews = []*view.View{ +var DefaultViews = append([]*view.View{ InfoView, ChainNodeHeightView, ChainNodeWorkerHeightView, @@ -111,8 +93,4 @@ var DefaultViews = []*view.View{ MessageReceivedView, MessageValidationFailureView, MessageValidationSuccessView, - PeerCountView, - RPCInvalidMethodView, - RPCRequestErrorView, - RPCResponseErrorView, -} + PeerCountView}, rpcmetrics.DefaultViews...) diff --git a/node/impl/remoteworker.go b/node/impl/remoteworker.go index ba5016d14..0645acbb7 100644 --- a/node/impl/remoteworker.go +++ b/node/impl/remoteworker.go @@ -2,6 +2,7 @@ package impl import ( "context" + "github.com/filecoin-project/go-jsonrpc" "net/http" "golang.org/x/xerrors" @@ -11,7 +12,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/sector-storage" ) diff --git a/node/node_test.go b/node/node_test.go index a2c7d2c1b..bb66b7a39 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" @@ -39,7 +40,6 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" genesis "github.com/filecoin-project/lotus/genesis" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index 7f90ea169..6e00ff910 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -5,6 +5,7 @@ import ( "net/http" "time" + "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/specs-actors/actors/abi" manet "github.com/multiformats/go-multiaddr-net" @@ -16,7 +17,6 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/node/repo" )