diff --git a/lib/jsonrpc/rpc_test.go b/lib/jsonrpc/rpc_test.go index fe6d13397..e41e96c3d 100644 --- a/lib/jsonrpc/rpc_test.go +++ b/lib/jsonrpc/rpc_test.go @@ -298,9 +298,10 @@ func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error for { select { case <-ctx.Done(): - fmt.Println("ctxdone1") + fmt.Println("ctxdone1", i, eq) return case <-wait: + fmt.Println("CONSUMED WAIT: ", i) } n += i @@ -393,6 +394,57 @@ func TestChan(t *testing.T) { require.Equal(t, false, ok) } +func TestChanClosing(t *testing.T) { + var client struct { + Sub func(context.Context, int, int) (<-chan int, error) + } + + serverHandler := &ChanHandler{ + wait: make(chan struct{}, 5), + } + + rpcServer := NewServer() + rpcServer.Register("ChanHandler", serverHandler) + + testServ := httptest.NewServer(rpcServer) + defer testServ.Close() + + closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil) + require.NoError(t, err) + + defer closer() + + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2, cancel2 := context.WithCancel(context.Background()) + + // sub + + sub1, err := client.Sub(ctx1, 2, -1) + require.NoError(t, err) + + sub2, err := client.Sub(ctx2, 3, -1) + require.NoError(t, err) + + // recv one + + serverHandler.wait <- struct{}{} + serverHandler.wait <- struct{}{} + + require.Equal(t, 2, <-sub1) + require.Equal(t, 3, <-sub2) + + cancel1() + + require.Equal(t, 0, <-sub1) + time.Sleep(time.Millisecond * 50) // make sure the loop has exited (having a shared wait channel makes this annoying) + + serverHandler.wait <- struct{}{} + require.Equal(t, 6, <-sub2) + + cancel2() + require.Equal(t, 0, <-sub2) +} + func TestChanServerClose(t *testing.T) { var client struct { Sub func(context.Context, int, int) (<-chan int, error) diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 44e7ffbf7..568eaa37f 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -204,13 +204,14 @@ func (c *wsConn) handleOutChans() { if !ok { // Output channel closed, cleanup, and tell remote that this happened + id := caseToID[chosen-internal] + n := len(cases) - 1 if n > 0 { cases[chosen] = cases[n] caseToID[chosen-internal] = caseToID[n-internal] } - id := caseToID[chosen-internal] cases = cases[:n] caseToID = caseToID[:n-internal]