rpc: add new client, use it everywhere

The new client implementation supports concurrent requests,
subscriptions and replaces the various ad hoc RPC clients
throughout go-ethereum.
This commit is contained in:
Felix Lange 2016-07-12 17:47:15 +02:00
parent bb01bea4e2
commit 91b7690428
30 changed files with 2007 additions and 756 deletions

View File

@ -17,11 +17,7 @@
package backends package backends
import ( import (
"encoding/json"
"fmt"
"math/big" "math/big"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -37,119 +33,34 @@ var _ bind.ContractBackend = (*rpcBackend)(nil)
// rpcBackend implements bind.ContractBackend, and acts as the data provider to // rpcBackend implements bind.ContractBackend, and acts as the data provider to
// Ethereum contracts bound to Go structs. It uses an RPC connection to delegate // Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
// all its functionality. // all its functionality.
//
// Note: The current implementation is a blocking one. This should be replaced
// by a proper async version when a real RPC client is created.
type rpcBackend struct { type rpcBackend struct {
client rpc.Client // RPC client connection to interact with an API server client *rpc.Client // RPC client connection to interact with an API server
autoid uint32 // ID number to use for the next API request
lock sync.Mutex // Singleton access until we get to request multiplexing
} }
// NewRPCBackend creates a new binding backend to an RPC provider that can be // NewRPCBackend creates a new binding backend to an RPC provider that can be
// used to interact with remote contracts. // used to interact with remote contracts.
func NewRPCBackend(client rpc.Client) bind.ContractBackend { func NewRPCBackend(client *rpc.Client) bind.ContractBackend {
return &rpcBackend{ return &rpcBackend{client: client}
client: client,
}
}
// request is a JSON RPC request package assembled internally from the client
// method calls.
type request struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Method string `json:"method"` // Remote procedure name to invoke on the server
Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple)
}
// response is a JSON RPC response package sent back from the API server.
type response struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Error *failure `json:"error"` // Any error returned by the remote side
Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply
}
// failure is a JSON RPC response error field sent back from the API server.
type failure struct {
Code int `json:"code"` // JSON RPC error code associated with the failure
Message string `json:"message"` // Specific error message of the failure
}
// request forwards an API request to the RPC server, and parses the response.
//
// This is currently painfully non-concurrent, but it will have to do until we
// find the time for niceties like this :P
func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
b.lock.Lock()
defer b.lock.Unlock()
if ctx == nil {
ctx = context.Background()
}
// Ugly hack to serialize an empty list properly
if params == nil {
params = []interface{}{}
}
// Assemble the request object
reqID := int(atomic.AddUint32(&b.autoid, 1))
req := &request{
JSONRPC: "2.0",
ID: reqID,
Method: method,
Params: params,
}
if err := b.client.Send(req); err != nil {
return nil, err
}
res := new(response)
errc := make(chan error, 1)
go func() {
errc <- b.client.Recv(res)
}()
select {
case err := <-errc:
if err != nil {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
if res.Error != nil {
if res.Error.Message == bind.ErrNoCode.Error() {
return nil, bind.ErrNoCode
}
return nil, fmt.Errorf("remote error: %s", res.Error.Message)
}
return res.Result, nil
} }
// HasCode implements ContractVerifier.HasCode by retrieving any code associated // HasCode implements ContractVerifier.HasCode by retrieving any code associated
// with the contract from the remote node, and checking its size. // with the contract from the remote node, and checking its size.
func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) { func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
// Execute the RPC code retrieval
block := "latest" block := "latest"
if pending { if pending {
block = "pending" block = "pending"
} }
res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block}) var hex string
err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block)
if err != nil { if err != nil {
return false, err return false, err
} }
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return false, err
}
// Convert the response back to a Go byte slice and return
return len(common.FromHex(hex)) > 0, nil return len(common.FromHex(hex)) > 0, nil
} }
// ContractCall implements ContractCaller.ContractCall, delegating the execution of // ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing. // a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) { func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
// Pack up the request into an RPC argument
args := struct { args := struct {
To common.Address `json:"to"` To common.Address `json:"to"`
Data string `json:"data"` Data string `json:"data"`
@ -157,63 +68,43 @@ func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address,
To: contract, To: contract,
Data: common.ToHex(data), Data: common.ToHex(data),
} }
// Execute the RPC call and retrieve the response
block := "latest" block := "latest"
if pending { if pending {
block = "pending" block = "pending"
} }
res, err := b.request(ctx, "eth_call", []interface{}{args, block}) var hex string
err := b.client.CallContext(ctx, &hex, "eth_call", args, block)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
// Convert the response back to a Go byte slice and return
return common.FromHex(hex), nil return common.FromHex(hex), nil
} }
// PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating // PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
// the current account nonce retrieval to the remote node. // the current account nonce retrieval to the remote node.
func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) { func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"}) var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending")
if err != nil { if err != nil {
return 0, err return 0, err
} }
var hex string return hex.Uint64(), nil
if err := json.Unmarshal(res, &hex); err != nil {
return 0, err
}
nonce, ok := new(big.Int).SetString(hex, 0)
if !ok {
return 0, fmt.Errorf("invalid nonce hex: %s", hex)
}
return nonce.Uint64(), nil
} }
// SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the // SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
// gas price oracle request to the remote node. // gas price oracle request to the remote node.
func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) { func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
res, err := b.request(ctx, "eth_gasPrice", nil) var hex rpc.HexNumber
if err != nil { if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
return nil, err return nil, err
} }
var hex string return (*big.Int)(&hex), nil
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
price, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid price hex: %s", hex)
}
return price, nil
} }
// EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating // EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
// the gas estimation to the remote node. // the gas estimation to the remote node.
func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) { func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
// Pack up the request into an RPC argument
args := struct { args := struct {
From common.Address `json:"from"` From common.Address `json:"from"`
To *common.Address `json:"to"` To *common.Address `json:"to"`
@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address
Value: rpc.NewHexNumber(value), Value: rpc.NewHexNumber(value),
} }
// Execute the RPC call and retrieve the response // Execute the RPC call and retrieve the response
res, err := b.request(ctx, "eth_estimateGas", []interface{}{args}) var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var hex string return (*big.Int)(&hex), nil
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
estimate, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid estimate hex: %s", hex)
}
return estimate, nil
} }
// SendTransaction implements ContractTransactor.SendTransaction, delegating the // SendTransaction implements ContractTransactor.SendTransaction, delegating the
@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction)
if err != nil { if err != nil {
return err return err
} }
res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)}) return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
if err != nil {
return err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return err
}
return nil
} }

View File

@ -19,9 +19,12 @@ package main
import ( import (
"os" "os"
"os/signal" "os/signal"
"strings"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/console" "github.com/ethereum/go-ethereum/console"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1" "gopkg.in/urfave/cli.v1"
) )
@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error {
// console to it. // console to it.
func remoteConsole(ctx *cli.Context) error { func remoteConsole(ctx *cli.Context) error {
// Attach to a remotely running geth instance and start the JavaScript console // Attach to a remotely running geth instance and start the JavaScript console
client, err := utils.NewRemoteRPCClient(ctx) client, err := dialRPC(ctx.Args().First())
if err != nil { if err != nil {
utils.Fatalf("Unable to attach to remote geth: %v", err) utils.Fatalf("Unable to attach to remote geth: %v", err)
} }
@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error {
return nil return nil
} }
// dialRPC returns a RPC client which connects to the given endpoint.
// The check for empty endpoint implements the defaulting logic
// for "geth attach" and "geth monitor" with no argument.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint()
} else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") {
// Backwards compatibility with geth < 1.5 which required
// these prefixes.
endpoint = endpoint[4:]
}
return rpc.Dial(endpoint)
}
// ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript // ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript
// console to it, and each of the files specified as arguments and tears the // console to it, and each of the files specified as arguments and tears the
// everything down. // everything down.

View File

