jsonrpc: Channel buffeering

This commit is contained in:
Łukasz Magiera 2019-09-27 13:37:44 +02:00
parent fe8e1fe1e4
commit 2874022251
2 changed files with 39 additions and 1 deletions

View File

@ -107,6 +107,9 @@ func (e *Events) listenHeadChanges(ctx context.Context) {
} }
func (e *Events) listenHeadChangesOnce(ctx context.Context) error { func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := e.api.ChainNotify(ctx) notifs, err := e.api.ChainNotify(ctx)
if err != nil { if err != nil {
// TODO: retry // TODO: retry

View File

@ -1,12 +1,14 @@
package jsonrpc package jsonrpc
import ( import (
"container/list"
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"reflect" "reflect"
"sync"
"sync/atomic" "sync/atomic"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -132,6 +134,9 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
ch := reflect.MakeChan(ctyp, 0) // todo: buffer? ch := reflect.MakeChan(ctyp, 0) // todo: buffer?
retVal = ch.Convert(ftyp.Out(valOut)) retVal = ch.Convert(ftyp.Out(valOut))
buf := (&list.List{}).Init()
var bufLk sync.Mutex
return ctx, func(result []byte, ok bool) { return ctx, func(result []byte, ok bool) {
if !ok { if !ok {
// remote channel closed, close ours too // remote channel closed, close ours too
@ -145,7 +150,36 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
return return
} }
ch.Send(val.Elem()) // todo: select on ctx is probably a good idea bufLk.Lock()
if ctx.Err() != nil {
log.Errorf("got rpc message with cancelled context: %s", ctx.Err())
bufLk.Unlock()
return
}
buf.PushBack(val)
if buf.Len() > 1 {
log.Warnf("rpc output channel has %d buffered messages", buf.Len())
bufLk.Unlock()
return
}
go func() {
for buf.Len() > 0 {
front := buf.Front()
bufLk.Unlock()
ch.Send(front.Value.(reflect.Value).Elem()) // todo: select on ctx is probably a good idea
bufLk.Lock()
buf.Remove(front)
}
bufLk.Unlock()
}()
} }
} }
@ -177,6 +211,7 @@ loop:
ctxDone = nil ctxDone = nil
c.requests <- clientRequest{ c.requests <- clientRequest{
req: request{ req: request{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Method: wsCancel, Method: wsCancel,