4501bbccdc
* stargate: refactor * remove evm CLI * rpc: refactor * more fixes * fixes fixes fixes * changelog * refactor according to namespaces * fix * lint * remove export logic * fix rpc test * godoc
310 lines
7.4 KiB
Go
310 lines
7.4 KiB
Go
package websockets
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
tmtypes "github.com/tendermint/tendermint/types"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/eth/filters"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
|
|
context "github.com/cosmos/cosmos-sdk/client/context"
|
|
|
|
rpcfilters "github.com/cosmos/ethermint/rpc/namespaces/eth/filters"
|
|
rpctypes "github.com/cosmos/ethermint/rpc/types"
|
|
evmtypes "github.com/cosmos/ethermint/x/evm/types"
|
|
)
|
|
|
|
// PubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
|
|
type PubSubAPI struct {
|
|
clientCtx context.CLIContext
|
|
events *rpcfilters.EventSystem
|
|
filtersMu sync.Mutex
|
|
filters map[rpc.ID]*wsSubscription
|
|
logger log.Logger
|
|
}
|
|
|
|
// NewAPI creates an instance of the ethereum PubSub API.
|
|
func NewAPI(clientCtx context.CLIContext) *PubSubAPI {
|
|
return &PubSubAPI{
|
|
clientCtx: clientCtx,
|
|
events: rpcfilters.NewEventSystem(clientCtx.Client),
|
|
filters: make(map[rpc.ID]*wsSubscription),
|
|
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-client"),
|
|
}
|
|
}
|
|
|
|
func (api *PubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) {
|
|
method, ok := params[0].(string)
|
|
if !ok {
|
|
return "0", fmt.Errorf("invalid parameters")
|
|
}
|
|
|
|
switch method {
|
|
case "newHeads":
|
|
// TODO: handle extra params
|
|
return api.subscribeNewHeads(conn)
|
|
case "logs":
|
|
if len(params) > 1 {
|
|
return api.subscribeLogs(conn, params[1])
|
|
}
|
|
|
|
return api.subscribeLogs(conn, nil)
|
|
case "newPendingTransactions":
|
|
return api.subscribePendingTransactions(conn)
|
|
case "syncing":
|
|
return api.subscribeSyncing(conn)
|
|
default:
|
|
return "0", fmt.Errorf("unsupported method %s", method)
|
|
}
|
|
}
|
|
|
|
func (api *PubSubAPI) unsubscribe(id rpc.ID) bool {
|
|
api.filtersMu.Lock()
|
|
defer api.filtersMu.Unlock()
|
|
|
|
if api.filters[id] == nil {
|
|
return false
|
|
}
|
|
|
|
close(api.filters[id].unsubscribed)
|
|
delete(api.filters, id)
|
|
return true
|
|
}
|
|
|
|
func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
|
|
sub, _, err := api.events.SubscribeNewHeads()
|
|
if err != nil {
|
|
return "", fmt.Errorf("error creating block filter: %s", err.Error())
|
|
}
|
|
|
|
unsubscribed := make(chan struct{})
|
|
api.filtersMu.Lock()
|
|
api.filters[sub.ID()] = &wsSubscription{
|
|
sub: sub,
|
|
conn: conn,
|
|
unsubscribed: unsubscribed,
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) {
|
|
for {
|
|
select {
|
|
case event := <-headersCh:
|
|
data, _ := event.Data.(tmtypes.EventDataNewBlockHeader)
|
|
header := rpctypes.EthHeaderFromTendermint(data.Header)
|
|
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[sub.ID()]; found {
|
|
// write to ws conn
|
|
res := &SubscriptionNotification{
|
|
Jsonrpc: "2.0",
|
|
Method: "eth_subscription",
|
|
Params: &SubscriptionResult{
|
|
Subscription: sub.ID(),
|
|
Result: header,
|
|
},
|
|
}
|
|
|
|
err = f.conn.WriteJSON(res)
|
|
if err != nil {
|
|
api.logger.Error("error writing header")
|
|
}
|
|
}
|
|
api.filtersMu.Unlock()
|
|
case <-errCh:
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, sub.ID())
|
|
api.filtersMu.Unlock()
|
|
return
|
|
case <-unsubscribed:
|
|
return
|
|
}
|
|
}
|
|
}(sub.Event(), sub.Err())
|
|
|
|
return sub.ID(), nil
|
|
}
|
|
|
|
func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) {
|
|
crit := filters.FilterCriteria{}
|
|
|
|
if extra != nil {
|
|
params, ok := extra.(map[string]interface{})
|
|
if !ok {
|
|
return "", fmt.Errorf("invalid criteria")
|
|
}
|
|
|
|
if params["address"] != nil {
|
|
address, ok := params["address"].(string)
|
|
addresses, sok := params["address"].([]interface{})
|
|
if !ok && !sok {
|
|
return "", fmt.Errorf("invalid address; must be address or array of addresses")
|
|
}
|
|
|
|
if ok {
|
|
crit.Addresses = []common.Address{common.HexToAddress(address)}
|
|
}
|
|
|
|
if sok {
|
|
crit.Addresses = []common.Address{}
|
|
for _, addr := range addresses {
|
|
address, ok := addr.(string)
|
|
if !ok {
|
|
return "", fmt.Errorf("invalid address")
|
|
}
|
|
|
|
crit.Addresses = append(crit.Addresses, common.HexToAddress(address))
|
|
}
|
|
}
|
|
}
|
|
|
|
if params["topics"] != nil {
|
|
topics, ok := params["topics"].([]interface{})
|
|
if !ok {
|
|
return "", fmt.Errorf("invalid topics")
|
|
}
|
|
|
|
crit.Topics = [][]common.Hash{}
|
|
for _, topic := range topics {
|
|
tstr, ok := topic.(string)
|
|
if !ok {
|
|
return "", fmt.Errorf("invalid topics")
|
|
}
|
|
|
|
h := common.HexToHash(tstr)
|
|
crit.Topics = append(crit.Topics, []common.Hash{h})
|
|
}
|
|
}
|
|
}
|
|
|
|
sub, _, err := api.events.SubscribeLogs(crit)
|
|
if err != nil {
|
|
return rpc.ID(""), err
|
|
}
|
|
|
|
unsubscribed := make(chan struct{})
|
|
api.filtersMu.Lock()
|
|
api.filters[sub.ID()] = &wsSubscription{
|
|
sub: sub,
|
|
conn: conn,
|
|
unsubscribed: unsubscribed,
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func(ch <-chan coretypes.ResultEvent, errCh <-chan error) {
|
|
for {
|
|
select {
|
|
case event := <-ch:
|
|
dataTx, ok := event.Data.(tmtypes.EventDataTx)
|
|
if !ok {
|
|
err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data)
|
|
return
|
|
}
|
|
|
|
var resultData evmtypes.ResultData
|
|
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
|
|
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[sub.ID()]; found {
|
|
// write to ws conn
|
|
res := &SubscriptionNotification{
|
|
Jsonrpc: "2.0",
|
|
Method: "eth_subscription",
|
|
Params: &SubscriptionResult{
|
|
Subscription: sub.ID(),
|
|
Result: logs,
|
|
},
|
|
}
|
|
|
|
err = f.conn.WriteJSON(res)
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to write header: %w", err)
|
|
return
|
|
}
|
|
case <-errCh:
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, sub.ID())
|
|
api.filtersMu.Unlock()
|
|
return
|
|
case <-unsubscribed:
|
|
return
|
|
}
|
|
}
|
|
}(sub.Event(), sub.Err())
|
|
|
|
return sub.ID(), nil
|
|
}
|
|
|
|
func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) {
|
|
sub, _, err := api.events.SubscribePendingTxs()
|
|
if err != nil {
|
|
return "", fmt.Errorf("error creating block filter: %s", err.Error())
|
|
}
|
|
|
|
unsubscribed := make(chan struct{})
|
|
api.filtersMu.Lock()
|
|
api.filters[sub.ID()] = &wsSubscription{
|
|
sub: sub,
|
|
conn: conn,
|
|
unsubscribed: unsubscribed,
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
|
|
for {
|
|
select {
|
|
case ev := <-txsCh:
|
|
data, _ := ev.Data.(tmtypes.EventDataTx)
|
|
txHash := common.BytesToHash(data.Tx.Hash())
|
|
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[sub.ID()]; found {
|
|
// write to ws conn
|
|
res := &SubscriptionNotification{
|
|
Jsonrpc: "2.0",
|
|
Method: "eth_subscription",
|
|
Params: &SubscriptionResult{
|
|
Subscription: sub.ID(),
|
|
Result: txHash,
|
|
},
|
|
}
|
|
|
|
err = f.conn.WriteJSON(res)
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
if err != nil {
|
|
err = fmt.Errorf("failed to write header: %w", err)
|
|
return
|
|
}
|
|
case <-errCh:
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, sub.ID())
|
|
api.filtersMu.Unlock()
|
|
}
|
|
}
|
|
}(sub.Event(), sub.Err())
|
|
|
|
return sub.ID(), nil
|
|
}
|
|
|
|
func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
|
|
return "", nil
|
|
}
|