@ -21,11 +21,10 @@ import (
"math" "math"
"reflect" "reflect"
"runtime" "runtime"
"sort"
"strings" "strings"
"time" "time"
"sort"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -36,7 +35,7 @@ import (
var ( var (
monitorCommandAttachFlag = cli.StringFlag{ monitorCommandAttachFlag = cli.StringFlag{
Name: "attach", Name: "attach",
Value: "ipc:" + node.DefaultIPCEndpoint(), Value: node.DefaultIPCEndpoint(),
Usage: "API endpoint to attach to", Usage: "API endpoint to attach to",
} }
monitorCommandRowsFlag = cli.IntFlag{ monitorCommandRowsFlag = cli.IntFlag{
@ -69,12 +68,12 @@ to display multiple metrics simultaneously.
// monitor starts a terminal UI based monitoring tool for the requested metrics. // monitor starts a terminal UI based monitoring tool for the requested metrics.
func monitor(ctx *cli.Context) error { func monitor(ctx *cli.Context) error {
var ( var (
client rpc.Client client *rpc.Client
err error err error
) )
// Attach to an Ethereum node over IPC or RPC // Attach to an Ethereum node over IPC or RPC
endpoint := ctx.String(monitorCommandAttachFlag.Name) endpoint := ctx.String(monitorCommandAttachFlag.Name)
if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil { if client, err = dialRPC(endpoint); err != nil {
utils.Fatalf("Unable to attach to geth node: %v", err) utils.Fatalf("Unable to attach to geth node: %v", err)
} }
defer client.Close() defer client.Close()
@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error {
// retrieveMetrics contacts the attached geth node and retrieves the entire set // retrieveMetrics contacts the attached geth node and retrieves the entire set
// of collected system metrics. // of collected system metrics.
func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) { func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) {
req := map[string]interface{}{ var metrics map[string]interface{}
"id": new(int64), err := client.Call(&metrics, "debug_metrics", true)
"method": "debug_metrics", return metrics, err
"jsonrpc": "2.0",
"params": []interface{}{true},
}
if err := client.Send(req); err != nil {
return nil, err
}
var res rpc.JSONSuccessResponse
if err := client.Recv(&res); err != nil {
return nil, err
}
if res.Result != nil {
if mets, ok := res.Result.(map[string]interface{}); ok {
return mets, nil
}
}
return nil, fmt.Errorf("unable to retrieve metrics")
} }
// resolveMetrics takes a list of input metric patterns, and resolves each to one // resolveMetrics takes a list of input metric patterns, and resolves each to one
@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 {
// refreshCharts retrieves a next batch of metrics, and inserts all the new // refreshCharts retrieves a next batch of metrics, and inserts all the new
// values into the active datasets and charts // values into the active datasets and charts
func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) { func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
values, err := retrieveMetrics(client) values, err := retrieveMetrics(client)
for i, metric := range metrics { for i, metric := range metrics {
if len(data) < 512 { if len(data) < 512 {

View File

@ -1,55 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1"
)
// NewRemoteRPCClient returns a RPC client which connects to a running geth instance.
// Depending on the given context this can either be a IPC or a HTTP client.
func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
if ctx.Args().Present() {
endpoint := ctx.Args().First()
return NewRemoteRPCClientFromString(endpoint)
}
// use IPC by default
return rpc.NewIPCClient(node.DefaultIPCEndpoint())
}
// NewRemoteRPCClientFromString returns a RPC client which connects to the given
// endpoint. It must start with either `ipc:` or `rpc:` (HTTP).
func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
if strings.HasPrefix(endpoint, "ipc:") {
return rpc.NewIPCClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "rpc:") {
return rpc.NewHTTPClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "http://") {
return rpc.NewHTTPClient(endpoint)
}
if strings.HasPrefix(endpoint, "ws:") {
return rpc.NewWSClient(endpoint)
}
return nil, fmt.Errorf("invalid endpoint")
}

View File

@ -31,13 +31,13 @@ import (
// bridge is a collection of JavaScript utility methods to bride the .js runtime // bridge is a collection of JavaScript utility methods to bride the .js runtime
// environment and the Go RPC connection backing the remote method calls. // environment and the Go RPC connection backing the remote method calls.
type bridge struct { type bridge struct {
client rpc.Client // RPC client to execute Ethereum requests through client *rpc.Client // RPC client to execute Ethereum requests through
prompter UserPrompter // Input prompter to allow interactive user feedback prompter UserPrompter // Input prompter to allow interactive user feedback
printer io.Writer // Output writer to serialize any display strings to printer io.Writer // Output writer to serialize any display strings to
} }
// newBridge creates a new JavaScript wrapper around an RPC client. // newBridge creates a new JavaScript wrapper around an RPC client.
func newBridge(client rpc.Client, prompter UserPrompter, printer io.Writer) *bridge { func newBridge(client *rpc.Client, prompter UserPrompter, printer io.Writer) *bridge {
return &bridge{ return &bridge{
client: client, client: client,
prompter: prompter, prompter: prompter,
@ -188,88 +188,86 @@ func (b *bridge) SleepBlocks(call otto.FunctionCall) (response otto.Value) {
return otto.FalseValue() return otto.FalseValue()
} }
// Send will serialize the first argument, send it to the node and returns the response. type jsonrpcCall struct {
Id int64
Method string
Params []interface{}
}
// Send implements the web3 provider "send" method.
func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) { func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) {
// Ensure that we've got a batch request (array) or a single request (object) // Remarshal the request into a Go value.
arg := call.Argument(0).Object() JSON, _ := call.Otto.Object("JSON")
if arg == nil || (arg.Class() != "Array" && arg.Class() != "Object") { reqVal, err := JSON.Call("stringify", call.Argument(0))
throwJSException("request must be an object or array")
}
// Convert the otto VM arguments to Go values
data, err := call.Otto.Call("JSON.stringify", nil, arg)
if err != nil { if err != nil {
throwJSException(err.Error()) throwJSException(err.Error())
} }
reqjson, err := data.ToString()
if err != nil {
throwJSException(err.Error())
}
var ( var (
reqs []rpc.JSONRequest rawReq = []byte(reqVal.String())
batch = true reqs []jsonrpcCall
batch bool
) )
if err = json.Unmarshal([]byte(reqjson), &reqs); err != nil { if rawReq[0] == '[' {
// single request? batch = true
reqs = make([]rpc.JSONRequest, 1) json.Unmarshal(rawReq, &reqs)
if err = json.Unmarshal([]byte(reqjson), &reqs[0]); err != nil { } else {
throwJSException("invalid request")
}
batch = false batch = false
reqs = make([]jsonrpcCall, 1)
json.Unmarshal(rawReq, &reqs[0])
} }
// Iteratively execute the requests
call.Otto.Set("response_len", len(reqs))
call.Otto.Run("var ret_response = new Array(response_len);")
for i, req := range reqs { // Execute the requests.
// Execute the RPC request and parse the reply resps, _ := call.Otto.Object("new Array()")
if err = b.client.Send(&req); err != nil { for _, req := range reqs {
return newErrorResponse(call, -32603, err.Error(), req.Id) resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`)
resp.Set("id", req.Id)
var result json.RawMessage
err = b.client.Call(&result, req.Method, req.Params...)
switch err := err.(type) {
case nil:
if result == nil {
// Special case null because it is decoded as an empty
// raw message for some reason.
resp.Set("result", otto.NullValue())
} else {
resultVal, err := JSON.Call("parse", string(result))
if err != nil {
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
} else {
resp.Set("result", resultVal)
} }
result := make(map[string]interface{})
if err = b.client.Recv(&result); err != nil {
return newErrorResponse(call, -32603, err.Error(), req.Id)
} }
// Feed the reply back into the JavaScript runtime environment case rpc.Error:
id, _ := result["id"] resp.Set("error", map[string]interface{}{
jsonver, _ := result["jsonrpc"] "code": err.ErrorCode(),
"message": err.Error(),
})
default:
resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object()
}
resps.Call("push", resp)
}
call.Otto.Set("ret_id", id) // Return the responses either to the callback (if supplied)
call.Otto.Set("ret_jsonrpc", jsonver) // or directly as the return value.
call.Otto.Set("response_idx", i) if batch {
response = resps.Value()
} else {
response, _ = resps.Get("0")
}
if fn := call.Argument(1).Object(); fn != nil && fn.Class() == "function" {
fn.Call("apply", response)
return otto.UndefinedValue()
}
return response
}
if res, ok := result["result"]; ok { func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) otto.Value {
payload, _ := json.Marshal(res) // Bundle the error into a JSON RPC call response
call.Otto.Set("ret_result", string(payload)) m := map[string]interface{}{"version": "2.0", "id": id, "error": map[string]interface{}{"code": code, msg: msg}}
response, err = call.Otto.Run(` res, _ := json.Marshal(m)
ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) }; val, _ := call.Otto.Run("(" + string(res) + ")")
`) return val
continue
}
if res, ok := result["error"]; ok {
payload, _ := json.Marshal(res)
call.Otto.Set("ret_result", string(payload))
response, err = call.Otto.Run(`
ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, error: JSON.parse(ret_result) };
`)
continue
}
return newErrorResponse(call, -32603, fmt.Sprintf("Invalid response"), new(int64))
}
// Convert single requests back from batch ones
if !batch {
call.Otto.Run("ret_response = ret_response[0];")
}
// Execute any registered callbacks
if call.Argument(1).IsObject() {
call.Otto.Set("callback", call.Argument(1))
call.Otto.Run(`
if (Object.prototype.toString.call(callback) == '[object Function]') {
callback(null, ret_response);
}
`)
}
return
} }
// throwJSException panics on an otto.Value. The Otto VM will recover from the // throwJSException panics on an otto.Value. The Otto VM will recover from the
@ -281,37 +279,3 @@ func throwJSException(msg interface{}) otto.Value {
} }
panic(val) panic(val)
} }
// newErrorResponse creates a JSON RPC error response for a specific request id,
// containing the specified error code and error message. Beside returning the
// error to the caller, it also sets the ret_error and ret_response JavaScript
// variables.
func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) {
// Bundle the error into a JSON RPC call response
res := rpc.JSONErrResponse{
Version: rpc.JSONRPCVersion,
Id: id,
Error: rpc.JSONError{
Code: code,
Message: msg,
},
}
// Serialize the error response into JavaScript variables
errObj, err := json.Marshal(res.Error)
if err != nil {
glog.V(logger.Error).Infof("Failed to serialize JSON RPC error: %v", err)
}
resObj, err := json.Marshal(res)
if err != nil {
glog.V(logger.Error).Infof("Failed to serialize JSON RPC error response: %v", err)
}
if _, err = call.Otto.Run("ret_error = " + string(errObj)); err != nil {
glog.V(logger.Error).Infof("Failed to set `ret_error` to the occurred error: %v", err)
}
resVal, err := call.Otto.Run("ret_response = " + string(resObj))
if err != nil {
glog.V(logger.Error).Infof("Failed to set `ret_response` to the JSON RPC response: %v", err)
}
return resVal
}

View File

@ -52,7 +52,7 @@ const DefaultPrompt = "> "
type Config struct { type Config struct {
DataDir string // Data directory to store the console history at DataDir string // Data directory to store the console history at
DocRoot string // Filesystem path from where to load JavaScript files from DocRoot string // Filesystem path from where to load JavaScript files from
Client rpc.Client // RPC client to execute Ethereum requests through Client *rpc.Client // RPC client to execute Ethereum requests through
Prompt string // Input prompt prefix string (defaults to DefaultPrompt) Prompt string // Input prompt prefix string (defaults to DefaultPrompt)
Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter) Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter)
Printer io.Writer // Output writer to serialize any display strings to (defaults to os.Stdout) Printer io.Writer // Output writer to serialize any display strings to (defaults to os.Stdout)
@ -63,7 +63,7 @@ type Config struct {
// JavaScript console attached to a running node via an external or in-process RPC // JavaScript console attached to a running node via an external or in-process RPC
// client. // client.
type Console struct { type Console struct {
client rpc.Client // RPC client to execute Ethereum requests through client *rpc.Client // RPC client to execute Ethereum requests through
jsre *jsre.JSRE // JavaScript runtime environment running the interpreter jsre *jsre.JSRE // JavaScript runtime environment running the interpreter
prompt string // Input prompt prefix string prompt string // Input prompt prefix string
prompter UserPrompter // Input prompter to allow interactive user feedback prompter UserPrompter // Input prompter to allow interactive user feedback

View File

@ -116,7 +116,7 @@ func (ctx ppctx) printValue(v otto.Value, level int, inArray bool) {
func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) { func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) {
switch obj.Class() { switch obj.Class() {
case "Array": case "Array", "GoArray":
lv, _ := obj.Get("length") lv, _ := obj.Get("length")
len, _ := lv.ToInteger() len, _ := lv.ToInteger()
if len == 0 { if len == 0 {

View File

@ -505,16 +505,14 @@ func (n *Node) Restart() error {
} }
// Attach creates an RPC client attached to an in-process API handler. // Attach creates an RPC client attached to an in-process API handler.
func (n *Node) Attach() (rpc.Client, error) { func (n *Node) Attach() (*rpc.Client, error) {
n.lock.RLock() n.lock.RLock()
defer n.lock.RUnlock() defer n.lock.RUnlock()
// Short circuit if the node's not running
if n.server == nil { if n.server == nil {
return nil, ErrNodeStopped return nil, ErrNodeStopped
} }
// Otherwise attach to the API and return return rpc.DialInProc(n.inprocHandler), nil
return rpc.NewInProcRPCClient(n.inprocHandler), nil
} }
// Server retrieves the currently running P2P network layer. This method is meant // Server retrieves the currently running P2P network layer. This method is meant

View File

@ -507,21 +507,27 @@ func TestAPIGather(t *testing.T) {
} }
// Register a batch of services with some configured APIs // Register a batch of services with some configured APIs
calls := make(chan string, 1) calls := make(chan string, 1)
makeAPI := func(result string) *OneMethodApi {
return &OneMethodApi{fun: func() { calls <- result }}
}
services := map[string]struct { services := map[string]struct {
APIs []rpc.API APIs []rpc.API
Maker InstrumentingWrapper Maker InstrumentingWrapper
}{ }{
"Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA}, "Zero APIs": {
"Single API": {[]rpc.API{ []rpc.API{}, InstrumentedServiceMakerA},
{"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true}, "Single API": {
[]rpc.API{
{Namespace: "single", Version: "1", Service: makeAPI("single.v1"), Public: true},
}, InstrumentedServiceMakerB}, }, InstrumentedServiceMakerB},
"Many APIs": {[]rpc.API{ "Many APIs": {
{"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true}, []rpc.API{
{"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true}, {Namespace: "multi", Version: "1", Service: makeAPI("multi.v1"), Public: true},
{"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true}, {Namespace: "multi.v2", Version: "2", Service: makeAPI("multi.v2"), Public: true},
{Namespace: "multi.v2.nested", Version: "2", Service: makeAPI("multi.v2.nested"), Public: true},
}, InstrumentedServiceMakerC}, }, InstrumentedServiceMakerC},
} }
for id, config := range services { for id, config := range services {
config := config config := config
constructor := func(*ServiceContext) (Service, error) { constructor := func(*ServiceContext) (Service, error) {
@ -554,12 +560,8 @@ func TestAPIGather(t *testing.T) {
{"multi.v2.nested_theOneMethod", "multi.v2.nested"}, {"multi.v2.nested_theOneMethod", "multi.v2.nested"},
} }
for i, test := range tests { for i, test := range tests {
if err := client.Send(rpc.JSONRequest{Id: []byte("1"), Version: "2.0", Method: test.Method}); err != nil { if err := client.Call(nil, test.Method); err != nil {
t.Fatalf("test %d: failed to send API request: %v", i, err) t.Errorf("test %d: API request failed: %v", i, err)
}
reply := new(rpc.JSONSuccessResponse)
if err := client.Recv(reply); err != nil {
t.Fatalf("test %d: failed to read API reply: %v", i, err)
} }
select { select {
case result := <-calls: case result := <-calls:

740
rpc/client.go Normal file
View File

@ -0,0 +1,740 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
)
var (
ErrClientQuit = errors.New("client is closed")
ErrNoResult = errors.New("no result in JSON-RPC response")
)
const (
clientSubscriptionBuffer = 100 // if exceeded, the client stops reading
tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline
defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)
// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Args []interface{}
// The result is unmarshaled into this field. Result must be set to a
// non-nil pointer value of the desired type, otherwise the response will be
// discarded.
Result interface{}
// Error is set if the server returns an error for this request, or if
// unmarshaling into Result fails. It is not set for I/O errors.
Error error
}
// A value of this type can a JSON-RPC request, notification, successful response or
// error response. Which one it is depends on the fields.
type jsonrpcMessage struct {
Version string `json:"jsonrpc"`
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
Error *jsonError `json:"error,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}
func (msg *jsonrpcMessage) isNotification() bool {
return msg.ID == nil && msg.Method != ""
}
func (msg *jsonrpcMessage) isResponse() bool {
return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0
}
func (msg *jsonrpcMessage) hasValidID() bool {
return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '['
}
func (msg *jsonrpcMessage) String() string {
b, _ := json.Marshal(msg)
return string(b)
}
// Client represents a connection to an RPC server.
type Client struct {
idCounter uint32
connectFunc func(ctx context.Context) (net.Conn, error)
isHTTP bool
// writeConn is only safe to access outside dispatch, with the
// write lock held. The write lock is taken by sending on
// requestOp and released by sending on sendDone.
writeConn net.Conn
// for dispatch
close chan struct{}
didQuit chan struct{} // closed when client quits
reconnected chan net.Conn // where write/reconnect sends the new connection
readErr chan error // errors from read
readResp chan []*jsonrpcMessage // valid messages from read
requestOp chan *requestOp // for registering response IDs
sendDone chan error // signals write completion, releases write lock
respWait map[string]*requestOp // active requests
subs map[string]*ClientSubscription // active subscriptions
}
type requestOp struct {
ids []json.RawMessage
err error
resp chan *jsonrpcMessage // receives up to len(ids) responses
sub *ClientSubscription // only set for EthSubscribe requests
}
func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-op.resp:
return resp, op.err
}
}
// Dial creates a new client for the given URL.
//
// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a
// file name with no URL scheme, a local socket connection is established using UNIX
// domain sockets on supported platforms and named pipes on Windows. If you want to
// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead.
//
// For websocket connections, the origin is set to the local host name.
//
// The client reconnects automatically if the connection is lost.
func Dial(rawurl string) (*Client, error) {
return DialContext(context.Background(), rawurl)
}
// DialContext creates a new RPC client, just like Dial.
//
// The context is used to cancel or time out the initial connection establishment. It does
// not affect subsequent interactions with the client.
func DialContext(ctx context.Context, rawurl string) (*Client, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
return DialHTTP(rawurl)
case "ws", "wss":
return DialWebsocket(ctx, rawurl, "")
case "":
return DialIPC(ctx, rawurl)
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
}
func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
conn, err := connectFunc(initctx)
if err != nil {
return nil, err
}
_, isHTTP := conn.(*httpConn)
c := &Client{
writeConn: conn,
isHTTP: isHTTP,
connectFunc: connectFunc,
close: make(chan struct{}),
didQuit: make(chan struct{}),
reconnected: make(chan net.Conn),
readErr: make(chan error),
readResp: make(chan []*jsonrpcMessage),
requestOp: make(chan *requestOp),
sendDone: make(chan error, 1),
respWait: make(map[string]*requestOp),
subs: make(map[string]*ClientSubscription),
}
if !isHTTP {
go c.dispatch(conn)
}
return c, nil
}
func (c *Client) nextID() json.RawMessage {
id := atomic.AddUint32(&c.idCounter, 1)
return []byte(strconv.FormatUint(uint64(id), 10))
}
// SupportedModules calls the rpc_modules method, retrieving the list of
// APIs that are available on the server.
func (c *Client) SupportedModules() (map[string]string, error) {
var result map[string]string
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
err := c.CallContext(ctx, &result, "rpc_modules")
return result, err
}
// Close closes the client, aborting any in-flight requests.
func (c *Client) Close() {
if c.isHTTP {
return
}
select {
case c.close <- struct{}{}:
<-c.didQuit
case <-c.didQuit:
}
}
// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
ctx := context.Background()
return c.CallContext(ctx, result, method, args...)
}
// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
msg, err := c.newMessage(method, args...)
if err != nil {
return err
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
}
if err != nil {
return err
}
// dispatch has accepted the request and will close the channel it when it quits.
switch resp, err := op.wait(ctx); {
case err != nil:
return err
case resp.Error != nil:
return resp.Error
case len(resp.Result) == 0:
return ErrNoResult
default:
return json.Unmarshal(resp.Result, &result)
}
}
// BatchCall sends all given requests as a single batch and waits for the server
// to return a response for all of them.
//
// In contrast to Call, BatchCall only returns I/O errors. Any error specific to
// a request is reported through the Error field of the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCall(b []BatchElem) error {
ctx := context.Background()
return c.BatchCallContext(ctx, b)
}
// BatchCall sends all given requests as a single batch and waits for the server
// to return a response for all of them. The wait duration is bounded by the
// context's deadline.
//
// In contrast to CallContext, BatchCallContext only returns I/O errors. Any
// error specific to a request is reported through the Error field of the
// corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
msgs := make([]*jsonrpcMessage, len(b))
op := &requestOp{
ids: make([]json.RawMessage, len(b)),
resp: make(chan *jsonrpcMessage, len(b)),
}
for i, elem := range b {
msg, err := c.newMessage(elem.Method, elem.Args...)
if err != nil {
return err
}
msgs[i] = msg
op.ids[i] = msg.ID
}
var err error
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
}
// Wait for all responses to come back.
for n := 0; n < len(b) && err == nil; n++ {
var resp *jsonrpcMessage
resp, err = op.wait(ctx)
if err != nil {
break
}
// Find the element corresponding to this response.
// The element is guaranteed to be present because dispatch
// only sends valid IDs to our channel.
var elem *BatchElem
for i := range msgs {
if bytes.Equal(msgs[i].ID, resp.ID) {
elem = &b[i]
break
}
}
if resp.Error != nil {
elem.Error = resp.Error
continue
}
if len(resp.Result) == 0 {
elem.Error = ErrNoResult
continue
}
elem.Error = json.Unmarshal(resp.Result, elem.Result)
}
return err
}
// EthSubscribe calls the "eth_subscribe" method with the given arguments,
// registering a subscription. Server notifications for the subscription are
// sent to the given channel. The element type of the channel must match the
// expected type of content returned by the subscription.
//
// Callers should not use the same channel for multiple calls to EthSubscribe.
// The channel is closed when the notification is unsubscribed or an error
// occurs. The error can be retrieved via the Err method of the subscription.
//
// Slow subscribers will block the clients ingress path eventually.
func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
panic("first argument to EthSubscribe must be a writable channel")
}
if chanVal.IsNil() {
panic("channel given to EthSubscribe must not be nil")
}
if c.isHTTP {
return nil, ErrNotificationsUnsupported
}
msg, err := c.newMessage(subscribeMethod, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, chanVal),
}
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel()
// Send the subscription request.
// The arrival and validity of the response is signaled on sub.quit.
if err := c.send(ctx, op, msg); err != nil {
return nil, err
}
if _, err := op.wait(ctx); err != nil {
return nil, err
}
return op.sub, nil
}
func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
params, err := json.Marshal(paramsIn)
if err != nil {
return nil, err
}
return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil
}
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
select {
case c.requestOp <- op:
if glog.V(logger.Detail) {
glog.Info("sending ", msg)
}
err := c.write(ctx, msg)
c.sendDone <- err
return err
case <-c.didQuit:
return ErrClientQuit
}
}
func (c *Client) write(ctx context.Context, msg interface{}) error {
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultWriteTimeout)
}
// The previous write failed. Try to establish a new connection.
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
c.writeConn.SetWriteDeadline(deadline)
err := json.NewEncoder(c.writeConn).Encode(msg)
if err != nil {
c.writeConn = nil
}
return err
}
func (c *Client) reconnect(ctx context.Context) error {
newconn, err := c.connectFunc(ctx)
if err != nil {
glog.V(logger.Detail).Infof("reconnect failed: %v", err)
return err
}
select {
case c.reconnected <- newconn:
c.writeConn = newconn
return nil
case <-c.didQuit:
newconn.Close()
return ErrClientQuit
}
}
// dispatch is the main loop of the client.
// It sends read messages to waiting calls to Call and BatchCall
// and subscription notifications to registered subscriptions.
func (c *Client) dispatch(conn net.Conn) {
// Spawn the initial read loop.
go c.read(conn)
var (
lastOp *requestOp // tracks last send operation
requestOpLock = c.requestOp // nil while the send lock is held
reading = true // if true, a read loop is running
)
defer close(c.didQuit)
defer func() {
c.closeRequestOps(ErrClientQuit)
conn.Close()
if reading {
// Empty read channels until read is dead.
for {
select {
case <-c.readResp:
case <-c.readErr:
return
}
}
}
}()
for {
select {
case <-c.close:
return
// Read path.
case batch := <-c.readResp:
for _, msg := range batch {
switch {
case msg.isNotification():
if glog.V(logger.Detail) {
glog.Info("<-readResp: notification ", msg)
}
c.handleNotification(msg)
case msg.isResponse():
if glog.V(logger.Detail) {
glog.Info("<-readResp: response ", msg)
}
c.handleResponse(msg)
default:
if glog.V(logger.Debug) {
glog.Error("<-readResp: dropping weird message", msg)
}
// TODO: maybe close
}
}
case err := <-c.readErr:
glog.V(logger.Debug).Infof("<-readErr: %v", err)
c.closeRequestOps(err)
conn.Close()
reading = false
case newconn := <-c.reconnected:
glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr())
if reading {
// Wait for the previous read loop to exit. This is a rare case.
conn.Close()
<-c.readErr
}
go c.read(newconn)
reading = true
conn = newconn
// Send path.
case op := <-requestOpLock:
// Stop listening for further send ops until the current one is done.
requestOpLock = nil
lastOp = op
for _, id := range op.ids {
c.respWait[string(id)] = op
}
case err := <-c.sendDone:
if err != nil {
// Remove response handlers for the last send. We remove those here
// because the error is already handled in Call or BatchCall. When the
// read loop goes down, it will signal all other current operations.
for _, id := range lastOp.ids {
delete(c.respWait, string(id))
}
}
// Listen for send ops again.
requestOpLock = c.requestOp
lastOp = nil
}
}
}
// closeRequestOps unblocks pending send ops and active subscriptions.
func (c *Client) closeRequestOps(err error) {
didClose := make(map[*requestOp]bool)
for id, op := range c.respWait {
// Remove the op so that later calls will not close op.resp again.
delete(c.respWait, id)
if !didClose[op] {
op.err = err
close(op.resp)
didClose[op] = true
}
}
for id, sub := range c.subs {
delete(c.subs, id)
sub.quitWithError(err, false)
}
}
func (c *Client) handleNotification(msg *jsonrpcMessage) {
if msg.Method != notificationMethod {
glog.V(logger.Debug).Info("dropping non-subscription message: ", msg)
return
}
var subResult struct {
ID string `json:"subscription"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(msg.Params, &subResult); err != nil {
glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg)
return
}
if c.subs[subResult.ID] != nil {
c.subs[subResult.ID].deliver(subResult.Result)
}
}
func (c *Client) handleResponse(msg *jsonrpcMessage) {
op := c.respWait[string(msg.ID)]
if op == nil {
glog.V(logger.Debug).Infof("unsolicited response %v", msg)
return
}
delete(c.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
go op.sub.start()
c.subs[op.sub.subid] = op.sub
}
}
// Reading happens on a dedicated goroutine.
func (c *Client) read(conn net.Conn) error {
var (
buf json.RawMessage
dec = json.NewDecoder(conn)
)
readMessage := func() (rs []*jsonrpcMessage, err error) {
buf = buf[:0]
if err = dec.Decode(&buf); err != nil {
return nil, err
}
if isBatch(buf) {
err = json.Unmarshal(buf, &rs)
} else {
rs = make([]*jsonrpcMessage, 1)
err = json.Unmarshal(buf, &rs[0])
}
return rs, err
}
for {
resp, err := readMessage()
if err != nil {
c.readErr <- err
return err
}
c.readResp <- resp
}
}
// Subscriptions.
// A ClientSubscription represents a subscription established through EthSubscribe.
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
subid string
in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits
errOnce sync.Once // ensures err is closed once
err chan error
}
func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
// in is buffered so dispatch can continue even if the subscriber is slow.
in: make(chan json.RawMessage, clientSubscriptionBuffer),
}
return sub
}
// Err returns the subscription error channel. The intended use of Err is to schedule
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due
// to an error. The received error is ErrClientQuit if Close has been called
// on the underlying client and no other error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
func (sub *ClientSubscription) Err() <-chan error {
return sub.err
}
// Unsubscribe unsubscribes the notification and closes the error channel.
// It can safely be called more than once.
func (sub *ClientSubscription) Unsubscribe() {
sub.quitWithError(nil, true)
sub.errOnce.Do(func() { close(sub.err) })
}
func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) {
sub.quitOnce.Do(func() {
if unsubscribeServer {
sub.requestUnsubscribe()
}
if err != nil {
sub.err <- err
}
close(sub.quit)
})
}
func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) {
select {
case sub.in <- result:
return true
case <-sub.quit:
return false
}
}
func (sub *ClientSubscription) start() {
sub.quitWithError(sub.forward())
}
func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) {
cases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)},
{Dir: reflect.SelectSend, Chan: sub.channel},
}
for {
select {
case result := <-sub.in:
val, err := sub.unmarshal(result)
if err != nil {
return err, true
}
cases[1].Send = val
switch chosen, _, _ := reflect.Select(cases); chosen {
case 0: // <-sub.quit
return nil, false
case 1: // sub.channel<-
continue
}
case <-sub.quit:
return nil, false
}
}
}
func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) {
val := reflect.New(sub.etype)
err := json.Unmarshal(result, val.Interface())
return val.Elem(), err
}
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(&result, unsubscribeMethod, sub.subid)
}

View File

@ -0,0 +1,60 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build !go1.5
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In older versions of Go (below 1.5), dials cannot be canceled
// via a channel or context. The context deadline can still applied.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// Set Timeout on the client if the context has a deadline.
// Note that there is no default timeout (unlike in contextDialer) because
// the timeout applies to the entire request, including reads from body.
if deadline, ok := ctx.Deadline(); ok {
c2 := *c
c2.Timeout = deadline.Sub(time.Now())
c = &c2
}
req2 := *req
return c, &req2
}

View File

@ -0,0 +1,61 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.5,!go1.6
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can
// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// Set Timeout on the client if the context has a deadline.
// Note that there is no default timeout (unlike in contextDialer) because
// the timeout applies to the entire request, including reads from body.
if deadline, ok := ctx.Deadline(); ok {
c2 := *c
c2.Timeout = deadline.Sub(time.Now())
c = &c2
}
req2 := *req
req2.Cancel = ctx.Done()
return c, &req2
}

View File

@ -0,0 +1,55 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.6,!go1.7
package rpc
import (
"net"
"net/http"
"time"
"golang.org/x/net/context"
)
// In Go 1.6, net.Dialer gained the ability to cancel via a channel.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
return contextDialer(ctx).Dial(network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
// We set Timeout on the client for Go <= 1.5. There
// is no need to do that here because the dial will be canceled
// by package http.
req2 := *req
req2.Cancel = ctx.Done()
return c, &req2
}

View File

@ -0,0 +1,51 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// +build go1.7
package rpc
import (
"context"
"net"
"net/http"
"time"
)
// In Go 1.7, context moved into the standard library and support
// for cancelation via context was added to net.Dialer and http.Request.
// contextDialer returns a dialer that applies the deadline value from the given context.
func contextDialer(ctx context.Context) *net.Dialer {
dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
} else {
dialer.Deadline = time.Now().Add(defaultDialTimeout)
}
return dialer
}
// dialContext connects to the given address, aborting the dial if ctx is canceled.
func dialContext(ctx context.Context, network, addr string) (net.Conn, error) {
d := &net.Dialer{KeepAlive: tcpKeepAliveInterval}
return d.DialContext(ctx, network, addr)
}
// requestWithContext copies req, adding the cancelation channel and deadline from ctx.
func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) {
return c, req.WithContext(ctx)
}

View File

@ -0,0 +1,83 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc_test
import (
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/rpc"
)
// In this example, our client whishes to track the latest 'block number'
// known to the server. The server supports two methods:
//
// eth_getBlockByNumber("latest", {})
// returns the latest block object.
//
// eth_subscribe("newBlocks")
// creates a subscription which fires block objects when new blocks arrive.
type Block struct {
Number *big.Int
}
func ExampleClientSubscription() {
// Connect the client.
client, _ := rpc.Dial("ws://127.0.0.1:8485")
subch := make(chan Block)
go subscribeBlocks(client, subch)
// Print events from the subscription as they arrive.
for block := range subch {
fmt.Println("latest block:", block.Number)
}
}
// subscribeBlocks runs in its own goroutine and maintains
// a subscription for new blocks.
func subscribeBlocks(client *rpc.Client, subch chan Block) {
for i := 0; ; i++ {
if i > 0 {
time.Sleep(2 * time.Second)
}
// Subscribe to new blocks.
sub, err := client.EthSubscribe(subch, "newBlocks")
if err == rpc.ErrClientQuit {
return // Stop reconnecting if the client was closed.
} else if err != nil {
fmt.Println("subscribe error:", err)
continue
}
// The connection is established now.
// Update the channel with the current block.
var lastBlock Block
if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil {
fmt.Println("can't get latest block:", err)
continue
}
subch <- lastBlock
// The subscription will deliver events to the channel. Wait for the
// subscription to end for any reason, then loop around to re-establish
// the connection.
fmt.Println("connection lost: ", <-sub.Err())
}
}

489
rpc/client_test.go Normal file
View File

@ -0,0 +1,489 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
)
func TestClientRequest(t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()
client := DialInProc(server)
defer client.Close()
var resp Result
if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
t.Errorf("incorrect result %#v", resp)
}
}
func TestClientBatchRequest(t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()
client := DialInProc(server)
defer client.Close()
batch := []BatchElem{
{
Method: "service_echo",
Args: []interface{}{"hello", 10, &Args{"world"}},
Result: new(Result),
},
{
Method: "service_echo",
Args: []interface{}{"hello2", 11, &Args{"world"}},
Result: new(Result),
},
{
Method: "no_such_method",
Args: []interface{}{1, 2, 3},
Result: new(int),
},
}
if err := client.BatchCall(batch); err != nil {
t.Fatal(err)
}
wantResult := []BatchElem{
{
Method: "service_echo",
Args: []interface{}{"hello", 10, &Args{"world"}},
Result: &Result{"hello", 10, &Args{"world"}},
},
{
Method: "service_echo",
Args: []interface{}{"hello2", 11, &Args{"world"}},
Result: &Result{"hello2", 11, &Args{"world"}},
},
{
Method: "no_such_method",
Args: []interface{}{1, 2, 3},
Result: new(int),
Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
},
}
if !reflect.DeepEqual(batch, wantResult) {
t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
}
}
// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
// This test checks that requests made through CallContext can be canceled by canceling
// the context.
func testClientCancel(transport string, t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()
// What we want to achieve is that the context gets canceled
// at various stages of request processing. The interesting cases
// are:
// - cancel during dial
// - cancel while performing a HTTP request
// - cancel while waiting for a response
//
// To trigger those, the times are chosen such that connections
// are killed within the deadline for every other call (maxKillTimeout
// is 2x maxCancelTimeout).
//
// Once a connection is dead, there is a fair chance it won't connect
// successfully because the accept is delayed by 1s.
maxContextCancelTimeout := 300 * time.Millisecond
fl := &flakeyListener{
maxAcceptDelay: 1 * time.Second,
maxKillTimeout: 600 * time.Millisecond,
}
var client *Client
switch transport {
case "ws", "http":
c, hs := httpTestClient(server, transport, fl)
defer hs.Close()
client = c
case "ipc":
c, l := ipcTestClient(server, fl)
defer l.Close()
client = c
default:
panic("unknown transport: " + transport)
}
// These tests take a lot of time, run them all at once.
// You probably want to run with -parallel 1 or comment out
// the call to t.Parallel if you enable the logging.
t.Parallel()
// glog.SetV(6)
// glog.SetToStderr(true)
// defer glog.SetToStderr(false)
// glog.Infoln("testing ", transport)
// The actual test starts here.
var (
wg sync.WaitGroup
nreqs = 10
ncallers = 6
)
caller := func(index int) {
defer wg.Done()
for i := 0; i < nreqs; i++ {
var (
ctx context.Context
cancel func()
timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
)
if index < ncallers/2 {
// For half of the callers, create a context without deadline
// and cancel it later.
ctx, cancel = context.WithCancel(context.Background())
time.AfterFunc(timeout, cancel)
} else {
// For the other half, create a context with a deadline instead. This is
// different because the context deadline is used to set the socket write
// deadline.
ctx, cancel = context.WithTimeout(context.Background(), timeout)
}
// Now perform a call with the context.
// The key thing here is that no call will ever complete successfully.
err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
if err != nil {
glog.V(logger.Debug).Infoln("got expected error:", err)
} else {
t.Errorf("no error for call with %v wait time", timeout)
}
cancel()
}
}
wg.Add(ncallers)
for i := 0; i < ncallers; i++ {
go caller(i)
}
wg.Wait()
}
func TestClientSubscribeInvalidArg(t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()
client := DialInProc(server)
defer client.Close()
check := func(shouldPanic bool, arg interface{}) {
defer func() {
err := recover()
if shouldPanic && err == nil {
t.Errorf("EthSubscribe should've panicked for %#v", arg)
}
if !shouldPanic && err != nil {
t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
buf := make([]byte, 1024*1024)
buf = buf[:runtime.Stack(buf, false)]
t.Error(err)
t.Error(string(buf))
}
}()
client.EthSubscribe(arg, "foo_bar")
}
check(true, nil)
check(true, 1)
check(true, (chan int)(nil))
check(true, make(<-chan int))
check(false, make(chan int))
check(false, make(chan<- int))
}
func TestClientSubscribe(t *testing.T) {
server := newTestServer("eth", new(NotificationTestService))
defer server.Stop()
client := DialInProc(server)
defer client.Close()
nc := make(chan int)
count := 10
sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
for i := 0; i < count; i++ {
if val := <-nc; val != i {
t.Fatalf("value mismatch: got %d, want %d", val, i)
}
}
sub.Unsubscribe()
select {
case v := <-nc:
t.Fatal("received value after unsubscribe:", v)
case err := <-sub.Err():
if err != nil {
t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
}
case <-time.After(1 * time.Second):
t.Fatalf("subscription not closed within 1s after unsubscribe")
}
}
// In this test, the connection drops while EthSubscribe is
// waiting for a response.
func TestClientSubscribeClose(t *testing.T) {
service := &NotificationTestService{
gotHangSubscriptionReq: make(chan struct{}),
unblockHangSubscription: make(chan struct{}),
}
server := newTestServer("eth", service)
defer server.Stop()
client := DialInProc(server)
defer client.Close()
var (
nc = make(chan int)
errc = make(chan error)
sub *ClientSubscription
err error
)
go func() {
sub, err = client.EthSubscribe(nc, "hangSubscription", 999)
errc <- err
}()
<-service.gotHangSubscriptionReq
client.Close()
service.unblockHangSubscription <- struct{}{}
select {
case err := <-errc:
if err == nil {
t.Errorf("EthSubscribe returned nil error after Close")
}
if sub != nil {
t.Error("EthSubscribe returned non-nil subscription after Close")
}
case <-time.After(1 * time.Second):
t.Fatalf("EthSubscribe did not return within 1s after Close")
}
}
func TestClientHTTP(t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()
client, hs := httpTestClient(server, "http", nil)
defer hs.Close()
defer client.Close()
// Launch concurrent requests.
var (
results = make([]Result, 100)
errc = make(chan error)
wantResult = Result{"a", 1, new(Args)}
)
defer client.Close()
for i := range results {
i := i
go func() {
errc <- client.Call(&results[i], "service_echo",
wantResult.String, wantResult.Int, wantResult.Args)
}()
}
// Wait for all of them to complete.
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
for i := range results {
select {
case err := <-errc:
if err != nil {
t.Fatal(err)
}
case <-timeout.C:
t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
}
}
// Check results.
for i := range results {
if !reflect.DeepEqual(results[i], wantResult) {
t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
}
}
}
func TestClientReconnect(t *testing.T) {
startServer := func(addr string) (*Server, net.Listener) {
srv := newTestServer("service", new(Service))
l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
}
go http.Serve(l, srv.WebsocketHandler("*"))
return srv, l
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Start a server and corresponding client.
s1, l1 := startServer("127.0.0.1:0")
client, err := DialContext(ctx, "ws://"+l1.Addr().String())
if err != nil {
t.Fatal("can't dial", err)
}
// Perform a call. This should work because the server is up.
var resp Result
if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
t.Fatal(err)
}
// Shut down the server and try calling again. It shouldn't work.
l1.Close()
s1.Stop()
if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
t.Error("successful call while the server is down")
t.Logf("resp: %#v", resp)
}
// Allow for some cool down time so we can listen on the same address again.
time.Sleep(2 * time.Second)
// Start it up again and call again. The connection should be reestablished.
// We spawn multiple calls here to check whether this hangs somehow.
s2, l2 := startServer(l1.Addr().String())
defer l2.Close()
defer s2.Stop()
start := make(chan struct{})
errors := make(chan error, 20)
for i := 0; i < cap(errors); i++ {
go func() {
<-start
var resp Result
errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
}()
}
close(start)
errcount := 0
for i := 0; i < cap(errors); i++ {
if err = <-errors; err != nil {
errcount++
}
}
t.Log("err:", err)
if errcount > 1 {
t.Errorf("expected one error after disconnect, got %d", errcount)
}
}
func newTestServer(serviceName string, service interface{}) *Server {
server := NewServer()
if err := server.RegisterName(serviceName, service); err != nil {
panic(err)
}
return server
}
func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
// Create the HTTP server.
var hs *httptest.Server
switch transport {
case "ws":
hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*"))
case "http":
hs = httptest.NewUnstartedServer(srv)
default:
panic("unknown HTTP transport: " + transport)
}
// Wrap the listener if required.
if fl != nil {
fl.Listener = hs.Listener
hs.Listener = fl
}
// Connect the client.
hs.Start()
client, err := Dial(transport + "://" + hs.Listener.Addr().String())
if err != nil {
panic(err)
}
return client, hs
}
func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
// Listen on a random endpoint.
endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
if runtime.GOOS == "windows" {
endpoint = `\\.\pipe\` + endpoint
} else {
endpoint = os.TempDir() + "/" + endpoint
}
l, err := ipcListen(endpoint)
if err != nil {
panic(err)
}
// Connect the listener to the server.
if fl != nil {
fl.Listener = l
l = fl
}
go srv.ServeListener(l)
// Connect the client.
client, err := Dial(endpoint)
if err != nil {
panic(err)
}
return client, l
}
// flakeyListener kills accepted connections after a random timeout.
type flakeyListener struct {
net.Listener
maxKillTimeout time.Duration
maxAcceptDelay time.Duration
}
func (l *flakeyListener) Accept() (net.Conn, error) {
delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
time.Sleep(delay)
c, err := l.Listener.Accept()
if err == nil {
timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
time.AfterFunc(timeout, func() {
glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout)
c.Close()
})
}
return c, err
}

View File

@ -24,74 +24,43 @@ type methodNotFoundError struct {
method string method string
} }
func (e *methodNotFoundError) Code() int { func (e *methodNotFoundError) ErrorCode() int { return -32601 }
return -32601
}
func (e *methodNotFoundError) Error() string { func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method) return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
} }
// received message isn't a valid request // received message isn't a valid request
type invalidRequestError struct { type invalidRequestError struct{ message string }
message string
}
func (e *invalidRequestError) Code() int { func (e *invalidRequestError) ErrorCode() int { return -32600 }
return -32600
}
func (e *invalidRequestError) Error() string { func (e *invalidRequestError) Error() string { return e.message }
return e.message
}
// received message is invalid // received message is invalid
type invalidMessageError struct { type invalidMessageError struct{ message string }
message string
}
func (e *invalidMessageError) Code() int { func (e *invalidMessageError) ErrorCode() int { return -32700 }
return -32700
}
func (e *invalidMessageError) Error() string { func (e *invalidMessageError) Error() string { return e.message }
return e.message
}
// unable to decode supplied params, or an invalid number of parameters // unable to decode supplied params, or an invalid number of parameters
type invalidParamsError struct { type invalidParamsError struct{ message string }
message string
}
func (e *invalidParamsError) Code() int { func (e *invalidParamsError) ErrorCode() int { return -32602 }
return -32602
}
func (e *invalidParamsError) Error() string { func (e *invalidParamsError) Error() string { return e.message }
return e.message
}
// logic error, callback returned an error // logic error, callback returned an error
type callbackError struct { type callbackError struct{ message string }
message string
}
func (e *callbackError) Code() int { func (e *callbackError) ErrorCode() int { return -32000 }
return -32000
}
func (e *callbackError) Error() string { func (e *callbackError) Error() string { return e.message }
return e.message
}
// issued when a request is received after the server is issued to stop. // issued when a request is received after the server is issued to stop.
type shutdownError struct { type shutdownError struct{}
}
func (e *shutdownError) Code() int { func (e *shutdownError) ErrorCode() int { return -32000 }
return -32000
}
func (e *shutdownError) Error() string { func (e *shutdownError) Error() string { return "server is shutting down" }
return "server is shutting down"
}

View File

@ -22,71 +22,108 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/url"
"strings" "strings"
"sync"
"time"
"github.com/rs/cors" "github.com/rs/cors"
"golang.org/x/net/context"
) )
const ( const (
maxHTTPRequestContentLength = 1024 * 128 maxHTTPRequestContentLength = 1024 * 128
) )
// httpClient connects to a geth RPC server over HTTP. var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0")
type httpClient struct {
endpoint *url.URL // HTTP-RPC server endpoint type httpConn struct {
httpClient http.Client // reuse connection client *http.Client
lastRes []byte // HTTP requests are synchronous, store last response req *http.Request
closeOnce sync.Once
closed chan struct{}
} }
// NewHTTPClient create a new RPC clients that connection to a geth RPC server // httpConn is treated specially by Client.
// over HTTP. func (hc *httpConn) LocalAddr() net.Addr { return nullAddr }
func NewHTTPClient(endpoint string) (Client, error) { func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr }
url, err := url.Parse(endpoint) func (hc *httpConn) SetReadDeadline(time.Time) error { return nil }
func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil }
func (hc *httpConn) SetDeadline(time.Time) error { return nil }
func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") }
func (hc *httpConn) Read(b []byte) (int, error) {
<-hc.closed
return 0, io.EOF
}
func (hc *httpConn) Close() error {
hc.closeOnce.Do(func() { close(hc.closed) })
return nil
}
// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP.
func DialHTTP(endpoint string) (*Client, error) {
req, err := http.NewRequest("POST", endpoint, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &httpClient{endpoint: url}, nil req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
initctx := context.Background()
return newClient(initctx, func(context.Context) (net.Conn, error) {
return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil
})
} }
// Send will serialize the given msg to JSON and sends it to the RPC server. func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
// Since HTTP is synchronous the response is stored until Recv is called. hc := c.writeConn.(*httpConn)
func (client *httpClient) Send(msg interface{}) error { respBody, err := hc.doRequest(ctx, msg)
var body []byte
var err error
client.lastRes = nil
if body, err = json.Marshal(msg); err != nil {
return err
}
resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
if err != nil { if err != nil {
return err return err
} }
defer respBody.Close()
defer resp.Body.Close() var respmsg jsonrpcMessage
if resp.StatusCode == http.StatusOK { if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
client.lastRes, err = ioutil.ReadAll(resp.Body)
return err return err
} }
op.resp <- &respmsg
return fmt.Errorf("request failed: %s", resp.Status) return nil
} }
// Recv will try to deserialize the last received response into the given msg. func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
func (client *httpClient) Recv(msg interface{}) error { hc := c.writeConn.(*httpConn)
return json.Unmarshal(client.lastRes, &msg) respBody, err := hc.doRequest(ctx, msgs)
if err != nil {
return err
}
defer respBody.Close()
var respmsgs []jsonrpcMessage
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
for _, respmsg := range respmsgs {
op.resp <- &respmsg
}
return nil
} }
// Close is not necessary for httpClient func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
func (client *httpClient) Close() { body, err := json.Marshal(msg)
if err != nil {
return nil, err
} }
client, req := requestWithContext(hc.client, hc.req, ctx)
req.Body = ioutil.NopCloser(bytes.NewReader(body))
req.ContentLength = int64(len(body))
// SupportedModules will return the collection of offered RPC modules. resp, err := client.Do(req)
func (client *httpClient) SupportedModules() (map[string]string, error) { if err != nil {
return SupportedModules(client) return nil, err
}
return resp.Body, nil
} }
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method. // httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
@ -100,17 +137,21 @@ func (t *httpReadWriteNopCloser) Close() error {
return nil return nil
} }
// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests, // NewHTTPServer creates a new HTTP RPC server around an API provider.
// send the request to the given API provider and sends the response back to the caller. //
func newJSONHTTPHandler(srv *Server) http.HandlerFunc { // Deprecated: Server implements http.Handler
return func(w http.ResponseWriter, r *http.Request) { func NewHTTPServer(corsString string, srv *Server) *http.Server {
return &http.Server{Handler: newCorsHandler(srv, corsString)}
}
// ServeHTTP serves JSON-RPC requests over HTTP.
func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.ContentLength > maxHTTPRequestContentLength { if r.ContentLength > maxHTTPRequestContentLength {
http.Error(w, http.Error(w,
fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
http.StatusRequestEntityTooLarge) http.StatusRequestEntityTooLarge)
return return
} }
w.Header().Set("content-type", "application/json") w.Header().Set("content-type", "application/json")
// create a codec that reads direct from the request body until // create a codec that reads direct from the request body until
@ -120,23 +161,15 @@ func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
defer codec.Close() defer codec.Close()
srv.ServeSingleRequest(codec, OptionMethodInvocation) srv.ServeSingleRequest(codec, OptionMethodInvocation)
} }
}
// NewHTTPServer creates a new HTTP RPC server around an API provider. func newCorsHandler(srv *Server, corsString string) http.Handler {
func NewHTTPServer(corsString string, srv *Server) *http.Server {
var allowedOrigins []string var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") { for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
} }
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins, AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"}, AllowedMethods: []string{"POST", "GET"},
}) })
return c.Handler(srv)
handler := c.Handler(newJSONHTTPHandler(srv))
return &http.Server{
Handler: handler,
}
} }

View File

@ -17,45 +17,18 @@
package rpc package rpc
import ( import (
"encoding/json"
"io"
"net" "net"
"golang.org/x/net/context"
) )
// inProcClient is an in-process buffer stream attached to an RPC server. // NewInProcClient attaches an in-process connection to the given RPC server.
type inProcClient struct { func DialInProc(handler *Server) *Client {
server *Server initctx := context.Background()
cl io.Closer c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
enc *json.Encoder
dec *json.Decoder
}
// Close tears down the request channel of the in-proc client.
func (c *inProcClient) Close() {
c.cl.Close()
}
// NewInProcRPCClient creates an in-process buffer stream attachment to a given
// RPC server.
func NewInProcRPCClient(handler *Server) Client {
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} return p2, nil
} })
return c
// Send marshals a message into a json format and injects in into the client
// request channel.
func (c *inProcClient) Send(msg interface{}) error {
return c.enc.Encode(msg)
}
// Recv reads a message from the response channel and tries to parse it into the
// given msg interface.
func (c *inProcClient) Recv(msg interface{}) error {
return c.dec.Decode(msg)
}
// Returns the collection of modules the RPC server offers.
func (c *inProcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(c)
} }

View File

@ -17,68 +17,39 @@
package rpc package rpc
import ( import (
"encoding/json"
"net" "net"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
) )
// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe // CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on
// Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) { func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint) return ipcListen(endpoint)
} }
// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using // ServeListener accepts connections on l, serving JSON-RPC on them.
// JSON serialization. func (srv *Server) ServeListener(l net.Listener) error {
type ipcClient struct { for {
endpoint string conn, err := l.Accept()
conn net.Conn
out *json.Encoder
in *json.Decoder
}
// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
// named pipe.
func NewIPCClient(endpoint string) (Client, error) {
conn, err := newIPCConnection(endpoint)
if err != nil {
return nil, err
}
return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
}
// Send will serialize the given message and send it to the server.
// When sending the message fails it will try to reconnect once and send the message again.
func (client *ipcClient) Send(msg interface{}) error {
if err := client.out.Encode(msg); err == nil {
return nil
}
// retry once
client.conn.Close()
conn, err := newIPCConnection(client.endpoint)
if err != nil { if err != nil {
return err return err
} }
glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr())
client.conn = conn go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
client.in = json.NewDecoder(conn) }
client.out = json.NewEncoder(conn)
return client.out.Encode(msg)
} }
// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded. // DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes
func (client *ipcClient) Recv(msg interface{}) error { // the endpoint is the full path to a unix socket, and Windows the endpoint is an
return client.in.Decode(&msg) // identifier for a named pipe.
} //
// The context is used for the initial connection establishment. It does not
// Close will close the underlying IPC connection // affect subsequent interactions with the client.
func (client *ipcClient) Close() { func DialIPC(ctx context.Context, endpoint string) (*Client, error) {
client.conn.Close() return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
} return newIPCConnection(ctx, endpoint)
})
// SupportedModules will return the collection of offered RPC modules.
func (client *ipcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
} }

View File

@ -22,6 +22,8 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"golang.org/x/net/context"
) )
// ipcListen will create a Unix socket on the given endpoint. // ipcListen will create a Unix socket on the given endpoint.
@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) {
} }
// newIPCConnection will connect to a Unix socket on the given endpoint. // newIPCConnection will connect to a Unix socket on the given endpoint.
func newIPCConnection(endpoint string) (net.Conn, error) { func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"}) return dialContext(ctx, "unix", endpoint)
} }

View File

@ -22,16 +22,27 @@ import (
"net" "net"
"time" "time"
"golang.org/x/net/context"
"gopkg.in/natefinch/npipe.v2" "gopkg.in/natefinch/npipe.v2"
) )
// This is used if the dialing context has no deadline. It is much smaller than the
// defaultDialTimeout because named pipes are local and there is no need to wait so long.
const defaultPipeDialTimeout = 2 * time.Second
// ipcListen will create a named pipe on the given endpoint. // ipcListen will create a named pipe on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) { func ipcListen(endpoint string) (net.Listener, error) {
return npipe.Listen(endpoint) return npipe.Listen(endpoint)
} }
// newIPCConnection will connect to a named pipe with the given endpoint as name. // newIPCConnection will connect to a named pipe with the given endpoint as name.
func newIPCConnection(endpoint string) (net.Conn, error) { func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
timeout := 5 * time.Second timeout := defaultPipeDialTimeout
if deadline, ok := ctx.Deadline(); ok {
timeout = deadline.Sub(time.Now())
if timeout < 0 {
timeout = 0
}
}
return npipe.DialTimeout(endpoint, timeout) return npipe.DialTimeout(endpoint, timeout)
} }

View File

@ -30,49 +30,43 @@ import (
) )
const ( const (
JSONRPCVersion = "2.0" jsonrpcVersion = "2.0"
serviceMethodSeparator = "_" serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe" subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe" unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription" notificationMethod = "eth_subscription"
) )
// JSON-RPC request type jsonRequest struct {
type JSONRequest struct {
Method string `json:"method"` Method string `json:"method"`
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Id json.RawMessage `json:"id,omitempty"` Id json.RawMessage `json:"id,omitempty"`
Payload json.RawMessage `json:"params,omitempty"` Payload json.RawMessage `json:"params,omitempty"`
} }
// JSON-RPC response type jsonSuccessResponse struct {
type JSONSuccessResponse struct {
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"` Id interface{} `json:"id,omitempty"`
Result interface{} `json:"result"` Result interface{} `json:"result"`
} }
// JSON-RPC error object type jsonError struct {
type JSONError struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
Data interface{} `json:"data,omitempty"` Data interface{} `json:"data,omitempty"`
} }
// JSON-RPC error response type jsonErrResponse struct {
type JSONErrResponse struct {
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Id interface{} `json:"id,omitempty"` Id interface{} `json:"id,omitempty"`
Error JSONError `json:"error"` Error jsonError `json:"error"`
} }
// JSON-RPC notification payload
type jsonSubscription struct { type jsonSubscription struct {
Subscription string `json:"subscription"` Subscription string `json:"subscription"`
Result interface{} `json:"result,omitempty"` Result interface{} `json:"result,omitempty"`
} }
// JSON-RPC notification
type jsonNotification struct { type jsonNotification struct {
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Method string `json:"method"` Method string `json:"method"`
@ -91,6 +85,17 @@ type jsonCodec struct {
rw io.ReadWriteCloser // connection rw io.ReadWriteCloser // connection
} }
func (err *jsonError) Error() string {
if err.Message == "" {
return fmt.Sprintf("json-rpc error %d", err.Code)
}
return err.Message
}
func (err *jsonError) ErrorCode() int {
return err.Code
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc) d := json.NewDecoder(rwc)
@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool {
// ReadRequestHeaders will read new requests without parsing the arguments. It will // ReadRequestHeaders will read new requests without parsing the arguments. It will
// return a collection of requests, an indication if these requests are in batch // return a collection of requests, an indication if these requests are in batch
// form or an error when the incoming message could not be read/parsed. // form or an error when the incoming message could not be read/parsed.
func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) { func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
c.decMu.Lock() c.decMu.Lock()
defer c.decMu.Unlock() defer c.decMu.Unlock()
@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error {
// parseRequest will parse a single request from the given RawMessage. It will return // parseRequest will parse a single request from the given RawMessage. It will return
// the parsed request, an indication if the request was a batch or an error when // the parsed request, an indication if the request was a batch or an error when
// the request could not be parsed. // the request could not be parsed.
func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
var in JSONRequest var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil { if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()} return nil, false, &invalidMessageError{err.Error()}
} }
@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication // parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
// if the request was a batch or an error when the request could not be read. // if the request was a batch or an error when the request could not be read.
func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
var in []JSONRequest var in []jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil { if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()} return nil, false, &invalidMessageError{err.Error()}
} }
@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro
// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed // ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
// values or an error when the parsing failed. // values or an error when the parsing failed.
func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) { func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) {
if args, ok := params.(json.RawMessage); !ok { if args, ok := params.(json.RawMessage); !ok {
return nil, &invalidParamsError{"Invalid params supplied"} return nil, &invalidParamsError{"Invalid params supplied"}
} else { } else {
@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf
// parsePositionalArguments tries to parse the given args to an array of values with the given types. // parsePositionalArguments tries to parse the given args to an array of values with the given types.
// It returns the parsed values or an error when the args could not be parsed. Missing optional arguments // It returns the parsed values or an error when the args could not be parsed. Missing optional arguments
// are returned as reflect.Zero values. // are returned as reflect.Zero values.
func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) { func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) {
params := make([]interface{}, 0, len(callbackArgs)) params := make([]interface{}, 0, len(callbackArgs))
for _, t := range callbackArgs { for _, t := range callbackArgs {
params = append(params, reflect.New(t).Interface()) params = append(params, reflect.New(t).Interface())
@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type)
// CreateResponse will create a JSON-RPC success response with the given id and reply as result. // CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} { func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} {
if isHexNum(reflect.TypeOf(reply)) { if isHexNum(reflect.TypeOf(reply)) {
return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
} }
return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply} return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply}
} }
// CreateErrorResponse will create a JSON-RPC error response with the given id and error. // CreateErrorResponse will create a JSON-RPC error response with the given id and error.
func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} { func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} {
return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}} return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}}
} }
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error. // CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored. // info is optional and contains additional information about the error. When an empty string is passed it is ignored.
func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} { func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} {
return &JSONErrResponse{Version: JSONRPCVersion, Id: id, return &jsonErrResponse{Version: jsonrpcVersion, Id: id,
Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}} Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}}
} }
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) { if isHexNum(reflect.TypeOf(event)) {
return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
} }
return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}} Params: jsonSubscription{Subscription: subid, Result: event}}
} }

