434 lines
14 KiB
Go
434 lines
14 KiB
Go
|
package rpc
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
"time"
|
||
|
|
||
|
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||
|
rpcclient "github.com/tendermint/tendermint/rpc/client"
|
||
|
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||
|
tmtypes "github.com/tendermint/tendermint/types"
|
||
|
|
||
|
"github.com/ethereum/go-ethereum/common"
|
||
|
ethtypes "github.com/ethereum/go-ethereum/core/types"
|
||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||
|
"github.com/ethereum/go-ethereum/rpc"
|
||
|
|
||
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
||
|
|
||
|
evmtypes "github.com/cosmos/ethermint/x/evm/types"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
txEvents = tmtypes.QueryForEvent(tmtypes.EventTx).String()
|
||
|
evmEvents = tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s.%s='%s'", tmtypes.EventTypeKey, tmtypes.EventTx, sdk.EventTypeMessage, sdk.AttributeKeyModule, evmtypes.ModuleName)).String()
|
||
|
headerEvents = tmtypes.QueryForEvent(tmtypes.EventNewBlockHeader).String()
|
||
|
)
|
||
|
|
||
|
// EventSystem creates subscriptions, processes events and broadcasts them to the
|
||
|
// subscription which match the subscription criteria using the Tendermint's RPC client.
|
||
|
type EventSystem struct {
|
||
|
ctx context.Context
|
||
|
client rpcclient.Client
|
||
|
|
||
|
// light client mode
|
||
|
lightMode bool
|
||
|
|
||
|
index filterIndex
|
||
|
|
||
|
// Subscriptions
|
||
|
txsSub *Subscription // Subscription for new transaction event
|
||
|
logsSub *Subscription // Subscription for new log event
|
||
|
// rmLogsSub *Subscription // Subscription for removed log event
|
||
|
|
||
|
pendingLogsSub *Subscription // Subscription for pending log event
|
||
|
chainSub *Subscription // Subscription for new chain event
|
||
|
|
||
|
// Channels
|
||
|
install chan *Subscription // install filter for event notification
|
||
|
uninstall chan *Subscription // remove filter for event notification
|
||
|
|
||
|
// Unidirectional channels to receive Tendermint ResultEvents
|
||
|
txsCh <-chan coretypes.ResultEvent // Channel to receive new pending transactions event
|
||
|
logsCh <-chan coretypes.ResultEvent // Channel to receive new log event
|
||
|
pendingLogsCh <-chan coretypes.ResultEvent // Channel to receive new pending log event
|
||
|
// rmLogsCh <-chan coretypes.ResultEvent // Channel to receive removed log event
|
||
|
|
||
|
chainCh <-chan coretypes.ResultEvent // Channel to receive new chain event
|
||
|
}
|
||
|
|
||
|
// NewEventSystem creates a new manager that listens for event on the given mux,
|
||
|
// parses and filters them. It uses the all map to retrieve filter changes. The
|
||
|
// work loop holds its own index that is used to forward events to filters.
|
||
|
//
|
||
|
// The returned manager has a loop that needs to be stopped with the Stop function
|
||
|
// or by stopping the given mux.
|
||
|
func NewEventSystem(client rpcclient.Client) *EventSystem {
|
||
|
index := make(filterIndex)
|
||
|
for i := filters.UnknownSubscription; i < filters.LastIndexSubscription; i++ {
|
||
|
index[i] = make(map[rpc.ID]*Subscription)
|
||
|
}
|
||
|
|
||
|
es := &EventSystem{
|
||
|
ctx: context.Background(),
|
||
|
client: client,
|
||
|
lightMode: false,
|
||
|
index: index,
|
||
|
install: make(chan *Subscription),
|
||
|
uninstall: make(chan *Subscription),
|
||
|
txsCh: make(<-chan coretypes.ResultEvent),
|
||
|
logsCh: make(<-chan coretypes.ResultEvent),
|
||
|
pendingLogsCh: make(<-chan coretypes.ResultEvent),
|
||
|
// rmLogsCh: make(<-chan coretypes.ResultEvent),
|
||
|
chainCh: make(<-chan coretypes.ResultEvent),
|
||
|
}
|
||
|
|
||
|
go es.eventLoop()
|
||
|
return es
|
||
|
}
|
||
|
|
||
|
// WithContext sets a new context to the EventSystem. This is required to set a timeout context when
|
||
|
// a new filter is intantiated.
|
||
|
func (es *EventSystem) WithContext(ctx context.Context) {
|
||
|
es.ctx = ctx
|
||
|
}
|
||
|
|
||
|
// subscribe performs a new event subscription to a given Tendermint event.
|
||
|
// The subscription creates a unidirectional receive event channel to receive the ResultEvent. By
|
||
|
// default, the subscription timeouts (i.e is canceled) after 5 minutes. This function returns an
|
||
|
// error if the subscription fails (eg: if the identifier is already subscribed) or if the filter
|
||
|
// type is invalid.
|
||
|
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) {
|
||
|
var (
|
||
|
err error
|
||
|
cancelFn context.CancelFunc
|
||
|
eventCh <-chan coretypes.ResultEvent
|
||
|
)
|
||
|
|
||
|
es.ctx, cancelFn = context.WithTimeout(context.Background(), deadline)
|
||
|
|
||
|
switch sub.typ {
|
||
|
case filters.PendingTransactionsSubscription:
|
||
|
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
|
||
|
case filters.PendingLogsSubscription, filters.MinedAndPendingLogsSubscription:
|
||
|
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
|
||
|
case filters.LogsSubscription:
|
||
|
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
|
||
|
case filters.BlocksSubscription:
|
||
|
eventCh, err = es.client.Subscribe(es.ctx, string(sub.id), sub.event)
|
||
|
default:
|
||
|
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
sub.err <- err
|
||
|
return nil, cancelFn, err
|
||
|
}
|
||
|
|
||
|
// wrap events in a go routine to prevent blocking
|
||
|
go func() {
|
||
|
es.install <- sub
|
||
|
<-sub.installed
|
||
|
}()
|
||
|
|
||
|
sub.eventCh = eventCh
|
||
|
return sub, cancelFn, nil
|
||
|
}
|
||
|
|
||
|
// SubscribeLogs creates a subscription that will write all logs matching the
|
||
|
// given criteria to the given logs channel. Default value for the from and to
|
||
|
// block is "latest". If the fromBlock > toBlock an error is returned.
|
||
|
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
|
||
|
var from, to rpc.BlockNumber
|
||
|
if crit.FromBlock == nil {
|
||
|
from = rpc.LatestBlockNumber
|
||
|
} else {
|
||
|
from = rpc.BlockNumber(crit.FromBlock.Int64())
|
||
|
}
|
||
|
if crit.ToBlock == nil {
|
||
|
to = rpc.LatestBlockNumber
|
||
|
} else {
|
||
|
to = rpc.BlockNumber(crit.ToBlock.Int64())
|
||
|
}
|
||
|
|
||
|
switch {
|
||
|
// only interested in pending logs
|
||
|
case from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber:
|
||
|
return es.subscribePendingLogs(crit)
|
||
|
|
||
|
// only interested in new mined logs, mined logs within a specific block range, or
|
||
|
// logs from a specific block number to new mined blocks
|
||
|
case (from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber),
|
||
|
(from >= 0 && to >= 0 && to >= from):
|
||
|
return es.subscribeLogs(crit)
|
||
|
|
||
|
// interested in mined logs from a specific block number, new logs and pending logs
|
||
|
case from >= rpc.LatestBlockNumber && (to == rpc.PendingBlockNumber || to == rpc.LatestBlockNumber):
|
||
|
return es.subscribeMinedPendingLogs(crit)
|
||
|
|
||
|
default:
|
||
|
return nil, nil, fmt.Errorf("invalid from and to block combination: from > to (%d > %d)", from, to)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// subscribeMinedPendingLogs creates a subscription that returned mined and
|
||
|
// pending logs that match the given criteria.
|
||
|
func (es *EventSystem) subscribeMinedPendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
|
||
|
sub := &Subscription{
|
||
|
id: rpc.NewID(),
|
||
|
typ: filters.MinedAndPendingLogsSubscription,
|
||
|
event: evmEvents,
|
||
|
logsCrit: crit,
|
||
|
created: time.Now().UTC(),
|
||
|
logs: make(chan []*ethtypes.Log),
|
||
|
installed: make(chan struct{}, 1),
|
||
|
err: make(chan error, 1),
|
||
|
}
|
||
|
return es.subscribe(sub)
|
||
|
}
|
||
|
|
||
|
// subscribeLogs creates a subscription that will write all logs matching the
|
||
|
// given criteria to the given logs channel.
|
||
|
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
|
||
|
sub := &Subscription{
|
||
|
id: rpc.NewID(),
|
||
|
typ: filters.LogsSubscription,
|
||
|
event: evmEvents,
|
||
|
logsCrit: crit,
|
||
|
created: time.Now().UTC(),
|
||
|
logs: make(chan []*ethtypes.Log),
|
||
|
installed: make(chan struct{}, 1),
|
||
|
err: make(chan error, 1),
|
||
|
}
|
||
|
return es.subscribe(sub)
|
||
|
}
|
||
|
|
||
|
// subscribePendingLogs creates a subscription that writes transaction hashes for
|
||
|
// transactions that enter the transaction pool.
|
||
|
func (es *EventSystem) subscribePendingLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
|
||
|
sub := &Subscription{
|
||
|
id: rpc.NewID(),
|
||
|
typ: filters.PendingLogsSubscription,
|
||
|
event: evmEvents,
|
||
|
logsCrit: crit,
|
||
|
created: time.Now().UTC(),
|
||
|
logs: make(chan []*ethtypes.Log),
|
||
|
installed: make(chan struct{}, 1),
|
||
|
err: make(chan error, 1),
|
||
|
}
|
||
|
return es.subscribe(sub)
|
||
|
}
|
||
|
|
||
|
// SubscribeNewHeads subscribes to new block headers events.
|
||
|
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) {
|
||
|
sub := &Subscription{
|
||
|
id: rpc.NewID(),
|
||
|
typ: filters.BlocksSubscription,
|
||
|
event: headerEvents,
|
||
|
created: time.Now().UTC(),
|
||
|
headers: make(chan *ethtypes.Header),
|
||
|
installed: make(chan struct{}, 1),
|
||
|
err: make(chan error, 1),
|
||
|
}
|
||
|
return es.subscribe(sub)
|
||
|
}
|
||
|
|
||
|
// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
|
||
|
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) {
|
||
|
sub := &Subscription{
|
||
|
id: rpc.NewID(),
|
||
|
typ: filters.PendingTransactionsSubscription,
|
||
|
event: txEvents,
|
||
|
created: time.Now().UTC(),
|
||
|
hashes: make(chan []common.Hash),
|
||
|
installed: make(chan struct{}, 1),
|
||
|
err: make(chan error, 1),
|
||
|
}
|
||
|
return es.subscribe(sub)
|
||
|
}
|
||
|
|
||
|
type filterIndex map[filters.Type]map[rpc.ID]*Subscription
|
||
|
|
||
|
func (es *EventSystem) handleLogs(ev coretypes.ResultEvent) {
|
||
|
data, _ := ev.Data.(tmtypes.EventDataTx)
|
||
|
resultData, err := evmtypes.DecodeResultData(data.TxResult.Result.Data)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if len(resultData.Logs) == 0 {
|
||
|
return
|
||
|
}
|
||
|
for _, f := range es.index[filters.LogsSubscription] {
|
||
|
matchedLogs := filterLogs(resultData.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
|
||
|
if len(matchedLogs) > 0 {
|
||
|
f.logs <- matchedLogs
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (es *EventSystem) handleTxsEvent(ev coretypes.ResultEvent) {
|
||
|
data, _ := ev.Data.(tmtypes.EventDataTx)
|
||
|
for _, f := range es.index[filters.PendingTransactionsSubscription] {
|
||
|
f.hashes <- []common.Hash{common.BytesToHash(data.Tx.Hash())}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (es *EventSystem) handleChainEvent(ev coretypes.ResultEvent) {
|
||
|
data, _ := ev.Data.(tmtypes.EventDataNewBlockHeader)
|
||
|
for _, f := range es.index[filters.BlocksSubscription] {
|
||
|
f.headers <- EthHeaderFromTendermint(data.Header)
|
||
|
}
|
||
|
// TODO: light client
|
||
|
}
|
||
|
|
||
|
// eventLoop (un)installs filters and processes mux events.
|
||
|
func (es *EventSystem) eventLoop() {
|
||
|
var (
|
||
|
err error
|
||
|
cancelPendingTxsSubs, cancelLogsSubs, cancelPendingLogsSubs, cancelHeaderSubs context.CancelFunc
|
||
|
)
|
||
|
|
||
|
// Subscribe events
|
||
|
es.txsSub, cancelPendingTxsSubs, err = es.SubscribePendingTxs()
|
||
|
if err != nil {
|
||
|
panic(fmt.Errorf("failed to subscribe pending txs: %w", err))
|
||
|
}
|
||
|
|
||
|
defer cancelPendingTxsSubs()
|
||
|
|
||
|
es.logsSub, cancelLogsSubs, err = es.SubscribeLogs(filters.FilterCriteria{})
|
||
|
if err != nil {
|
||
|
panic(fmt.Errorf("failed to subscribe logs: %w", err))
|
||
|
}
|
||
|
|
||
|
defer cancelLogsSubs()
|
||
|
|
||
|
es.pendingLogsSub, cancelPendingLogsSubs, err = es.subscribePendingLogs(filters.FilterCriteria{})
|
||
|
if err != nil {
|
||
|
panic(fmt.Errorf("failed to subscribe pending logs: %w", err))
|
||
|
}
|
||
|
|
||
|
defer cancelPendingLogsSubs()
|
||
|
|
||
|
es.chainSub, cancelHeaderSubs, err = es.SubscribeNewHeads()
|
||
|
if err != nil {
|
||
|
panic(fmt.Errorf("failed to subscribe headers: %w", err))
|
||
|
}
|
||
|
|
||
|
defer cancelHeaderSubs()
|
||
|
|
||
|
// Ensure all subscriptions get cleaned up
|
||
|
defer func() {
|
||
|
es.txsSub.Unsubscribe(es)
|
||
|
es.logsSub.Unsubscribe(es)
|
||
|
// es.rmLogsSub.Unsubscribe(es)
|
||
|
es.pendingLogsSub.Unsubscribe(es)
|
||
|
es.chainSub.Unsubscribe(es)
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case txEvent := <-es.txsSub.eventCh:
|
||
|
es.handleTxsEvent(txEvent)
|
||
|
case headerEv := <-es.chainSub.eventCh:
|
||
|
es.handleChainEvent(headerEv)
|
||
|
case logsEv := <-es.logsSub.eventCh:
|
||
|
es.handleLogs(logsEv)
|
||
|
// TODO: figure out how to handle removed logs
|
||
|
// case logsEv := <-es.rmLogsSub.eventCh:
|
||
|
// es.handleLogs(logsEv)
|
||
|
case logsEv := <-es.pendingLogsSub.eventCh:
|
||
|
es.handleLogs(logsEv)
|
||
|
|
||
|
case f := <-es.install:
|
||
|
if f.typ == filters.MinedAndPendingLogsSubscription {
|
||
|
// the type are logs and pending logs subscriptions
|
||
|
es.index[filters.LogsSubscription][f.id] = f
|
||
|
es.index[filters.PendingLogsSubscription][f.id] = f
|
||
|
} else {
|
||
|
es.index[f.typ][f.id] = f
|
||
|
}
|
||
|
close(f.installed)
|
||
|
|
||
|
case f := <-es.uninstall:
|
||
|
if f.typ == filters.MinedAndPendingLogsSubscription {
|
||
|
// the type are logs and pending logs subscriptions
|
||
|
delete(es.index[filters.LogsSubscription], f.id)
|
||
|
delete(es.index[filters.PendingLogsSubscription], f.id)
|
||
|
} else {
|
||
|
delete(es.index[f.typ], f.id)
|
||
|
}
|
||
|
close(f.err)
|
||
|
// System stopped
|
||
|
case <-es.txsSub.Err():
|
||
|
return
|
||
|
case <-es.logsSub.Err():
|
||
|
return
|
||
|
// case <-es.rmLogsSub.Err():
|
||
|
// return
|
||
|
case <-es.pendingLogsSub.Err():
|
||
|
return
|
||
|
case <-es.chainSub.Err():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
// }()
|
||
|
}
|
||
|
|
||
|
// Subscription defines a wrapper for the private subscription
|
||
|
type Subscription struct {
|
||
|
id rpc.ID
|
||
|
typ filters.Type
|
||
|
event string
|
||
|
created time.Time
|
||
|
logsCrit filters.FilterCriteria
|
||
|
logs chan []*ethtypes.Log
|
||
|
hashes chan []common.Hash
|
||
|
headers chan *ethtypes.Header
|
||
|
installed chan struct{} // closed when the filter is installed
|
||
|
eventCh <-chan coretypes.ResultEvent
|
||
|
err chan error
|
||
|
}
|
||
|
|
||
|
// ID returns the underlying subscription RPC identifier.
|
||
|
func (s Subscription) ID() rpc.ID {
|
||
|
return s.id
|
||
|
}
|
||
|
|
||
|
// Unsubscribe to the current subscription from Tendermint Websocket. It sends an error to the
|
||
|
// subscription error channel if unsubscription fails.
|
||
|
func (s *Subscription) Unsubscribe(es *EventSystem) {
|
||
|
if err := es.client.Unsubscribe(es.ctx, string(s.ID()), s.event); err != nil {
|
||
|
s.err <- err
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
defer func() {
|
||
|
log.Println("successfully unsubscribed to event", s.event)
|
||
|
}()
|
||
|
|
||
|
uninstallLoop:
|
||
|
for {
|
||
|
// write uninstall request and consume logs/hashes. This prevents
|
||
|
// the eventLoop broadcast method to deadlock when writing to the
|
||
|
// filter event channel while the subscription loop is waiting for
|
||
|
// this method to return (and thus not reading these events).
|
||
|
select {
|
||
|
case es.uninstall <- s:
|
||
|
break uninstallLoop
|
||
|
case <-s.logs:
|
||
|
case <-s.hashes:
|
||
|
case <-s.headers:
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// Err returns the error channel
|
||
|
func (s *Subscription) Err() <-chan error {
|
||
|
return s.err
|
||
|
}
|