laconicd/rpc/websockets/pubsub_api.go

310 lines
7.4 KiB
Go
Raw Normal View History

package websockets
import (
"fmt"
"os"
"sync"
"github.com/gorilla/websocket"
"github.com/tendermint/tendermint/libs/log"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
context "github.com/cosmos/cosmos-sdk/client/context"
rpcfilters "github.com/cosmos/ethermint/rpc/namespaces/eth/filters"
rpctypes "github.com/cosmos/ethermint/rpc/types"
evmtypes "github.com/cosmos/ethermint/x/evm/types"
)
// PubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
type PubSubAPI struct {
clientCtx context.CLIContext
events *rpcfilters.EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*wsSubscription
logger log.Logger
}
// NewAPI creates an instance of the ethereum PubSub API.
func NewAPI(clientCtx context.CLIContext) *PubSubAPI {
return &PubSubAPI{
clientCtx: clientCtx,
events: rpcfilters.NewEventSystem(clientCtx.Client),
filters: make(map[rpc.ID]*wsSubscription),
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-client"),
}
}
func (api *PubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) {
method, ok := params[0].(string)
if !ok {
return "0", fmt.Errorf("invalid parameters")
}
switch method {
case "newHeads":
// TODO: handle extra params
return api.subscribeNewHeads(conn)
case "logs":
if len(params) > 1 {
return api.subscribeLogs(conn, params[1])
}
return api.subscribeLogs(conn, nil)
case "newPendingTransactions":
return api.subscribePendingTransactions(conn)
case "syncing":
return api.subscribeSyncing(conn)
default:
return "0", fmt.Errorf("unsupported method %s", method)
}
}
func (api *PubSubAPI) unsubscribe(id rpc.ID) bool {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if api.filters[id] == nil {
return false
}
close(api.filters[id].unsubscribed)
delete(api.filters, id)
return true
}
func (api *PubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
sub, _, err := api.events.SubscribeNewHeads()
if err != nil {
return "", fmt.Errorf("error creating block filter: %s", err.Error())
}
unsubscribed := make(chan struct{})
api.filtersMu.Lock()
api.filters[sub.ID()] = &wsSubscription{
sub: sub,
conn: conn,
unsubscribed: unsubscribed,
}
api.filtersMu.Unlock()
go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) {
for {
select {
case event := <-headersCh:
data, _ := event.Data.(tmtypes.EventDataNewBlockHeader)
header := rpctypes.EthHeaderFromTendermint(data.Header)
api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: sub.ID(),
Result: header,
},
}
err = f.conn.WriteJSON(res)
if err != nil {
api.logger.Error("error writing header")
}
}
api.filtersMu.Unlock()
case <-errCh:
api.filtersMu.Lock()
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
return
case <-unsubscribed:
return
}
}
}(sub.Event(), sub.Err())
return sub.ID(), nil
}
func (api *PubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) {
crit := filters.FilterCriteria{}
if extra != nil {
params, ok := extra.(map[string]interface{})
if !ok {
return "", fmt.Errorf("invalid criteria")
}
if params["address"] != nil {
address, ok := params["address"].(string)
addresses, sok := params["address"].([]interface{})
if !ok && !sok {
return "", fmt.Errorf("invalid address; must be address or array of addresses")
}
if ok {
crit.Addresses = []common.Address{common.HexToAddress(address)}
}
if sok {
crit.Addresses = []common.Address{}
for _, addr := range addresses {
address, ok := addr.(string)
if !ok {
return "", fmt.Errorf("invalid address")
}
crit.Addresses = append(crit.Addresses, common.HexToAddress(address))
}
}
}
if params["topics"] != nil {
topics, ok := params["topics"].([]interface{})
if !ok {
return "", fmt.Errorf("invalid topics")
}
crit.Topics = [][]common.Hash{}
for _, topic := range topics {
tstr, ok := topic.(string)
if !ok {
return "", fmt.Errorf("invalid topics")
}
h := common.HexToHash(tstr)
crit.Topics = append(crit.Topics, []common.Hash{h})
}
}
}
sub, _, err := api.events.SubscribeLogs(crit)
if err != nil {
return rpc.ID(""), err
}
unsubscribed := make(chan struct{})
api.filtersMu.Lock()
api.filters[sub.ID()] = &wsSubscription{
sub: sub,
conn: conn,
unsubscribed: unsubscribed,
}
api.filtersMu.Unlock()
go func(ch <-chan coretypes.ResultEvent, errCh <-chan error) {
for {
select {
case event := <-ch:
dataTx, ok := event.Data.(tmtypes.EventDataTx)
if !ok {
err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data)
return
}
var resultData evmtypes.ResultData
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
if err != nil {
return
}
logs := rpcfilters.FilterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: sub.ID(),
Result: logs,
},
}
err = f.conn.WriteJSON(res)
}
api.filtersMu.Unlock()
if err != nil {
err = fmt.Errorf("failed to write header: %w", err)
return
}
case <-errCh:
api.filtersMu.Lock()
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
return
case <-unsubscribed:
return
}
}
}(sub.Event(), sub.Err())
return sub.ID(), nil
}
func (api *PubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) {
sub, _, err := api.events.SubscribePendingTxs()
if err != nil {
return "", fmt.Errorf("error creating block filter: %s", err.Error())
}
unsubscribed := make(chan struct{})
api.filtersMu.Lock()
api.filters[sub.ID()] = &wsSubscription{
sub: sub,
conn: conn,
unsubscribed: unsubscribed,
}
api.filtersMu.Unlock()
go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
for {
select {
case ev := <-txsCh:
data, _ := ev.Data.(tmtypes.EventDataTx)
txHash := common.BytesToHash(data.Tx.Hash())
api.filtersMu.Lock()
if f, found := api.filters[sub.ID()]; found {
// write to ws conn
res := &SubscriptionNotification{
Jsonrpc: "2.0",
Method: "eth_subscription",
Params: &SubscriptionResult{
Subscription: sub.ID(),
Result: txHash,
},
}
err = f.conn.WriteJSON(res)
}
api.filtersMu.Unlock()
if err != nil {
err = fmt.Errorf("failed to write header: %w", err)
return
}
case <-errCh:
api.filtersMu.Lock()
delete(api.filters, sub.ID())
api.filtersMu.Unlock()
}
}
}(sub.Event(), sub.Err())
return sub.ID(), nil
}
func (api *PubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
return "", nil
}