* eth/filter: check nil pointer when unsubscribe * eth/filters, accounts, rpc: abort system if subscribe failed * eth/filter: add crit log before exit * eth/filter, event: minor fixes
		
			
				
	
	
		
			218 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2014 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| // Package event deals with subscriptions to real-time events.
 | |
| package event
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // TypeMuxEvent is a time-tagged notification pushed to subscribers.
 | |
| type TypeMuxEvent struct {
 | |
| 	Time time.Time
 | |
| 	Data interface{}
 | |
| }
 | |
| 
 | |
| // A TypeMux dispatches events to registered receivers. Receivers can be
 | |
| // registered to handle events of certain type. Any operation
 | |
| // called after mux is stopped will return ErrMuxClosed.
 | |
| //
 | |
| // The zero value is ready to use.
 | |
| //
 | |
| // Deprecated: use Feed
 | |
| type TypeMux struct {
 | |
| 	mutex   sync.RWMutex
 | |
| 	subm    map[reflect.Type][]*TypeMuxSubscription
 | |
| 	stopped bool
 | |
| }
 | |
| 
 | |
| // ErrMuxClosed is returned when Posting on a closed TypeMux.
 | |
| var ErrMuxClosed = errors.New("event: mux closed")
 | |
| 
 | |
| // Subscribe creates a subscription for events of the given types. The
 | |
| // subscription's channel is closed when it is unsubscribed
 | |
| // or the mux is closed.
 | |
| func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
 | |
| 	sub := newsub(mux)
 | |
| 	mux.mutex.Lock()
 | |
| 	defer mux.mutex.Unlock()
 | |
| 	if mux.stopped {
 | |
| 		// set the status to closed so that calling Unsubscribe after this
 | |
| 		// call will short circuit.
 | |
| 		sub.closed = true
 | |
| 		close(sub.postC)
 | |
| 	} else {
 | |
| 		if mux.subm == nil {
 | |
| 			mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
 | |
| 		}
 | |
| 		for _, t := range types {
 | |
| 			rtyp := reflect.TypeOf(t)
 | |
| 			oldsubs := mux.subm[rtyp]
 | |
| 			if find(oldsubs, sub) != -1 {
 | |
| 				panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
 | |
| 			}
 | |
| 			subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
 | |
| 			copy(subs, oldsubs)
 | |
| 			subs[len(oldsubs)] = sub
 | |
| 			mux.subm[rtyp] = subs
 | |
| 		}
 | |
| 	}
 | |
| 	return sub
 | |
| }
 | |
| 
 | |
| // Post sends an event to all receivers registered for the given type.
 | |
| // It returns ErrMuxClosed if the mux has been stopped.
 | |
| func (mux *TypeMux) Post(ev interface{}) error {
 | |
| 	event := &TypeMuxEvent{
 | |
| 		Time: time.Now(),
 | |
| 		Data: ev,
 | |
| 	}
 | |
| 	rtyp := reflect.TypeOf(ev)
 | |
| 	mux.mutex.RLock()
 | |
| 	if mux.stopped {
 | |
| 		mux.mutex.RUnlock()
 | |
| 		return ErrMuxClosed
 | |
| 	}
 | |
| 	subs := mux.subm[rtyp]
 | |
| 	mux.mutex.RUnlock()
 | |
| 	for _, sub := range subs {
 | |
| 		sub.deliver(event)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop closes a mux. The mux can no longer be used.
 | |
| // Future Post calls will fail with ErrMuxClosed.
 | |
| // Stop blocks until all current deliveries have finished.
 | |
| func (mux *TypeMux) Stop() {
 | |
| 	mux.mutex.Lock()
 | |
| 	for _, subs := range mux.subm {
 | |
| 		for _, sub := range subs {
 | |
| 			sub.closewait()
 | |
| 		}
 | |
| 	}
 | |
| 	mux.subm = nil
 | |
| 	mux.stopped = true
 | |
| 	mux.mutex.Unlock()
 | |
| }
 | |
| 
 | |
| func (mux *TypeMux) del(s *TypeMuxSubscription) {
 | |
| 	mux.mutex.Lock()
 | |
| 	for typ, subs := range mux.subm {
 | |
| 		if pos := find(subs, s); pos >= 0 {
 | |
| 			if len(subs) == 1 {
 | |
| 				delete(mux.subm, typ)
 | |
| 			} else {
 | |
| 				mux.subm[typ] = posdelete(subs, pos)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	s.mux.mutex.Unlock()
 | |
| }
 | |
| 
 | |
| func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int {
 | |
| 	for i, v := range slice {
 | |
| 		if v == item {
 | |
| 			return i
 | |
| 		}
 | |
| 	}
 | |
| 	return -1
 | |
| }
 | |
| 
 | |
| func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
 | |
| 	news := make([]*TypeMuxSubscription, len(slice)-1)
 | |
| 	copy(news[:pos], slice[:pos])
 | |
| 	copy(news[pos:], slice[pos+1:])
 | |
| 	return news
 | |
| }
 | |
| 
 | |
| // TypeMuxSubscription is a subscription established through TypeMux.
 | |
| type TypeMuxSubscription struct {
 | |
| 	mux     *TypeMux
 | |
| 	created time.Time
 | |
| 	closeMu sync.Mutex
 | |
| 	closing chan struct{}
 | |
| 	closed  bool
 | |
| 
 | |
| 	// these two are the same channel. they are stored separately so
 | |
| 	// postC can be set to nil without affecting the return value of
 | |
| 	// Chan.
 | |
| 	postMu sync.RWMutex
 | |
| 	readC  <-chan *TypeMuxEvent
 | |
| 	postC  chan<- *TypeMuxEvent
 | |
| }
 | |
| 
 | |
| func newsub(mux *TypeMux) *TypeMuxSubscription {
 | |
| 	c := make(chan *TypeMuxEvent)
 | |
| 	return &TypeMuxSubscription{
 | |
| 		mux:     mux,
 | |
| 		created: time.Now(),
 | |
| 		readC:   c,
 | |
| 		postC:   c,
 | |
| 		closing: make(chan struct{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent {
 | |
| 	return s.readC
 | |
| }
 | |
| 
 | |
| func (s *TypeMuxSubscription) Unsubscribe() {
 | |
| 	s.mux.del(s)
 | |
| 	s.closewait()
 | |
| }
 | |
| 
 | |
| func (s *TypeMuxSubscription) Closed() bool {
 | |
| 	s.closeMu.Lock()
 | |
| 	defer s.closeMu.Unlock()
 | |
| 	return s.closed
 | |
| }
 | |
| 
 | |
| func (s *TypeMuxSubscription) closewait() {
 | |
| 	s.closeMu.Lock()
 | |
| 	defer s.closeMu.Unlock()
 | |
| 	if s.closed {
 | |
| 		return
 | |
| 	}
 | |
| 	close(s.closing)
 | |
| 	s.closed = true
 | |
| 
 | |
| 	s.postMu.Lock()
 | |
| 	close(s.postC)
 | |
| 	s.postC = nil
 | |
| 	s.postMu.Unlock()
 | |
| }
 | |
| 
 | |
| func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
 | |
| 	// Short circuit delivery if stale event
 | |
| 	if s.created.After(event.Time) {
 | |
| 		return
 | |
| 	}
 | |
| 	// Otherwise deliver the event
 | |
| 	s.postMu.RLock()
 | |
| 	defer s.postMu.RUnlock()
 | |
| 
 | |
| 	select {
 | |
| 	case s.postC <- event:
 | |
| 	case <-s.closing:
 | |
| 	}
 | |
| }
 |