diff --git a/go.mod b/go.mod index a854e482..9e8dd94b 100644 --- a/go.mod +++ b/go.mod @@ -8,11 +8,11 @@ require ( github.com/cespare/cp v1.1.1 // indirect github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5 github.com/deckarep/golang-set v1.7.1 // indirect - github.com/elastic/gosigar v0.10.3 // indirect github.com/ethereum/go-ethereum v1.9.15 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/gogo/protobuf v1.3.1 github.com/gorilla/mux v1.7.4 + github.com/gorilla/websocket v1.4.2 github.com/mattn/go-colorable v0.1.4 // indirect github.com/onsi/ginkgo v1.11.0 // indirect github.com/onsi/gomega v1.8.1 // indirect diff --git a/go.sum b/go.sum index a5658916..dd474a01 100644 --- a/go.sum +++ b/go.sum @@ -154,17 +154,12 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/edsrzf/mmap-go v0.0.0-20160512033002-935e0e8a636c/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/elastic/gosigar v0.8.1-0.20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs= -github.com/elastic/gosigar v0.10.3 h1:xA7TJmJgaVgqEnQpYAijMI4J9V1ZM2a9z8+5gAc5FMs= -github.com/elastic/gosigar v0.10.3/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= -github.com/ethereum/go-ethereum v1.9.14 h1:/rGoPYujLeajAHyDs8aZKYcLrurLdUJP9AzHk73QNr0= -github.com/ethereum/go-ethereum v1.9.14/go.mod h1:oP8FC5+TbICUyftkTWs+8JryntjIJLJvWvApK3z2AYw= github.com/ethereum/go-ethereum v1.9.15 h1:wrWl+QrtutRUJ9LZXdUqBoGoo2b1tOCYRDrAOQhCY3A= github.com/ethereum/go-ethereum v1.9.15/go.mod h1:slT8bPPRhXsyNTwHQxrOnjuTZ1sDXRajW11EkJ84QJ0= github.com/facebookgo/ensure v0.0.0-20160127193407-b4ab57deab51 h1:0JZ+dUmQeA8IIVUMzysrX4/AKuQwWhV2dYQuPZdvdSQ= @@ -460,6 +455,7 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/pelletier/go-toml v1.6.0 h1:aetoXYr0Tv7xRU/V4B4IZJ2QcbtMUFoNb3ORp7TzIK4= github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= +github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -846,7 +842,6 @@ gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuv gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= -gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200316214253-d7b0ff38cac9/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns= gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200603215123-a4a8cb9d2cbc/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns= gopkg.in/redis.v4 v4.2.4/go.mod h1:8KREHdypkCEojGKQcjMqAODMICIVwZAONWq8RowTITA= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/rpc/apis.go b/rpc/apis.go index 6082ac94..ca336ff1 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -22,6 +22,7 @@ const ( func GetRPCAPIs(cliCtx context.CLIContext, keys []emintcrypto.PrivKeySecp256k1) []rpc.API { nonceLock := new(AddrLocker) backend := NewEthermintBackend(cliCtx) + return []rpc.API{ { Namespace: Web3Namespace, diff --git a/rpc/backend.go b/rpc/backend.go index b0accbc9..1c5800a1 100644 --- a/rpc/backend.go +++ b/rpc/backend.go @@ -3,8 +3,10 @@ package rpc import ( "fmt" "math/big" + "os" "strconv" + "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" evmtypes "github.com/cosmos/ethermint/x/evm/types" @@ -43,6 +45,7 @@ var _ Backend = (*EthermintBackend)(nil) // EthermintBackend implements the Backend interface type EthermintBackend struct { cliCtx context.CLIContext + logger log.Logger gasLimit int64 } @@ -50,6 +53,7 @@ type EthermintBackend struct { func NewEthermintBackend(cliCtx context.CLIContext) *EthermintBackend { return &EthermintBackend{ cliCtx: cliCtx, + logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "json-rpc"), gasLimit: int64(^uint32(0)), } } diff --git a/rpc/config.go b/rpc/config.go index 5d3cd1ed..869103fa 100644 --- a/rpc/config.go +++ b/rpc/config.go @@ -25,28 +25,15 @@ import ( const ( flagUnlockKey = "unlock-key" + flagWebsocket = "wsport" ) -// Config contains configuration fields that determine the behavior of the RPC HTTP server. -// TODO: These may become irrelevant if HTTP config is handled by the SDK -type Config struct { - // EnableRPC defines whether or not to enable the RPC server - EnableRPC bool - // RPCAddr defines the IP address to listen on - RPCAddr string - // RPCPort defines the port to listen on - RPCPort int - // RPCCORSDomains defines list of domains to enable CORS headers for (used by browsers) - RPCCORSDomains []string - // RPCVhosts defines list of domains to listen on (useful if Tendermint is addressable via DNS) - RPCVHosts []string -} - // EmintServeCmd creates a CLI command to start Cosmos REST server with web3 RPC API and // Cosmos rest-server endpoints func EmintServeCmd(cdc *codec.Codec) *cobra.Command { cmd := lcd.ServeCommand(cdc, registerRoutes) cmd.Flags().String(flagUnlockKey, "", "Select a key to unlock on the RPC server") + cmd.Flags().String(flagWebsocket, "8546", "websocket port to listen to") cmd.Flags().StringP(flags.FlagBroadcastMode, "b", flags.BroadcastSync, "Transaction broadcasting mode (sync|async|block)") return cmd } @@ -104,6 +91,11 @@ func registerRoutes(rs *lcd.RestServer) { client.RegisterRoutes(rs.CliCtx, rs.Mux) authrest.RegisterTxRoutes(rs.CliCtx, rs.Mux) app.ModuleBasics.RegisterRESTRoutes(rs.CliCtx, rs.Mux) + + // start websockets server + websocketAddr := viper.GetString(flagWebsocket) + ws := newWebsocketsServer(rs.CliCtx, websocketAddr) + ws.start() } func unlockKeyFromNameAndPassphrase(accountNames []string, passphrase string) (emintKeys []emintcrypto.PrivKeySecp256k1, err error) { diff --git a/rpc/websockets.go b/rpc/websockets.go new file mode 100644 index 00000000..26117824 --- /dev/null +++ b/rpc/websockets.go @@ -0,0 +1,542 @@ +package rpc + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "math/big" + "net" + "net/http" + "os" + "strings" + "sync" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/spf13/viper" + + "github.com/tendermint/tendermint/libs/log" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" + + evmtypes "github.com/cosmos/ethermint/x/evm/types" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/filters" + "github.com/ethereum/go-ethereum/rpc" + + context "github.com/cosmos/cosmos-sdk/client/context" +) + +type SubscriptionResponseJSON struct { + Jsonrpc string `json:"jsonrpc"` + Result interface{} `json:"result"` + ID float64 `json:"id"` +} + +type SubscriptionNotification struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params *SubscriptionResult `json:"params"` +} + +type SubscriptionResult struct { + Subscription rpc.ID `json:"subscription"` + Result interface{} `json:"result"` +} + +type ErrorResponseJSON struct { + Jsonrpc string `json:"jsonrpc"` + Error *ErrorMessageJSON `json:"error"` + ID *big.Int `json:"id"` +} + +type ErrorMessageJSON struct { + Code *big.Int `json:"code"` + Message string `json:"message"` +} + +type websocketsServer struct { + rpcAddr string // listen address of rest-server + wsAddr string // listen address of ws server + api *pubSubAPI + logger log.Logger +} + +func newWebsocketsServer(cliCtx context.CLIContext, wsAddr string) *websocketsServer { + return &websocketsServer{ + rpcAddr: viper.GetString("laddr"), + wsAddr: wsAddr, + api: newPubSubAPI(cliCtx), + logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-server"), + } +} + +func (s *websocketsServer) start() { + ws := mux.NewRouter() + ws.Handle("/", s) + + go func() { + err := http.ListenAndServe(fmt.Sprintf(":%s", s.wsAddr), ws) + if err != nil { + s.logger.Error("http error:", err) + } + }() +} + +func (s *websocketsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + wsConn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + s.logger.Error("websocket upgrade failed; error:", err) + return + } + + s.readLoop(wsConn) +} + +func (s *websocketsServer) sendErrResponse(conn *websocket.Conn, msg string) { + res := &ErrorResponseJSON{ + Jsonrpc: "2.0", + Error: &ErrorMessageJSON{ + Code: big.NewInt(-32600), + Message: msg, + }, + ID: nil, + } + err := conn.WriteJSON(res) + if err != nil { + s.logger.Error("websocket failed write message", "error", err) + } +} + +func (s *websocketsServer) readLoop(wsConn *websocket.Conn) { + for { + _, mb, err := wsConn.ReadMessage() + if err != nil { + _ = wsConn.Close() + s.logger.Error("failed to read message; error", err) + return + } + + var msg map[string]interface{} + err = json.Unmarshal(mb, &msg) + if err != nil { + s.sendErrResponse(wsConn, "invalid request") + continue + } + + // check if method == eth_subscribe or eth_unsubscribe + method := msg["method"] + if method.(string) == "eth_subscribe" { + params := msg["params"].([]interface{}) + if len(params) == 0 { + s.sendErrResponse(wsConn, "invalid parameters") + continue + } + + id, err := s.api.subscribe(wsConn, params) + if err != nil { + s.sendErrResponse(wsConn, err.Error()) + continue + } + + res := &SubscriptionResponseJSON{ + Jsonrpc: "2.0", + ID: 1, + Result: id, + } + + err = wsConn.WriteJSON(res) + if err != nil { + s.logger.Error("failed to write json response", err) + continue + } + + continue + } else if method.(string) == "eth_unsubscribe" { + ids, ok := msg["params"].([]interface{}) + if _, idok := ids[0].(string); !ok || !idok { + s.sendErrResponse(wsConn, "invalid parameters") + continue + } + + ok = s.api.unsubscribe(rpc.ID(ids[0].(string))) + res := &SubscriptionResponseJSON{ + Jsonrpc: "2.0", + ID: 1, + Result: ok, + } + + err = wsConn.WriteJSON(res) + if err != nil { + s.logger.Error("failed to write json response", err) + continue + } + + continue + } + + // otherwise, call the usual rpc server to respond + err = s.tcpGetAndSendResponse(wsConn, mb) + if err != nil { + s.sendErrResponse(wsConn, err.Error()) + } + } +} + +// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response +// to the client over websockets +func (s *websocketsServer) tcpGetAndSendResponse(conn *websocket.Conn, mb []byte) error { + addr := strings.Split(s.rpcAddr, "tcp://") + if len(addr) != 2 { + return fmt.Errorf("invalid laddr %s", s.rpcAddr) + } + + tcpConn, err := net.Dial("tcp", addr[1]) + if err != nil { + return fmt.Errorf("cannot connect to %s; %s", s.rpcAddr, err) + } + + buf := &bytes.Buffer{} + _, err = buf.Write(mb) + if err != nil { + return fmt.Errorf("failed to write message; %s", err) + } + + req, err := http.NewRequest("POST", s.rpcAddr, buf) + if err != nil { + return fmt.Errorf("failed to request; %s", err) + } + + req.Header.Set("Content-Type", "application/json;") + err = req.Write(tcpConn) + if err != nil { + return fmt.Errorf("failed to write to rest-server; %s", err) + } + + respBytes, err := ioutil.ReadAll(tcpConn) + if err != nil { + return fmt.Errorf("error reading response from rest-server; %s", err) + } + + respbuf := &bytes.Buffer{} + respbuf.Write(respBytes) + resp, err := http.ReadResponse(bufio.NewReader(respbuf), req) + if err != nil { + return fmt.Errorf("could not read response; %s", err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("could not read body from response; %s", err) + } + + var wsSend interface{} + err = json.Unmarshal(body, &wsSend) + if err != nil { + return fmt.Errorf("failed to unmarshal rest-server response; %s", err) + } + + return conn.WriteJSON(wsSend) +} + +type wsSubscription struct { + sub *Subscription + unsubscribed chan struct{} // closed when unsubscribing + conn *websocket.Conn +} + +// pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec +type pubSubAPI struct { + cliCtx context.CLIContext + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*wsSubscription + logger log.Logger +} + +// newPubSubAPI creates an instance of the ethereum PubSub API. +func newPubSubAPI(cliCtx context.CLIContext) *pubSubAPI { + return &pubSubAPI{ + cliCtx: cliCtx, + events: NewEventSystem(cliCtx.Client), + filters: make(map[rpc.ID]*wsSubscription), + logger: log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "websocket-client"), + } +} + +func (api *pubSubAPI) subscribe(conn *websocket.Conn, params []interface{}) (rpc.ID, error) { + method, ok := params[0].(string) + if !ok { + return "0", fmt.Errorf("invalid parameters") + } + + switch method { + case "newHeads": + // TODO: handle extra params + return api.subscribeNewHeads(conn) + case "logs": + if len(params) > 1 { + return api.subscribeLogs(conn, params[1]) + } + + return api.subscribeLogs(conn, nil) + case "newPendingTransactions": + return api.subscribePendingTransactions(conn) + case "syncing": + return api.subscribeSyncing(conn) + default: + return "0", fmt.Errorf("unsupported method %s", method) + } +} + +func (api *pubSubAPI) unsubscribe(id rpc.ID) bool { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if api.filters[id] == nil { + return false + } + + close(api.filters[id].unsubscribed) + delete(api.filters, id) + return true +} + +func (api *pubSubAPI) subscribeNewHeads(conn *websocket.Conn) (rpc.ID, error) { + sub, _, err := api.events.SubscribeNewHeads() + if err != nil { + return "", fmt.Errorf("error creating block filter: %s", err.Error()) + } + + unsubscribed := make(chan struct{}) + api.filtersMu.Lock() + api.filters[sub.ID()] = &wsSubscription{ + sub: sub, + conn: conn, + unsubscribed: unsubscribed, + } + api.filtersMu.Unlock() + + go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) { + for { + select { + case event := <-headersCh: + data, _ := event.Data.(tmtypes.EventDataNewBlockHeader) + header := EthHeaderFromTendermint(data.Header) + + api.filtersMu.Lock() + if f, found := api.filters[sub.ID()]; found { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: sub.ID(), + Result: header, + }, + } + + err = f.conn.WriteJSON(res) + if err != nil { + api.logger.Error("error writing header") + } + } + api.filtersMu.Unlock() + case <-errCh: + api.filtersMu.Lock() + delete(api.filters, sub.ID()) + api.filtersMu.Unlock() + return + case <-unsubscribed: + return + } + } + }(sub.eventCh, sub.Err()) + + return sub.ID(), nil +} + +func (api *pubSubAPI) subscribeLogs(conn *websocket.Conn, extra interface{}) (rpc.ID, error) { + crit := filters.FilterCriteria{} + + if extra != nil { + params, ok := extra.(map[string]interface{}) + if !ok { + return "", fmt.Errorf("invalid criteria") + } + + if params["address"] != nil { + address, ok := params["address"].(string) + addresses, sok := params["address"].([]interface{}) + if !ok && !sok { + return "", fmt.Errorf("invalid address; must be address or array of addresses") + } + + if ok { + crit.Addresses = []common.Address{common.HexToAddress(address)} + } + + if sok { + crit.Addresses = []common.Address{} + for _, addr := range addresses { + address, ok := addr.(string) + if !ok { + return "", fmt.Errorf("invalid address") + } + + crit.Addresses = append(crit.Addresses, common.HexToAddress(address)) + } + } + } + + if params["topics"] != nil { + topics, ok := params["topics"].([]interface{}) + if !ok { + return "", fmt.Errorf("invalid topics") + } + + crit.Topics = [][]common.Hash{} + for _, topic := range topics { + tstr, ok := topic.(string) + if !ok { + return "", fmt.Errorf("invalid topics") + } + + h := common.HexToHash(tstr) + crit.Topics = append(crit.Topics, []common.Hash{h}) + } + } + } + + sub, _, err := api.events.SubscribeLogs(crit) + if err != nil { + return rpc.ID(""), err + } + + unsubscribed := make(chan struct{}) + api.filtersMu.Lock() + api.filters[sub.ID()] = &wsSubscription{ + sub: sub, + conn: conn, + unsubscribed: unsubscribed, + } + api.filtersMu.Unlock() + + go func(ch <-chan coretypes.ResultEvent, errCh <-chan error) { + for { + select { + case event := <-ch: + dataTx, ok := event.Data.(tmtypes.EventDataTx) + if !ok { + err = fmt.Errorf("invalid event data %T, expected EventDataTx", event.Data) + return + } + + var resultData evmtypes.ResultData + resultData, err = evmtypes.DecodeResultData(dataTx.TxResult.Result.Data) + if err != nil { + return + } + + logs := filterLogs(resultData.Logs, crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics) + + api.filtersMu.Lock() + if f, found := api.filters[sub.ID()]; found { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: sub.ID(), + Result: logs, + }, + } + + err = f.conn.WriteJSON(res) + } + api.filtersMu.Unlock() + + if err != nil { + err = fmt.Errorf("failed to write header: %w", err) + return + } + case <-errCh: + api.filtersMu.Lock() + delete(api.filters, sub.ID()) + api.filtersMu.Unlock() + return + case <-unsubscribed: + return + } + } + }(sub.eventCh, sub.Err()) + + return sub.ID(), nil +} + +func (api *pubSubAPI) subscribePendingTransactions(conn *websocket.Conn) (rpc.ID, error) { + sub, _, err := api.events.SubscribePendingTxs() + if err != nil { + return "", fmt.Errorf("error creating block filter: %s", err.Error()) + } + + unsubscribed := make(chan struct{}) + api.filtersMu.Lock() + api.filters[sub.ID()] = &wsSubscription{ + sub: sub, + conn: conn, + unsubscribed: unsubscribed, + } + api.filtersMu.Unlock() + + go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { + for { + select { + case ev := <-txsCh: + data, _ := ev.Data.(tmtypes.EventDataTx) + txHash := common.BytesToHash(data.Tx.Hash()) + + api.filtersMu.Lock() + if f, found := api.filters[sub.ID()]; found { + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: sub.ID(), + Result: txHash, + }, + } + + err = f.conn.WriteJSON(res) + } + api.filtersMu.Unlock() + + if err != nil { + err = fmt.Errorf("failed to write header: %w", err) + return + } + case <-errCh: + api.filtersMu.Lock() + delete(api.filters, sub.ID()) + api.filtersMu.Unlock() + } + } + }(sub.eventCh, sub.Err()) + + return sub.ID(), nil +} + +func (api *pubSubAPI) subscribeSyncing(conn *websocket.Conn) (rpc.ID, error) { + return "", nil +}