View File

@ -28,7 +28,7 @@ import (
var ( var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported") ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport")
// ErrNotificationNotFound is returned when the notification for the given id is not found // ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found") ErrNotificationNotFound = errors.New("notification not found")

View File

@ -19,20 +19,31 @@ package rpc
import ( import (
"encoding/json" "encoding/json"
"net" "net"
"sync"
"testing" "testing"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type NotificationTestService struct{} type NotificationTestService struct {
mu sync.Mutex
unsubscribed bool
var ( gotHangSubscriptionReq chan struct{}
unsubCallbackCalled = false unblockHangSubscription chan struct{}
) }
func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.unsubscribed
}
func (s *NotificationTestService) Unsubscribe(subid string) { func (s *NotificationTestService) Unsubscribe(subid string) {
unsubCallbackCalled = true s.mu.Lock()
s.unsubscribed = true
s.mu.Unlock()
} }
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
return subscription, nil return subscription, nil
} }
// HangSubscription blocks on s.unblockHangSubscription before
// sending anything.
func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) {
notifier, supported := NotifierFromContext(ctx)
if !supported {
return nil, ErrNotificationsUnsupported
}
s.gotHangSubscriptionReq <- struct{}{}
<-s.unblockHangSubscription
subscription, err := notifier.NewSubscription(s.Unsubscribe)
if err != nil {
return nil, err
}
go func() {
subscription.Notify(val)
}()
return subscription, nil
}
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
server := NewServer() server := NewServer()
service := &NotificationTestService{} service := &NotificationTestService{}
@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) {
} }
var subid string var subid string
response := JSONSuccessResponse{Result: subid} response := jsonSuccessResponse{Result: subid}
if err := in.Decode(&response); err != nil { if err := in.Decode(&response); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) {
clientConn.Close() // causes notification unsubscribe callback to be called clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if !unsubCallbackCalled { if !service.wasUnsubCallbackCalled() {
t.Error("unsubscribe callback not called after closing connection") t.Error("unsubscribe callback not called after closing connection")
} }
} }

