Merge pull request #1155 from jsign/issue127_2

fix issue #127
This commit is contained in:
Łukasz Magiera 2020-02-24 17:22:58 +01:00 committed by GitHub
commit 9f0e0ced5e
3 changed files with 150 additions and 50 deletions

View File

@ -10,6 +10,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -18,11 +19,15 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
var log = logging.Logger("rpc") const (
methodRetryFrequency = time.Second * 3
)
var ( var (
errorType = reflect.TypeOf(new(error)).Elem() errorType = reflect.TypeOf(new(error)).Elem()
contextType = reflect.TypeOf(new(context.Context)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem()
log = logging.Logger("rpc")
) )
// ErrClient is an error which occurred on the client side the library // ErrClient is an error which occurred on the client side the library
@ -78,12 +83,21 @@ type client struct {
// NewMergeClient is like NewClient, but allows to specify multiple structs // NewMergeClient is like NewClient, but allows to specify multiple structs
// to be filled in the same namespace, using one connection // to be filled in the same namespace, using one connection
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) connFactory := func() (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
return conn, err
}
conn, err := connFactory()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var config Config
for _, o := range opts {
o(&config)
}
c := client{ c := client{
namespace: namespace, namespace: namespace,
} }
@ -95,11 +109,13 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
handlers := map[string]rpcHandler{} handlers := map[string]rpcHandler{}
go (&wsConn{ go (&wsConn{
conn: conn, conn: conn,
handler: handlers, connFactory: connFactory,
requests: c.requests, reconnectInterval: config.ReconnectInterval,
stop: stop, handler: handlers,
exiting: exiting, requests: c.requests,
stop: stop,
exiting: exiting,
}).handleWsConn(context.TODO()) }).handleWsConn(context.TODO())
for _, handler := range outs { for _, handler := range outs {
@ -269,6 +285,8 @@ type rpcFunc struct {
hasCtx int hasCtx int
retCh bool retCh bool
retry bool
} }
func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value {
@ -344,27 +362,38 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
} }
} }
resp, err := fn.client.sendRequest(ctx, req, chCtor) var resp clientResponse
if err != nil { var err error
return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) // keep retrying if got a forced closed websocket conn and calling method
} // has retry annotation
for {
if resp.ID != *req.ID { resp, err = fn.client.sendRequest(ctx, req, chCtor)
return fn.processError(xerrors.New("request and response id didn't match")) if err != nil {
} return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
if fn.valOut != -1 && !fn.retCh {
val := reflect.New(fn.ftyp.Out(fn.valOut))
if resp.Result != nil {
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
log.Warnw("unmarshaling failed", "message", string(resp.Result))
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
}
} }
retVal = func() reflect.Value { return val.Elem() } if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
}
if fn.valOut != -1 && !fn.retCh {
val := reflect.New(fn.ftyp.Out(fn.valOut))
if resp.Result != nil {
log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut))
if err := json.Unmarshal(resp.Result, val.Interface()); err != nil {
log.Warnw("unmarshaling failed", "message", string(resp.Result))
return fn.processError(xerrors.Errorf("unmarshaling result: %w", err))
}
}
retVal = func() reflect.Value { return val.Elem() }
}
retry := resp.Error != nil && resp.Error.Code == 2 && fn.retry
if !retry {
break
}
time.Sleep(methodRetryFrequency)
} }
return fn.processResponse(resp, retVal()) return fn.processResponse(resp, retVal())
@ -380,6 +409,7 @@ func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) {
client: c, client: c,
ftyp: ftyp, ftyp: ftyp,
name: f.Name, name: f.Name,
retry: f.Tag.Get("retry") == "true",
} }
fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp)

19
lib/jsonrpc/options.go Normal file
View File

@ -0,0 +1,19 @@
package jsonrpc
import "time"
type Config struct {
ReconnectInterval time.Duration
}
var defaultConfig = Config{
ReconnectInterval: time.Second * 5,
}
type Option func(c *Config)
func WithReconnectInterval(d time.Duration) func(c *Config) {
return func(c *Config) {
c.ReconnectInterval = d
}
}

