jsonrpc: basic channel test
This commit is contained in:
parent
dda1dfdc80
commit
527ab7100a
@ -10,7 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type SimpleServerHandler struct {
|
||||
@ -73,37 +73,37 @@ func TestRPC(t *testing.T) {
|
||||
StringMatch func(t TestType, i2 int64) (out TestOut, err error)
|
||||
}
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &client)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
defer closer()
|
||||
|
||||
// Add(int) error
|
||||
|
||||
assert.NoError(t, client.Add(2))
|
||||
assert.Equal(t, 2, serverHandler.n)
|
||||
require.NoError(t, client.Add(2))
|
||||
require.Equal(t, 2, serverHandler.n)
|
||||
|
||||
err = client.Add(-3546)
|
||||
assert.EqualError(t, err, "test")
|
||||
require.EqualError(t, err, "test")
|
||||
|
||||
// AddGet(int) int
|
||||
|
||||
n := client.AddGet(3)
|
||||
assert.Equal(t, 5, n)
|
||||
assert.Equal(t, 5, serverHandler.n)
|
||||
require.Equal(t, 5, n)
|
||||
require.Equal(t, 5, serverHandler.n)
|
||||
|
||||
// StringMatch
|
||||
|
||||
o, err := client.StringMatch(TestType{S: "0"}, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "0", o.S)
|
||||
assert.Equal(t, 0, o.I)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "0", o.S)
|
||||
require.Equal(t, 0, o.I)
|
||||
|
||||
_, err = client.StringMatch(TestType{S: "5"}, 5)
|
||||
assert.EqualError(t, err, ":(")
|
||||
require.EqualError(t, err, ":(")
|
||||
|
||||
o, err = client.StringMatch(TestType{S: "8", I: 8}, 8)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "8", o.S)
|
||||
assert.Equal(t, 8, o.I)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "8", o.S)
|
||||
require.Equal(t, 8, o.I)
|
||||
|
||||
// Invalid client handlers
|
||||
|
||||
@ -111,18 +111,18 @@ func TestRPC(t *testing.T) {
|
||||
Add func(int)
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noret)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// this one should actually work
|
||||
noret.Add(4)
|
||||
assert.Equal(t, 9, serverHandler.n)
|
||||
require.Equal(t, 9, serverHandler.n)
|
||||
closer()
|
||||
|
||||
var noparam struct {
|
||||
Add func()
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &noparam)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// shouldn't panic
|
||||
noparam.Add()
|
||||
@ -132,7 +132,7 @@ func TestRPC(t *testing.T) {
|
||||
AddGet func() (int, error)
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &erronly)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = erronly.AddGet()
|
||||
if err == nil || err.Error() != "RPC error (-32602): wrong param count" {
|
||||
@ -144,7 +144,7 @@ func TestRPC(t *testing.T) {
|
||||
Add func(string) error
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", &wrongtype)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = wrongtype.Add("not an int")
|
||||
if err == nil || !strings.Contains(err.Error(), "RPC error (-32700):") || !strings.Contains(err.Error(), "json: cannot unmarshal string into Go value of type int") {
|
||||
@ -156,7 +156,7 @@ func TestRPC(t *testing.T) {
|
||||
NotThere func(string) error
|
||||
}
|
||||
closer, err = NewClient("ws://"+testServ.Listener.Addr().String(), "SimpleServerHandler", ¬found)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = notfound.NotThere("hello?")
|
||||
if err == nil || err.Error() != "RPC error (-32601): method 'SimpleServerHandler.NotThere' not found" {
|
||||
@ -203,7 +203,7 @@ func TestCtx(t *testing.T) {
|
||||
Test func(ctx context.Context)
|
||||
}
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "CtxHandler", &client)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
@ -239,3 +239,62 @@ func TestCtx(t *testing.T) {
|
||||
serverHandler.lk.Unlock()
|
||||
closer()
|
||||
}
|
||||
|
||||
type ChanHandler struct {
|
||||
wait chan struct{}
|
||||
}
|
||||
|
||||
func (h *ChanHandler) Sub(ctx context.Context, i int) (<-chan int, error) {
|
||||
out := make(chan int)
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
var n int
|
||||
|
||||
for {
|
||||
<-h.wait
|
||||
n += i
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case out <- n:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func TestChan(t *testing.T) {
|
||||
var client struct {
|
||||
Sub func(context.Context, int) (<-chan int, error)
|
||||
}
|
||||
|
||||
serverHandler := &ChanHandler{
|
||||
wait: make(chan struct{}, 5),
|
||||
}
|
||||
|
||||
rpcServer := NewServer()
|
||||
rpcServer.Register("ChanHandler", serverHandler)
|
||||
|
||||
testServ := httptest.NewServer(rpcServer)
|
||||
defer testServ.Close()
|
||||
|
||||
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
serverHandler.wait <- struct{}{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
|
||||
sub, err := client.Sub(ctx, 2)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
require.Equal(t, 2, <-sub)
|
||||
}
|
||||
|
@ -230,6 +230,9 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
||||
return // remote closed
|
||||
}
|
||||
|
||||
// debug util - dump all messages to stderr
|
||||
// r = io.TeeReader(r, os.Stderr)
|
||||
|
||||
var frame frame
|
||||
if err := json.NewDecoder(r).Decode(&frame); err != nil {
|
||||
log.Error("handle me:", err)
|
||||
@ -248,11 +251,11 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
||||
continue
|
||||
}
|
||||
|
||||
if req.retCh != nil {
|
||||
if req.retCh != nil && frame.Result != nil {
|
||||
// output is channel
|
||||
var chid uint64
|
||||
if err := json.Unmarshal(frame.Result, &chid); err != nil {
|
||||
log.Error("failed to unmarshal channel id response: %s", err)
|
||||
log.Errorf("failed to unmarshal channel id response: %s, data '%s'", err, string(frame.Result))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -277,7 +280,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
||||
|
||||
hnd, ok := chanHandlers[chid]
|
||||
if !ok {
|
||||
log.Error("xrpc.ch.val: handler %d not found", chid)
|
||||
log.Errorf("xrpc.ch.val: handler %d not found", chid)
|
||||
continue
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user