test(rpc): Ethereum JSONRPC Websocket (#1342)

* WIP! add Ethereum JSONRPC Websocket test

* move to test-rpc

* clean up

* fix batch request with websocket

* refactor
This commit is contained in:
Tuan Pham Anh 2022-09-19 17:50:55 +07:00 committed by GitHub
parent 3ed55d9395
commit 26c341d04d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 218 additions and 18 deletions

View File

@ -186,6 +186,13 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) {
return return
} }
if isBatch(mb) {
if err := s.tcpGetAndSendResponse(wsConn, mb); err != nil {
s.sendErrResponse(wsConn, err.Error())
}
continue
}
var msg map[string]interface{} var msg map[string]interface{}
if err = json.Unmarshal(mb, &msg); err != nil { if err = json.Unmarshal(mb, &msg); err != nil {
s.sendErrResponse(wsConn, err.Error()) s.sendErrResponse(wsConn, err.Error())
@ -214,14 +221,8 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) {
switch method { switch method {
case "eth_subscribe": case "eth_subscribe":
params, ok := msg["params"].([]interface{}) params, ok := s.getParamsAndCheckValid(msg, wsConn)
if !ok { if !ok {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}
if len(params) == 0 {
s.sendErrResponse(wsConn, "empty parameters")
continue continue
} }
@ -243,14 +244,8 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) {
break break
} }
case "eth_unsubscribe": case "eth_unsubscribe":
params, ok := msg["params"].([]interface{}) params, ok := s.getParamsAndCheckValid(msg, wsConn)
if !ok { if !ok {
s.sendErrResponse(wsConn, "invalid parameters")
continue
}
if len(params) == 0 {
s.sendErrResponse(wsConn, "empty parameters")
continue continue
} }
@ -285,6 +280,22 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) {
} }
} }
// tcpGetAndSendResponse sends error response to client if params is invalid
func (s *websocketsServer) getParamsAndCheckValid(msg map[string]interface{}, wsConn *wsConn) ([]interface{}, bool) {
params, ok := msg["params"].([]interface{})
if !ok {
s.sendErrResponse(wsConn, "invalid parameters")
return nil, false
}
if len(params) == 0 {
s.sendErrResponse(wsConn, "empty parameters")
return nil, false
}
return params, true
}
// tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response
// to the client over websockets // to the client over websockets
func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) error { func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) error {
@ -658,3 +669,16 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID)
func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) {
return nil, errors.New("syncing subscription is not implemented") return nil, errors.New("syncing subscription is not implemented")
} }
// copy from github.com/ethereum/go-ethereum/rpc/json.go
// isBatch returns true when the first non-whitespace characters is '['
func isBatch(raw []byte) bool {
for _, c := range raw {
// skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d {
continue
}
return c == '['
}
return false
}

View File

