diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 360e36aef..81cf4ec0a 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -137,6 +137,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) // unpack chan type to make sure it's reflect.BothDir ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + chCtx, chCancel := context.WithCancel(ctx) retVal = ch.Convert(ftyp.Out(valOut)) buf := (&list.List{}).Init() @@ -144,6 +145,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) return ctx, func(result []byte, ok bool) { if !ok { + chCancel() // remote channel closed, close ours too ch.Close() return @@ -173,13 +175,29 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) go func() { for buf.Len() > 0 { front := buf.Front() - bufLk.Unlock() - ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea + cases := []reflect.SelectCase{ + { + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(chCtx.Done()), + }, + { + Dir: reflect.SelectSend, + Chan: ch, + Send: front.Value.(reflect.Value).Elem(), + }, + } + chosen, _, _ := reflect.Select(cases) bufLk.Lock() - buf.Remove(front) + + switch chosen { + case 0: + buf.Init() + case 1: + buf.Remove(front) + } } bufLk.Unlock()