// Copyright 2014 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // Package event deals with subscriptions to real-time events. package event import ( "errors" "fmt" "reflect" "sync" "time" ) // TypeMuxEvent is a time-tagged notification pushed to subscribers. type TypeMuxEvent struct { Time time.Time Data interface{} } // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. // // The zero value is ready to use. // // Deprecated: use Feed type TypeMux struct { mutex sync.RWMutex subm map[reflect.Type][]*TypeMuxSubscription stopped bool } // ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription { sub := newsub(mux) mux.mutex.Lock() defer mux.mutex.Unlock() if mux.stopped { // set the status to closed so that calling Unsubscribe after this // call will short circuit. sub.closed = true close(sub.postC) } else { if mux.subm == nil { mux.subm = make(map[reflect.Type][]*TypeMuxSubscription, len(types)) } for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] if find(oldsubs, sub) != -1 { panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) } subs := make([]*TypeMuxSubscription, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } } return sub } // Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. func (mux *TypeMux) Post(ev interface{}) error { event := &TypeMuxEvent{ Time: time.Now(), Data: ev, } rtyp := reflect.TypeOf(ev) mux.mutex.RLock() if mux.stopped { mux.mutex.RUnlock() return ErrMuxClosed } subs := mux.subm[rtyp] mux.mutex.RUnlock() for _, sub := range subs { sub.deliver(event) } return nil } // Stop closes a mux. The mux can no longer be used. // Future Post calls will fail with ErrMuxClosed. // Stop blocks until all current deliveries have finished. func (mux *TypeMux) Stop() { mux.mutex.Lock() defer mux.mutex.Unlock() for _, subs := range mux.subm { for _, sub := range subs { sub.closewait() } } mux.subm = nil mux.stopped = true } func (mux *TypeMux) del(s *TypeMuxSubscription) { mux.mutex.Lock() defer mux.mutex.Unlock() for typ, subs := range mux.subm { if pos := find(subs, s); pos >= 0 { if len(subs) == 1 { delete(mux.subm, typ) } else { mux.subm[typ] = posdelete(subs, pos) } } } } func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int { for i, v := range slice { if v == item { return i } } return -1 } func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription { news := make([]*TypeMuxSubscription, len(slice)-1) copy(news[:pos], slice[:pos]) copy(news[pos:], slice[pos+1:]) return news } // TypeMuxSubscription is a subscription established through TypeMux. type TypeMuxSubscription struct { mux *TypeMux created time.Time closeMu sync.Mutex closing chan struct{} closed bool // these two are the same channel. they are stored separately so // postC can be set to nil without affecting the return value of // Chan. postMu sync.RWMutex readC <-chan *TypeMuxEvent postC chan<- *TypeMuxEvent } func newsub(mux *TypeMux) *TypeMuxSubscription { c := make(chan *TypeMuxEvent) return &TypeMuxSubscription{ mux: mux, created: time.Now(), readC: c, postC: c, closing: make(chan struct{}), } } func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent { return s.readC } func (s *TypeMuxSubscription) Unsubscribe() { s.mux.del(s) s.closewait() } func (s *TypeMuxSubscription) Closed() bool { s.closeMu.Lock() defer s.closeMu.Unlock() return s.closed } func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock() if s.closed { return } close(s.closing) s.closed = true s.postMu.Lock() defer s.postMu.Unlock() close(s.postC) s.postC = nil } func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) { // Short circuit delivery if stale event if s.created.After(event.Time) { return } // Otherwise deliver the event s.postMu.RLock() defer s.postMu.RUnlock() select { case s.postC <- event: case <-s.closing: } }