rpc: tighter shutdown synchronization in client subscription (#22597)

This fixes a rare issue where the client subscription forwarding loop
would attempt send on the subscription's channel after Unsubscribe has
returned, leading to a panic if the subscription channel was already
closed by the user. Example:

    sub, _ := client.Subscribe(..., channel, ...)
    sub.Unsubscribe()
    close(channel)

The race occurred because Unsubscribe called quitWithServer to tell the
forwarding loop to stop sending on sub.channel, but did not wait for the
loop to actually come down. This is fixed by adding an additional channel
to track the shutdown, on which Unsubscribe now waits.

Fixes #22322
This commit is contained in:
Felix Lange 2021-03-30 20:09:30 +02:00 committed by GitHub
parent 61ff3e86b2
commit 4a37ae510e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 40 deletions

View File

@ -18,6 +18,7 @@ package rpc
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
@ -376,6 +377,93 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
} }
} }
// unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls.
type unsubscribeRecorder struct {
ServerCodec
unsubscribes map[string]bool
}
func (r *unsubscribeRecorder) readBatch() ([]*jsonrpcMessage, bool, error) {
if r.unsubscribes == nil {
r.unsubscribes = make(map[string]bool)
}
msgs, batch, err := r.ServerCodec.readBatch()
for _, msg := range msgs {
if msg.isUnsubscribe() {
var params []string
if err := json.Unmarshal(msg.Params, &params); err != nil {
panic("unsubscribe decode error: " + err.Error())
}
r.unsubscribes[params[0]] = true
}
}
return msgs, batch, err
}
// This checks that Client calls the _unsubscribe method on the server when Unsubscribe is
// called on a subscription.
func TestClientSubscriptionUnsubscribeServer(t *testing.T) {
t.Parallel()
// Create the server.
srv := NewServer()
srv.RegisterName("nftest", new(notificationTestService))
p1, p2 := net.Pipe()
recorder := &unsubscribeRecorder{ServerCodec: NewCodec(p1)}
go srv.ServeCodec(recorder, OptionMethodInvocation|OptionSubscriptions)
defer srv.Stop()
// Create the client on the other end of the pipe.
client, _ := newClient(context.Background(), func(context.Context) (ServerCodec, error) {
return NewCodec(p2), nil
})
defer client.Close()
// Create the subscription.
ch := make(chan int)
sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", 1, 1)
if err != nil {
t.Fatal(err)
}
// Unsubscribe and check that unsubscribe was called.
sub.Unsubscribe()
if !recorder.unsubscribes[sub.subid] {
t.Fatal("client did not call unsubscribe method")
}
if _, open := <-sub.Err(); open {
t.Fatal("subscription error channel not closed after unsubscribe")
}
}
// This checks that the subscribed channel can be closed after Unsubscribe.
// It is the reproducer for https://github.com/ethereum/go-ethereum/issues/22322
func TestClientSubscriptionChannelClose(t *testing.T) {
t.Parallel()
var (
srv = NewServer()
httpsrv = httptest.NewServer(srv.WebsocketHandler(nil))
wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:")
)
defer srv.Stop()
defer httpsrv.Close()
srv.RegisterName("nftest", new(notificationTestService))
client, _ := Dial(wsURL)
for i := 0; i < 100; i++ {
ch := make(chan int, 100)
sub, err := client.Subscribe(context.Background(), "nftest", ch, "someSubscription", maxClientSubscriptionBuffer-1, 1)
if err != nil {
t.Fatal(err)
}
sub.Unsubscribe()
close(ch)
}
}
// This test checks that Client doesn't lock up when a single subscriber // This test checks that Client doesn't lock up when a single subscriber
// doesn't read subscription events. // doesn't read subscription events.
func TestClientNotificationStorm(t *testing.T) { func TestClientNotificationStorm(t *testing.T) {

View File

@ -189,7 +189,7 @@ func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
} }
for id, sub := range h.clientSubs { for id, sub := range h.clientSubs {
delete(h.clientSubs, id) delete(h.clientSubs, id)
sub.quitWithError(false, err) sub.close(err)
} }
} }
@ -281,7 +281,7 @@ func (h *handler) handleResponse(msg *jsonrpcMessage) {
return return
} }
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
go op.sub.start() go op.sub.run()
h.clientSubs[op.sub.subid] = op.sub h.clientSubs[op.sub.subid] = op.sub
} }
} }

View File

