jsonrpc: Test hanging ctx on unclean disconnect

This commit is contained in:
Łukasz Magiera 2020-05-01 21:30:32 +02:00
parent f9be73c73e
commit 971282a9a5
3 changed files with 82 additions and 6 deletions

View File

@ -84,20 +84,25 @@ type client struct {
// NewMergeClient is like NewClient, but allows to specify multiple structs
// to be filled in the same namespace, using one connection
func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header, opts ...Option) (ClientCloser, error) {
var config Config
for _, o := range opts {
o(&config)
}
connFactory := func() (*websocket.Conn, error) {
conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader)
return conn, err
}
if config.proxyConnFactory != nil {
connFactory = config.proxyConnFactory(connFactory)
}
conn, err := connFactory()
if err != nil {
return nil, err
}
var config Config
for _, o := range opts {
o(&config)
}
c := client{
namespace: namespace,
}

View File

@ -1,9 +1,15 @@
package jsonrpc
import "time"
import (
"time"
"github.com/gorilla/websocket"
)
type Config struct {
ReconnectInterval time.Duration
proxyConnFactory func(func() (*websocket.Conn, error)) func() (*websocket.Conn, error) // for testing
}
var defaultConfig = Config{

View File

@ -12,6 +12,7 @@ import (
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
@ -275,10 +276,12 @@ func TestUnmarshalableResult(t *testing.T) {
type ChanHandler struct {
wait chan struct{}
ctxdone <-chan struct{}
}
func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error) {
out := make(chan int)
h.ctxdone = ctx.Done()
go func() {
defer close(out)
@ -433,6 +436,68 @@ func TestChanServerClose(t *testing.T) {
require.Equal(t, false, ok)
}
func TestServerChanLockClose(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
}
serverHandler := &ChanHandler{
wait: make(chan struct{}),
}
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
var closeConn func() error
_, err := NewMergeClient("ws://"+testServ.Listener.Addr().String(),
"ChanHandler",
[]interface{}{&client}, nil,
func(c *Config) {
c.proxyConnFactory = func(f func() (*websocket.Conn, error)) func() (*websocket.Conn, error) {
return func() (*websocket.Conn, error) {
c, err := f()
if err != nil {
return nil, err
}
closeConn = c.UnderlyingConn().Close
return c, nil
}
}
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// sub
sub, err := client.Sub(ctx, 2, -1)
require.NoError(t, err)
// recv one
go func() {
serverHandler.wait <- struct{}{}
}()
require.Equal(t, 2, <-sub)
for i := 0; i < 100; i++ {
serverHandler.wait <- struct{}{}
}
if err := closeConn(); err != nil {
t.Fatal(err)
}
<-serverHandler.ctxdone
}
func TestControlChanDeadlock(t *testing.T) {
for r := 0; r < 20; r++ {
testControlChanDeadlock(t)