laconicd/rpc/ethereum/pubsub/pubsub.go
2022-09-30 12:11:39 +05:30

144 lines
3.0 KiB
Go

package pubsub
import (
"sync"
"sync/atomic"
"github.com/pkg/errors"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)
type UnsubscribeFunc func()
type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}
type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}
func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}
func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}
func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()
topics = make([]string, 0, len(m.topics))
for topicName := range m.topics {
topics = append(topics, topicName)
}
return topics
}
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()
if ok {
return errors.New("topic already registered")
}
m.topicsMux.Lock()
m.topics[name] = src
m.topicsMux.Unlock()
go m.publishTopic(name, src)
return nil
}
func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
}
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()
if !ok {
return nil, nil, errors.Errorf("topic not found: %s", name)
}
ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch
unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}
return ch, unsubscribe, nil
}
func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
for {
msg, ok := <-src
if !ok {
m.closeAllSubscribers(name)
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
return
}
m.publishAllSubscribers(name, msg)
}
}
func (m *memEventBus) closeAllSubscribers(name string) {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
subsribers := m.subscribers[name]
delete(m.subscribers, name)
for _, sub := range subsribers {
close(sub)
}
}
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) {
m.subscribersMux.RLock()
subsribers := m.subscribers[name]
m.subscribersMux.RUnlock()
for _, sub := range subsribers {
select {
case sub <- msg:
default:
}
}
}