diff --git a/rpc/websockets.go b/rpc/websockets.go index 555a68c3..fb729fed 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -186,6 +186,13 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) { return } + if isBatch(mb) { + if err := s.tcpGetAndSendResponse(wsConn, mb); err != nil { + s.sendErrResponse(wsConn, err.Error()) + } + continue + } + var msg map[string]interface{} if err = json.Unmarshal(mb, &msg); err != nil { s.sendErrResponse(wsConn, err.Error()) @@ -214,14 +221,8 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) { switch method { case "eth_subscribe": - params, ok := msg["params"].([]interface{}) + params, ok := s.getParamsAndCheckValid(msg, wsConn) if !ok { - s.sendErrResponse(wsConn, "invalid parameters") - continue - } - - if len(params) == 0 { - s.sendErrResponse(wsConn, "empty parameters") continue } @@ -243,14 +244,8 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) { break } case "eth_unsubscribe": - params, ok := msg["params"].([]interface{}) + params, ok := s.getParamsAndCheckValid(msg, wsConn) if !ok { - s.sendErrResponse(wsConn, "invalid parameters") - continue - } - - if len(params) == 0 { - s.sendErrResponse(wsConn, "empty parameters") continue } @@ -285,6 +280,22 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) { } } +// tcpGetAndSendResponse sends error response to client if params is invalid +func (s *websocketsServer) getParamsAndCheckValid(msg map[string]interface{}, wsConn *wsConn) ([]interface{}, bool) { + params, ok := msg["params"].([]interface{}) + if !ok { + s.sendErrResponse(wsConn, "invalid parameters") + return nil, false + } + + if len(params) == 0 { + s.sendErrResponse(wsConn, "empty parameters") + return nil, false + } + + return params, true +} + // tcpGetAndSendResponse connects to the rest-server over tcp, posts a JSON-RPC request, and sends the response // to the client over websockets func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) error { @@ -658,3 +669,16 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { return nil, errors.New("syncing subscription is not implemented") } + +// copy from github.com/ethereum/go-ethereum/rpc/json.go +// isBatch returns true when the first non-whitespace characters is '[' +func isBatch(raw []byte) bool { + for _, c := range raw { + // skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt) + if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d { + continue + } + return c == '[' + } + return false +} diff --git a/scripts/integration-test-all.sh b/scripts/integration-test-all.sh index 1522f219..31450eb2 100755 --- a/scripts/integration-test-all.sh +++ b/scripts/integration-test-all.sh @@ -14,6 +14,8 @@ REMOVE_DATA_DIR=false #PORT AND RPC_PORT 3 initial digits, to be concat with a suffix later when node is initialized RPC_PORT="854" +# Ethereum JSONRPC Websocket +WS_PORT="855" IP_ADDR="0.0.0.0" KEY="mykey" @@ -106,7 +108,7 @@ start_func() { echo "starting ethermint node $i in background ..." "$PWD"/build/ethermintd start --pruning=nothing --rpc.unsafe \ --p2p.laddr tcp://$IP_ADDR:$NODE_P2P_PORT"$i" --address tcp://$IP_ADDR:$NODE_PORT"$i" --rpc.laddr tcp://$IP_ADDR:$NODE_RPC_PORT"$i" \ - --json-rpc.address=$IP_ADDR:$RPC_PORT"$i" \ + --json-rpc.address=$IP_ADDR:$RPC_PORT"$i" --json-rpc.ws-address=$IP_ADDR:$WS_PORT"$i" \ --json-rpc.api="eth,txpool,personal,net,debug,web3" \ --keyring-backend test --home "$DATA_DIR$i" \ >"$DATA_DIR"/node"$i".log 2>&1 & disown @@ -158,12 +160,12 @@ if [[ -z $TEST || $TEST == "rpc" || $TEST == "pending" ]]; then for i in $(seq 1 "$TEST_QTD"); do HOST_RPC=http://$IP_ADDR:$RPC_PORT"$i" - echo "going to test ethermint node $HOST_RPC ..." - MODE=$MODE HOST=$HOST_RPC go test ./tests/rpc/... -timeout=$time_out -v -short + HOST_WS=$IP_ADDR:$WS_PORT"$i" + echo "going to test ethermint node rpc=$HOST_RPC ws=$HOST_WS ..." + MODE=$MODE HOST=$HOST_RPC HOST_WS=$HOST_WS go test ./tests/rpc/... -timeout=$time_out -v -short TEST_FAIL=$? done - fi stop_func() { diff --git a/tests/rpc/ws_test.go b/tests/rpc/ws_test.go new file mode 100644 index 00000000..a13668e9 --- /dev/null +++ b/tests/rpc/ws_test.go @@ -0,0 +1,174 @@ +package rpc + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "net/url" + "os" + "strings" + "testing" + "time" +) + +var ( + HOST_WS = os.Getenv("HOST_WS") + wsUrl string +) + +func init() { + if HOST_WS == "" { + HOST_WS = "localhost:8542" + } + + u := url.URL{Scheme: "ws", Host: HOST_WS, Path: ""} + wsUrl = u.String() +} + +func TestWsSingleRequest(t *testing.T) { + t.Log("test simple rpc request net_version with Websocket") + + wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + require.NoError(t, err) + defer wc.Close() + + wsWriteMessage(t, wc, `{"jsonrpc":"2.0","method":"net_version","params":[],"id":1}`) + + time.Sleep(1 * time.Second) + mb := readMessage(t, wc) + msg := jsonUnmarshal(t, mb) + result, ok := msg["result"].(string) + require.True(t, ok) + require.Equal(t, "9000", result) +} + +func TestWsBatchRequest(t *testing.T) { + t.Log("test batch request with Websocket") + + wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + require.NoError(t, err) + defer wc.Close() + + wsWriteMessage(t, wc, `[{"jsonrpc":"2.0","method":"net_version","params":[],"id":1},{"jsonrpc":"2.0","method":"eth_protocolVersion","params":[],"id":2}]`) + + time.Sleep(1 * time.Second) + mb := readMessage(t, wc) + + var msg []map[string]interface{} + err = json.Unmarshal(mb, &msg) + require.NoError(t, err) + require.Equal(t, 2, len(msg)) + + // net_version + resNetVersion := msg[0] + result, ok := resNetVersion["result"].(string) + require.True(t, ok) + require.Equal(t, "9000", result) + id, ok := resNetVersion["id"].(float64) + require.True(t, ok) + require.Equal(t, 1, int(id)) + + // eth_protocolVersion + resEthProtocolVersion := msg[1] + result, ok = resEthProtocolVersion["result"].(string) + require.True(t, ok) + require.Equal(t, "0x41", result) + id, ok = resEthProtocolVersion["id"].(float64) + require.True(t, ok) + require.Equal(t, 2, int(id)) +} + +func TestWsEth_subscribe_newHeads(t *testing.T) { + t.Log("test eth_subscribe newHeads with Websocket") + + wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + require.NoError(t, err) + defer wc.Close() + + wsWriteMessage(t, wc, `{"id":1,"method":"eth_subscribe","params":["newHeads",{}]}`) + + time.Sleep(1 * time.Second) + mb := readMessage(t, wc) + msg := jsonUnmarshal(t, mb) + subscribeId, ok := msg["result"].(string) + require.True(t, ok) + require.True(t, strings.HasPrefix(subscribeId, "0x")) + + time.Sleep(3 * time.Second) + mb = readMessage(t, wc) + msg = jsonUnmarshal(t, mb) + method, ok := msg["method"].(string) + require.Equal(t, "eth_subscription", method) + + // id should not exist with eth_subscription event + _, ok = msg["id"].(float64) + require.False(t, ok) + + wsUnsubscribe(t, wc, subscribeId) // eth_unsubscribe +} + +func TestWsEth_subscribe_log(t *testing.T) { + t.Log("test eth_subscribe log with websocket") + + wc, _, err := websocket.DefaultDialer.Dial(wsUrl, nil) + require.NoError(t, err) + defer wc.Close() + + wsWriteMessage(t, wc, fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["logs",{"topics":["%s", "%s"]}]}`, helloTopic, worldTopic)) + + time.Sleep(1 * time.Second) + mb := readMessage(t, wc) + msg := jsonUnmarshal(t, mb) + subscribeId, ok := msg["result"].(string) + require.True(t, ok) + require.True(t, strings.HasPrefix(subscribeId, "0x")) + + // do something here to receive subscription messages + deployTestContractWithFunction(t) + + time.Sleep(3 * time.Second) + mb = readMessage(t, wc) + msg = jsonUnmarshal(t, mb) + method, ok := msg["method"].(string) + require.Equal(t, "eth_subscription", method) + + // id should not exist with eth_subscription event + _, ok = msg["id"].(float64) + require.False(t, ok) + + wsUnsubscribe(t, wc, subscribeId) // eth_unsubscribe +} + +func wsWriteMessage(t *testing.T, wc *websocket.Conn, jsonStr string) { + t.Logf("send: %s", jsonStr) + err := wc.WriteMessage(websocket.TextMessage, []byte(jsonStr)) + require.NoError(t, err) +} + +func wsUnsubscribe(t *testing.T, wc *websocket.Conn, subscribeId string) { + t.Logf("eth_unsubscribe %s", subscribeId) + wsWriteMessage(t, wc, fmt.Sprintf(`{"id":1,"method":"eth_unsubscribe","params":["%s"]}`, subscribeId)) + + time.Sleep(1 * time.Second) + mb := readMessage(t, wc) + msg := jsonUnmarshal(t, mb) + + result, ok := msg["result"].(bool) + require.True(t, ok) + require.True(t, result) +} + +func readMessage(t *testing.T, wc *websocket.Conn) []byte { + _, mb, err := wc.ReadMessage() + require.NoError(t, err) + t.Logf("recv: %s", mb) + return mb +} + +func jsonUnmarshal(t *testing.T, mb []byte) map[string]interface{} { + var msg map[string]interface{} + err := json.Unmarshal(mb, &msg) + require.NoError(t, err) + return msg +}