event: make TypeMux zero value ready to use
This commit is contained in:
parent
10bbf265b2
commit
6906904896
@ -23,6 +23,8 @@ type Subscription interface {
|
|||||||
// A TypeMux dispatches events to registered receivers. Receivers can be
|
// A TypeMux dispatches events to registered receivers. Receivers can be
|
||||||
// registered to handle events of certain type. Any operation
|
// registered to handle events of certain type. Any operation
|
||||||
// called after mux is stopped will return ErrMuxClosed.
|
// called after mux is stopped will return ErrMuxClosed.
|
||||||
|
//
|
||||||
|
// The zero value is ready to use.
|
||||||
type TypeMux struct {
|
type TypeMux struct {
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
subm map[reflect.Type][]*muxsub
|
subm map[reflect.Type][]*muxsub
|
||||||
@ -32,11 +34,6 @@ type TypeMux struct {
|
|||||||
// ErrMuxClosed is returned when Posting on a closed TypeMux.
|
// ErrMuxClosed is returned when Posting on a closed TypeMux.
|
||||||
var ErrMuxClosed = errors.New("event: mux closed")
|
var ErrMuxClosed = errors.New("event: mux closed")
|
||||||
|
|
||||||
// NewTypeMux creates a running mux.
|
|
||||||
func NewTypeMux() *TypeMux {
|
|
||||||
return &TypeMux{subm: make(map[reflect.Type][]*muxsub)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe creates a subscription for events of the given types. The
|
// Subscribe creates a subscription for events of the given types. The
|
||||||
// subscription's channel is closed when it is unsubscribed
|
// subscription's channel is closed when it is unsubscribed
|
||||||
// or the mux is closed.
|
// or the mux is closed.
|
||||||
@ -44,9 +41,11 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
|
|||||||
sub := newsub(mux)
|
sub := newsub(mux)
|
||||||
mux.mutex.Lock()
|
mux.mutex.Lock()
|
||||||
if mux.stopped {
|
if mux.stopped {
|
||||||
mux.mutex.Unlock()
|
|
||||||
close(sub.postC)
|
close(sub.postC)
|
||||||
} else {
|
} else {
|
||||||
|
if mux.subm == nil {
|
||||||
|
mux.subm = make(map[reflect.Type][]*muxsub)
|
||||||
|
}
|
||||||
for _, t := range types {
|
for _, t := range types {
|
||||||
rtyp := reflect.TypeOf(t)
|
rtyp := reflect.TypeOf(t)
|
||||||
oldsubs := mux.subm[rtyp]
|
oldsubs := mux.subm[rtyp]
|
||||||
@ -55,8 +54,8 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
|
|||||||
subs[len(oldsubs)] = sub
|
subs[len(oldsubs)] = sub
|
||||||
mux.subm[rtyp] = subs
|
mux.subm[rtyp] = subs
|
||||||
}
|
}
|
||||||
mux.mutex.Unlock()
|
|
||||||
}
|
}
|
||||||
|
mux.mutex.Unlock()
|
||||||
return sub
|
return sub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
type testEvent int
|
type testEvent int
|
||||||
|
|
||||||
func TestSub(t *testing.T) {
|
func TestSub(t *testing.T) {
|
||||||
mux := NewTypeMux()
|
mux := new(TypeMux)
|
||||||
defer mux.Stop()
|
defer mux.Stop()
|
||||||
|
|
||||||
sub := mux.Subscribe(testEvent(0))
|
sub := mux.Subscribe(testEvent(0))
|
||||||
@ -28,7 +28,7 @@ func TestSub(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMuxErrorAfterStop(t *testing.T) {
|
func TestMuxErrorAfterStop(t *testing.T) {
|
||||||
mux := NewTypeMux()
|
mux := new(TypeMux)
|
||||||
mux.Stop()
|
mux.Stop()
|
||||||
|
|
||||||
sub := mux.Subscribe(testEvent(0))
|
sub := mux.Subscribe(testEvent(0))
|
||||||
@ -41,7 +41,7 @@ func TestMuxErrorAfterStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUnsubscribeUnblockPost(t *testing.T) {
|
func TestUnsubscribeUnblockPost(t *testing.T) {
|
||||||
mux := NewTypeMux()
|
mux := new(TypeMux)
|
||||||
defer mux.Stop()
|
defer mux.Stop()
|
||||||
|
|
||||||
sub := mux.Subscribe(testEvent(0))
|
sub := mux.Subscribe(testEvent(0))
|
||||||
@ -62,7 +62,7 @@ func TestUnsubscribeUnblockPost(t *testing.T) {
|
|||||||
|
|
||||||
func TestMuxConcurrent(t *testing.T) {
|
func TestMuxConcurrent(t *testing.T) {
|
||||||
rand.Seed(time.Now().Unix())
|
rand.Seed(time.Now().Unix())
|
||||||
mux := NewTypeMux()
|
mux := new(TypeMux)
|
||||||
defer mux.Stop()
|
defer mux.Stop()
|
||||||
|
|
||||||
recv := make(chan int)
|
recv := make(chan int)
|
||||||
@ -111,7 +111,7 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkPost3(b *testing.B) {
|
func BenchmarkPost3(b *testing.B) {
|
||||||
var mux = NewTypeMux()
|
var mux = new(TypeMux)
|
||||||
defer mux.Stop()
|
defer mux.Stop()
|
||||||
emptySubscriber(mux, testEvent(0))
|
emptySubscriber(mux, testEvent(0))
|
||||||
emptySubscriber(mux, testEvent(0))
|
emptySubscriber(mux, testEvent(0))
|
||||||
@ -123,7 +123,7 @@ func BenchmarkPost3(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkPostConcurrent(b *testing.B) {
|
func BenchmarkPostConcurrent(b *testing.B) {
|
||||||
var mux = NewTypeMux()
|
var mux = new(TypeMux)
|
||||||
defer mux.Stop()
|
defer mux.Stop()
|
||||||
emptySubscriber(mux, testEvent(0))
|
emptySubscriber(mux, testEvent(0))
|
||||||
emptySubscriber(mux, testEvent(0))
|
emptySubscriber(mux, testEvent(0))
|
||||||
|
Loading…
Reference in New Issue
Block a user