@ -208,23 +208,37 @@ type ClientSubscription struct {
channel reflect.Value channel reflect.Value
namespace string namespace string
subid string subid string
in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once // The in channel receives notification values from client dispatcher.
quit chan struct{} // quit is closed when the subscription exits in chan json.RawMessage
errOnce sync.Once // ensures err is closed once
err chan error // The error channel receives the error from the forwarding loop.
// It is closed by Unsubscribe.
err chan error
errOnce sync.Once
// Closing of the subscription is requested by sending on 'quit'. This is handled by
// the forwarding loop, which closes 'forwardDone' when it has stopped sending to
// sub.channel. Finally, 'unsubDone' is closed after unsubscribing on the server side.
quit chan error
forwardDone chan struct{}
unsubDone chan struct{}
} }
// This is the sentinel value sent on sub.quit when Unsubscribe is called.
var errUnsubscribed = errors.New("unsubscribed")
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription { func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{ sub := &ClientSubscription{
client: c, client: c,
namespace: namespace, namespace: namespace,
etype: channel.Type().Elem(), etype: channel.Type().Elem(),
channel: channel, channel: channel,
quit: make(chan struct{}), in: make(chan json.RawMessage),
err: make(chan error, 1), quit: make(chan error),
in: make(chan json.RawMessage), forwardDone: make(chan struct{}),
unsubDone: make(chan struct{}),
err: make(chan error, 1),
} }
return sub return sub
} }
@ -232,9 +246,9 @@ func newClientSubscription(c *Client, namespace string, channel reflect.Value) *
// Err returns the subscription error channel. The intended use of Err is to schedule // Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly. // resubscription when the client connection is closed unexpectedly.
// //
// The error channel receives a value when the subscription has ended due // The error channel receives a value when the subscription has ended due to an error. The
// to an error. The received error is nil if Close has been called // received error is nil if Close has been called on the underlying client and no other
// on the underlying client and no other error has occurred. // error has occurred.
// //
// The error channel is closed when Unsubscribe is called on the subscription. // The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error { func (sub *ClientSubscription) Err() <-chan error {
@ -244,41 +258,63 @@ func (sub *ClientSubscription) Err() <-chan error {
// Unsubscribe unsubscribes the notification and closes the error channel. // Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once. // It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() { func (sub *ClientSubscription) Unsubscribe() {
sub.quitWithError(true, nil) sub.errOnce.Do(func() {
sub.errOnce.Do(func() { close(sub.err) }) select {
} case sub.quit <- errUnsubscribed:
<-sub.unsubDone
func (sub *ClientSubscription) quitWithError(unsubscribeServer bool, err error) { case <-sub.unsubDone:
sub.quitOnce.Do(func() {
// The dispatch loop won't be able to execute the unsubscribe call
// if it is blocked on deliver. Close sub.quit first because it
// unblocks deliver.
close(sub.quit)
if unsubscribeServer {
sub.requestUnsubscribe()
}
if err != nil {
if err == ErrClientQuit {
err = nil // Adhere to subscription semantics.
}
sub.err <- err
} }
close(sub.err)
}) })
} }
// deliver is called by the client's message dispatcher to send a notification value.
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select { select {
case sub.in <- result: case sub.in <- result:
return true return true
case <-sub.quit: case <-sub.forwardDone:
return false return false
} }
} }
func (sub *ClientSubscription) start() { // close is called by the client's message dispatcher when the connection is closed.
sub.quitWithError(sub.forward()) func (sub *ClientSubscription) close(err error) {
select {
case sub.quit <- err:
case <-sub.forwardDone:
}
} }
// run is the forwarding loop of the subscription. It runs in its own goroutine and
// is launched by the client's handler after the subscription has been created.
func (sub *ClientSubscription) run() {
defer close(sub.unsubDone)
unsubscribe, err := sub.forward()
// The client's dispatch loop won't be able to execute the unsubscribe call if it is
// blocked in sub.deliver() or sub.close(). Closing forwardDone unblocks them.
close(sub.forwardDone)
// Call the unsubscribe method on the server.
if unsubscribe {
sub.requestUnsubscribe()
}
// Send the error.
if err != nil {
if err == ErrClientQuit {
// ErrClientQuit gets here when Client.Close is called. This is reported as a
// nil error because it's not an error, but we can't close sub.err here.
err = nil
}
sub.err <- err
}
}
// forward is the forwarding loop. It takes in RPC notifications and sends them
// on the subscription channel.
func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) { func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
cases := []reflect.SelectCase{ cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
@ -286,7 +322,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
{Dir: reflect.SelectSend, Chan: sub.channel}, {Dir: reflect.SelectSend, Chan: sub.channel},
} }
buffer := list.New() buffer := list.New()
defer buffer.Init()
for { for {
var chosen int var chosen int
var recv reflect.Value var recv reflect.Value
@ -301,7 +337,15 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
switch chosen { switch chosen {
case 0: // <-sub.quit case 0: // <-sub.quit
return false, nil if !recv.IsNil() {
err = recv.Interface().(error)
}
if err == errUnsubscribed {
// Exiting because Unsubscribe was called, unsubscribe on server.
return true, nil
}
return false, err
case 1: // <-sub.in case 1: // <-sub.in
val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) val, err := sub.unmarshal(recv.Interface().(json.RawMessage))
if err != nil { if err != nil {
@ -311,6 +355,7 @@ func (sub *ClientSubscription) forward() (unsubscribeServer bool, err error) {
return true, ErrSubscriptionQueueOverflow return true, ErrSubscriptionQueueOverflow
} }
buffer.PushBack(val) buffer.PushBack(val)
case 2: // sub.channel<- case 2: // sub.channel<-
cases[2].Send = reflect.Value{} // Don't hold onto the value. cases[2].Send = reflect.Value{} // Don't hold onto the value.
buffer.Remove(buffer.Front()) buffer.Remove(buffer.Front())

View File

@ -129,11 +129,15 @@ func TestClientWebsocketPing(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("client dial error: %v", err) t.Fatalf("client dial error: %v", err)
} }
defer client.Close()
resultChan := make(chan int) resultChan := make(chan int)
sub, err := client.EthSubscribe(ctx, resultChan, "foo") sub, err := client.EthSubscribe(ctx, resultChan, "foo")
if err != nil { if err != nil {
t.Fatalf("client subscribe error: %v", err) t.Fatalf("client subscribe error: %v", err)
} }
// Note: Unsubscribe is not called on this subscription because the mockup
// server can't handle the request.
// Wait for the context's deadline to be reached before proceeding. // Wait for the context's deadline to be reached before proceeding.
// This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798 // This is important for reproducing https://github.com/ethereum/go-ethereum/issues/19798