event: document select case slice use and add edge case test (#16680)
Feed keeps active subscription channels in a slice called 'f.sendCases'. The Send method tracks the active cases in a local variable 'cases' whose value is f.sendCases initially. 'cases' shrinks to a shorter prefix of f.sendCases every time a send succeeds, moving the successful case out of range of the active case list. This can be confusing because the two slices share a backing array. Add more comments to document what is going on. Also add a test for removing a case that is in 'f.sentCases' but not 'cases'.
This commit is contained in:
parent
7beccb29be
commit
53a18d2e27
@ -148,7 +148,9 @@ func (f *Feed) Send(value interface{}) (nsent int) {
|
|||||||
f.sendCases[i].Send = rvalue
|
f.sendCases[i].Send = rvalue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send until all channels except removeSub have been chosen.
|
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
|
||||||
|
// of sendCases. When a send succeeds, the corresponding case moves to the end of
|
||||||
|
// 'cases' and it shrinks by one element.
|
||||||
cases := f.sendCases
|
cases := f.sendCases
|
||||||
for {
|
for {
|
||||||
// Fast path: try sending without blocking before adding to the select set.
|
// Fast path: try sending without blocking before adding to the select set.
|
||||||
@ -170,6 +172,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
|
|||||||
index := f.sendCases.find(recv.Interface())
|
index := f.sendCases.find(recv.Interface())
|
||||||
f.sendCases = f.sendCases.delete(index)
|
f.sendCases = f.sendCases.delete(index)
|
||||||
if index >= 0 && index < len(cases) {
|
if index >= 0 && index < len(cases) {
|
||||||
|
// Shrink 'cases' too because the removed case was still active.
|
||||||
cases = f.sendCases[:len(cases)-1]
|
cases = f.sendCases[:len(cases)-1]
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -235,6 +235,45 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checks that unsubscribing a channel during Send works even if that
|
||||||
|
// channel has already been sent on.
|
||||||
|
func TestFeedUnsubscribeSentChan(t *testing.T) {
|
||||||
|
var (
|
||||||
|
feed Feed
|
||||||
|
ch1 = make(chan int)
|
||||||
|
ch2 = make(chan int)
|
||||||
|
sub1 = feed.Subscribe(ch1)
|
||||||
|
sub2 = feed.Subscribe(ch2)
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
feed.Send(0)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the value on ch1.
|
||||||
|
<-ch1
|
||||||
|
// Unsubscribe ch1, removing it from the send cases.
|
||||||
|
sub1.Unsubscribe()
|
||||||
|
|
||||||
|
// Receive ch2, finishing Send.
|
||||||
|
<-ch2
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Send again. This should send to ch2 only, so the wait group will unblock
|
||||||
|
// as soon as a value is received on ch2.
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
feed.Send(0)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
<-ch2
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestFeedUnsubscribeFromInbox(t *testing.T) {
|
func TestFeedUnsubscribeFromInbox(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
feed Feed
|
feed Feed
|
||||||
|
Loading…
Reference in New Issue
Block a user