From dbd51ff8e729284779ce0d9827d1c5f3d1d0a6df Mon Sep 17 00:00:00 2001 From: jsign Date: Fri, 24 Jan 2020 21:13:14 -0300 Subject: [PATCH] fix issue #127 Signed-off-by: jsign --- lib/jsonrpc/client.go | 78 ++++++++++++++++++++++----------- lib/jsonrpc/options.go | 19 ++++++++ lib/jsonrpc/websocket.go | 93 ++++++++++++++++++++++++++++++---------- 3 files changed, 141 insertions(+), 49 deletions(-) create mode 100644 lib/jsonrpc/options.go diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 018a9f681..24e728736 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -10,6 +10,7 @@ import ( "reflect" "sync" "sync/atomic" + "time" "github.com/gorilla/websocket" logging "github.com/ipfs/go-log/v2" @@ -78,12 +79,21 @@ type client struct { // NewMergeClient is like NewClient, but allows to specify multiple structs // to be filled in the same namespace, using one connection -func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { - conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) +func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) { + connFactory := func() (*websocket.Conn, error) { + conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) + return conn, err + } + conn, err := connFactory() if err != nil { return nil, err } + var config Config + for _, o := range opts { + o(&config) + } + c := client{ namespace: namespace, } @@ -95,11 +105,13 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe handlers := map[string]rpcHandler{} go (&wsConn{ - conn: conn, - handler: handlers, - requests: c.requests, - stop: stop, - exiting: exiting, + conn: conn, + connFactory: connFactory, + reconnectInterval: config.ReconnectInterval, + handler: handlers, + requests: c.requests, + stop: stop, + exiting: exiting, }).handleWsConn(context.TODO()) for _, handler := range outs { @@ -269,6 +281,8 @@ type rpcFunc struct { hasCtx int retCh bool + + retry bool } func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { @@ -344,27 +358,38 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) } } - resp, err := fn.client.sendRequest(ctx, req, chCtor) - if err != nil { - return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) - } - - if resp.ID != *req.ID { - return fn.processError(xerrors.New("request and response id didn't match")) - } - - if fn.valOut != -1 && !fn.retCh { - val := reflect.New(fn.ftyp.Out(fn.valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) - if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { - log.Warnw("unmarshaling failed", "message", string(resp.Result)) - return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) - } + var resp clientResponse + var err error + // keep retrying if got a forced closed websocket conn and calling method + // has retry annotation + for { + resp, err = fn.client.sendRequest(ctx, req, chCtor) + if err != nil { + return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) } - retVal = func() reflect.Value { return val.Elem() } + if resp.ID != *req.ID { + return fn.processError(xerrors.New("request and response id didn't match")) + } + + if fn.valOut != -1 && !fn.retCh { + val := reflect.New(fn.ftyp.Out(fn.valOut)) + + if resp.Result != nil { + log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) + if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { + log.Warnw("unmarshaling failed", "message", string(resp.Result)) + return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) + } + } + + retVal = func() reflect.Value { return val.Elem() } + } + retry := resp.Error != nil && resp.Error.Code == 0 && fn.retry + if !retry { + break + } + time.Sleep(time.Second * 3) } return fn.processResponse(resp, retVal()) @@ -380,6 +405,7 @@ func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { client: c, ftyp: ftyp, name: f.Name, + retry: f.Tag.Get("retry") == "true", } fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) diff --git a/lib/jsonrpc/options.go b/lib/jsonrpc/options.go new file mode 100644 index 000000000..50df15d96 --- /dev/null +++ b/lib/jsonrpc/options.go @@ -0,0 +1,19 @@ +package jsonrpc + +import "time" + +type Config struct { + ReconnectInterval time.Duration +} + +var defaultConfig = Config{ + ReconnectInterval: time.Second * 5, +} + +type Option func(c *Config) + +func WithReconnectInterval(d time.Duration) func(c *Config) { + return func(c *Config) { + c.ReconnectInterval = d + } +} diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index da4f59d6e..cbf5cb0ac 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -9,6 +9,7 @@ import ( "reflect" "sync" "sync/atomic" + "time" "github.com/gorilla/websocket" "golang.org/x/xerrors" @@ -42,11 +43,13 @@ type outChanReg struct { type wsConn struct { // outside params - conn *websocket.Conn - handler handlers - requests <-chan clientRequest - stop <-chan struct{} - exiting chan struct{} + conn *websocket.Conn + connFactory func() (*websocket.Conn, error) + reconnectInterval time.Duration + handler handlers + requests <-chan clientRequest + stop <-chan struct{} + exiting chan struct{} // incoming messages incoming chan io.Reader @@ -419,6 +422,32 @@ func (c *wsConn) handleFrame(ctx context.Context, frame frame) { } } +func (c *wsConn) closeInFlight() { + for id, req := range c.inflight { + req.ready <- clientResponse{ + Jsonrpc: "2.0", + ID: id, + Error: &respError{ + Message: "handler: websocket connection closed", + }, + } + + c.handlingLk.Lock() + for _, cancel := range c.handling { + cancel() + } + c.handlingLk.Unlock() + } +} + +func (c *wsConn) closeChans() { + for chid := range c.chanHandlers { + hnd := c.chanHandlers[chid] + delete(c.chanHandlers, chid) + hnd(nil, false) + } +} + func (c *wsConn) handleWsConn(ctx context.Context) { c.incoming = make(chan io.Reader) c.inflight = map[int64]clientRequest{} @@ -432,27 +461,10 @@ func (c *wsConn) handleWsConn(ctx context.Context) { // on close, make sure to return from all pending calls, and cancel context // on all calls we handle - defer func() { - for id, req := range c.inflight { - req.ready <- clientResponse{ - Jsonrpc: "2.0", - ID: id, - Error: &respError{ - Message: "handler: websocket connection closed", - }, - } - - c.handlingLk.Lock() - for _, cancel := range c.handling { - cancel() - } - c.handlingLk.Unlock() - } - }() + defer c.closeInFlight() // wait for the first message go c.nextMessage() - for { select { case r, ok := <-c.incoming: @@ -460,6 +472,28 @@ func (c *wsConn) handleWsConn(ctx context.Context) { if c.incomingErr != nil { if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) { log.Debugw("websocket error", "error", c.incomingErr) + // connection dropped unexpectedly, do our best to recover it + c.closeInFlight() + c.closeChans() + c.incoming = make(chan io.Reader) // listen again for responses + go func() { + var conn *websocket.Conn + for conn == nil { + time.Sleep(c.reconnectInterval) + var err error + if conn, err = c.connFactory(); err != nil { + log.Debugw("websocket connection retried failed", "error", err) + } + } + + c.writeLk.Lock() + c.conn = conn + c.incomingErr = nil + c.writeLk.Unlock() + + go c.nextMessage() + }() + continue } } return // remote closed @@ -477,9 +511,22 @@ func (c *wsConn) handleWsConn(ctx context.Context) { c.handleFrame(ctx, frame) go c.nextMessage() case req := <-c.requests: + c.writeLk.Lock() if req.req.ID != nil { + if c.incomingErr != nil { // No conn?, immediate fail + req.ready <- clientResponse{ + Jsonrpc: "2.0", + ID: *req.req.ID, + Error: &respError{ + Message: "handler: websocket connection closed", + }, + } + c.writeLk.Unlock() + break + } c.inflight[*req.req.ID] = req } + c.writeLk.Unlock() c.sendRequest(req.req) case <-c.stop: c.writeLk.Lock()