View File

@ -9,6 +9,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -42,11 +43,13 @@ type outChanReg struct {
type wsConn struct { type wsConn struct {
// outside params // outside params
conn *websocket.Conn conn *websocket.Conn
handler handlers connFactory func() (*websocket.Conn, error)
requests <-chan clientRequest reconnectInterval time.Duration
stop <-chan struct{} handler handlers
exiting chan struct{} requests <-chan clientRequest
stop <-chan struct{}
exiting chan struct{}
// incoming messages // incoming messages
incoming chan io.Reader incoming chan io.Reader
@ -419,6 +422,35 @@ func (c *wsConn) handleFrame(ctx context.Context, frame frame) {
} }
} }
func (c *wsConn) closeInFlight() {
for id, req := range c.inflight {
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: id,
Error: &respError{
Message: "handler: websocket connection closed",
Code: 2,
},
}
c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
}
c.inflight = map[int64]clientRequest{}
c.handling = map[int64]context.CancelFunc{}
}
func (c *wsConn) closeChans() {
for chid := range c.chanHandlers {
hnd := c.chanHandlers[chid]
delete(c.chanHandlers, chid)
hnd(nil, false)
}
}
func (c *wsConn) handleWsConn(ctx context.Context) { func (c *wsConn) handleWsConn(ctx context.Context) {
c.incoming = make(chan io.Reader) c.incoming = make(chan io.Reader)
c.inflight = map[int64]clientRequest{} c.inflight = map[int64]clientRequest{}
@ -432,27 +464,10 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
// on close, make sure to return from all pending calls, and cancel context // on close, make sure to return from all pending calls, and cancel context
// on all calls we handle // on all calls we handle
defer func() { defer c.closeInFlight()
for id, req := range c.inflight {
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: id,
Error: &respError{
Message: "handler: websocket connection closed",
},
}
c.handlingLk.Lock()
for _, cancel := range c.handling {
cancel()
}
c.handlingLk.Unlock()
}
}()
// wait for the first message // wait for the first message
go c.nextMessage() go c.nextMessage()
for { for {
select { select {
case r, ok := <-c.incoming: case r, ok := <-c.incoming:
@ -460,6 +475,28 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
if c.incomingErr != nil { if c.incomingErr != nil {
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) { if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
log.Debugw("websocket error", "error", c.incomingErr) log.Debugw("websocket error", "error", c.incomingErr)
// connection dropped unexpectedly, do our best to recover it
c.closeInFlight()
c.closeChans()
c.incoming = make(chan io.Reader) // listen again for responses
go func() {
var conn *websocket.Conn
for conn == nil {
time.Sleep(c.reconnectInterval)
var err error
if conn, err = c.connFactory(); err != nil {
log.Debugw("websocket connection retried failed", "error", err)
}
}
c.writeLk.Lock()
c.conn = conn
c.incomingErr = nil
c.writeLk.Unlock()
go c.nextMessage()
}()
continue
} }
} }
return // remote closed return // remote closed
@ -477,9 +514,23 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.handleFrame(ctx, frame) c.handleFrame(ctx, frame)
go c.nextMessage() go c.nextMessage()
case req := <-c.requests: case req := <-c.requests:
c.writeLk.Lock()
if req.req.ID != nil { if req.req.ID != nil {
if c.incomingErr != nil { // No conn?, immediate fail
req.ready <- clientResponse{
Jsonrpc: "2.0",
ID: *req.req.ID,
Error: &respError{
Message: "handler: websocket connection closed",
Code: 2,
},
}
c.writeLk.Unlock()
break
}
c.inflight[*req.req.ID] = req c.inflight[*req.req.ID] = req
} }
c.writeLk.Unlock()
c.sendRequest(req.req) c.sendRequest(req.req)
case <-c.stop: case <-c.stop:
c.writeLk.Lock() c.writeLk.Lock()