fix: websocket client duplicated messages (#955)

* Problem: websocket client get duplicated messages

Closes: #954
Solution:
- localize the subscription management within current connection

* changelog

* fix linter

* fix test building

Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
This commit is contained in:
yihuang 2022-02-26 03:08:30 +08:00 committed by GitHub
parent 19a1be61c7
commit 0d69a69625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 174 additions and 269 deletions

View File

@ -38,11 +38,16 @@ Ref: https://keepachangelog.com/en/1.0.0/
## Unreleased ## Unreleased
### Improvements ### Improvements
* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger. * (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to Cosmos SDK logger.
* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support.
### Bug Fixes
* (rpc) [#955](https://github.com/tharsis/ethermint/pull/955) Fix websocket server push duplicated messages to subscriber.
* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support.
* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger.
## [v0.10.0-beta1] - 2022-02-15 ## [v0.10.0-beta1] - 2022-02-15

View File

@ -90,62 +90,62 @@ func (es *EventSystem) WithContext(ctx context.Context) {
// subscribe performs a new event subscription to a given Tendermint event. // subscribe performs a new event subscription to a given Tendermint event.
// The subscription creates a unidirectional receive event channel to receive the ResultEvent. // The subscription creates a unidirectional receive event channel to receive the ResultEvent.
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) { func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) {
var ( var (
err error err error
cancelFn context.CancelFunc cancelFn context.CancelFunc
) )
es.ctx, cancelFn = context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
existingSubs := es.eventBus.Topics() existingSubs := es.eventBus.Topics()
for _, topic := range existingSubs { for _, topic := range existingSubs {
if topic == sub.event { if topic == sub.event {
eventCh, err := es.eventBus.Subscribe(sub.event) eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil { if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event)
return nil, cancelFn, err return nil, nil, err
} }
sub.eventCh = eventCh sub.eventCh = eventCh
return sub, cancelFn, nil return sub, unsubFn, nil
} }
} }
switch sub.typ { switch sub.typ {
case filters.LogsSubscription: case filters.LogsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event) err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.BlocksSubscription: case filters.BlocksSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event) err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.PendingTransactionsSubscription: case filters.PendingTransactionsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event) err = es.tmWSClient.Subscribe(ctx, sub.event)
default: default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ) err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
} }
if err != nil { if err != nil {
sub.err <- err sub.err <- err
return nil, cancelFn, err return nil, nil, err
} }
// wrap events in a go routine to prevent blocking // wrap events in a go routine to prevent blocking
es.install <- sub es.install <- sub
<-sub.installed <-sub.installed
eventCh, err := es.eventBus.Subscribe(sub.event) eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil { if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
return sub, cancelFn, err
} }
sub.eventCh = eventCh sub.eventCh = eventCh
return sub, cancelFn, nil return sub, unsubFn, nil
} }
// SubscribeLogs creates a subscription that will write all logs matching the // SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to // given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned. // block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
var from, to rpc.BlockNumber var from, to rpc.BlockNumber
if crit.FromBlock == nil { if crit.FromBlock == nil {
from = rpc.LatestBlockNumber from = rpc.LatestBlockNumber
@ -173,7 +173,7 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription
// subscribeLogs creates a subscription that will write all logs matching the // subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. // given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{ sub := &Subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: filters.LogsSubscription, typ: filters.LogsSubscription,
@ -188,7 +188,7 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription
} }
// SubscribeNewHeads subscribes to new block headers events. // SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) { func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{ sub := &Subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: filters.BlocksSubscription, typ: filters.BlocksSubscription,
@ -202,7 +202,7 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, er
} }
// SubscribePendingTxs subscribes to new pending transactions events from the mempool. // SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) { func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{ sub := &Subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription, typ: filters.PendingTransactionsSubscription,

View File

