Merge pull request #1741 from filecoin-project/fix/json-out-chan-closing
properly select channel ID to close for handleOutChans
This commit is contained in:
commit
d539be4875
@ -298,9 +298,10 @@ func (h *ChanHandler) Sub(ctx context.Context, i int, eq int) (<-chan int, error
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
fmt.Println("ctxdone1")
|
fmt.Println("ctxdone1", i, eq)
|
||||||
return
|
return
|
||||||
case <-wait:
|
case <-wait:
|
||||||
|
fmt.Println("CONSUMED WAIT: ", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
n += i
|
n += i
|
||||||
@ -393,6 +394,57 @@ func TestChan(t *testing.T) {
|
|||||||
require.Equal(t, false, ok)
|
require.Equal(t, false, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestChanClosing(t *testing.T) {
|
||||||
|
var client struct {
|
||||||
|
Sub func(context.Context, int, int) (<-chan int, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
serverHandler := &ChanHandler{
|
||||||
|
wait: make(chan struct{}, 5),
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcServer := NewServer()
|
||||||
|
rpcServer.Register("ChanHandler", serverHandler)
|
||||||
|
|
||||||
|
testServ := httptest.NewServer(rpcServer)
|
||||||
|
defer testServ.Close()
|
||||||
|
|
||||||
|
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer closer()
|
||||||
|
|
||||||
|
ctx1, cancel1 := context.WithCancel(context.Background())
|
||||||
|
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// sub
|
||||||
|
|
||||||
|
sub1, err := client.Sub(ctx1, 2, -1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sub2, err := client.Sub(ctx2, 3, -1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// recv one
|
||||||
|
|
||||||
|
serverHandler.wait <- struct{}{}
|
||||||
|
serverHandler.wait <- struct{}{}
|
||||||
|
|
||||||
|
require.Equal(t, 2, <-sub1)
|
||||||
|
require.Equal(t, 3, <-sub2)
|
||||||
|
|
||||||
|
cancel1()
|
||||||
|
|
||||||
|
require.Equal(t, 0, <-sub1)
|
||||||
|
time.Sleep(time.Millisecond * 50) // make sure the loop has exited (having a shared wait channel makes this annoying)
|
||||||
|
|
||||||
|
serverHandler.wait <- struct{}{}
|
||||||
|
require.Equal(t, 6, <-sub2)
|
||||||
|
|
||||||
|
cancel2()
|
||||||
|
require.Equal(t, 0, <-sub2)
|
||||||
|
}
|
||||||
|
|
||||||
func TestChanServerClose(t *testing.T) {
|
func TestChanServerClose(t *testing.T) {
|
||||||
var client struct {
|
var client struct {
|
||||||
Sub func(context.Context, int, int) (<-chan int, error)
|
Sub func(context.Context, int, int) (<-chan int, error)
|
||||||
|
@ -204,13 +204,14 @@ func (c *wsConn) handleOutChans() {
|
|||||||
if !ok {
|
if !ok {
|
||||||
// Output channel closed, cleanup, and tell remote that this happened
|
// Output channel closed, cleanup, and tell remote that this happened
|
||||||
|
|
||||||
|
id := caseToID[chosen-internal]
|
||||||
|
|
||||||
n := len(cases) - 1
|
n := len(cases) - 1
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
cases[chosen] = cases[n]
|
cases[chosen] = cases[n]
|
||||||
caseToID[chosen-internal] = caseToID[n-internal]
|
caseToID[chosen-internal] = caseToID[n-internal]
|
||||||
}
|
}
|
||||||
|
|
||||||
id := caseToID[chosen-internal]
|
|
||||||
cases = cases[:n]
|
cases = cases[:n]
|
||||||
caseToID = caseToID[:n-internal]
|
caseToID = caseToID[:n-internal]
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user