From d98c42c0e37da20df8af74724fbd032b342035ab Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 2 Jun 2020 14:04:44 +0200 Subject: [PATCH] rpc: send websocket ping when connection is idle (#21142) * rpc: send websocket ping when connection is idle * rpc: use non-blocking send for websocket pingReset --- rpc/websocket.go | 66 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/rpc/websocket.go b/rpc/websocket.go index 6e37b8522..a716383be 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -25,6 +25,7 @@ import ( "os" "strings" "sync" + "time" mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/log" @@ -32,8 +33,10 @@ import ( ) const ( - wsReadBuffer = 1024 - wsWriteBuffer = 1024 + wsReadBuffer = 1024 + wsWriteBuffer = 1024 + wsPingInterval = 60 * time.Second + wsPingWriteTimeout = 5 * time.Second ) var wsBufferPool = new(sync.Pool) @@ -168,7 +171,64 @@ func wsClientHeaders(endpoint, origin string) (string, http.Header, error) { return endpointURL.String(), header, nil } +type websocketCodec struct { + *jsonCodec + conn *websocket.Conn + + wg sync.WaitGroup + pingReset chan struct{} +} + func newWebsocketCodec(conn *websocket.Conn) ServerCodec { conn.SetReadLimit(maxRequestContentLength) - return NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON) + wc := &websocketCodec{ + jsonCodec: NewFuncCodec(conn, conn.WriteJSON, conn.ReadJSON).(*jsonCodec), + conn: conn, + pingReset: make(chan struct{}, 1), + } + wc.wg.Add(1) + go wc.pingLoop() + return wc +} + +func (wc *websocketCodec) close() { + wc.jsonCodec.close() + wc.wg.Wait() +} + +func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}) error { + err := wc.jsonCodec.writeJSON(ctx, v) + if err == nil { + // Notify pingLoop to delay the next idle ping. + select { + case wc.pingReset <- struct{}{}: + default: + } + } + return err +} + +// pingLoop sends periodic ping frames when the connection is idle. +func (wc *websocketCodec) pingLoop() { + var timer = time.NewTimer(wsPingInterval) + defer wc.wg.Done() + defer timer.Stop() + + for { + select { + case <-wc.closed(): + return + case <-wc.pingReset: + if !timer.Stop() { + <-timer.C + } + timer.Reset(wsPingInterval) + case <-timer.C: + wc.jsonCodec.encMu.Lock() + wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout)) + wc.conn.WriteMessage(websocket.PingMessage, nil) + wc.jsonCodec.encMu.Unlock() + timer.Reset(wsPingInterval) + } + } }