package rpc import ( "context" "fmt" "sync" "time" "github.com/pkg/errors" log "github.com/xlab/suplog" tmjson "github.com/tendermint/tendermint/libs/json" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" coretypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" tmtypes "github.com/tendermint/tendermint/types" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/ethermint/ethereum/rpc/pubsub" evmtypes "github.com/cosmos/ethermint/x/evm/types" ) var ( txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String() evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx, sdk.EventTypeMessage, sdk.AttributeKeyModule, evmtypes.ModuleName)).String() headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String() ) // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria using the Tendermint's RPC client. type EventSystem struct { ctx context.Context tmWSClient *rpcclient.WSClient // light client mode lightMode bool index filterIndex topicChans map[string]chan<- coretypes.ResultEvent indexMux *sync.RWMutex // Channels install chan *Subscription // install filter for event notification uninstall chan *Subscription // remove filter for event notification eventBus pubsub.EventBus } // NewEventSystem creates a new manager that listens for event on the given mux, // parses and filters them. It uses the all map to retrieve filter changes. The // work loop holds its own index that is used to forward events to filters. // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. func NewEventSystem(tmWSClient *rpcclient.WSClient) *EventSystem { index := make(filterIndex) for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*Subscription) } es := &EventSystem{ ctx: context.Background(), tmWSClient: tmWSClient, lightMode: false, index: index, topicChans: make(map[string]chan<- coretypes.ResultEvent, len(index)), indexMux: new(sync.RWMutex), install: make(chan *Subscription), uninstall: make(chan *Subscription), eventBus: pubsub.NewEventBus(), } go es.eventLoop() go es.consumeEvents() return es } // WithContext sets a new context to the EventSystem. This is required to set a timeout context when // a new filter is intantiated. func (es *EventSystem) WithContext(ctx context.Context) { es.ctx = ctx } // subscribe performs a new event subscription to a given Tendermint event. // The subscription creates a unidirectional receive event channel to receive the ResultEvent. func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) { var ( err error cancelFn context.CancelFunc ) es.ctx, cancelFn = context.WithCancel(context.Background()) existingSubs := es.eventBus.Topics() for _, topic := range existingSubs { if topic == sub.event { eventCh, err := es.eventBus.Subscribe(sub.event) if err != nil { err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) return nil, cancelFn, err } sub.eventCh = eventCh return sub, cancelFn, nil } } switch sub.typ { case filters.LogsSubscription: err = es.tmWSClient.Subscribe(es.ctx, sub.event) case filters.BlocksSubscription: err = es.tmWSClient.Subscribe(es.ctx, sub.event) default: err = fmt.Errorf("invalid filter subscription type %d", sub.typ) } if err != nil { sub.err <- err return nil, cancelFn, err } // wrap events in a go routine to prevent blocking es.install <- sub <-sub.installed eventCh, err := es.eventBus.Subscribe(sub.event) if err != nil { err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) return sub, cancelFn, err } sub.eventCh = eventCh return sub, cancelFn, nil } // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber } else { from = rpc.BlockNumber(crit.FromBlock.Int64()) } if crit.ToBlock == nil { to = rpc.LatestBlockNumber } else { to = rpc.BlockNumber(crit.ToBlock.Int64()) } switch { // only interested in new mined logs, mined logs within a specific block range, or // logs from a specific block number to new mined blocks case (from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber), (from >= 0 && to >= 0 && to >= from): return es.subscribeLogs(crit) default: return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to) } } // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.LogsSubscription, event: evmEvents, logsCrit: crit, created: time.Now().UTC(), logs: make(chan []*ethtypes.Log), installed: make(chan struct{}, 1), err: make(chan error, 1), } return es.subscribe(sub) } // SubscribeNewHeads subscribes to new block headers events. func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.BlocksSubscription, event: headerEvents, created: time.Now().UTC(), headers: make(chan *ethtypes.Header), installed: make(chan struct{}, 1), err: make(chan error, 1), } return es.subscribe(sub) } // SubscribePendingTxs subscribes to new pending transactions events from the mempool. func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.PendingTransactionsSubscription, event: txEvents, created: time.Now().UTC(), hashes: make(chan []common.Hash), installed: make(chan struct{}, 1), err: make(chan error, 1), } return es.subscribe(sub) } type filterIndex map[filters.Type]map[rpc.ID]*Subscription // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { for { select { case f := <-es.install: es.indexMux.Lock() es.index[f.typ][f.id] = f ch := make(chan coretypes.ResultEvent) es.topicChans[f.event] = ch if err := es.eventBus.AddTopic(f.event, ch); err != nil { log.WithField("topic", f.event).WithError(err).Errorln("failed to add event topic to event bus") } es.indexMux.Unlock() close(f.installed) case f := <-es.uninstall: es.indexMux.Lock() delete(es.index[f.typ], f.id) var channelInUse bool for _, sub := range es.index[f.typ] { if sub.event == f.event { channelInUse = true break } } // remove topic only when channel is not used by other subscriptions if !channelInUse { if err := es.tmWSClient.Unsubscribe(es.ctx, f.event); err != nil { log.WithError(err).WithField("query", f.event).Errorln("failed to unsubscribe from query") } ch, ok := es.topicChans[f.event] if ok { es.eventBus.RemoveTopic(f.event) close(ch) delete(es.topicChans, f.event) } } es.indexMux.Unlock() close(f.err) } } } func (es *EventSystem) consumeEvents() { for { for rpcResp := range es.tmWSClient.ResponsesCh { var ev coretypes.ResultEvent if rpcResp.Error != nil { time.Sleep(5 * time.Second) continue } else if err := tmjson.Unmarshal(rpcResp.Result, &ev); err != nil { log.WithError(err).Warningln("failed to JSON unmarshal ResponsesCh result event") continue } if len(ev.Query) == 0 { // skip empty responses continue } es.indexMux.RLock() ch, ok := es.topicChans[ev.Query] es.indexMux.RUnlock() if !ok { log.WithField("topic", ev.Query).Warningln("channel for subscription not found, lol") log.Infoln("available channels:", es.eventBus.Topics()) continue } // gracefully handle lagging subscribers t := time.NewTimer(time.Second) select { case <-t.C: log.WithField("topic", ev.Query).Warningln("dropped event during lagging subscription") case ch <- ev: } } time.Sleep(time.Second) } } // Subscription defines a wrapper for the private subscription type Subscription struct { id rpc.ID typ filters.Type event string created time.Time logsCrit filters.FilterCriteria logs chan []*ethtypes.Log hashes chan []common.Hash headers chan *ethtypes.Header installed chan struct{} // closed when the filter is installed eventCh <-chan coretypes.ResultEvent err chan error } // ID returns the underlying subscription RPC identifier. func (s Subscription) ID() rpc.ID { return s.id } // Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the // subscription error channel if unsubscription fails. func (s *Subscription) Unsubscribe(es *EventSystem) { go func() { uninstallLoop: for { // write uninstall request and consume logs/hashes. This prevents // the eventLoop broadcast method to deadlock when writing to the // filter event channel while the subscription loop is waiting for // this method to return (and thus not reading these events). select { case es.uninstall <- s: break uninstallLoop case <-s.logs: case <-s.hashes: case <-s.headers: } } }() } // Err returns the error channel func (s *Subscription) Err() <-chan error { return s.err }