diff --git a/chain/events/events.go b/chain/events/events.go index c58ab4e81..b7366774e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -107,6 +107,9 @@ func (e *Events) listenHeadChanges(ctx context.Context) { } func (e *Events) listenHeadChangesOnce(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + notifs, err := e.api.ChainNotify(ctx) if err != nil { // TODO: retry diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 628cbc8ad..5e88d843d 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -1,12 +1,14 @@ package jsonrpc import ( + "container/list" "context" "encoding/base64" "encoding/json" "fmt" "net/http" "reflect" + "sync" "sync/atomic" "github.com/gorilla/websocket" @@ -132,6 +134,9 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) ch := reflect.MakeChan(ctyp, 0) // todo: buffer? retVal = ch.Convert(ftyp.Out(valOut)) + buf := (&list.List{}).Init() + var bufLk sync.Mutex + return ctx, func(result []byte, ok bool) { if !ok { // remote channel closed, close ours too @@ -145,7 +150,36 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) return } - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea + bufLk.Lock() + if ctx.Err() != nil { + log.Errorf("got rpc message with cancelled context: %s", ctx.Err()) + bufLk.Unlock() + return + } + + buf.PushBack(val) + + if buf.Len() > 1 { + log.Warnf("rpc output channel has %d buffered messages", buf.Len()) + bufLk.Unlock() + return + } + + go func() { + for buf.Len() > 0 { + front := buf.Front() + + bufLk.Unlock() + + ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea + + bufLk.Lock() + buf.Remove(front) + } + + bufLk.Unlock() + }() + } } @@ -177,6 +211,7 @@ loop: ctxDone = nil c.requests <- clientRequest{ + req: request{ Jsonrpc: "2.0", Method: wsCancel,