View File

@ -381,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
// readRequest requests the next (batch) request from the codec. It will return the collection // readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an // of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed. // error when the request could not be read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
reqs, batch, err := codec.ReadRequestHeaders() reqs, batch, err := codec.ReadRequestHeaders()
if err != nil { if err != nil {
return nil, batch, err return nil, batch, err

View File

@ -21,6 +21,7 @@ import (
"net" "net"
"reflect" "reflect"
"testing" "testing"
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args
return Result{str, i, args} return Result{str, i, args}
} }
func (s *Service) Sleep(ctx context.Context, duration time.Duration) {
select {
case <-time.After(duration):
case <-ctx.Done():
}
}
func (s *Service) Rets() (string, error) { func (s *Service) Rets() (string, error) {
return "", nil return "", nil
} }
@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) {
t.Fatalf("Expected service calc to be registered") t.Fatalf("Expected service calc to be registered")
} }
if len(svc.callbacks) != 4 { if len(svc.callbacks) != 5 {
t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks)) t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks))
} }
if len(svc.subscriptions) != 1 { if len(svc.subscriptions) != 1 {
@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) {
t.Fatal(err) t.Fatal(err)
} }
response := JSONSuccessResponse{Result: &Result{}} response := jsonSuccessResponse{Result: &Result{}}
if err := in.Decode(&response); err != nil { if err := in.Decode(&response); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -62,7 +62,7 @@ type serverRequest struct {
callb *callback callb *callback
args []reflect.Value args []reflect.Value
isUnsubscribe bool isUnsubscribe bool
err RPCError err Error
} }
type serviceRegistry map[string]*service // collection of services type serviceRegistry map[string]*service // collection of services
@ -88,15 +88,13 @@ type rpcRequest struct {
id interface{} id interface{}
isPubSub bool isPubSub bool
params interface{} params interface{}
err RPCError // invalid batch element err Error // invalid batch element
} }
// RPCError implements RPC error, is add support for error codec over regular go errors // Error wraps RPC errors, which contain an error code in addition to the message.
type RPCError interface { type Error interface {
// RPC error code Error() string // returns the message
Code() int ErrorCode() int // returns the code
// Error message
Error() string
} }
// ServerCodec implements reading, parsing and writing RPC messages for the server side of // ServerCodec implements reading, parsing and writing RPC messages for the server side of
@ -104,15 +102,15 @@ type RPCError interface {
// multiple go-routines concurrently. // multiple go-routines concurrently.
type ServerCodec interface { type ServerCodec interface {
// Read next request // Read next request
ReadRequestHeaders() ([]rpcRequest, bool, RPCError) ReadRequestHeaders() ([]rpcRequest, bool, Error)
// Parse request argument to the given types // Parse request argument to the given types
ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError) ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error)
// Assemble success response, expects response id and payload // Assemble success response, expects response id and payload
CreateResponse(interface{}, interface{}) interface{} CreateResponse(interface{}, interface{}) interface{}
// Assemble error response, expects response id and error // Assemble error response, expects response id and error
CreateErrorResponse(interface{}, RPCError) interface{} CreateErrorResponse(interface{}, Error) interface{}
// Assemble error response with extra information about the error through info // Assemble error response with extra information about the error through info
CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
// Create notification response // Create notification response
CreateNotification(string, interface{}) interface{} CreateNotification(string, interface{}) interface{}
// Write msg to client. // Write msg to client.
@ -274,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
func (bn *BlockNumber) Int64() int64 { func (bn *BlockNumber) Int64() int64 {
return (int64)(*bn) return (int64)(*bn)
} }
// Client defines the interface for go client that wants to connect to a geth RPC endpoint
type Client interface {
// SupportedModules returns the collection of API's the server offers
SupportedModules() (map[string]string, error)
Send(req interface{}) error
Recv(msg interface{}) error
Close()
}

View File

@ -20,7 +20,6 @@ import (
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"math/big" "math/big"
"reflect" "reflect"
"unicode" "unicode"
@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) {
} }
return "0x" + hex.EncodeToString(subid[:]), nil return "0x" + hex.EncodeToString(subid[:]), nil
} }
// SupportedModules returns the collection of API's that the RPC server offers
// on which the given client connects.
func SupportedModules(client Client) (map[string]string, error) {
req := JSONRequest{
Id: []byte("1"),
Version: "2.0",
Method: MetadataApi + "_modules",
}
if err := client.Send(req); err != nil {
return nil, err
}
var response JSONSuccessResponse
if err := client.Recv(&response); err != nil {
return nil, err
}
if response.Result != nil {
mods := make(map[string]string)
if modules, ok := response.Result.(map[string]interface{}); ok {
for m, v := range modules {
mods[m] = fmt.Sprintf("%s", v)
}
return mods, nil
}
}
return nil, fmt.Errorf("unable to retrieve modules")
}