@ -2,35 +2,43 @@ package pubsub
import ( import (
"sync" "sync"
"sync/atomic"
"github.com/pkg/errors" "github.com/pkg/errors"
coretypes "github.com/tendermint/tendermint/rpc/core/types" coretypes "github.com/tendermint/tendermint/rpc/core/types"
) )
type UnsubscribeFunc func()
type EventBus interface { type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string) RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, error) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string Topics() []string
} }
type memEventBus struct { type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex topicsMux *sync.RWMutex
subscribers map[string][]chan<- coretypes.ResultEvent subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex subscribersMux *sync.RWMutex
currentUniqueID uint64
} }
func NewEventBus() EventBus { func NewEventBus() EventBus {
return &memEventBus{ return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent), topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex), topicsMux: new(sync.RWMutex),
subscribers: make(map[string][]chan<- coretypes.ResultEvent), subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex), subscribersMux: new(sync.RWMutex),
} }
} }
func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}
func (m *memEventBus) Topics() (topics []string) { func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock() m.topicsMux.RLock()
defer m.topicsMux.RUnlock() defer m.topicsMux.RUnlock()
@ -67,21 +75,32 @@ func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Unlock() m.topicsMux.Unlock()
} }
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, error) { func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock() m.topicsMux.RLock()
_, ok := m.topics[name] _, ok := m.topics[name]
m.topicsMux.RUnlock() m.topicsMux.RUnlock()
if !ok { if !ok {
return nil, errors.Errorf("topic not found: %s", name) return nil, nil, errors.Errorf("topic not found: %s", name)
} }
ch := make(chan coretypes.ResultEvent) ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock() m.subscribersMux.Lock()
defer m.subscribersMux.Unlock() defer m.subscribersMux.Unlock()
m.subscribers[name] = append(m.subscribers[name], ch)
return ch, nil id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch
unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}
return ch, unsubscribe, nil
} }
func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) { func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {

View File

@ -37,13 +37,13 @@ func TestSubscribe(t *testing.T) {
q.AddTopic("lol", lolSrc) q.AddTopic("lol", lolSrc)
kekSubC, err := q.Subscribe("kek") kekSubC, _, err := q.Subscribe("kek")
require.NoError(t, err) require.NoError(t, err)
lolSubC, err := q.Subscribe("lol") lolSubC, _, err := q.Subscribe("lol")
require.NoError(t, err) require.NoError(t, err)
lol2SubC, err := q.Subscribe("lol") lol2SubC, _, err := q.Subscribe("lol")
require.NoError(t, err) require.NoError(t, err)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)

View File

@ -23,11 +23,11 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
rpcfilters "github.com/tharsis/ethermint/rpc/ethereum/namespaces/eth/filters" rpcfilters "github.com/tharsis/ethermint/rpc/ethereum/namespaces/eth/filters"
"github.com/tharsis/ethermint/rpc/ethereum/pubsub"
"github.com/tharsis/ethermint/rpc/ethereum/types" "github.com/tharsis/ethermint/rpc/ethereum/types"
"github.com/tharsis/ethermint/server/config" "github.com/tharsis/ethermint/server/config"
evmtypes "github.com/tharsis/ethermint/x/evm/types" evmtypes "github.com/tharsis/ethermint/x/evm/types"
@ -168,6 +168,15 @@ func (w *wsConn) ReadMessage() (messageType int, p []byte, err error) {
} }
func (s *websocketsServer) readLoop(wsConn *wsConn) { func (s *websocketsServer) readLoop(wsConn *wsConn) {
// subscriptions of current connection
subscriptions := make(map[rpc.ID]pubsub.UnsubscribeFunc)
defer func() {
// cancel all subscriptions when connection closed
for _, unsubFn := range subscriptions {
unsubFn()
}
}()
for { for {
_, mb, err := wsConn.ReadMessage() _, mb, err := wsConn.ReadMessage()
if err != nil { if err != nil {
@ -195,59 +204,66 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) {
} }
connID := msg["id"].(float64) connID := msg["id"].(float64)
if method == "eth_subscribe" { switch method {
case "eth_subscribe":
params := msg["params"].([]interface{}) params := msg["params"].([]interface{})
if len(params) == 0 { if len(params) == 0 {
s.sendErrResponse(wsConn, "invalid parameters") s.sendErrResponse(wsConn, "invalid parameters")
continue continue
} }
id, err := s.api.subscribe(wsConn, params) subID := rpc.NewID()
unsubFn, err := s.api.subscribe(wsConn, subID, params)
if err != nil { if err != nil {
s.sendErrResponse(wsConn, err.Error()) s.sendErrResponse(wsConn, err.Error())
continue continue
} }
subscriptions[subID] = unsubFn
res := &SubscriptionResponseJSON{ res := &SubscriptionResponseJSON{
Jsonrpc: "2.0", Jsonrpc: "2.0",
ID: connID, ID: connID,
Result: id, Result: subID,
} }
err = wsConn.WriteJSON(res) if err := wsConn.WriteJSON(res); err != nil {
if err != nil { break
}
case "eth_unsubscribe":
params, ok := msg["params"].([]interface{})
if !ok {
s.sendErrResponse(wsConn, "invalid parameters")
continue continue
} }
id, ok := params[0].(string)
continue if !ok {
} else if method == "eth_unsubscribe" {
ids, ok := msg["params"].([]interface{})
if _, idok := ids[0].(string); !ok || !idok {
s.sendErrResponse(wsConn, "invalid parameters") s.sendErrResponse(wsConn, "invalid parameters")
continue continue
} }
ok = s.api.unsubscribe(rpc.ID(ids[0].(string))) subID := rpc.ID(id)
unsubFn, ok := subscriptions[subID]
if ok {
delete(subscriptions, subID)
unsubFn()
}
res := &SubscriptionResponseJSON{ res := &SubscriptionResponseJSON{
Jsonrpc: "2.0", Jsonrpc: "2.0",
ID: connID, ID: connID,
Result: ok, Result: ok,
} }
err = wsConn.WriteJSON(res) if err := wsConn.WriteJSON(res); err != nil {
if err != nil { break
continue
} }
default:
continue
}
// otherwise, call the usual rpc server to respond // otherwise, call the usual rpc server to respond
err = s.tcpGetAndSendResponse(wsConn, mb) err = s.tcpGetAndSendResponse(wsConn, mb)
if err != nil { if err != nil {
s.sendErrResponse(wsConn, err.Error()) s.sendErrResponse(wsConn, err.Error())
} }
} }
}
} }
// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response
@ -281,18 +297,9 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro
return wsConn.WriteJSON(wsSend) return wsConn.WriteJSON(wsSend)
} }
type wsSubscription struct {
sub *rpcfilters.Subscription
unsubscribed chan struct{} // closed when unsubscribing
wsConn *wsConn
query string
}
// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
type pubSubAPI struct { type pubSubAPI struct {
events *rpcfilters.EventSystem events *rpcfilters.EventSystem
filtersMu *sync.RWMutex
filters map[rpc.ID]*wsSubscription
logger log.Logger logger log.Logger
clientCtx client.Context clientCtx client.Context
} }
@ -302,80 +309,51 @@ func newPubSubAPI(clientCtx client.Context, logger log.Logger, tmWSClient *rpccl
logger = logger.With("module", "websocket-client") logger = logger.With("module", "websocket-client")
return &pubSubAPI{ return &pubSubAPI{
events: rpcfilters.NewEventSystem(logger, tmWSClient), events: rpcfilters.NewEventSystem(logger, tmWSClient),
filtersMu: new(sync.RWMutex),
filters: make(map[rpc.ID]*wsSubscription),
logger: logger, logger: logger,
clientCtx: clientCtx, clientCtx: clientCtx,
} }
} }
func (api *pubSubAPI) subscribe(wsConn *wsConn, params []interface{}) (rpc.ID, error) { func (api *pubSubAPI) subscribe(wsConn *wsConn, subID rpc.ID, params []interface{}) (pubsub.UnsubscribeFunc, error) {
method, ok := params[0].(string) method, ok := params[0].(string)
if !ok { if !ok {
return "0", errors.New("invalid parameters") return nil, errors.New("invalid parameters")
} }
switch method { switch method {
case "newHeads": case "newHeads":
// TODO: handle extra params // TODO: handle extra params
return api.subscribeNewHeads(wsConn) return api.subscribeNewHeads(wsConn, subID)
case "logs": case "logs":
if len(params) > 1 { if len(params) > 1 {
return api.subscribeLogs(wsConn, params[1]) return api.subscribeLogs(wsConn, subID, params[1])
} }
return api.subscribeLogs(wsConn, nil) return api.subscribeLogs(wsConn, subID, nil)
case "newPendingTransactions": case "newPendingTransactions":
return api.subscribePendingTransactions(wsConn) return api.subscribePendingTransactions(wsConn, subID)
case "syncing": case "syncing":
return api.subscribeSyncing(wsConn) return api.subscribeSyncing(wsConn, subID)
default: default:
return "0", errors.Errorf("unsupported method %s", method) return nil, errors.Errorf("unsupported method %s", method)
} }
} }
func (api *pubSubAPI) unsubscribe(id rpc.ID) bool { func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) {
api.filtersMu.Lock() sub, unsubFn, err := api.events.SubscribeNewHeads()
defer api.filtersMu.Unlock()
wsSub, ok := api.filters[id]
if !ok {
return false
}
wsSub.sub.Unsubscribe(api.events)
close(api.filters[id].unsubscribed)
delete(api.filters, id)
return true
}
func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn) (rpc.ID, error) {
query := "subscribeNewHeads"
subID := rpc.NewID()
sub, _, err := api.events.SubscribeNewHeads()
if err != nil { if err != nil {
return "", errors.Wrap(err, "error creating block filter") return nil, errors.Wrap(err, "error creating block filter")
} }
// TODO: use events // TODO: use events
baseFee := big.NewInt(params.InitialBaseFee) baseFee := big.NewInt(params.InitialBaseFee)
unsubscribed := make(chan struct{}) go func() {
api.filtersMu.Lock() headersCh := sub.Event()
api.filters[subID] = &wsSubscription{ errCh := sub.Err()
sub: sub,
wsConn: wsConn,
unsubscribed: unsubscribed,
query: query,
}
api.filtersMu.Unlock()
go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) {
for { for {
select { select {
case event, ok := <-headersCh: case event, ok := <-headersCh:
if !ok { if !ok {
api.unsubscribe(subID)
return return
} }
@ -387,13 +365,6 @@ func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn) (rpc.ID, error) {
header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee)
api.filtersMu.RLock()
for subID, wsSub := range api.filters {
subID := subID
wsSub := wsSub
if wsSub.query != query {
continue
}
// write to ws conn // write to ws conn
res := &SubscriptionNotification{ res := &SubscriptionNotification{
Jsonrpc: "2.0", Jsonrpc: "2.0",
@ -404,42 +375,26 @@ func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn) (rpc.ID, error) {
}, },
} }
err = wsSub.wsConn.WriteJSON(res) err = wsConn.WriteJSON(res)
if err != nil { if err != nil {
api.logger.Error("error writing header, will drop peer", "error", err.Error()) api.logger.Error("error writing header, will drop peer", "error", err.Error())
try(func() { try(func() {
api.filtersMu.RUnlock()
api.filtersMu.Lock()
defer func() {
api.filtersMu.Unlock()
api.filtersMu.RLock()
}()
if err != websocket.ErrCloseSent { if err != websocket.ErrCloseSent {
_ = wsSub.wsConn.Close() _ = wsConn.Close()
} }
delete(api.filters, subID)
close(wsSub.unsubscribed)
}, api.logger, "closing websocket peer sub") }, api.logger, "closing websocket peer sub")
} }
}
api.filtersMu.RUnlock()
case err, ok := <-errCh: case err, ok := <-errCh:
if !ok { if !ok {
api.unsubscribe(subID)
return return
} }
api.logger.Debug("dropping NewHeads WebSocket subscription", "subscription-id", subID, "error", err.Error()) api.logger.Debug("dropping NewHeads WebSocket subscription", "subscription-id", subID, "error", err.Error())
api.unsubscribe(subID)
case <-unsubscribed:
return
} }
} }
}(sub.Event(), sub.Err()) }()
return subID, nil return unsubFn, nil
} }
func try(fn func(), l log.Logger, desc string) { func try(fn func(), l log.Logger, desc string) {
@ -459,7 +414,7 @@ func try(fn func(), l log.Logger, desc string) {
fn() fn()
} }
func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, error) { func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interface{}) (pubsub.UnsubscribeFunc, error) {
crit := filters.FilterCriteria{} crit := filters.FilterCriteria{}
if extra != nil { if extra != nil {
@ -467,7 +422,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok { if !ok {
err := errors.New("invalid criteria") err := errors.New("invalid criteria")
api.logger.Debug("invalid criteria", "type", fmt.Sprintf("%T", extra)) api.logger.Debug("invalid criteria", "type", fmt.Sprintf("%T", extra))
return "", err return nil, err
} }
if params["address"] != nil { if params["address"] != nil {
@ -476,7 +431,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok && !sok { if !ok && !sok {
err := errors.New("invalid addresses; must be address or array of addresses") err := errors.New("invalid addresses; must be address or array of addresses")
api.logger.Debug("invalid addresses", "type", fmt.Sprintf("%T", params["address"])) api.logger.Debug("invalid addresses", "type", fmt.Sprintf("%T", params["address"]))
return "", err return nil, err
} }
if ok { if ok {
@ -490,7 +445,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok { if !ok {
err := errors.New("invalid address") err := errors.New("invalid address")
api.logger.Debug("invalid address", "type", fmt.Sprintf("%T", addr)) api.logger.Debug("invalid address", "type", fmt.Sprintf("%T", addr))
return "", err return nil, err
} }
crit.Addresses = append(crit.Addresses, common.HexToAddress(address)) crit.Addresses = append(crit.Addresses, common.HexToAddress(address))
@ -503,7 +458,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok { if !ok {
err := errors.Errorf("invalid topics: %s", topics) err := errors.Errorf("invalid topics: %s", topics)
api.logger.Error("invalid topics", "type", fmt.Sprintf("%T", topics)) api.logger.Error("invalid topics", "type", fmt.Sprintf("%T", topics))
return "", err return nil, err
} }
crit.Topics = make([][]common.Hash, len(topics)) crit.Topics = make([][]common.Hash, len(topics))
@ -528,7 +483,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
// in case we don't have list, but a single topic value // in case we don't have list, but a single topic value
if topic, ok := subtopics.(string); ok { if topic, ok := subtopics.(string); ok {
if err := addCritTopic(topicIdx, topic); err != nil { if err := addCritTopic(topicIdx, topic); err != nil {
return "", err return nil, err
} }
continue continue
@ -539,7 +494,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok { if !ok {
err := errors.New("invalid subtopics") err := errors.New("invalid subtopics")
api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopics)) api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopics))
return "", err return nil, err
} }
subtopicsCollect := make([]common.Hash, len(subtopicsList)) subtopicsCollect := make([]common.Hash, len(subtopicsList))
@ -548,7 +503,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
if !ok { if !ok {
err := errors.Errorf("invalid subtopic: %s", subtopic) err := errors.Errorf("invalid subtopic: %s", subtopic)
api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopic)) api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopic))
return "", err return nil, err
} }
subtopicsCollect[idx] = common.HexToHash(tstr) subtopicsCollect[idx] = common.HexToHash(tstr)
@ -559,37 +514,19 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
} }
} }
critBz, err := json.Marshal(crit) sub, unsubFn, err := api.events.SubscribeLogs(crit)
if err != nil {
api.logger.Error("failed to JSON marshal criteria", "error", err.Error())
return rpc.ID(""), err
}
query := "subscribeLogs" + string(critBz)
subID := rpc.NewID()
sub, _, err := api.events.SubscribeLogs(crit)
if err != nil { if err != nil {
api.logger.Error("failed to subscribe logs", "error", err.Error()) api.logger.Error("failed to subscribe logs", "error", err.Error())
return rpc.ID(""), err return nil, err
} }
unsubscribed := make(chan struct{}) go func() {
api.filtersMu.Lock() ch := sub.Event()
api.filters[subID] = &wsSubscription{ errCh := sub.Err()
sub: sub,
wsConn: wsConn,
unsubscribed: unsubscribed,
query: query,
}
api.filtersMu.Unlock()
go func(ch <-chan coretypes.ResultEvent, errCh <-chan error, subID rpc.ID) {
for { for {
select { select {
case event, ok := <-ch: case event, ok := <-ch:
if !ok { if !ok {
api.unsubscribe(subID)
return return
} }
@ -610,14 +547,6 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
continue continue
} }
api.filtersMu.RLock()
wsSub, ok := api.filters[subID]
if !ok {
api.logger.Debug("subID not in filters", subID)
return
}
api.filtersMu.RUnlock()
for _, ethLog := range logs { for _, ethLog := range logs {
res := &SubscriptionNotification{ res := &SubscriptionNotification{
Jsonrpc: "2.0", Jsonrpc: "2.0",
@ -628,57 +557,36 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID,
}, },
} }
err = wsSub.wsConn.WriteJSON(res) err = wsConn.WriteJSON(res)
if err != nil { if err != nil {
try(func() { try(func() {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if err != websocket.ErrCloseSent { if err != websocket.ErrCloseSent {
_ = wsSub.wsConn.Close() _ = wsConn.Close()
} }
delete(api.filters, subID)
close(wsSub.unsubscribed)
}, api.logger, "closing websocket peer sub") }, api.logger, "closing websocket peer sub")
} }
} }
case err, ok := <-errCh: case err, ok := <-errCh:
if !ok { if !ok {
api.unsubscribe(subID)
return return
} }
api.logger.Debug("dropping Logs WebSocket subscription", "subscription-id", subID, "error", err.Error()) api.logger.Debug("dropping Logs WebSocket subscription", "subscription-id", subID, "error", err.Error())
api.unsubscribe(subID)
case <-unsubscribed:
return
} }
} }
}(sub.Event(), sub.Err(), subID) }()
return subID, nil return unsubFn, nil
} }
func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn) (rpc.ID, error) { func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) {
query := "subscribePendingTransactions" sub, unsubFn, err := api.events.SubscribePendingTxs()
subID := rpc.NewID()
sub, _, err := api.events.SubscribePendingTxs()
if err != nil { if err != nil {
return "", errors.Wrap(err, "error creating block filter: %s") return nil, errors.Wrap(err, "error creating block filter: %s")
} }
unsubscribed := make(chan struct{}) go func() {
api.filtersMu.Lock() txsCh := sub.Event()
api.filters[subID] = &wsSubscription{ errCh := sub.Err()
sub: sub,
wsConn: wsConn,
unsubscribed: unsubscribed,
query: query,
}
api.filtersMu.Unlock()
go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
for { for {
select { select {
case ev := <-txsCh: case ev := <-txsCh:
@ -689,14 +597,7 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn) (rpc.ID, erro
continue continue
} }
api.filtersMu.RLock()
for _, ethTx := range ethTxs { for _, ethTx := range ethTxs {
for subID, wsSub := range api.filters {
subID := subID
wsSub := wsSub
if wsSub.query != query {
continue
}
// write to ws conn // write to ws conn
res := &SubscriptionNotification{ res := &SubscriptionNotification{
Jsonrpc: "2.0", Jsonrpc: "2.0",
@ -707,49 +608,29 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn) (rpc.ID, erro
}, },
} }
err = wsSub.wsConn.WriteJSON(res) err = wsConn.WriteJSON(res)
if err != nil { if err != nil {
api.logger.Debug("error writing header, will drop peer", "error", err.Error()) api.logger.Debug("error writing header, will drop peer", "error", err.Error())
try(func() { try(func() {
// Release the initial read lock in .RUnlock() before
// invoking .Lock() to avoid the deadlock in
// https://github.com/tharsis/ethermint/issues/821#issuecomment-1033959984
// and as documented at https://pkg.go.dev/sync#RWMutex
api.filtersMu.RUnlock()
api.filtersMu.Lock()
defer func() {
api.filtersMu.Unlock()
api.filtersMu.RLock()
}()
if err != websocket.ErrCloseSent { if err != websocket.ErrCloseSent {
_ = wsSub.wsConn.Close() _ = wsConn.Close()
} }
delete(api.filters, subID)
close(wsSub.unsubscribed)
}, api.logger, "closing websocket peer sub") }, api.logger, "closing websocket peer sub")
} }
} }
}
api.filtersMu.RUnlock()
case err, ok := <-errCh: case err, ok := <-errCh:
if !ok { if !ok {
api.unsubscribe(subID)
return return
} }
api.logger.Debug("dropping PendingTransactions WebSocket subscription", subID, "error", err.Error()) api.logger.Debug("dropping PendingTransactions WebSocket subscription", subID, "error", err.Error())
api.unsubscribe(subID)
case <-unsubscribed:
return
} }
} }
}(sub.Event(), sub.Err()) }()
return subID, nil return unsubFn, nil
} }
func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn) (rpc.ID, error) { func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) {
return "", nil return nil, errors.New("syncing subscription is not implemented")
} }