event: fix datarace between Subscribe and Send

This commit is contained in:
Péter Szilágyi 2017-10-20 14:01:38 +03:00
parent 0e7d019e0e
commit 65738c1eb3
No known key found for this signature in database
GPG Key ID: E9AE538CEDF8293D

View File

@ -127,6 +127,8 @@ func (f *Feed) remove(sub *feedSub) {
// Send delivers to all subscribed channels simultaneously. // Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to. // It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) { func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init) f.once.Do(f.init)
<-f.sendLock <-f.sendLock
@ -134,14 +136,14 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.mu.Lock() f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...) f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil f.inbox = nil
f.mu.Unlock()
// Set the sent value on all channels.
rvalue := reflect.ValueOf(value)
if !f.typecheck(rvalue.Type()) { if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{} f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
} }
f.mu.Unlock()
// Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ { for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue f.sendCases[i].Send = rvalue
} }