rpc: fix subscription corner case and speed up tests (#17874)

Notifier tracks whether subscription are 'active'. A subscription
becomes active when the subscription ID has been sent to the client. If
the client sends notifications in the request handler before the
subscription becomes active they are dropped. The tests tried to work
around this problem by always waiting 5s before sending the first
notification.

Fix it by buffering notifications until the subscription becomes active.
This speeds up all subscription tests.

Also fix TestSubscriptionMultipleNamespaces to wait for three messages
per subscription instead of six. The test now finishes just after all
notifications have been received and doesn't hit the 30s timeout anymore.
This commit is contained in:
Felix Lange 2018-10-09 16:34:24 +02:00 committed by GitHub
parent da290e9707
commit 4e474c74dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 70 deletions

View File

@ -52,9 +52,10 @@ type notifierKey struct{}
// Server callbacks use the notifier to send notifications. // Server callbacks use the notifier to send notifications.
type Notifier struct { type Notifier struct {
codec ServerCodec codec ServerCodec
subMu sync.RWMutex // guards active and inactive maps subMu sync.Mutex
active map[ID]*Subscription active map[ID]*Subscription
inactive map[ID]*Subscription inactive map[ID]*Subscription
buffer map[ID][]interface{} // unsent notifications of inactive subscriptions
} }
// newNotifier creates a new notifier that can be used to send subscription // newNotifier creates a new notifier that can be used to send subscription
@ -64,6 +65,7 @@ func newNotifier(codec ServerCodec) *Notifier {
codec: codec, codec: codec,
active: make(map[ID]*Subscription), active: make(map[ID]*Subscription),
inactive: make(map[ID]*Subscription), inactive: make(map[ID]*Subscription),
buffer: make(map[ID][]interface{}),
} }
} }
@ -88,20 +90,26 @@ func (n *Notifier) CreateSubscription() *Subscription {
// Notify sends a notification to the client with the given data as payload. // Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned. // If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error { func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock() n.subMu.Lock()
defer n.subMu.RUnlock() defer n.subMu.Unlock()
sub, active := n.active[id] if sub, active := n.active[id]; active {
if active { n.send(sub, data)
notification := n.codec.CreateNotification(string(id), sub.namespace, data) } else {
if err := n.codec.Write(notification); err != nil { n.buffer[id] = append(n.buffer[id], data)
n.codec.Close()
return err
}
} }
return nil return nil
} }
func (n *Notifier) send(sub *Subscription, data interface{}) error {
notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data)
err := n.codec.Write(notification)
if err != nil {
n.codec.Close()
}
return err
}
// Closed returns a channel that is closed when the RPC connection is closed. // Closed returns a channel that is closed when the RPC connection is closed.
func (n *Notifier) Closed() <-chan interface{} { func (n *Notifier) Closed() <-chan interface{} {
return n.codec.Closed() return n.codec.Closed()
@ -127,9 +135,15 @@ func (n *Notifier) unsubscribe(id ID) error {
func (n *Notifier) activate(id ID, namespace string) { func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock() n.subMu.Lock()
defer n.subMu.Unlock() defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found { if sub, found := n.inactive[id]; found {
sub.namespace = namespace sub.namespace = namespace
n.active[id] = sub n.active[id] = sub
delete(n.inactive, id) delete(n.inactive, id)
// Send buffered notifications.
for _, data := range n.buffer[id] {
n.send(sub, data)
}
delete(n.buffer, id)
} }
} }

View File

@ -28,8 +28,7 @@ import (
type NotificationTestService struct { type NotificationTestService struct {
mu sync.Mutex mu sync.Mutex
unsubscribed bool unsubscribed chan string
gotHangSubscriptionReq chan struct{} gotHangSubscriptionReq chan struct{}
unblockHangSubscription chan struct{} unblockHangSubscription chan struct{}
} }
@ -38,16 +37,10 @@ func (s *NotificationTestService) Echo(i int) int {
return i return i
} }
func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.unsubscribed
}
func (s *NotificationTestService) Unsubscribe(subid string) { func (s *NotificationTestService) Unsubscribe(subid string) {
s.mu.Lock() if s.unsubscribed != nil {
s.unsubscribed = true s.unsubscribed <- subid
s.mu.Unlock() }
} }
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
@ -65,7 +58,6 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
// test expects n events, if we begin sending event immediately some events // test expects n events, if we begin sending event immediately some events
// will probably be dropped since the subscription ID might not be send to // will probably be dropped since the subscription ID might not be send to
// the client. // the client.
time.Sleep(5 * time.Second)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if err := notifier.Notify(subscription.ID, val+i); err != nil { if err := notifier.Notify(subscription.ID, val+i); err != nil {
return return
@ -74,13 +66,10 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
select { select {
case <-notifier.Closed(): case <-notifier.Closed():
s.mu.Lock()
s.unsubscribed = true
s.mu.Unlock()
case <-subscription.Err(): case <-subscription.Err():
s.mu.Lock() }
s.unsubscribed = true if s.unsubscribed != nil {
s.mu.Unlock() s.unsubscribed <- string(subscription.ID)
} }
}() }()
@ -107,7 +96,7 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int)
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
server := NewServer() server := NewServer()
service := &NotificationTestService{} service := &NotificationTestService{unsubscribed: make(chan string)}
if err := server.RegisterName("eth", service); err != nil { if err := server.RegisterName("eth", service); err != nil {
t.Fatalf("unable to register test service %v", err) t.Fatalf("unable to register test service %v", err)
@ -157,10 +146,10 @@ func TestNotifications(t *testing.T) {
} }
clientConn.Close() // causes notification unsubscribe callback to be called clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second) select {
case <-service.unsubscribed:
if !service.wasUnsubCallbackCalled() { case <-time.After(1 * time.Second):
t.Error("unsubscribe callback not called after closing connection") t.Fatal("Unsubscribe not called after one second")
} }
} }
@ -228,16 +217,17 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
func TestSubscriptionMultipleNamespaces(t *testing.T) { func TestSubscriptionMultipleNamespaces(t *testing.T) {
var ( var (
namespaces = []string{"eth", "shh", "bzz"} namespaces = []string{"eth", "shh", "bzz"}
server = NewServer()
service = NotificationTestService{} service = NotificationTestService{}
clientConn, serverConn = net.Pipe() subCount = len(namespaces) * 2
notificationCount = 3
server = NewServer()
clientConn, serverConn = net.Pipe()
out = json.NewEncoder(clientConn) out = json.NewEncoder(clientConn)
in = json.NewDecoder(clientConn) in = json.NewDecoder(clientConn)
successes = make(chan jsonSuccessResponse) successes = make(chan jsonSuccessResponse)
failures = make(chan jsonErrResponse) failures = make(chan jsonErrResponse)
notifications = make(chan jsonNotification) notifications = make(chan jsonNotification)
errors = make(chan error, 10) errors = make(chan error, 10)
) )
@ -255,13 +245,12 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
go waitForMessages(t, in, successes, failures, notifications, errors) go waitForMessages(t, in, successes, failures, notifications, errors)
// create subscriptions one by one // create subscriptions one by one
n := 3
for i, namespace := range namespaces { for i, namespace := range namespaces {
request := map[string]interface{}{ request := map[string]interface{}{
"id": i, "id": i,
"method": fmt.Sprintf("%s_subscribe", namespace), "method": fmt.Sprintf("%s_subscribe", namespace),
"version": "2.0", "version": "2.0",
"params": []interface{}{"someSubscription", n, i}, "params": []interface{}{"someSubscription", notificationCount, i},
} }
if err := out.Encode(&request); err != nil { if err := out.Encode(&request); err != nil {
@ -276,7 +265,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
"id": i, "id": i,
"method": fmt.Sprintf("%s_subscribe", namespace), "method": fmt.Sprintf("%s_subscribe", namespace),
"version": "2.0", "version": "2.0",
"params": []interface{}{"someSubscription", n, i}, "params": []interface{}{"someSubscription", notificationCount, i},
}) })
} }
@ -285,46 +274,40 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
} }
timeout := time.After(30 * time.Second) timeout := time.After(30 * time.Second)
subids := make(map[string]string, 2*len(namespaces)) subids := make(map[string]string, subCount)
count := make(map[string]int, 2*len(namespaces)) count := make(map[string]int, subCount)
allReceived := func() bool {
for { done := len(count) == subCount
done := true for _, c := range count {
for id := range count { if c < notificationCount {
if count, found := count[id]; !found || count < (2*n) {
done = false done = false
} }
} }
return done
if done && len(count) == len(namespaces) {
break
} }
for !allReceived() {
select { select {
case err := <-errors:
t.Fatal(err)
case suc := <-successes: // subscription created case suc := <-successes: // subscription created
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string) subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
case notification := <-notifications:
count[notification.Params.Subscription]++
case err := <-errors:
t.Fatal(err)
case failure := <-failures: case failure := <-failures:
t.Errorf("received error: %v", failure.Error) t.Errorf("received error: %v", failure.Error)
case notification := <-notifications:
if cnt, found := count[notification.Params.Subscription]; found {
count[notification.Params.Subscription] = cnt + 1
} else {
count[notification.Params.Subscription] = 1
}
case <-timeout: case <-timeout:
for _, namespace := range namespaces { for _, namespace := range namespaces {
subid, found := subids[namespace] subid, found := subids[namespace]
if !found { if !found {
t.Errorf("Subscription for '%s' not created", namespace) t.Errorf("subscription for %q not created", namespace)
continue continue
} }
if count, found := count[subid]; !found || count < n { if count, found := count[subid]; !found || count < notificationCount {
t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace) t.Errorf("didn't receive all notifications (%d<%d) in time for namespace %q", count, notificationCount, namespace)
} }
} }
return t.Fatal("timed out")
} }
} }
} }