diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 360e36aef..cb144de77 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -137,6 +137,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) // unpack chan type to make sure it's reflect.BothDir ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + chCtx, chCancel := context.WithCancel(ctx) retVal = ch.Convert(ftyp.Out(valOut)) buf := (&list.List{}).Init() @@ -144,6 +145,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) return ctx, func(result []byte, ok bool) { if !ok { + chCancel() // remote channel closed, close ours too ch.Close() return @@ -172,14 +174,18 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) go func() { for buf.Len() > 0 { - front := buf.Front() + select { + case <- chCtx.Done(): + buf.Init() + default: + front := buf.Front() + bufLk.Unlock() - bufLk.Unlock() + ch.Send(front.Value.(reflect.Value).Elem()) - ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea - - bufLk.Lock() - buf.Remove(front) + bufLk.Lock() + buf.Remove(front) + } } bufLk.Unlock()