View File

@ -17,36 +17,39 @@
package rpc package rpc
import ( import (
"crypto/tls"
"fmt" "fmt"
"net"
"net/http" "net/http"
"net/url"
"os" "os"
"strings" "strings"
"sync"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
// wsReaderWriterCloser reads and write payloads from and to a websocket connection. // WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections.
type wsReaderWriterCloser struct { //
c *websocket.Conn // allowedOrigins should be a comma-separated list of allowed origin URLs.
// To allow connections with any origin, pass "*".
func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler {
return websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
Handler: func(conn *websocket.Conn) {
srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
},
}
} }
// Read will read incoming payload data into p. // NewWSServer creates a new websocket RPC server around an API provider.
func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) { //
return rw.c.Read(p) // Deprecated: use Server.WebsocketHandler
} func NewWSServer(allowedOrigins string, srv *Server) *http.Server {
return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)}
// Write writes p to the websocket.
func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
return rw.c.Write(p)
}
// Close closes the websocket connection.
func (rw *wsReaderWriterCloser) Close() error {
return rw.c.Close()
} }
// wsHandshakeValidator returns a handler that verifies the origin during the // wsHandshakeValidator returns a handler that verifies the origin during the
@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http
return f return f
} }
// NewWSServer creates a new websocket RPC server around an API provider. // DialWebsocket creates a new RPC client that communicates with a JSON-RPC server
func NewWSServer(allowedOrigins string, handler *Server) *http.Server { // that is listening on the given endpoint.
return &http.Server{ //
Handler: websocket.Server{ // The context is used for the initial connection establishment. It does not
Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")), // affect subsequent interactions with the client.
Handler: func(conn *websocket.Conn) { func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) {
handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}), if origin == "" {
OptionMethodInvocation|OptionSubscriptions) var err error
}, if origin, err = os.Hostname(); err != nil {
}, return nil, err
}
if strings.HasPrefix(endpoint, "wss") {
origin = "https://" + strings.ToLower(origin)
} else {
origin = "http://" + strings.ToLower(origin)
} }
} }
config, err := websocket.NewConfig(endpoint, origin)
// wsClient represents a RPC client that communicates over websockets with a
// RPC server.
type wsClient struct {
endpoint string
connMu sync.Mutex
conn *websocket.Conn
}
// NewWSClientj creates a new RPC client that communicates with a RPC server
// that is listening on the given endpoint using JSON encoding.
func NewWSClient(endpoint string) (Client, error) {
return &wsClient{endpoint: endpoint}, nil
}
// connection will return a websocket connection to the RPC server. It will
// (re)connect when necessary.
func (client *wsClient) connection() (*websocket.Conn, error) {
if client.conn != nil {
return client.conn, nil
}
origin, err := os.Hostname()
if err != nil { if err != nil {
return nil, err return nil, err
} }
origin = "http://" + origin return newClient(ctx, func(ctx context.Context) (net.Conn, error) {
client.conn, err = websocket.Dial(client.endpoint, "", origin) return wsDialContext(ctx, config)
})
return client.conn, err
} }
// SupportedModules is the collection of modules the RPC server offers. func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) {
func (client *wsClient) SupportedModules() (map[string]string, error) { var conn net.Conn
return SupportedModules(client) var err error
switch config.Location.Scheme {
case "ws":
conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location))
case "wss":
dialer := contextDialer(ctx)
conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig)
default:
err = websocket.ErrBadScheme
}
if err != nil {
return nil, err
}
ws, err := websocket.NewClient(config, conn)
if err != nil {
conn.Close()
return nil, err
}
return ws, err
} }
// Send writes the JSON serialized msg to the websocket. It will create a new var wsPortMap = map[string]string{"ws": "80", "wss": "443"}
// websocket connection to the server if the client is currently not connected.
func (client *wsClient) Send(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn func wsDialAddress(location *url.URL) string {
if conn, err = client.connection(); err == nil { if _, ok := wsPortMap[location.Scheme]; ok {
if err = websocket.JSON.Send(conn, msg); err != nil { if _, _, err := net.SplitHostPort(location.Host); err != nil {
client.conn.Close() return net.JoinHostPort(location.Host, wsPortMap[location.Scheme])
client.conn = nil
} }
} }
return location.Host
return err
}
// Recv reads a JSON message from the websocket and unmarshals it into msg.
func (client *wsClient) Recv(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn
if conn, err = client.connection(); err == nil {
if err = websocket.JSON.Receive(conn, msg); err != nil {
client.conn.Close()
client.conn = nil
}
}
return
}
// Close closes the underlaying websocket connection.
func (client *wsClient) Close() {
client.connMu.Lock()
defer client.connMu.Unlock()
if client.conn != nil {
client.conn.Close()
client.conn = nil
}
} }