implement ethereum-compatible websockets server (#357)
This commit is contained in:
parent
b76c36fcdb
commit
c4cf33a9bb
2
go.mod
2
go.mod
@ -8,11 +8,11 @@ require (
|
||||
github.com/cespare/cp v1.1.1 // indirect
|
||||
github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5
|
||||
github.com/deckarep/golang-set v1.7.1 // indirect
|
||||
github.com/elastic/gosigar v0.10.3 // indirect
|
||||
github.com/ethereum/go-ethereum v1.9.15
|
||||
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
|
||||
github.com/gogo/protobuf v1.3.1
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/mattn/go-colorable v0.1.4 // indirect
|
||||
github.com/onsi/ginkgo v1.11.0 // indirect
|
||||
github.com/onsi/gomega v1.8.1 // indirect
|
||||
|
7
go.sum
7
go.sum
@ -154,17 +154,12 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
|
||||
github.com/edsrzf/mmap-go v0.0.0-20160512033002-935e0e8a636c/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||
github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw=
|
||||
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||
github.com/elastic/gosigar v0.8.1-0.20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs=
|
||||
github.com/elastic/gosigar v0.10.3 h1:xA7TJmJgaVgqEnQpYAijMI4J9V1ZM2a9z8+5gAc5FMs=
|
||||
github.com/elastic/gosigar v0.10.3/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs=
|
||||
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
|
||||
github.com/ethereum/go-ethereum v1.9.14 h1:/rGoPYujLeajAHyDs8aZKYcLrurLdUJP9AzHk73QNr0=
|
||||
github.com/ethereum/go-ethereum v1.9.14/go.mod h1:oP8FC5+TbICUyftkTWs+8JryntjIJLJvWvApK3z2AYw=
|
||||
github.com/ethereum/go-ethereum v1.9.15 h1:wrWl+QrtutRUJ9LZXdUqBoGoo2b1tOCYRDrAOQhCY3A=
|
||||
github.com/ethereum/go-ethereum v1.9.15/go.mod h1:slT8bPPRhXsyNTwHQxrOnjuTZ1sDXRajW11EkJ84QJ0=
|
||||
github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51 h1:0JZ+dUmQeA8IIVUMzysrX4/AKuQwWhV2dYQuPZdvdSQ=
|
||||
@ -460,6 +455,7 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
|
||||
github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4=
|
||||
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
|
||||
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
|
||||
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
|
||||
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
|
||||
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
@ -846,7 +842,6 @@ gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuv
|
||||
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
|
||||
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200316214253-d7b0ff38cac9/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
|
||||
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200603215123-a4a8cb9d2cbc/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
|
||||
gopkg.in/redis.v4 v4.2.4/go.mod h1:8KREHdypkCEojGKQcjMqAODMICIVwZAONWq8RowTITA=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
|
@ -22,6 +22,7 @@ const (
|
||||
func GetRPCAPIs(cliCtx context.CLIContext, keys []emintcrypto.PrivKeySecp256k1) []rpc.API {
|
||||
nonceLock := new(AddrLocker)
|
||||
backend := NewEthermintBackend(cliCtx)
|
||||
|
||||
return []rpc.API{
|
||||
{
|
||||
Namespace: Web3Namespace,
|
||||
|
@ -3,8 +3,10 @@ package rpc
|
||||
import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
|
||||
evmtypes "github.com/cosmos/ethermint/x/evm/types"
|
||||
@ -43,6 +45,7 @@ var _ Backend = (*EthermintBackend)(nil)
|
||||
// EthermintBackend implements the Backend interface
|
||||
type EthermintBackend struct {
|
||||
cliCtx context.CLIContext
|
||||
logger log.Logger
|
||||
gasLimit int64
|
||||
}
|
||||
|
||||
@ -50,6 +53,7 @@ type EthermintBackend struct {
|
||||
func NewEthermintBackend(cliCtx context.CLIContext) *EthermintBackend {
|
||||
return &EthermintBackend{
|
||||
cliCtx: cliCtx,
|
||||
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "json-rpc"),
|
||||
gasLimit: int64(^uint32(0)),
|
||||
}
|
||||
}
|
||||
|
@ -25,28 +25,15 @@ import (
|
||||
|
||||
const (
|
||||
flagUnlockKey = "unlock-key"
|
||||
flagWebsocket = "wsport"
|
||||
)
|
||||
|
||||
// Config contains configuration fields that determine the behavior of the RPC HTTP server.
|
||||
// TODO: These may become irrelevant if HTTP config is handled by the SDK
|
||||
type Config struct {
|
||||
// EnableRPC defines whether or not to enable the RPC server
|
||||
EnableRPC bool
|
||||
// RPCAddr defines the IP address to listen on
|
||||
RPCAddr string
|
||||
// RPCPort defines the port to listen on
|
||||
RPCPort int
|
||||
// RPCCORSDomains defines list of domains to enable CORS headers for (used by browsers)
|
||||
RPCCORSDomains []string
|
||||
// RPCVhosts defines list of domains to listen on (useful if Tendermint is addressable via DNS)
|
||||
RPCVHosts []string
|
||||
}
|
||||
|
||||
// EmintServeCmd creates a CLI command to start Cosmos REST server with web3 RPC API and
|
||||
// Cosmos rest-server endpoints
|
||||
func EmintServeCmd(cdc *codec.Codec) *cobra.Command {
|
||||
cmd := lcd.ServeCommand(cdc, registerRoutes)
|
||||
cmd.Flags().String(flagUnlockKey, "", "Select a key to unlock on the RPC server")
|
||||
cmd.Flags().String(flagWebsocket, "8546", "websocket port to listen to")
|
||||
cmd.Flags().StringP(flags.FlagBroadcastMode, "b", flags.BroadcastSync, "Transaction broadcasting mode (sync|async|block)")
|
||||
return cmd
|
||||
}
|
||||
@ -104,6 +91,11 @@ func registerRoutes(rs *lcd.RestServer) {
|
||||
client.RegisterRoutes(rs.CliCtx, rs.Mux)
|
||||
authrest.RegisterTxRoutes(rs.CliCtx, rs.Mux)
|
||||
app.ModuleBasics.RegisterRESTRoutes(rs.CliCtx, rs.Mux)
|
||||
|
||||
// start websockets server
|
||||
websocketAddr := viper.GetString(flagWebsocket)
|
||||
ws := newWebsocketsServer(rs.CliCtx, websocketAddr)
|
||||
ws.start()
|
||||
}
|
||||
|
||||
func unlockKeyFromNameAndPassphrase(accountNames []string, passphrase string) (emintKeys []emintcrypto.PrivKeySecp256k1, err error) {
|
||||
|
542
rpc/websockets.go
Normal file
542
rpc/websockets.go
Normal file
@ -0,0 +1,542 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
|
||||
evmtypes "github.com/cosmos/ethermint/x/evm/types"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/eth/filters"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
context "github.com/cosmos/cosmos-sdk/client/context"
|
||||
)
|
||||
|
||||
type SubscriptionResponseJSON struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Result interface{} `json:"result"`
|
||||
ID float64 `json:"id"`
|
||||
}
|
||||
|
||||
type SubscriptionNotification struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Method string `json:"method"`
|
||||
Params *SubscriptionResult `json:"params"`
|
||||
}
|
||||
|
||||
type SubscriptionResult struct {
|
||||
Subscription rpc.ID `json:"subscription"`
|
||||
Result interface{} `json:"result"`
|
||||
}
|
||||
|
||||
type ErrorResponseJSON struct {
|
||||
Jsonrpc string `json:"jsonrpc"`
|
||||
Error *ErrorMessageJSON `json:"error"`
|
||||
ID *big.Int `json:"id"`
|
||||
}
|
||||
|
||||
type ErrorMessageJSON struct {
|
||||
Code *big.Int `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type websocketsServer struct {
|
||||
rpcAddr string // listen address of rest-server
|
||||
wsAddr string // listen address of ws server
|
||||
api *pubSubAPI
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newWebsocketsServer(cliCtx context.CLIContext, wsAddr string) *websocketsServer {
|
||||
return &websocketsServer{
|
||||
rpcAddr: viper.GetString("laddr"),
|
||||
wsAddr: wsAddr,
|
||||
api: newPubSubAPI(cliCtx),
|
||||
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-server"),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *websocketsServer) start() {
|
||||
ws := mux.NewRouter()
|
||||
ws.Handle("/", s)
|
||||
|
||||
go func() {
|
||||
err := http.ListenAndServe(fmt.Sprintf(":%s", s.wsAddr), ws)
|
||||
if err != nil {
|
||||
s.logger.Error("http error:", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *websocketsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
wsConn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
s.logger.Error("websocket upgrade failed; error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.readLoop(wsConn)
|
||||
}
|
||||
|
||||
func (s *websocketsServer) sendErrResponse(conn *websocket.Conn, msg string) {
|
||||
res := &ErrorResponseJSON{
|
||||
Jsonrpc: "2.0",
|
||||
Error: &ErrorMessageJSON{
|
||||
Code: big.NewInt(-32600),
|
||||
Message: msg,
|
||||
},
|
||||
ID: nil,
|
||||
}
|
||||
err := conn.WriteJSON(res)
|
||||
if err != nil {
|
||||
s.logger.Error("websocket failed write message", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *websocketsServer) readLoop(wsConn *websocket.Conn) {
|
||||
for {
|
||||
_, mb, err := wsConn.ReadMessage()
|
||||
if err != nil {
|
||||
_ = wsConn.Close()
|
||||
s.logger.Error("failed to read message; error", err)
|
||||
return
|
||||
}
|
||||
|
||||
var msg map[string]interface{}
|
||||
err = json.Unmarshal(mb, &msg)
|
||||
if err != nil {
|
||||
s.sendErrResponse(wsConn, "invalid request")
|
||||
continue
|
||||
}
|
||||
|
||||
// check if method == eth_subscribe or eth_unsubscribe
|
||||
method := msg["method"]
|
||||
if method.(string) == "eth_subscribe" {
|
||||
params := msg["params"].([]interface{})
|
||||
if len(params) == 0 {
|
||||
s.sendErrResponse(wsConn, "invalid parameters")
|
||||
continue
|
||||
}
|
||||
|
||||
id, err := s.api.subscribe(wsConn, params)
|
||||
if err != nil {
|
||||
s.sendErrResponse(wsConn, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
res := &SubscriptionResponseJSON{
|
||||
Jsonrpc: "2.0",
|
||||
ID: 1,
|
||||
Result: id,
|
||||
}
|
||||
|
||||
err = wsConn.WriteJSON(res)
|
||||
if err != nil {
|
||||
s.logger.Error("failed to write json response", err)
|
||||
continue
|
||||
}
|
||||
|
||||
continue
|
||||
} else if method.(string) == "eth_unsubscribe" {
|
||||
ids, ok := msg["params"].([]interface{})
|
||||
if _, idok := ids[0].(string); !ok || !idok {
|
||||
s.sendErrResponse(wsConn, "invalid parameters")
|
||||
continue
|
||||
}
|
||||
|
||||
ok = s.api.unsubscribe(rpc.ID(ids[0].(string)))
|
||||
res := &SubscriptionResponseJSON{
|
||||
Jsonrpc: "2.0",
|
||||
ID: 1,
|
||||
Result: ok,
|
||||
}
|
||||
|
||||
err = wsConn.WriteJSON(res)
|
||||
if err != nil {
|
||||
s.logger.Error("failed to write json response", err)
|
||||
continue
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// otherwise, call the usual rpc server to respond
|
||||
err = s.tcpGetAndSendResponse(wsConn, mb)
|
||||
if err != nil {
|
||||
s.sendErrResponse(wsConn, err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response
|
||||
// to the client over websockets
|
||||
func (s *websocketsServer) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error {
|
||||
addr := strings.Split(s.rpcAddr, "tcp://")
|
||||
if len(addr) != 2 {
|
||||
return fmt.Errorf("invalid laddr %s", s.rpcAddr)
|
||||
}
|
||||
|
||||
tcpConn, err := net.Dial("tcp", addr[1])
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err)
|
||||
}
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
_, err = buf.Write(mb)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write message; %s", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", s.rpcAddr, buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to request; %s", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json;")
|
||||
err = req.Write(tcpConn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write to rest-server; %s", err)
|
||||
}
|
||||
|
||||
respBytes, err := ioutil.ReadAll(tcpConn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading response from rest-server; %s", err)
|
||||
}
|
||||
|
||||
respbuf := &bytes.Buffer{}
|
||||
respbuf.Write(respBytes)
|
||||
resp, err := http.ReadResponse(bufio.NewReader(respbuf), req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read response; %s", err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not read body from response; %s", err)
|
||||
}
|
||||
|
||||
var wsSend interface{}
|
||||
err = json.Unmarshal(body, &wsSend)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal rest-server response; %s", err)
|
||||
}
|
||||
|
||||
return conn.WriteJSON(wsSend)
|
||||
}
|
||||
|
||||
type wsSubscription struct {
|
||||
sub *Subscription
|
||||
unsubscribed chan struct{} // closed when unsubscribing
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec
|
||||
type pubSubAPI struct {
|
||||
cliCtx context.CLIContext
|
||||
events *EventSystem
|
||||
filtersMu sync.Mutex
|
||||
filters map[rpc.ID]*wsSubscription
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// newPubSubAPI creates an instance of the ethereum PubSub API.
|
||||
func newPubSubAPI(cliCtx context.CLIContext) *pubSubAPI {
|
||||
return &pubSubAPI{
|
||||
cliCtx: cliCtx,
|
||||
events: NewEventSystem(cliCtx.Client),
|
||||
filters: make(map[rpc.ID]*wsSubscription),
|
||||
logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-client"),
|
||||
}
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) {
|
||||
method, ok := params[0].(string)
|
||||
if !ok {
|
||||
return "0", fmt.Errorf("invalid parameters")
|
||||
}
|
||||
|
||||
switch method {
|
||||
case "newHeads":
|
||||
// TODO: handle extra params
|
||||
return api.subscribeNewHeads(conn)
|
||||
case "logs":
|
||||
if len(params) > 1 {
|
||||
return api.subscribeLogs(conn, params[1])
|
||||
}
|
||||
|
||||
return api.subscribeLogs(conn, nil)
|
||||
case "newPendingTransactions":
|
||||
return api.subscribePendingTransactions(conn)
|
||||
case "syncing":
|
||||
return api.subscribeSyncing(conn)
|
||||
default:
|
||||
return "0", fmt.Errorf("unsupported method %s", method)
|
||||
}
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) unsubscribe(id rpc.ID) bool {
|
||||
api.filtersMu.Lock()
|
||||
defer api.filtersMu.Unlock()
|
||||
|
||||
if api.filters[id] == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
close(api.filters[id].unsubscribed)
|
||||
delete(api.filters, id)
|
||||
return true
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) {
|
||||
sub, _, err := api.events.SubscribeNewHeads()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error creating block filter: %s", err.Error())
|
||||
}
|
||||
|
||||
unsubscribed := make(chan struct{})
|
||||
api.filtersMu.Lock()
|
||||
api.filters[sub.ID()] = &wsSubscription{
|
||||
sub: sub,
|
||||
conn: conn,
|
||||
unsubscribed: unsubscribed,
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) {
|
||||
for {
|
||||
select {
|
||||
case event := <-headersCh:
|
||||
data, _ := event.Data.(tmtypes.EventDataNewBlockHeader)
|
||||
header := EthHeaderFromTendermint(data.Header)
|
||||
|
||||
api.filtersMu.Lock()
|
||||
if f, found := api.filters[sub.ID()]; found {
|
||||
// write to ws conn
|
||||
res := &SubscriptionNotification{
|
||||
Jsonrpc: "2.0",
|
||||
Method: "eth_subscription",
|
||||
Params: &SubscriptionResult{
|
||||
Subscription: sub.ID(),
|
||||
Result: header,
|
||||
},
|
||||
}
|
||||
|
||||
err = f.conn.WriteJSON(res)
|
||||
if err != nil {
|
||||
api.logger.Error("error writing header")
|
||||
}
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
case <-errCh:
|
||||
api.filtersMu.Lock()
|
||||
delete(api.filters, sub.ID())
|
||||
api.filtersMu.Unlock()
|
||||
return
|
||||
case <-unsubscribed:
|
||||
return
|
||||
}
|
||||
}
|
||||
}(sub.eventCh, sub.Err())
|
||||
|
||||
return sub.ID(), nil
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) {
|
||||
crit := filters.FilterCriteria{}
|
||||
|
||||
if extra != nil {
|
||||
params, ok := extra.(map[string]interface{})
|
||||
if !ok {
|
||||
return "", fmt.Errorf("invalid criteria")
|
||||
}
|
||||
|
||||
if params["address"] != nil {
|
||||
address, ok := params["address"].(string)
|
||||
addresses, sok := params["address"].([]interface{})
|
||||
if !ok && !sok {
|
||||
return "", fmt.Errorf("invalid address; must be address or array of addresses")
|
||||
}
|
||||
|
||||
if ok {
|
||||
crit.Addresses = []common.Address{common.HexToAddress(address)}
|
||||
}
|
||||
|
||||
if sok {
|
||||
crit.Addresses = []common.Address{}
|
||||
for _, addr := range addresses {
|
||||
address, ok := addr.(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("invalid address")
|
||||
}
|
||||
|
||||
crit.Addresses = append(crit.Addresses, common.HexToAddress(address))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if params["topics"] != nil {
|
||||
topics, ok := params["topics"].([]interface{})
|
||||
if !ok {
|
||||
return "", fmt.Errorf("invalid topics")
|
||||
}
|
||||
|
||||
crit.Topics = [][]common.Hash{}
|
||||
for _, topic := range topics {
|
||||
tstr, ok := topic.(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("invalid topics")
|
||||
}
|
||||
|
||||
h := common.HexToHash(tstr)
|
||||
crit.Topics = append(crit.Topics, []common.Hash{h})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sub, _, err := api.events.SubscribeLogs(crit)
|
||||
if err != nil {
|
||||
return rpc.ID(""), err
|
||||
}
|
||||
|
||||
unsubscribed := make(chan struct{})
|
||||
api.filtersMu.Lock()
|
||||
api.filters[sub.ID()] = &wsSubscription{
|
||||
sub: sub,
|
||||
conn: conn,
|
||||
unsubscribed: unsubscribed,
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func(ch <-chan coretypes.ResultEvent, errCh <-chan error) {
|
||||
for {
|
||||
select {
|
||||
case event := <-ch:
|
||||
dataTx, ok := event.Data.(tmtypes.EventDataTx)
|
||||
if !ok {
|
||||
err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data)
|
||||
return
|
||||
}
|
||||
|
||||
var resultData evmtypes.ResultData
|
||||
resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logs := filterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
|
||||
|
||||
api.filtersMu.Lock()
|
||||
if f, found := api.filters[sub.ID()]; found {
|
||||
// write to ws conn
|
||||
res := &SubscriptionNotification{
|
||||
Jsonrpc: "2.0",
|
||||
Method: "eth_subscription",
|
||||
Params: &SubscriptionResult{
|
||||
Subscription: sub.ID(),
|
||||
Result: logs,
|
||||
},
|
||||
}
|
||||
|
||||
err = f.conn.WriteJSON(res)
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to write header: %w", err)
|
||||
return
|
||||
}
|
||||
case <-errCh:
|
||||
api.filtersMu.Lock()
|
||||
delete(api.filters, sub.ID())
|
||||
api.filtersMu.Unlock()
|
||||
return
|
||||
case <-unsubscribed:
|
||||
return
|
||||
}
|
||||
}
|
||||
}(sub.eventCh, sub.Err())
|
||||
|
||||
return sub.ID(), nil
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) {
|
||||
sub, _, err := api.events.SubscribePendingTxs()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error creating block filter: %s", err.Error())
|
||||
}
|
||||
|
||||
unsubscribed := make(chan struct{})
|
||||
api.filtersMu.Lock()
|
||||
api.filters[sub.ID()] = &wsSubscription{
|
||||
sub: sub,
|
||||
conn: conn,
|
||||
unsubscribed: unsubscribed,
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
|
||||
for {
|
||||
select {
|
||||
case ev := <-txsCh:
|
||||
data, _ := ev.Data.(tmtypes.EventDataTx)
|
||||
txHash := common.BytesToHash(data.Tx.Hash())
|
||||
|
||||
api.filtersMu.Lock()
|
||||
if f, found := api.filters[sub.ID()]; found {
|
||||
// write to ws conn
|
||||
res := &SubscriptionNotification{
|
||||
Jsonrpc: "2.0",
|
||||
Method: "eth_subscription",
|
||||
Params: &SubscriptionResult{
|
||||
Subscription: sub.ID(),
|
||||
Result: txHash,
|
||||
},
|
||||
}
|
||||
|
||||
err = f.conn.WriteJSON(res)
|
||||
}
|
||||
api.filtersMu.Unlock()
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to write header: %w", err)
|
||||
return
|
||||
}
|
||||
case <-errCh:
|
||||
api.filtersMu.Lock()
|
||||
delete(api.filters, sub.ID())
|
||||
api.filtersMu.Unlock()
|
||||
}
|
||||
}
|
||||
}(sub.eventCh, sub.Err())
|
||||
|
||||
return sub.ID(), nil
|
||||
}
|
||||
|
||||
func (api *pubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) {
|
||||
return "", nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user