event: address review issues (multiple commits)

event: address Feed review issues

event: clarify role of NewSubscription function

event: more Feed review fixes

* take sendLock after dropping f.mu
* add constant for number of special cases

event: fix subscribing/unsubscribing while Send is blocked
This commit is contained in:
Felix Lange 2017-01-26 11:57:31 +01:00 committed by Péter Szilágyi
parent a2b4abd89a
commit 1bed9b3fea
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D
3 changed files with 93 additions and 16 deletions

View File

@ -33,7 +33,9 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable
// //
// The zero value is ready to use. // The zero value is ready to use.
type Feed struct { type Feed struct {
sendLock chan struct{} // one-element buffer, empty when held // sendLock has a one-element buffer and is empty when held.
// It protects sendCases.
sendLock chan struct{}
removeSub chan interface{} // interrupts Send removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send sendCases caseList // the active set of select cases used by Send
@ -44,6 +46,10 @@ type Feed struct {
closed bool closed bool
} }
// This is the index of the first actual subscription channel in sendCases.
// sendCases[0] is a SelectRecv case for the removeSub channel.
const firstSubSendCase = 1
type feedTypeError struct { type feedTypeError struct {
got, want reflect.Type got, want reflect.Type
op string op string
@ -67,6 +73,7 @@ func (f *Feed) init() {
// until the subscription is canceled. All channels added must have the same element type. // until the subscription is canceled. All channels added must have the same element type.
// //
// The channel should have ample buffer space to avoid blocking other subscribers. // The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
func (f *Feed) Subscribe(channel interface{}) Subscription { func (f *Feed) Subscribe(channel interface{}) Subscription {
chanval := reflect.ValueOf(channel) chanval := reflect.ValueOf(channel)
chantyp := chanval.Type() chantyp := chanval.Type()
@ -125,13 +132,14 @@ func (f *Feed) remove(sub *feedSub) {
func (f *Feed) Send(value interface{}) (nsent int) { func (f *Feed) Send(value interface{}) (nsent int) {
f.mu.Lock() f.mu.Lock()
f.init() f.init()
f.mu.Unlock()
<-f.sendLock <-f.sendLock
// Add new subscriptions from the inbox, then clear it.
// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...) f.sendCases = append(f.sendCases, f.inbox...)
for i := range f.inbox { f.inbox = nil
f.inbox[i] = reflect.SelectCase{}
}
f.inbox = f.inbox[:0]
f.mu.Unlock() f.mu.Unlock()
// Set the sent value on all channels. // Set the sent value on all channels.
@ -140,7 +148,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.sendLock <- struct{}{} f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
} }
for i := 1; i < len(f.sendCases); i++ { for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue f.sendCases[i].Send = rvalue
} }
@ -150,13 +158,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
// Fast path: try sending without blocking before adding to the select set. // Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free // This should usually succeed if subscribers are fast enough and have free
// buffer space. // buffer space.
for i := 1; i < len(cases); i++ { for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) { if cases[i].Chan.TrySend(rvalue) {
cases = cases.deactivate(i)
nsent++ nsent++
cases = cases.deactivate(i)
i--
} }
} }
if len(cases) == 1 { if len(cases) == firstSubSendCase {
break break
} }
// Select on all the receivers, waiting for them to unblock. // Select on all the receivers, waiting for them to unblock.
@ -174,7 +183,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
} }
// Forget about the sent value and hand off the send lock. // Forget about the sent value and hand off the send lock.
for i := 1; i < len(f.sendCases); i++ { for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{} f.sendCases[i].Send = reflect.Value{}
} }
f.sendLock <- struct{}{} f.sendLock <- struct{}{}

View File

@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) {
done.Wait() done.Wait()
} }
func TestFeedSubscribeBlockedPost(t *testing.T) {
var (
feed Feed
nsends = 2000
ch1 = make(chan int)
ch2 = make(chan int)
wg sync.WaitGroup
)
defer wg.Wait()
feed.Subscribe(ch1)
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
feed.Send(99)
wg.Done()
}()
}
sub2 := feed.Subscribe(ch2)
defer sub2.Unsubscribe()
// We're done when ch1 has received N times.
// The number of receives on ch2 depends on scheduling.
for i := 0; i < nsends; {
select {
case <-ch1:
i++
case <-ch2:
}
}
}
func TestFeedUnsubscribeBlockedPost(t *testing.T) {
var (
feed Feed
nsends = 200
chans = make([]chan int, 2000)
subs = make([]Subscription, len(chans))
bchan = make(chan int)
bsub = feed.Subscribe(bchan)
wg sync.WaitGroup
)
for i := range chans {
chans[i] = make(chan int, nsends)
}
// Queue up some Sends. None of these can make progress while bchan isn't read.
wg.Add(nsends)
for i := 0; i < nsends; i++ {
go func() {
feed.Send(99)
wg.Done()
}()
}
// Subscribe the other channels.
for i, ch := range chans {
subs[i] = feed.Subscribe(ch)
}
// Unsubscribe them again.
for _, sub := range subs {
sub.Unsubscribe()
}
// Unblock the Sends.
bsub.Unsubscribe()
wg.Wait()
}
func TestFeedUnsubscribeFromInbox(t *testing.T) { func TestFeedUnsubscribeFromInbox(t *testing.T) {
var ( var (
feed Feed feed Feed

View File

@ -43,14 +43,14 @@ type Subscription interface {
Unsubscribe() // cancels sending of events, closing the error channel Unsubscribe() // cancels sending of events, closing the error channel
} }
// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn // NewSubscription runs a producer function as a subscription in a new goroutine. The
// is closed when Unsubscribe is called. If fn returns an error, it is sent on the // channel given to the producer is closed when Unsubscribe is called. If fn returns an
// subscription's error channel. // error, it is sent on the subscription's error channel.
func NewSubscription(fn func(<-chan struct{}) error) Subscription { func NewSubscription(producer func(<-chan struct{}) error) Subscription {
s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
go func() { go func() {
defer close(s.err) defer close(s.err)
err := fn(s.unsub) err := producer(s.unsub)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if !s.unsubscribed { if !s.unsubscribed {