rpc: attempt to fix ping/pong logic race (#27733)
This should fix #27726. With enough load, it might happen that the SetPongHandler callback gets invoked before the call to SetReadDeadline is made in pingLoop. When this occurs, the socket will end up with a 30s read deadline even though it got the pong, which will lead to a timeout. The fix here is processing the pong on pingLoop, synchronizing with the code that sends the ping.
This commit is contained in:
parent
35f7f3d015
commit
8f8ef2bc0c
@ -280,15 +280,11 @@ type websocketCodec struct {
|
|||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
pingReset chan struct{}
|
pingReset chan struct{}
|
||||||
|
pongReceived chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) ServerCodec {
|
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) ServerCodec {
|
||||||
conn.SetReadLimit(wsMessageSizeLimit)
|
conn.SetReadLimit(wsMessageSizeLimit)
|
||||||
conn.SetPongHandler(func(appData string) error {
|
|
||||||
conn.SetReadDeadline(time.Time{})
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
encode := func(v interface{}, isErrorResponse bool) error {
|
encode := func(v interface{}, isErrorResponse bool) error {
|
||||||
return conn.WriteJSON(v)
|
return conn.WriteJSON(v)
|
||||||
}
|
}
|
||||||
@ -296,6 +292,7 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) Serve
|
|||||||
jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec),
|
jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec),
|
||||||
conn: conn,
|
conn: conn,
|
||||||
pingReset: make(chan struct{}, 1),
|
pingReset: make(chan struct{}, 1),
|
||||||
|
pongReceived: make(chan struct{}),
|
||||||
info: PeerInfo{
|
info: PeerInfo{
|
||||||
Transport: "ws",
|
Transport: "ws",
|
||||||
RemoteAddr: conn.RemoteAddr().String(),
|
RemoteAddr: conn.RemoteAddr().String(),
|
||||||
@ -306,6 +303,13 @@ func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header) Serve
|
|||||||
wc.info.HTTP.Origin = req.Get("Origin")
|
wc.info.HTTP.Origin = req.Get("Origin")
|
||||||
wc.info.HTTP.UserAgent = req.Get("User-Agent")
|
wc.info.HTTP.UserAgent = req.Get("User-Agent")
|
||||||
// Start pinger.
|
// Start pinger.
|
||||||
|
conn.SetPongHandler(func(appData string) error {
|
||||||
|
select {
|
||||||
|
case wc.pongReceived <- struct{}{}:
|
||||||
|
case <-wc.closed():
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
wc.wg.Add(1)
|
wc.wg.Add(1)
|
||||||
go wc.pingLoop()
|
go wc.pingLoop()
|
||||||
return wc
|
return wc
|
||||||
@ -334,26 +338,31 @@ func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError
|
|||||||
|
|
||||||
// pingLoop sends periodic ping frames when the connection is idle.
|
// pingLoop sends periodic ping frames when the connection is idle.
|
||||||
func (wc *websocketCodec) pingLoop() {
|
func (wc *websocketCodec) pingLoop() {
|
||||||
var timer = time.NewTimer(wsPingInterval)
|
var pingTimer = time.NewTimer(wsPingInterval)
|
||||||
defer wc.wg.Done()
|
defer wc.wg.Done()
|
||||||
defer timer.Stop()
|
defer pingTimer.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-wc.closed():
|
case <-wc.closed():
|
||||||
return
|
return
|
||||||
|
|
||||||
case <-wc.pingReset:
|
case <-wc.pingReset:
|
||||||
if !timer.Stop() {
|
if !pingTimer.Stop() {
|
||||||
<-timer.C
|
<-pingTimer.C
|
||||||
}
|
}
|
||||||
timer.Reset(wsPingInterval)
|
pingTimer.Reset(wsPingInterval)
|
||||||
case <-timer.C:
|
|
||||||
|
case <-pingTimer.C:
|
||||||
wc.jsonCodec.encMu.Lock()
|
wc.jsonCodec.encMu.Lock()
|
||||||
wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
|
wc.conn.SetWriteDeadline(time.Now().Add(wsPingWriteTimeout))
|
||||||
wc.conn.WriteMessage(websocket.PingMessage, nil)
|
wc.conn.WriteMessage(websocket.PingMessage, nil)
|
||||||
wc.conn.SetReadDeadline(time.Now().Add(wsPongTimeout))
|
wc.conn.SetReadDeadline(time.Now().Add(wsPongTimeout))
|
||||||
wc.jsonCodec.encMu.Unlock()
|
wc.jsonCodec.encMu.Unlock()
|
||||||
timer.Reset(wsPingInterval)
|
pingTimer.Reset(wsPingInterval)
|
||||||
|
|
||||||
|
case <-wc.pongReceived:
|
||||||
|
wc.conn.SetReadDeadline(time.Time{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user