@ -14,6 +14,8 @@ REMOVE_DATA_DIR=false
#PORT AND RPC_PORT 3 initial digits, to be concat with a suffix later when node is initialized #PORT AND RPC_PORT 3 initial digits, to be concat with a suffix later when node is initialized
RPC_PORT="854" RPC_PORT="854"
# Ethereum JSONRPC Websocket
WS_PORT="855"
IP_ADDR="0.0.0.0" IP_ADDR="0.0.0.0"
KEY="mykey" KEY="mykey"
@ -106,7 +108,7 @@ start_func() {
echo "starting ethermint node $i in background ..." echo "starting ethermint node $i in background ..."
"$PWD"/build/ethermintd start --pruning=nothing --rpc.unsafe \ "$PWD"/build/ethermintd start --pruning=nothing --rpc.unsafe \
--p2p.laddr tcp://$IP_ADDR:$NODE_P2P_PORT"$i" --address tcp://$IP_ADDR:$NODE_PORT"$i" --rpc.laddr tcp://$IP_ADDR:$NODE_RPC_PORT"$i" \ --p2p.laddr tcp://$IP_ADDR:$NODE_P2P_PORT"$i" --address tcp://$IP_ADDR:$NODE_PORT"$i" --rpc.laddr tcp://$IP_ADDR:$NODE_RPC_PORT"$i" \
--json-rpc.address=$IP_ADDR:$RPC_PORT"$i" \ --json-rpc.address=$IP_ADDR:$RPC_PORT"$i" --json-rpc.ws-address=$IP_ADDR:$WS_PORT"$i" \
--json-rpc.api="eth,txpool,personal,net,debug,web3" \ --json-rpc.api="eth,txpool,personal,net,debug,web3" \
--keyring-backend test --home "$DATA_DIR$i" \ --keyring-backend test --home "$DATA_DIR$i" \
>"$DATA_DIR"/node"$i".log 2>&1 & disown >"$DATA_DIR"/node"$i".log 2>&1 & disown
@ -158,12 +160,12 @@ if [[ -z $TEST || $TEST == "rpc" || $TEST == "pending" ]]; then
for i in $(seq 1 "$TEST_QTD"); do for i in $(seq 1 "$TEST_QTD"); do
HOST_RPC=http://$IP_ADDR:$RPC_PORT"$i" HOST_RPC=http://$IP_ADDR:$RPC_PORT"$i"
echo "going to test ethermint node $HOST_RPC ..." HOST_WS=$IP_ADDR:$WS_PORT"$i"
MODE=$MODE HOST=$HOST_RPC go test ./tests/rpc/... -timeout=$time_out -v -short echo "going to test ethermint node rpc=$HOST_RPC ws=$HOST_WS ..."
MODE=$MODE HOST=$HOST_RPC HOST_WS=$HOST_WS go test ./tests/rpc/... -timeout=$time_out -v -short
TEST_FAIL=$? TEST_FAIL=$?
done done
fi fi
stop_func() { stop_func() {

174
tests/rpc/ws_test.go Normal file
View File

@ -0,0 +1,174 @@
package rpc
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
"net/url"
"os"
"strings"
"testing"
"time"
)
var (
HOST_WS = os.Getenv("HOST_WS")
wsUrl string
)
func init() {
if HOST_WS == "" {
HOST_WS = "localhost:8542"
}
u := url.URL{Scheme: "ws", Host: HOST_WS, Path: ""}
wsUrl = u.String()
}
func TestWsSingleRequest(t *testing.T) {
t.Log("test simple rpc request net_version with Websocket")
wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
require.NoError(t, err)
defer wc.Close()
wsWriteMessage(t, wc, `{"jsonrpc":"2.0","method":"net_version","params":[],"id":1}`)
time.Sleep(1 * time.Second)
mb := readMessage(t, wc)
msg := jsonUnmarshal(t, mb)
result, ok := msg["result"].(string)
require.True(t, ok)
require.Equal(t, "9000", result)
}
func TestWsBatchRequest(t *testing.T) {
t.Log("test batch request with Websocket")
wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
require.NoError(t, err)
defer wc.Close()
wsWriteMessage(t, wc, `[{"jsonrpc":"2.0","method":"net_version","params":[],"id":1},{"jsonrpc":"2.0","method":"eth_protocolVersion","params":[],"id":2}]`)
time.Sleep(1 * time.Second)
mb := readMessage(t, wc)
var msg []map[string]interface{}
err = json.Unmarshal(mb, &msg)
require.NoError(t, err)
require.Equal(t, 2, len(msg))
// net_version
resNetVersion := msg[0]
result, ok := resNetVersion["result"].(string)
require.True(t, ok)
require.Equal(t, "9000", result)
id, ok := resNetVersion["id"].(float64)
require.True(t, ok)
require.Equal(t, 1, int(id))
// eth_protocolVersion
resEthProtocolVersion := msg[1]
result, ok = resEthProtocolVersion["result"].(string)
require.True(t, ok)
require.Equal(t, "0x41", result)
id, ok = resEthProtocolVersion["id"].(float64)
require.True(t, ok)
require.Equal(t, 2, int(id))
}
func TestWsEth_subscribe_newHeads(t *testing.T) {
t.Log("test eth_subscribe newHeads with Websocket")
wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
require.NoError(t, err)
defer wc.Close()
wsWriteMessage(t, wc, `{"id":1,"method":"eth_subscribe","params":["newHeads",{}]}`)
time.Sleep(1 * time.Second)
mb := readMessage(t, wc)
msg := jsonUnmarshal(t, mb)
subscribeId, ok := msg["result"].(string)
require.True(t, ok)
require.True(t, strings.HasPrefix(subscribeId, "0x"))
time.Sleep(3 * time.Second)
mb = readMessage(t, wc)
msg = jsonUnmarshal(t, mb)
method, ok := msg["method"].(string)
require.Equal(t, "eth_subscription", method)
// id should not exist with eth_subscription event
_, ok = msg["id"].(float64)
require.False(t, ok)
wsUnsubscribe(t, wc, subscribeId) // eth_unsubscribe
}
func TestWsEth_subscribe_log(t *testing.T) {
t.Log("test eth_subscribe log with websocket")
wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
require.NoError(t, err)
defer wc.Close()
wsWriteMessage(t, wc, fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["logs",{"topics":["%s", "%s"]}]}`, helloTopic, worldTopic))
time.Sleep(1 * time.Second)
mb := readMessage(t, wc)
msg := jsonUnmarshal(t, mb)
subscribeId, ok := msg["result"].(string)
require.True(t, ok)
require.True(t, strings.HasPrefix(subscribeId, "0x"))
// do something here to receive subscription messages
deployTestContractWithFunction(t)
time.Sleep(3 * time.Second)
mb = readMessage(t, wc)
msg = jsonUnmarshal(t, mb)
method, ok := msg["method"].(string)
require.Equal(t, "eth_subscription", method)
// id should not exist with eth_subscription event
_, ok = msg["id"].(float64)
require.False(t, ok)
wsUnsubscribe(t, wc, subscribeId) // eth_unsubscribe
}
func wsWriteMessage(t *testing.T, wc *websocket.Conn, jsonStr string) {
t.Logf("send: %s", jsonStr)
err := wc.WriteMessage(websocket.TextMessage, []byte(jsonStr))
require.NoError(t, err)
}
func wsUnsubscribe(t *testing.T, wc *websocket.Conn, subscribeId string) {
t.Logf("eth_unsubscribe %s", subscribeId)
wsWriteMessage(t, wc, fmt.Sprintf(`{"id":1,"method":"eth_unsubscribe","params":["%s"]}`, subscribeId))
time.Sleep(1 * time.Second)
mb := readMessage(t, wc)
msg := jsonUnmarshal(t, mb)
result, ok := msg["result"].(bool)
require.True(t, ok)
require.True(t, result)
}
func readMessage(t *testing.T, wc *websocket.Conn) []byte {
_, mb, err := wc.ReadMessage()
require.NoError(t, err)
t.Logf("recv: %s", mb)
return mb
}
func jsonUnmarshal(t *testing.T, mb []byte) map[string]interface{} {
var msg map[string]interface{}
err := json.Unmarshal(mb, &msg)
require.NoError(t, err)
return msg
}