From ff4d1b581964ecea53d29c102de44fdd1f8db7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 22 Jul 2019 20:13:41 +0200 Subject: [PATCH] jsonrpc: Channel support --- api/api.go | 2 + api/struct.go | 6 ++ cli/cmd.go | 26 +++++++ lib/jsonrpc/client.go | 53 +++++++++---- lib/jsonrpc/handler.go | 22 +++++- lib/jsonrpc/server.go | 8 +- lib/jsonrpc/websocket.go | 163 ++++++++++++++++++++++++++++++++++++--- node/api.go | 22 ++++++ 8 files changed, 273 insertions(+), 29 deletions(-) diff --git a/api/api.go b/api/api.go index d2c2ad424..11f55d5f4 100644 --- a/api/api.go +++ b/api/api.go @@ -121,4 +121,6 @@ type API interface { // Version provides information about API provider Version(context.Context) (Version, error) + + TestCh(context.Context) (<-chan int, error) } diff --git a/api/struct.go b/api/struct.go index 7783bcabc..fb4f0991c 100644 --- a/api/struct.go +++ b/api/struct.go @@ -41,9 +41,15 @@ type Struct struct { NetPeers func(context.Context) ([]peer.AddrInfo, error) NetConnect func(context.Context, peer.AddrInfo) error NetAddrsListen func(context.Context) (peer.AddrInfo, error) + + TestCh func(context.Context) (<-chan int, error) } } +func (c *Struct) TestCh(ctx context.Context) (<-chan int, error) { + return c.Internal.TestCh(ctx) +} + func (c *Struct) ClientListImports(ctx context.Context) ([]Import, error) { return c.Internal.ClientListImports(ctx) } diff --git a/cli/cmd.go b/cli/cmd.go index ee964efbe..7bfdbc826 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -2,6 +2,7 @@ package cli import ( "context" + "fmt" "os" "os/signal" "syscall" @@ -67,4 +68,29 @@ var Commands = []*cli.Command{ versionCmd, walletCmd, createMinerCmd, + + { + Name:"testch", + Action: func(cctx *cli.Context) error { + api, err := getAPI(cctx) + if err != nil { + return err + } + ctx := reqContext(cctx) + + c, err := api.TestCh(ctx) + if err != nil { + return err + } + + for { + select { + case n := <-c: + fmt.Println(n) + case <- ctx.Done(): + return nil + } + } + }, + }, } diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index d065c6a2c..97b9d943f 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -34,17 +34,10 @@ func (e *ErrClient) Unwrap(err error) error { return e.err } -type result []byte - -func (p *result) UnmarshalJSON(raw []byte) error { - *p = make([]byte, len(raw)) - copy(*p, raw) - return nil -} type clientResponse struct { Jsonrpc string `json:"jsonrpc"` - Result result `json:"result"` + Result json.RawMessage `json:"result"` ID int64 `json:"id"` Error *respError `json:"error,omitempty"` } @@ -52,6 +45,8 @@ type clientResponse struct { type clientRequest struct { req request ready chan clientResponse + + retCh func() func([]byte, bool) } // ClientCloser is used to close Client from further use @@ -100,7 +95,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser out := make([]reflect.Value, nout) if valOut != -1 { - out[valOut] = rval.Elem() + out[valOut] = rval } if errOut != -1 { out[errOut] = reflect.New(errorType).Elem() @@ -130,6 +125,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { hasCtx = 1 } + retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { id := atomic.AddInt64(&idCtr, 1) @@ -140,6 +136,31 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser } } + var retVal reflect.Value + var chCtor func() func([]byte, bool) + if retCh { + chCtor = func() func([]byte, bool) { + // unpack chan type to make sure it's reflect.BothDir + ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) + ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + retVal = ch.Convert(ftyp.Out(valOut)) + + return func(result []byte, ok bool) { + if !ok { + ch.Close() + return + } + val := reflect.New(ftyp.Out(valOut).Elem()) + if err := json.Unmarshal(result, val.Interface()); err != nil { + log.Errorf("error unmarshaling chan response: %s", err) + return + } + + ch.Send(val.Elem()) // todo: select on ctx + } + } + } + req := request{ Jsonrpc: "2.0", ID: &id, @@ -151,6 +172,8 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser requests <- clientRequest{ req: req, ready: rchan, + + retCh: chCtor, } var ctxDone <-chan struct{} var resp clientResponse @@ -159,6 +182,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser ctxDone = args[0].Interface().(context.Context).Done() } + // wait for response, handle context cancellation loop: for { select { @@ -176,24 +200,25 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser } } } - var rval reflect.Value - if valOut != -1 { - rval = reflect.New(ftyp.Out(valOut)) + if valOut != -1 && !retCh { + retVal = reflect.New(ftyp.Out(valOut)) if resp.Result != nil { log.Debugw("rpc result", "type", ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil { + if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { return processError(xerrors.Errorf("unmarshaling result: %w", err)) } } + + retVal = retVal.Elem() } if resp.ID != *req.ID { return processError(errors.New("request and response id didn't match")) } - return processResponse(resp, rval) + return processResponse(resp, retVal) }) val.Elem().Field(i).Set(fn) diff --git a/lib/jsonrpc/handler.go b/lib/jsonrpc/handler.go index 97607dc50..80d2471f7 100644 --- a/lib/jsonrpc/handler.go +++ b/lib/jsonrpc/handler.go @@ -95,6 +95,7 @@ func (h handlers) register(namespace string, r interface{}) { // Handle type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error) +type chanOut func(reflect.Value) interface{} func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) { wf := func(cb func(io.Writer)) { @@ -107,20 +108,28 @@ func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rp return } - h.handle(ctx, req, wf, rpcError, func() {}) + h.handle(ctx, req, wf, rpcError, func(bool) {}, nil) } -func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func()) { - defer done() - +func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) { handler, ok := h[req.Method] if !ok { rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method)) + done(false) return } if len(req.Params) != handler.nParams { rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count")) + done(false) + return + } + + outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan + defer done(outCh) + + if chOut == nil && outCh { + rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method)) return } @@ -165,6 +174,11 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer } if handler.valOut != -1 { resp.Result = callResult[handler.valOut].Interface() + + if reflect.TypeOf(resp.Result).Kind() == reflect.Chan { + //noinspection GoNilness // already checked above + resp.Result = chOut(callResult[handler.valOut]) + } } w(func(w io.Writer) { diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index fdf3a5a67..ce6b3ca84 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -60,6 +60,8 @@ func rpcError(wf func(func(io.Writer)), req *request, code int, err error) { hw.WriteHeader(500) } + log.Warnf("rpc error: %s", err) + if req.ID == nil { // notification return } @@ -73,7 +75,11 @@ func rpcError(wf func(func(io.Writer)), req *request, code int, err error) { }, } - _ = json.NewEncoder(w).Encode(resp) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Warnf("failed to write rpc error: %s", err) + return + } }) } diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index f52c0591c..26d8cba6b 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -6,12 +6,16 @@ import ( "errors" "io" "io/ioutil" + "reflect" "sync" + "sync/atomic" "github.com/gorilla/websocket" ) const wsCancel = "xrpc.cancel" +const chValue = "xrpc.ch.val" +const chClose = "xrpc.ch.close" type frame struct { // common @@ -23,7 +27,7 @@ type frame struct { Params []param `json:"params,omitempty"` // response - Result result `json:"result,omitempty"` + Result json.RawMessage `json:"result,omitempty"` Error *respError `json:"error,omitempty"` } @@ -31,6 +35,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r incoming := make(chan io.Reader) var incErr error + // nextMessage wait for one message and puts it to the incoming channel nextMessage := func() { mtype, r, err := conn.NextReader() if err != nil { @@ -47,6 +52,9 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } var writeLk sync.Mutex + + // nextWriter waits for writeLk and invokes the cb callback with WS message + // writer when the lock is acquired nextWriter := func(cb func(io.Writer)) { writeLk.Lock() defer writeLk.Unlock() @@ -65,12 +73,113 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } } + sendReq := func(req request) { + writeLk.Lock() + if err := conn.WriteJSON(req); err != nil { + log.Error("handle me:", err) + writeLk.Unlock() + return + } + writeLk.Unlock() + } + + // wait for the first message go nextMessage() + // inflight are requests we sent to the remote inflight := map[int64]clientRequest{} + + // handling are the calls we handle handling := map[int64]context.CancelFunc{} var handlingLk sync.Mutex + // //// + // Subscriptions (func() <-chan Typ - like methods) + + var chOnce sync.Once + var outId uint64 + type chReg struct { id uint64; ch reflect.Value } + registerCh := make(chan chReg) + defer close(registerCh) + + handleOutChans := func() { + regV := reflect.ValueOf(registerCh) + + cases := []reflect.SelectCase{ + { // registration chan always 0 + Dir: reflect.SelectRecv, + Chan: regV, + }, + } + var caseToId []uint64 + + + for { + chosen, val, ok := reflect.Select(cases) + + if chosen == 0 { // control channel + if !ok { + // not closing any channels as we're on receiving end. + // Also, context cancellation below should take care of any running + // requests + return + } + + registration := val.Interface().(chReg) + + caseToId = append(caseToId, registration.id) + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: registration.ch, + }) + + continue + } + + + if !ok { + n := len(caseToId) + if n > 0 { + cases[chosen] = cases[n] + caseToId[chosen - 1] = caseToId[n - 1] + } + + cases = cases[:n] + caseToId = caseToId[:n-1] + continue + } + + sendReq(request{ + Jsonrpc: "2.0", + ID: nil, // notification + Method: chValue, + Params: []param{{v: reflect.ValueOf(caseToId[chosen - 1])}, {v: val}}, + }) + } + } + + handleChanOut := func(ch reflect.Value) interface{} { + chOnce.Do(func() { + go handleOutChans() + }) + id := atomic.AddUint64(&outId, 1) + + registerCh <- chReg{ + id: id, + ch: ch, + } + + return id + } + + // client side subs + + chanHandlers := map[uint64]func(m []byte, ok bool){} + + // //// + + // on close, make sure to return from all pending calls, and cancel context + // on all calls we handle defer func() { for id, req := range inflight { req.ready <- clientResponse{ @@ -89,6 +198,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r } }() + // cancelCtx is a built-in rpc which handles context cancellation over rpc cancelCtx := func(req frame) { if req.ID != nil { log.Warnf("%s call with ID set, won't respond", wsCancel) @@ -125,6 +235,10 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r return } + // Get message type by method name: + // "" - response + // "xrpc.*" - builtin + // anything else - incoming remote call switch frame.Method { case "": // Response to our call req, ok := inflight[*frame.ID] @@ -133,6 +247,17 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r continue } + if req.retCh != nil { + // output is channel + var chid uint64 + if err := json.Unmarshal(frame.Result, &chid); err != nil { + log.Error("failed to unmarshal channel id response: %s", err) + continue + } + + chanHandlers[chid] = req.retCh() + } + req.ready <- clientResponse{ Jsonrpc: frame.Jsonrpc, Result: frame.Result, @@ -142,6 +267,20 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r delete(inflight, *frame.ID) case wsCancel: cancelCtx(frame) + case chValue: + var chid uint64 + if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil { + log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err) + continue + } + + hnd, ok := chanHandlers[chid] + if !ok { + log.Error("xrpc.ch.val: handler %d not found", chid) + continue + } + + hnd(frame.Params[1].data, true) default: // Remote call req := request{ Jsonrpc: frame.Jsonrpc, @@ -155,7 +294,11 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r nw := func(cb func(io.Writer)) { cb(ioutil.Discard) } - done := cf + done := func(keepctx bool) { + if !keepctx { + cf() + } + } if frame.ID != nil { nw = nextWriter @@ -163,27 +306,27 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r handling[*frame.ID] = cf handlingLk.Unlock() - done = func() { + done = func(keepctx bool) { handlingLk.Lock() defer handlingLk.Unlock() - cf() + if !keepctx { + cf() + } + delete(handling, *frame.ID) } } - go handler.handle(ctx, req, nw, rpcError, done) + go handler.handle(ctx, req, nw, rpcError, done, handleChanOut) } - go nextMessage() + go nextMessage() // TODO: fix on errors case req := <-requests: if req.req.ID != nil { inflight[*req.req.ID] = req } - if err := conn.WriteJSON(req.req); err != nil { - log.Error("handle me:", err) - return - } + sendReq(req.req) case <-stop: if err := conn.Close(); err != nil { log.Debugf("websocket close error", "error", err) diff --git a/node/api.go b/node/api.go index ee0954ee6..a1e6dfe8a 100644 --- a/node/api.go +++ b/node/api.go @@ -2,6 +2,8 @@ package node import ( "context" + "fmt" + "time" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" @@ -28,6 +30,26 @@ type API struct { Wallet *chain.Wallet } +func (a *API) TestCh(ctx context.Context) (<-chan int, error) { + out := make(chan int) + go func() { + defer close(out) + var n int + for { + time.Sleep(time.Millisecond * 100) + n++ + select { + case <-ctx.Done(): + fmt.Println("CTXCANCEL!") + return + case out <- n: + } + } + }() + + return out, nil +} + func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { if err := a.Chain.AddBlock(blk.Header); err != nil { return err