From c145589f25445b9fb78e4556b17cde29df5b37bb Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 12 Jul 2016 17:34:59 +0200 Subject: [PATCH 1/3] rpc: remove grace period when shutting down the server The server delayed closing of connections for 3s when stopping. This was supposed to allow for slow handlers, but it didn't really work. When geth quits, it will just exit immediately after quitting the server. Removing the timer makes testing easier because all connections will be closed after Stop returns. --- rpc/server.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/rpc/server.go b/rpc/server.go index 7b7d22063..80976ed62 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,7 +21,6 @@ import ( "reflect" "runtime" "sync/atomic" - "time" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -30,8 +29,6 @@ import ( ) const ( - stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - notificationBufferSize = 10000 // max buffered notifications before codec is closed MetadataApi = "rpc" @@ -240,13 +237,11 @@ func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { glog.V(logger.Debug).Infoln("RPC Server shutdown initiatied") - time.AfterFunc(stopPendingRequestTimeout, func() { - s.codecsMu.Lock() - defer s.codecsMu.Unlock() - s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).Close() - return true - }) + s.codecsMu.Lock() + defer s.codecsMu.Unlock() + s.codecs.Each(func(c interface{}) bool { + c.(ServerCodec).Close() + return true }) } } From bb01bea4e276dad359815c682a2dee730737f4dc Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 12 Jul 2016 17:42:44 +0200 Subject: [PATCH 2/3] rpc: fix bad method error for batch requests If a batch request contained an invalid method, the server would reply with a non-batch error response. Fix this by tracking an error for each batch element. --- rpc/json.go | 16 ++++++++-------- rpc/server.go | 7 ++++++- rpc/types.go | 1 + 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/rpc/json.go b/rpc/json.go index 151ed546e..ee931bc87 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -182,12 +182,12 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { method: unsubscribeMethod, params: in.Payload}}, false, nil } - // regular RPC call elems := strings.Split(in.Method, serviceMethodSeparator) if len(elems) != 2 { return nil, false, &methodNotFoundError{in.Method, ""} } + // regular RPC call if len(in.Payload) == 0 { return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: &in.Id}}, false, nil } @@ -236,15 +236,15 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro continue } - elems := strings.Split(r.Method, serviceMethodSeparator) - if len(elems) != 2 { - return nil, true, &methodNotFoundError{r.Method, ""} - } - if len(r.Payload) == 0 { - requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: nil} + requests[i] = rpcRequest{id: id, params: nil} } else { - requests[i] = rpcRequest{service: elems[0], method: elems[1], id: id, params: r.Payload} + requests[i] = rpcRequest{id: id, params: r.Payload} + } + if elem := strings.Split(r.Method, serviceMethodSeparator); len(elem) == 2 { + requests[i].service, requests[i].method = elem[0], elem[1] + } else { + requests[i].err = &methodNotFoundError{r.Method, ""} } } diff --git a/rpc/server.go b/rpc/server.go index 80976ed62..a9bdef285 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -180,7 +180,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO for atomic.LoadInt32(&s.run) == 1 { reqs, batch, err := s.readRequest(codec) if err != nil { - glog.V(logger.Debug).Infof("%v\n", err) + glog.V(logger.Debug).Infof("read error %v\n", err) codec.Write(codec.CreateErrorResponse(nil, err)) return nil } @@ -394,6 +394,11 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro var ok bool var svc *service + if r.err != nil { + requests[i] = &serverRequest{id: r.id, err: r.err} + continue + } + if r.isPubSub && r.method == unsubscribeMethod { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg diff --git a/rpc/types.go b/rpc/types.go index a1f36fbd2..460581715 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -88,6 +88,7 @@ type rpcRequest struct { id interface{} isPubSub bool params interface{} + err RPCError // invalid batch element } // RPCError implements RPC error, is add support for error codec over regular go errors From 91b769042857f542b2792b23ec407e1c9bd4fe8d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 12 Jul 2016 17:47:15 +0200 Subject: [PATCH 3/3] rpc: add new client, use it everywhere The new client implementation supports concurrent requests, subscriptions and replaces the various ad hoc RPC clients throughout go-ethereum. --- accounts/abi/bind/backends/remote.go | 160 +----- cmd/geth/consolecmd.go | 19 +- cmd/geth/monitorcmd.go | 39 +- cmd/utils/client.go | 55 -- console/bridge.go | 176 +++---- console/console.go | 4 +- internal/jsre/pretty.go | 2 +- node/node.go | 6 +- node/node_test.go | 34 +- rpc/client.go | 740 +++++++++++++++++++++++++++ rpc/client_context_go1.4.go | 60 +++ rpc/client_context_go1.5.go | 61 +++ rpc/client_context_go1.6.go | 55 ++ rpc/client_context_go1.7.go | 51 ++ rpc/client_example_test.go | 83 +++ rpc/client_test.go | 489 ++++++++++++++++++ rpc/errors.go | 63 +-- rpc/http.go | 167 +++--- rpc/inproc.go | 49 +- rpc/ipc.go | 79 +-- rpc/ipc_unix.go | 6 +- rpc/ipc_windows.go | 15 +- rpc/json.go | 61 ++- rpc/notification.go | 2 +- rpc/notification_test.go | 45 +- rpc/server.go | 2 +- rpc/server_test.go | 14 +- rpc/types.go | 33 +- rpc/utils.go | 29 -- rpc/websocket.go | 164 +++--- 30 files changed, 2007 insertions(+), 756 deletions(-) delete mode 100644 cmd/utils/client.go create mode 100644 rpc/client.go create mode 100644 rpc/client_context_go1.4.go create mode 100644 rpc/client_context_go1.5.go create mode 100644 rpc/client_context_go1.6.go create mode 100644 rpc/client_context_go1.7.go create mode 100644 rpc/client_example_test.go create mode 100644 rpc/client_test.go diff --git a/accounts/abi/bind/backends/remote.go b/accounts/abi/bind/backends/remote.go index 4793143e4..58edd791a 100644 --- a/accounts/abi/bind/backends/remote.go +++ b/accounts/abi/bind/backends/remote.go @@ -17,11 +17,7 @@ package backends import ( - "encoding/json" - "fmt" "math/big" - "sync" - "sync/atomic" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -37,119 +33,34 @@ var _ bind.ContractBackend = (*rpcBackend)(nil) // rpcBackend implements bind.ContractBackend, and acts as the data provider to // Ethereum contracts bound to Go structs. It uses an RPC connection to delegate // all its functionality. -// -// Note: The current implementation is a blocking one. This should be replaced -// by a proper async version when a real RPC client is created. type rpcBackend struct { - client rpc.Client // RPC client connection to interact with an API server - autoid uint32 // ID number to use for the next API request - lock sync.Mutex // Singleton access until we get to request multiplexing + client *rpc.Client // RPC client connection to interact with an API server } // NewRPCBackend creates a new binding backend to an RPC provider that can be // used to interact with remote contracts. -func NewRPCBackend(client rpc.Client) bind.ContractBackend { - return &rpcBackend{ - client: client, - } -} - -// request is a JSON RPC request package assembled internally from the client -// method calls. -type request struct { - JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0 - ID int `json:"id"` // Auto incrementing ID number for this request - Method string `json:"method"` // Remote procedure name to invoke on the server - Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple) -} - -// response is a JSON RPC response package sent back from the API server. -type response struct { - JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0 - ID int `json:"id"` // Auto incrementing ID number for this request - Error *failure `json:"error"` // Any error returned by the remote side - Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply -} - -// failure is a JSON RPC response error field sent back from the API server. -type failure struct { - Code int `json:"code"` // JSON RPC error code associated with the failure - Message string `json:"message"` // Specific error message of the failure -} - -// request forwards an API request to the RPC server, and parses the response. -// -// This is currently painfully non-concurrent, but it will have to do until we -// find the time for niceties like this :P -func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) { - b.lock.Lock() - defer b.lock.Unlock() - - if ctx == nil { - ctx = context.Background() - } - - // Ugly hack to serialize an empty list properly - if params == nil { - params = []interface{}{} - } - // Assemble the request object - reqID := int(atomic.AddUint32(&b.autoid, 1)) - req := &request{ - JSONRPC: "2.0", - ID: reqID, - Method: method, - Params: params, - } - if err := b.client.Send(req); err != nil { - return nil, err - } - res := new(response) - errc := make(chan error, 1) - go func() { - errc <- b.client.Recv(res) - }() - select { - case err := <-errc: - if err != nil { - return nil, err - } - case <-ctx.Done(): - return nil, ctx.Err() - } - if res.Error != nil { - if res.Error.Message == bind.ErrNoCode.Error() { - return nil, bind.ErrNoCode - } - return nil, fmt.Errorf("remote error: %s", res.Error.Message) - } - return res.Result, nil +func NewRPCBackend(client *rpc.Client) bind.ContractBackend { + return &rpcBackend{client: client} } // HasCode implements ContractVerifier.HasCode by retrieving any code associated // with the contract from the remote node, and checking its size. func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) { - // Execute the RPC code retrieval block := "latest" if pending { block = "pending" } - res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block}) + var hex string + err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block) if err != nil { return false, err } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return false, err - } - // Convert the response back to a Go byte slice and return return len(common.FromHex(hex)) > 0, nil } // ContractCall implements ContractCaller.ContractCall, delegating the execution of // a contract call to the remote node, returning the reply to for local processing. func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) { - // Pack up the request into an RPC argument args := struct { To common.Address `json:"to"` Data string `json:"data"` @@ -157,63 +68,43 @@ func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, To: contract, Data: common.ToHex(data), } - // Execute the RPC call and retrieve the response block := "latest" if pending { block = "pending" } - res, err := b.request(ctx, "eth_call", []interface{}{args, block}) + var hex string + err := b.client.CallContext(ctx, &hex, "eth_call", args, block) if err != nil { return nil, err } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return nil, err - } - // Convert the response back to a Go byte slice and return return common.FromHex(hex), nil + } // PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating // the current account nonce retrieval to the remote node. func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) { - res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"}) + var hex rpc.HexNumber + err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending") if err != nil { return 0, err } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return 0, err - } - nonce, ok := new(big.Int).SetString(hex, 0) - if !ok { - return 0, fmt.Errorf("invalid nonce hex: %s", hex) - } - return nonce.Uint64(), nil + return hex.Uint64(), nil } // SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the // gas price oracle request to the remote node. func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) { - res, err := b.request(ctx, "eth_gasPrice", nil) - if err != nil { + var hex rpc.HexNumber + if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil { return nil, err } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return nil, err - } - price, ok := new(big.Int).SetString(hex, 0) - if !ok { - return nil, fmt.Errorf("invalid price hex: %s", hex) - } - return price, nil + return (*big.Int)(&hex), nil } // EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating // the gas estimation to the remote node. func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) { - // Pack up the request into an RPC argument args := struct { From common.Address `json:"from"` To *common.Address `json:"to"` @@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address Value: rpc.NewHexNumber(value), } // Execute the RPC call and retrieve the response - res, err := b.request(ctx, "eth_estimateGas", []interface{}{args}) + var hex rpc.HexNumber + err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args) if err != nil { return nil, err } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return nil, err - } - estimate, ok := new(big.Int).SetString(hex, 0) - if !ok { - return nil, fmt.Errorf("invalid estimate hex: %s", hex) - } - return estimate, nil + return (*big.Int)(&hex), nil } // SendTransaction implements ContractTransactor.SendTransaction, delegating the @@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction) if err != nil { return err } - res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)}) - if err != nil { - return err - } - var hex string - if err := json.Unmarshal(res, &hex); err != nil { - return err - } - return nil + return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data)) } diff --git a/cmd/geth/consolecmd.go b/cmd/geth/consolecmd.go index 257050a62..8d53809ce 100644 --- a/cmd/geth/consolecmd.go +++ b/cmd/geth/consolecmd.go @@ -19,9 +19,12 @@ package main import ( "os" "os/signal" + "strings" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/console" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rpc" "gopkg.in/urfave/cli.v1" ) @@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error { // console to it. func remoteConsole(ctx *cli.Context) error { // Attach to a remotely running geth instance and start the JavaScript console - client, err := utils.NewRemoteRPCClient(ctx) + client, err := dialRPC(ctx.Args().First()) if err != nil { utils.Fatalf("Unable to attach to remote geth: %v", err) } @@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error { return nil } +// dialRPC returns a RPC client which connects to the given endpoint. +// The check for empty endpoint implements the defaulting logic +// for "geth attach" and "geth monitor" with no argument. +func dialRPC(endpoint string) (*rpc.Client, error) { + if endpoint == "" { + endpoint = node.DefaultIPCEndpoint() + } else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") { + // Backwards compatibility with geth < 1.5 which required + // these prefixes. + endpoint = endpoint[4:] + } + return rpc.Dial(endpoint) +} + // ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript // console to it, and each of the files specified as arguments and tears the // everything down. diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go index 11fdca89c..d1490dce2 100644 --- a/cmd/geth/monitorcmd.go +++ b/cmd/geth/monitorcmd.go @@ -21,11 +21,10 @@ import ( "math" "reflect" "runtime" + "sort" "strings" "time" - "sort" - "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" @@ -36,7 +35,7 @@ import ( var ( monitorCommandAttachFlag = cli.StringFlag{ Name: "attach", - Value: "ipc:" + node.DefaultIPCEndpoint(), + Value: node.DefaultIPCEndpoint(), Usage: "API endpoint to attach to", } monitorCommandRowsFlag = cli.IntFlag{ @@ -69,12 +68,12 @@ to display multiple metrics simultaneously. // monitor starts a terminal UI based monitoring tool for the requested metrics. func monitor(ctx *cli.Context) error { var ( - client rpc.Client + client *rpc.Client err error ) // Attach to an Ethereum node over IPC or RPC endpoint := ctx.String(monitorCommandAttachFlag.Name) - if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil { + if client, err = dialRPC(endpoint); err != nil { utils.Fatalf("Unable to attach to geth node: %v", err) } defer client.Close() @@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error { // retrieveMetrics contacts the attached geth node and retrieves the entire set // of collected system metrics. -func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) { - req := map[string]interface{}{ - "id": new(int64), - "method": "debug_metrics", - "jsonrpc": "2.0", - "params": []interface{}{true}, - } - - if err := client.Send(req); err != nil { - return nil, err - } - - var res rpc.JSONSuccessResponse - if err := client.Recv(&res); err != nil { - return nil, err - } - - if res.Result != nil { - if mets, ok := res.Result.(map[string]interface{}); ok { - return mets, nil - } - } - - return nil, fmt.Errorf("unable to retrieve metrics") +func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) { + var metrics map[string]interface{} + err := client.Call(&metrics, "debug_metrics", true) + return metrics, err } // resolveMetrics takes a list of input metric patterns, and resolves each to one @@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 { // refreshCharts retrieves a next batch of metrics, and inserts all the new // values into the active datasets and charts -func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) { +func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) { values, err := retrieveMetrics(client) for i, metric := range metrics { if len(data) < 512 { diff --git a/cmd/utils/client.go b/cmd/utils/client.go deleted file mode 100644 index cc9647580..000000000 --- a/cmd/utils/client.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2015 The go-ethereum Authors -// This file is part of go-ethereum. -// -// go-ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// go-ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with go-ethereum. If not, see . - -package utils - -import ( - "fmt" - "strings" - - "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/rpc" - "gopkg.in/urfave/cli.v1" -) - -// NewRemoteRPCClient returns a RPC client which connects to a running geth instance. -// Depending on the given context this can either be a IPC or a HTTP client. -func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) { - if ctx.Args().Present() { - endpoint := ctx.Args().First() - return NewRemoteRPCClientFromString(endpoint) - } - // use IPC by default - return rpc.NewIPCClient(node.DefaultIPCEndpoint()) -} - -// NewRemoteRPCClientFromString returns a RPC client which connects to the given -// endpoint. It must start with either `ipc:` or `rpc:` (HTTP). -func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) { - if strings.HasPrefix(endpoint, "ipc:") { - return rpc.NewIPCClient(endpoint[4:]) - } - if strings.HasPrefix(endpoint, "rpc:") { - return rpc.NewHTTPClient(endpoint[4:]) - } - if strings.HasPrefix(endpoint, "http://") { - return rpc.NewHTTPClient(endpoint) - } - if strings.HasPrefix(endpoint, "ws:") { - return rpc.NewWSClient(endpoint) - } - return nil, fmt.Errorf("invalid endpoint") -} diff --git a/console/bridge.go b/console/bridge.go index b23e06837..06cb41d80 100644 --- a/console/bridge.go +++ b/console/bridge.go @@ -31,13 +31,13 @@ import ( // bridge is a collection of JavaScript utility methods to bride the .js runtime // environment and the Go RPC connection backing the remote method calls. type bridge struct { - client rpc.Client // RPC client to execute Ethereum requests through + client *rpc.Client // RPC client to execute Ethereum requests through prompter UserPrompter // Input prompter to allow interactive user feedback printer io.Writer // Output writer to serialize any display strings to } // newBridge creates a new JavaScript wrapper around an RPC client. -func newBridge(client rpc.Client, prompter UserPrompter, printer io.Writer) *bridge { +func newBridge(client *rpc.Client, prompter UserPrompter, printer io.Writer) *bridge { return &bridge{ client: client, prompter: prompter, @@ -188,88 +188,86 @@ func (b *bridge) SleepBlocks(call otto.FunctionCall) (response otto.Value) { return otto.FalseValue() } -// Send will serialize the first argument, send it to the node and returns the response. +type jsonrpcCall struct { + Id int64 + Method string + Params []interface{} +} + +// Send implements the web3 provider "send" method. func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) { - // Ensure that we've got a batch request (array) or a single request (object) - arg := call.Argument(0).Object() - if arg == nil || (arg.Class() != "Array" && arg.Class() != "Object") { - throwJSException("request must be an object or array") - } - // Convert the otto VM arguments to Go values - data, err := call.Otto.Call("JSON.stringify", nil, arg) + // Remarshal the request into a Go value. + JSON, _ := call.Otto.Object("JSON") + reqVal, err := JSON.Call("stringify", call.Argument(0)) if err != nil { throwJSException(err.Error()) } - reqjson, err := data.ToString() - if err != nil { - throwJSException(err.Error()) - } - var ( - reqs []rpc.JSONRequest - batch = true + rawReq = []byte(reqVal.String()) + reqs []jsonrpcCall + batch bool ) - if err = json.Unmarshal([]byte(reqjson), &reqs); err != nil { - // single request? - reqs = make([]rpc.JSONRequest, 1) - if err = json.Unmarshal([]byte(reqjson), &reqs[0]); err != nil { - throwJSException("invalid request") - } + if rawReq[0] == '[' { + batch = true + json.Unmarshal(rawReq, &reqs) + } else { batch = false + reqs = make([]jsonrpcCall, 1) + json.Unmarshal(rawReq, &reqs[0]) } - // Iteratively execute the requests - call.Otto.Set("response_len", len(reqs)) - call.Otto.Run("var ret_response = new Array(response_len);") - for i, req := range reqs { - // Execute the RPC request and parse the reply - if err = b.client.Send(&req); err != nil { - return newErrorResponse(call, -32603, err.Error(), req.Id) + // Execute the requests. + resps, _ := call.Otto.Object("new Array()") + for _, req := range reqs { + resp, _ := call.Otto.Object(`({"jsonrpc":"2.0"})`) + resp.Set("id", req.Id) + var result json.RawMessage + err = b.client.Call(&result, req.Method, req.Params...) + switch err := err.(type) { + case nil: + if result == nil { + // Special case null because it is decoded as an empty + // raw message for some reason. + resp.Set("result", otto.NullValue()) + } else { + resultVal, err := JSON.Call("parse", string(result)) + if err != nil { + resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object() + } else { + resp.Set("result", resultVal) + } + } + case rpc.Error: + resp.Set("error", map[string]interface{}{ + "code": err.ErrorCode(), + "message": err.Error(), + }) + default: + resp = newErrorResponse(call, -32603, err.Error(), &req.Id).Object() } - result := make(map[string]interface{}) - if err = b.client.Recv(&result); err != nil { - return newErrorResponse(call, -32603, err.Error(), req.Id) - } - // Feed the reply back into the JavaScript runtime environment - id, _ := result["id"] - jsonver, _ := result["jsonrpc"] + resps.Call("push", resp) + } - call.Otto.Set("ret_id", id) - call.Otto.Set("ret_jsonrpc", jsonver) - call.Otto.Set("response_idx", i) + // Return the responses either to the callback (if supplied) + // or directly as the return value. + if batch { + response = resps.Value() + } else { + response, _ = resps.Get("0") + } + if fn := call.Argument(1).Object(); fn != nil && fn.Class() == "function" { + fn.Call("apply", response) + return otto.UndefinedValue() + } + return response +} - if res, ok := result["result"]; ok { - payload, _ := json.Marshal(res) - call.Otto.Set("ret_result", string(payload)) - response, err = call.Otto.Run(` - ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, result: JSON.parse(ret_result) }; - `) - continue - } - if res, ok := result["error"]; ok { - payload, _ := json.Marshal(res) - call.Otto.Set("ret_result", string(payload)) - response, err = call.Otto.Run(` - ret_response[response_idx] = { jsonrpc: ret_jsonrpc, id: ret_id, error: JSON.parse(ret_result) }; - `) - continue - } - return newErrorResponse(call, -32603, fmt.Sprintf("Invalid response"), new(int64)) - } - // Convert single requests back from batch ones - if !batch { - call.Otto.Run("ret_response = ret_response[0];") - } - // Execute any registered callbacks - if call.Argument(1).IsObject() { - call.Otto.Set("callback", call.Argument(1)) - call.Otto.Run(` - if (Object.prototype.toString.call(callback) == '[object Function]') { - callback(null, ret_response); - } - `) - } - return +func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) otto.Value { + // Bundle the error into a JSON RPC call response + m := map[string]interface{}{"version": "2.0", "id": id, "error": map[string]interface{}{"code": code, msg: msg}} + res, _ := json.Marshal(m) + val, _ := call.Otto.Run("(" + string(res) + ")") + return val } // throwJSException panics on an otto.Value. The Otto VM will recover from the @@ -281,37 +279,3 @@ func throwJSException(msg interface{}) otto.Value { } panic(val) } - -// newErrorResponse creates a JSON RPC error response for a specific request id, -// containing the specified error code and error message. Beside returning the -// error to the caller, it also sets the ret_error and ret_response JavaScript -// variables. -func newErrorResponse(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) { - // Bundle the error into a JSON RPC call response - res := rpc.JSONErrResponse{ - Version: rpc.JSONRPCVersion, - Id: id, - Error: rpc.JSONError{ - Code: code, - Message: msg, - }, - } - // Serialize the error response into JavaScript variables - errObj, err := json.Marshal(res.Error) - if err != nil { - glog.V(logger.Error).Infof("Failed to serialize JSON RPC error: %v", err) - } - resObj, err := json.Marshal(res) - if err != nil { - glog.V(logger.Error).Infof("Failed to serialize JSON RPC error response: %v", err) - } - - if _, err = call.Otto.Run("ret_error = " + string(errObj)); err != nil { - glog.V(logger.Error).Infof("Failed to set `ret_error` to the occurred error: %v", err) - } - resVal, err := call.Otto.Run("ret_response = " + string(resObj)) - if err != nil { - glog.V(logger.Error).Infof("Failed to set `ret_response` to the JSON RPC response: %v", err) - } - return resVal -} diff --git a/console/console.go b/console/console.go index 00d1fea1d..f224f0c2e 100644 --- a/console/console.go +++ b/console/console.go @@ -52,7 +52,7 @@ const DefaultPrompt = "> " type Config struct { DataDir string // Data directory to store the console history at DocRoot string // Filesystem path from where to load JavaScript files from - Client rpc.Client // RPC client to execute Ethereum requests through + Client *rpc.Client // RPC client to execute Ethereum requests through Prompt string // Input prompt prefix string (defaults to DefaultPrompt) Prompter UserPrompter // Input prompter to allow interactive user feedback (defaults to TerminalPrompter) Printer io.Writer // Output writer to serialize any display strings to (defaults to os.Stdout) @@ -63,7 +63,7 @@ type Config struct { // JavaScript console attached to a running node via an external or in-process RPC // client. type Console struct { - client rpc.Client // RPC client to execute Ethereum requests through + client *rpc.Client // RPC client to execute Ethereum requests through jsre *jsre.JSRE // JavaScript runtime environment running the interpreter prompt string // Input prompt prefix string prompter UserPrompter // Input prompter to allow interactive user feedback diff --git a/internal/jsre/pretty.go b/internal/jsre/pretty.go index 30d8660ff..f32e16243 100644 --- a/internal/jsre/pretty.go +++ b/internal/jsre/pretty.go @@ -116,7 +116,7 @@ func (ctx ppctx) printValue(v otto.Value, level int, inArray bool) { func (ctx ppctx) printObject(obj *otto.Object, level int, inArray bool) { switch obj.Class() { - case "Array": + case "Array", "GoArray": lv, _ := obj.Get("length") len, _ := lv.ToInteger() if len == 0 { diff --git a/node/node.go b/node/node.go index 1f517a027..ac8a7e8f0 100644 --- a/node/node.go +++ b/node/node.go @@ -505,16 +505,14 @@ func (n *Node) Restart() error { } // Attach creates an RPC client attached to an in-process API handler. -func (n *Node) Attach() (rpc.Client, error) { +func (n *Node) Attach() (*rpc.Client, error) { n.lock.RLock() defer n.lock.RUnlock() - // Short circuit if the node's not running if n.server == nil { return nil, ErrNodeStopped } - // Otherwise attach to the API and return - return rpc.NewInProcRPCClient(n.inprocHandler), nil + return rpc.DialInProc(n.inprocHandler), nil } // Server retrieves the currently running P2P network layer. This method is meant diff --git a/node/node_test.go b/node/node_test.go index 372fc6b10..d9b26453b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -507,21 +507,27 @@ func TestAPIGather(t *testing.T) { } // Register a batch of services with some configured APIs calls := make(chan string, 1) - + makeAPI := func(result string) *OneMethodApi { + return &OneMethodApi{fun: func() { calls <- result }} + } services := map[string]struct { APIs []rpc.API Maker InstrumentingWrapper }{ - "Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA}, - "Single API": {[]rpc.API{ - {"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true}, - }, InstrumentedServiceMakerB}, - "Many APIs": {[]rpc.API{ - {"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true}, - {"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true}, - {"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true}, - }, InstrumentedServiceMakerC}, + "Zero APIs": { + []rpc.API{}, InstrumentedServiceMakerA}, + "Single API": { + []rpc.API{ + {Namespace: "single", Version: "1", Service: makeAPI("single.v1"), Public: true}, + }, InstrumentedServiceMakerB}, + "Many APIs": { + []rpc.API{ + {Namespace: "multi", Version: "1", Service: makeAPI("multi.v1"), Public: true}, + {Namespace: "multi.v2", Version: "2", Service: makeAPI("multi.v2"), Public: true}, + {Namespace: "multi.v2.nested", Version: "2", Service: makeAPI("multi.v2.nested"), Public: true}, + }, InstrumentedServiceMakerC}, } + for id, config := range services { config := config constructor := func(*ServiceContext) (Service, error) { @@ -554,12 +560,8 @@ func TestAPIGather(t *testing.T) { {"multi.v2.nested_theOneMethod", "multi.v2.nested"}, } for i, test := range tests { - if err := client.Send(rpc.JSONRequest{Id: []byte("1"), Version: "2.0", Method: test.Method}); err != nil { - t.Fatalf("test %d: failed to send API request: %v", i, err) - } - reply := new(rpc.JSONSuccessResponse) - if err := client.Recv(reply); err != nil { - t.Fatalf("test %d: failed to read API reply: %v", i, err) + if err := client.Call(nil, test.Method); err != nil { + t.Errorf("test %d: API request failed: %v", i, err) } select { case result := <-calls: diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 000000000..4ff9a8cb9 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,740 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net" + "net/url" + "reflect" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" +) + +var ( + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") +) + +const ( + clientSubscriptionBuffer = 100 // if exceeded, the client stops reading + tcpKeepAliveInterval = 30 * time.Second + defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline + defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +// BatchElem is an element in a batch request. +type BatchElem struct { + Method string + Args []interface{} + // The result is unmarshaled into this field. Result must be set to a + // non-nil pointer value of the desired type, otherwise the response will be + // discarded. + Result interface{} + // Error is set if the server returns an error for this request, or if + // unmarshaling into Result fails. It is not set for I/O errors. + Error error +} + +// A value of this type can a JSON-RPC request, notification, successful response or +// error response. Which one it is depends on the fields. +type jsonrpcMessage struct { + Version string `json:"jsonrpc"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params json.RawMessage `json:"params,omitempty"` + Error *jsonError `json:"error,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +func (msg *jsonrpcMessage) isNotification() bool { + return msg.ID == nil && msg.Method != "" +} + +func (msg *jsonrpcMessage) isResponse() bool { + return msg.hasValidID() && msg.Method == "" && len(msg.Params) == 0 +} + +func (msg *jsonrpcMessage) hasValidID() bool { + return len(msg.ID) > 0 && msg.ID[0] != '{' && msg.ID[0] != '[' +} + +func (msg *jsonrpcMessage) String() string { + b, _ := json.Marshal(msg) + return string(b) +} + +// Client represents a connection to an RPC server. +type Client struct { + idCounter uint32 + connectFunc func(ctx context.Context) (net.Conn, error) + isHTTP bool + + // writeConn is only safe to access outside dispatch, with the + // write lock held. The write lock is taken by sending on + // requestOp and released by sending on sendDone. + writeConn net.Conn + + // for dispatch + close chan struct{} + didQuit chan struct{} // closed when client quits + reconnected chan net.Conn // where write/reconnect sends the new connection + readErr chan error // errors from read + readResp chan []*jsonrpcMessage // valid messages from read + requestOp chan *requestOp // for registering response IDs + sendDone chan error // signals write completion, releases write lock + respWait map[string]*requestOp // active requests + subs map[string]*ClientSubscription // active subscriptions +} + +type requestOp struct { + ids []json.RawMessage + err error + resp chan *jsonrpcMessage // receives up to len(ids) responses + sub *ClientSubscription // only set for EthSubscribe requests +} + +func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-op.resp: + return resp, op.err + } +} + +// Dial creates a new client for the given URL. +// +// The currently supported URL schemes are "http", "https", "ws" and "wss". If rawurl is a +// file name with no URL scheme, a local socket connection is established using UNIX +// domain sockets on supported platforms and named pipes on Windows. If you want to +// configure transport options, use DialHTTP, DialWebsocket or DialIPC instead. +// +// For websocket connections, the origin is set to the local host name. +// +// The client reconnects automatically if the connection is lost. +func Dial(rawurl string) (*Client, error) { + return DialContext(context.Background(), rawurl) +} + +// DialContext creates a new RPC client, just like Dial. +// +// The context is used to cancel or time out the initial connection establishment. It does +// not affect subsequent interactions with the client. +func DialContext(ctx context.Context, rawurl string) (*Client, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + switch u.Scheme { + case "http", "https": + return DialHTTP(rawurl) + case "ws", "wss": + return DialWebsocket(ctx, rawurl, "") + case "": + return DialIPC(ctx, rawurl) + default: + return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme) + } +} + +func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) { + conn, err := connectFunc(initctx) + if err != nil { + return nil, err + } + _, isHTTP := conn.(*httpConn) + + c := &Client{ + writeConn: conn, + isHTTP: isHTTP, + connectFunc: connectFunc, + close: make(chan struct{}), + didQuit: make(chan struct{}), + reconnected: make(chan net.Conn), + readErr: make(chan error), + readResp: make(chan []*jsonrpcMessage), + requestOp: make(chan *requestOp), + sendDone: make(chan error, 1), + respWait: make(map[string]*requestOp), + subs: make(map[string]*ClientSubscription), + } + if !isHTTP { + go c.dispatch(conn) + } + return c, nil +} + +func (c *Client) nextID() json.RawMessage { + id := atomic.AddUint32(&c.idCounter, 1) + return []byte(strconv.FormatUint(uint64(id), 10)) +} + +// SupportedModules calls the rpc_modules method, retrieving the list of +// APIs that are available on the server. +func (c *Client) SupportedModules() (map[string]string, error) { + var result map[string]string + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + err := c.CallContext(ctx, &result, "rpc_modules") + return result, err +} + +// Close closes the client, aborting any in-flight requests. +func (c *Client) Close() { + if c.isHTTP { + return + } + select { + case c.close <- struct{}{}: + <-c.didQuit + case <-c.didQuit: + } +} + +// Call performs a JSON-RPC call with the given arguments and unmarshals into +// result if no error occurred. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) Call(result interface{}, method string, args ...interface{}) error { + ctx := context.Background() + return c.CallContext(ctx, result, method, args...) +} + +// CallContext performs a JSON-RPC call with the given arguments. If the context is +// canceled before the call has successfully returned, CallContext returns immediately. +// +// The result must be a pointer so that package json can unmarshal into it. You +// can also pass nil, in which case the result is ignored. +func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + msg, err := c.newMessage(method, args...) + if err != nil { + return err + } + op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} + + if c.isHTTP { + err = c.sendHTTP(ctx, op, msg) + } else { + err = c.send(ctx, op, msg) + } + if err != nil { + return err + } + + // dispatch has accepted the request and will close the channel it when it quits. + switch resp, err := op.wait(ctx); { + case err != nil: + return err + case resp.Error != nil: + return resp.Error + case len(resp.Result) == 0: + return ErrNoResult + default: + return json.Unmarshal(resp.Result, &result) + } +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. +// +// In contrast to Call, BatchCall only returns I/O errors. Any error specific to +// a request is reported through the Error field of the corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCall(b []BatchElem) error { + ctx := context.Background() + return c.BatchCallContext(ctx, b) +} + +// BatchCall sends all given requests as a single batch and waits for the server +// to return a response for all of them. The wait duration is bounded by the +// context's deadline. +// +// In contrast to CallContext, BatchCallContext only returns I/O errors. Any +// error specific to a request is reported through the Error field of the +// corresponding BatchElem. +// +// Note that batch calls may not be executed atomically on the server side. +func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { + msgs := make([]*jsonrpcMessage, len(b)) + op := &requestOp{ + ids: make([]json.RawMessage, len(b)), + resp: make(chan *jsonrpcMessage, len(b)), + } + for i, elem := range b { + msg, err := c.newMessage(elem.Method, elem.Args...) + if err != nil { + return err + } + msgs[i] = msg + op.ids[i] = msg.ID + } + + var err error + if c.isHTTP { + err = c.sendBatchHTTP(ctx, op, msgs) + } else { + err = c.send(ctx, op, msgs) + } + + // Wait for all responses to come back. + for n := 0; n < len(b) && err == nil; n++ { + var resp *jsonrpcMessage + resp, err = op.wait(ctx) + if err != nil { + break + } + // Find the element corresponding to this response. + // The element is guaranteed to be present because dispatch + // only sends valid IDs to our channel. + var elem *BatchElem + for i := range msgs { + if bytes.Equal(msgs[i].ID, resp.ID) { + elem = &b[i] + break + } + } + if resp.Error != nil { + elem.Error = resp.Error + continue + } + if len(resp.Result) == 0 { + elem.Error = ErrNoResult + continue + } + elem.Error = json.Unmarshal(resp.Result, elem.Result) + } + return err +} + +// EthSubscribe calls the "eth_subscribe" method with the given arguments, +// registering a subscription. Server notifications for the subscription are +// sent to the given channel. The element type of the channel must match the +// expected type of content returned by the subscription. +// +// Callers should not use the same channel for multiple calls to EthSubscribe. +// The channel is closed when the notification is unsubscribed or an error +// occurs. The error can be retrieved via the Err method of the subscription. +// +// Slow subscribers will block the clients ingress path eventually. +func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { + // Check type of channel first. + chanVal := reflect.ValueOf(channel) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + panic("first argument to EthSubscribe must be a writable channel") + } + if chanVal.IsNil() { + panic("channel given to EthSubscribe must not be nil") + } + if c.isHTTP { + return nil, ErrNotificationsUnsupported + } + + msg, err := c.newMessage(subscribeMethod, args...) + if err != nil { + return nil, err + } + op := &requestOp{ + ids: []json.RawMessage{msg.ID}, + resp: make(chan *jsonrpcMessage), + sub: newClientSubscription(c, chanVal), + } + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + defer cancel() + + // Send the subscription request. + // The arrival and validity of the response is signaled on sub.quit. + if err := c.send(ctx, op, msg); err != nil { + return nil, err + } + if _, err := op.wait(ctx); err != nil { + return nil, err + } + return op.sub, nil +} + +func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) { + params, err := json.Marshal(paramsIn) + if err != nil { + return nil, err + } + return &jsonrpcMessage{Version: "2.0", ID: c.nextID(), Method: method, Params: params}, nil +} + +// send registers op with the dispatch loop, then sends msg on the connection. +// if sending fails, op is deregistered. +func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { + select { + case c.requestOp <- op: + if glog.V(logger.Detail) { + glog.Info("sending ", msg) + } + err := c.write(ctx, msg) + c.sendDone <- err + return err + case <-c.didQuit: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg interface{}) error { + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + // The previous write failed. Try to establish a new connection. + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + c.writeConn.SetWriteDeadline(deadline) + err := json.NewEncoder(c.writeConn).Encode(msg) + if err != nil { + c.writeConn = nil + } + return err +} + +func (c *Client) reconnect(ctx context.Context) error { + newconn, err := c.connectFunc(ctx) + if err != nil { + glog.V(logger.Detail).Infof("reconnect failed: %v", err) + return err + } + select { + case c.reconnected <- newconn: + c.writeConn = newconn + return nil + case <-c.didQuit: + newconn.Close() + return ErrClientQuit + } +} + +// dispatch is the main loop of the client. +// It sends read messages to waiting calls to Call and BatchCall +// and subscription notifications to registered subscriptions. +func (c *Client) dispatch(conn net.Conn) { + // Spawn the initial read loop. + go c.read(conn) + + var ( + lastOp *requestOp // tracks last send operation + requestOpLock = c.requestOp // nil while the send lock is held + reading = true // if true, a read loop is running + ) + defer close(c.didQuit) + defer func() { + c.closeRequestOps(ErrClientQuit) + conn.Close() + if reading { + // Empty read channels until read is dead. + for { + select { + case <-c.readResp: + case <-c.readErr: + return + } + } + } + }() + + for { + select { + case <-c.close: + return + + // Read path. + case batch := <-c.readResp: + for _, msg := range batch { + switch { + case msg.isNotification(): + if glog.V(logger.Detail) { + glog.Info("<-readResp: notification ", msg) + } + c.handleNotification(msg) + case msg.isResponse(): + if glog.V(logger.Detail) { + glog.Info("<-readResp: response ", msg) + } + c.handleResponse(msg) + default: + if glog.V(logger.Debug) { + glog.Error("<-readResp: dropping weird message", msg) + } + // TODO: maybe close + } + } + + case err := <-c.readErr: + glog.V(logger.Debug).Infof("<-readErr: %v", err) + c.closeRequestOps(err) + conn.Close() + reading = false + + case newconn := <-c.reconnected: + glog.V(logger.Debug).Infof("<-reconnected: (reading=%t) %v", reading, conn.RemoteAddr()) + if reading { + // Wait for the previous read loop to exit. This is a rare case. + conn.Close() + <-c.readErr + } + go c.read(newconn) + reading = true + conn = newconn + + // Send path. + case op := <-requestOpLock: + // Stop listening for further send ops until the current one is done. + requestOpLock = nil + lastOp = op + for _, id := range op.ids { + c.respWait[string(id)] = op + } + + case err := <-c.sendDone: + if err != nil { + // Remove response handlers for the last send. We remove those here + // because the error is already handled in Call or BatchCall. When the + // read loop goes down, it will signal all other current operations. + for _, id := range lastOp.ids { + delete(c.respWait, string(id)) + } + } + // Listen for send ops again. + requestOpLock = c.requestOp + lastOp = nil + } + } +} + +// closeRequestOps unblocks pending send ops and active subscriptions. +func (c *Client) closeRequestOps(err error) { + didClose := make(map[*requestOp]bool) + + for id, op := range c.respWait { + // Remove the op so that later calls will not close op.resp again. + delete(c.respWait, id) + + if !didClose[op] { + op.err = err + close(op.resp) + didClose[op] = true + } + } + for id, sub := range c.subs { + delete(c.subs, id) + sub.quitWithError(err, false) + } +} + +func (c *Client) handleNotification(msg *jsonrpcMessage) { + if msg.Method != notificationMethod { + glog.V(logger.Debug).Info("dropping non-subscription message: ", msg) + return + } + var subResult struct { + ID string `json:"subscription"` + Result json.RawMessage `json:"result"` + } + if err := json.Unmarshal(msg.Params, &subResult); err != nil { + glog.V(logger.Debug).Info("dropping invalid subscription message: ", msg) + return + } + if c.subs[subResult.ID] != nil { + c.subs[subResult.ID].deliver(subResult.Result) + } +} + +func (c *Client) handleResponse(msg *jsonrpcMessage) { + op := c.respWait[string(msg.ID)] + if op == nil { + glog.V(logger.Debug).Infof("unsolicited response %v", msg) + return + } + delete(c.respWait, string(msg.ID)) + // For normal responses, just forward the reply to Call/BatchCall. + if op.sub == nil { + op.resp <- msg + return + } + // For subscription responses, start the subscription if the server + // indicates success. EthSubscribe gets unblocked in either case through + // the op.resp channel. + defer close(op.resp) + if msg.Error != nil { + op.err = msg.Error + return + } + if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil { + go op.sub.start() + c.subs[op.sub.subid] = op.sub + } +} + +// Reading happens on a dedicated goroutine. + +func (c *Client) read(conn net.Conn) error { + var ( + buf json.RawMessage + dec = json.NewDecoder(conn) + ) + readMessage := func() (rs []*jsonrpcMessage, err error) { + buf = buf[:0] + if err = dec.Decode(&buf); err != nil { + return nil, err + } + if isBatch(buf) { + err = json.Unmarshal(buf, &rs) + } else { + rs = make([]*jsonrpcMessage, 1) + err = json.Unmarshal(buf, &rs[0]) + } + return rs, err + } + + for { + resp, err := readMessage() + if err != nil { + c.readErr <- err + return err + } + c.readResp <- resp + } +} + +// Subscriptions. + +// A ClientSubscription represents a subscription established through EthSubscribe. +type ClientSubscription struct { + client *Client + etype reflect.Type + channel reflect.Value + subid string + in chan json.RawMessage + + quitOnce sync.Once // ensures quit is closed once + quit chan struct{} // quit is closed when the subscription exits + errOnce sync.Once // ensures err is closed once + err chan error +} + +func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription { + sub := &ClientSubscription{ + client: c, + etype: channel.Type().Elem(), + channel: channel, + quit: make(chan struct{}), + err: make(chan error, 1), + // in is buffered so dispatch can continue even if the subscriber is slow. + in: make(chan json.RawMessage, clientSubscriptionBuffer), + } + return sub +} + +// Err returns the subscription error channel. The intended use of Err is to schedule +// resubscription when the client connection is closed unexpectedly. +// +// The error channel receives a value when the subscription has ended due +// to an error. The received error is ErrClientQuit if Close has been called +// on the underlying client and no other error has occurred. +// +// The error channel is closed when Unsubscribe is called on the subscription. +func (sub *ClientSubscription) Err() <-chan error { + return sub.err +} + +// Unsubscribe unsubscribes the notification and closes the error channel. +// It can safely be called more than once. +func (sub *ClientSubscription) Unsubscribe() { + sub.quitWithError(nil, true) + sub.errOnce.Do(func() { close(sub.err) }) +} + +func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { + sub.quitOnce.Do(func() { + if unsubscribeServer { + sub.requestUnsubscribe() + } + if err != nil { + sub.err <- err + } + close(sub.quit) + }) +} + +func (sub *ClientSubscription) deliver(result json.RawMessage) (ok bool) { + select { + case sub.in <- result: + return true + case <-sub.quit: + return false + } +} + +func (sub *ClientSubscription) start() { + sub.quitWithError(sub.forward()) +} + +func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { + cases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectSend, Chan: sub.channel}, + } + for { + select { + case result := <-sub.in: + val, err := sub.unmarshal(result) + if err != nil { + return err, true + } + cases[1].Send = val + switch chosen, _, _ := reflect.Select(cases); chosen { + case 0: // <-sub.quit + return nil, false + case 1: // sub.channel<- + continue + } + case <-sub.quit: + return nil, false + } + } +} + +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) { + val := reflect.New(sub.etype) + err := json.Unmarshal(result, val.Interface()) + return val.Elem(), err +} + +func (sub *ClientSubscription) requestUnsubscribe() error { + var result interface{} + return sub.client.Call(&result, unsubscribeMethod, sub.subid) +} diff --git a/rpc/client_context_go1.4.go b/rpc/client_context_go1.4.go new file mode 100644 index 000000000..ac956a17d --- /dev/null +++ b/rpc/client_context_go1.4.go @@ -0,0 +1,60 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build !go1.5 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In older versions of Go (below 1.5), dials cannot be canceled +// via a channel or context. The context deadline can still applied. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // Set Timeout on the client if the context has a deadline. + // Note that there is no default timeout (unlike in contextDialer) because + // the timeout applies to the entire request, including reads from body. + if deadline, ok := ctx.Deadline(); ok { + c2 := *c + c2.Timeout = deadline.Sub(time.Now()) + c = &c2 + } + req2 := *req + return c, &req2 +} diff --git a/rpc/client_context_go1.5.go b/rpc/client_context_go1.5.go new file mode 100644 index 000000000..4a007d9f8 --- /dev/null +++ b/rpc/client_context_go1.5.go @@ -0,0 +1,61 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build go1.5,!go1.6 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In Go 1.5, dials cannot be canceled via a channel or context. The context deadline can +// still be applied. Go 1.5 adds the ability to cancel HTTP requests via a channel. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // Set Timeout on the client if the context has a deadline. + // Note that there is no default timeout (unlike in contextDialer) because + // the timeout applies to the entire request, including reads from body. + if deadline, ok := ctx.Deadline(); ok { + c2 := *c + c2.Timeout = deadline.Sub(time.Now()) + c = &c2 + } + req2 := *req + req2.Cancel = ctx.Done() + return c, &req2 +} diff --git a/rpc/client_context_go1.6.go b/rpc/client_context_go1.6.go new file mode 100644 index 000000000..67777ddc6 --- /dev/null +++ b/rpc/client_context_go1.6.go @@ -0,0 +1,55 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build go1.6,!go1.7 + +package rpc + +import ( + "net" + "net/http" + "time" + + "golang.org/x/net/context" +) + +// In Go 1.6, net.Dialer gained the ability to cancel via a channel. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + return contextDialer(ctx).Dial(network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + // We set Timeout on the client for Go <= 1.5. There + // is no need to do that here because the dial will be canceled + // by package http. + req2 := *req + req2.Cancel = ctx.Done() + return c, &req2 +} diff --git a/rpc/client_context_go1.7.go b/rpc/client_context_go1.7.go new file mode 100644 index 000000000..56ce12ab8 --- /dev/null +++ b/rpc/client_context_go1.7.go @@ -0,0 +1,51 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// +build go1.7 + +package rpc + +import ( + "context" + "net" + "net/http" + "time" +) + +// In Go 1.7, context moved into the standard library and support +// for cancelation via context was added to net.Dialer and http.Request. + +// contextDialer returns a dialer that applies the deadline value from the given context. +func contextDialer(ctx context.Context) *net.Dialer { + dialer := &net.Dialer{Cancel: ctx.Done(), KeepAlive: tcpKeepAliveInterval} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline + } else { + dialer.Deadline = time.Now().Add(defaultDialTimeout) + } + return dialer +} + +// dialContext connects to the given address, aborting the dial if ctx is canceled. +func dialContext(ctx context.Context, network, addr string) (net.Conn, error) { + d := &net.Dialer{KeepAlive: tcpKeepAliveInterval} + return d.DialContext(ctx, network, addr) +} + +// requestWithContext copies req, adding the cancelation channel and deadline from ctx. +func requestWithContext(c *http.Client, req *http.Request, ctx context.Context) (*http.Client, *http.Request) { + return c, req.WithContext(ctx) +} diff --git a/rpc/client_example_test.go b/rpc/client_example_test.go new file mode 100644 index 000000000..84b4b67bb --- /dev/null +++ b/rpc/client_example_test.go @@ -0,0 +1,83 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc_test + +import ( + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/rpc" +) + +// In this example, our client whishes to track the latest 'block number' +// known to the server. The server supports two methods: +// +// eth_getBlockByNumber("latest", {}) +// returns the latest block object. +// +// eth_subscribe("newBlocks") +// creates a subscription which fires block objects when new blocks arrive. + +type Block struct { + Number *big.Int +} + +func ExampleClientSubscription() { + // Connect the client. + client, _ := rpc.Dial("ws://127.0.0.1:8485") + subch := make(chan Block) + go subscribeBlocks(client, subch) + + // Print events from the subscription as they arrive. + for block := range subch { + fmt.Println("latest block:", block.Number) + } +} + +// subscribeBlocks runs in its own goroutine and maintains +// a subscription for new blocks. +func subscribeBlocks(client *rpc.Client, subch chan Block) { + for i := 0; ; i++ { + if i > 0 { + time.Sleep(2 * time.Second) + } + + // Subscribe to new blocks. + sub, err := client.EthSubscribe(subch, "newBlocks") + if err == rpc.ErrClientQuit { + return // Stop reconnecting if the client was closed. + } else if err != nil { + fmt.Println("subscribe error:", err) + continue + } + + // The connection is established now. + // Update the channel with the current block. + var lastBlock Block + if err := client.Call(&lastBlock, "eth_getBlockByNumber", "latest"); err != nil { + fmt.Println("can't get latest block:", err) + continue + } + subch <- lastBlock + + // The subscription will deliver events to the channel. Wait for the + // subscription to end for any reason, then loop around to re-establish + // the connection. + fmt.Println("connection lost: ", <-sub.Err()) + } +} diff --git a/rpc/client_test.go b/rpc/client_test.go new file mode 100644 index 000000000..58dceada0 --- /dev/null +++ b/rpc/client_test.go @@ -0,0 +1,489 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "os" + "reflect" + "runtime" + "sync" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" +) + +func TestClientRequest(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var resp Result + if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) { + t.Errorf("incorrect result %#v", resp) + } +} + +func TestClientBatchRequest(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + batch := []BatchElem{ + { + Method: "service_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "service_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: new(Result), + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + }, + } + if err := client.BatchCall(batch); err != nil { + t.Fatal(err) + } + wantResult := []BatchElem{ + { + Method: "service_echo", + Args: []interface{}{"hello", 10, &Args{"world"}}, + Result: &Result{"hello", 10, &Args{"world"}}, + }, + { + Method: "service_echo", + Args: []interface{}{"hello2", 11, &Args{"world"}}, + Result: &Result{"hello2", 11, &Args{"world"}}, + }, + { + Method: "no_such_method", + Args: []interface{}{1, 2, 3}, + Result: new(int), + Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"}, + }, + } + if !reflect.DeepEqual(batch, wantResult) { + t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult)) + } +} + +// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) } +func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) } +func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) } +func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) } + +// This test checks that requests made through CallContext can be canceled by canceling +// the context. +func testClientCancel(transport string, t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + + // What we want to achieve is that the context gets canceled + // at various stages of request processing. The interesting cases + // are: + // - cancel during dial + // - cancel while performing a HTTP request + // - cancel while waiting for a response + // + // To trigger those, the times are chosen such that connections + // are killed within the deadline for every other call (maxKillTimeout + // is 2x maxCancelTimeout). + // + // Once a connection is dead, there is a fair chance it won't connect + // successfully because the accept is delayed by 1s. + maxContextCancelTimeout := 300 * time.Millisecond + fl := &flakeyListener{ + maxAcceptDelay: 1 * time.Second, + maxKillTimeout: 600 * time.Millisecond, + } + + var client *Client + switch transport { + case "ws", "http": + c, hs := httpTestClient(server, transport, fl) + defer hs.Close() + client = c + case "ipc": + c, l := ipcTestClient(server, fl) + defer l.Close() + client = c + default: + panic("unknown transport: " + transport) + } + + // These tests take a lot of time, run them all at once. + // You probably want to run with -parallel 1 or comment out + // the call to t.Parallel if you enable the logging. + t.Parallel() + // glog.SetV(6) + // glog.SetToStderr(true) + // defer glog.SetToStderr(false) + // glog.Infoln("testing ", transport) + + // The actual test starts here. + var ( + wg sync.WaitGroup + nreqs = 10 + ncallers = 6 + ) + caller := func(index int) { + defer wg.Done() + for i := 0; i < nreqs; i++ { + var ( + ctx context.Context + cancel func() + timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout))) + ) + if index < ncallers/2 { + // For half of the callers, create a context without deadline + // and cancel it later. + ctx, cancel = context.WithCancel(context.Background()) + time.AfterFunc(timeout, cancel) + } else { + // For the other half, create a context with a deadline instead. This is + // different because the context deadline is used to set the socket write + // deadline. + ctx, cancel = context.WithTimeout(context.Background(), timeout) + } + // Now perform a call with the context. + // The key thing here is that no call will ever complete successfully. + err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout) + if err != nil { + glog.V(logger.Debug).Infoln("got expected error:", err) + } else { + t.Errorf("no error for call with %v wait time", timeout) + } + cancel() + } + } + wg.Add(ncallers) + for i := 0; i < ncallers; i++ { + go caller(i) + } + wg.Wait() +} + +func TestClientSubscribeInvalidArg(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + check := func(shouldPanic bool, arg interface{}) { + defer func() { + err := recover() + if shouldPanic && err == nil { + t.Errorf("EthSubscribe should've panicked for %#v", arg) + } + if !shouldPanic && err != nil { + t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg) + buf := make([]byte, 1024*1024) + buf = buf[:runtime.Stack(buf, false)] + t.Error(err) + t.Error(string(buf)) + } + }() + client.EthSubscribe(arg, "foo_bar") + } + check(true, nil) + check(true, 1) + check(true, (chan int)(nil)) + check(true, make(<-chan int)) + check(false, make(chan int)) + check(false, make(chan<- int)) +} + +func TestClientSubscribe(t *testing.T) { + server := newTestServer("eth", new(NotificationTestService)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + +// In this test, the connection drops while EthSubscribe is +// waiting for a response. +func TestClientSubscribeClose(t *testing.T) { + service := &NotificationTestService{ + gotHangSubscriptionReq: make(chan struct{}), + unblockHangSubscription: make(chan struct{}), + } + server := newTestServer("eth", service) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + var ( + nc = make(chan int) + errc = make(chan error) + sub *ClientSubscription + err error + ) + go func() { + sub, err = client.EthSubscribe(nc, "hangSubscription", 999) + errc <- err + }() + + <-service.gotHangSubscriptionReq + client.Close() + service.unblockHangSubscription <- struct{}{} + + select { + case err := <-errc: + if err == nil { + t.Errorf("EthSubscribe returned nil error after Close") + } + if sub != nil { + t.Error("EthSubscribe returned non-nil subscription after Close") + } + case <-time.After(1 * time.Second): + t.Fatalf("EthSubscribe did not return within 1s after Close") + } +} + +func TestClientHTTP(t *testing.T) { + server := newTestServer("service", new(Service)) + defer server.Stop() + + client, hs := httpTestClient(server, "http", nil) + defer hs.Close() + defer client.Close() + + // Launch concurrent requests. + var ( + results = make([]Result, 100) + errc = make(chan error) + wantResult = Result{"a", 1, new(Args)} + ) + defer client.Close() + for i := range results { + i := i + go func() { + errc <- client.Call(&results[i], "service_echo", + wantResult.String, wantResult.Int, wantResult.Args) + }() + } + + // Wait for all of them to complete. + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + for i := range results { + select { + case err := <-errc: + if err != nil { + t.Fatal(err) + } + case <-timeout.C: + t.Fatalf("timeout (got %d/%d) results)", i+1, len(results)) + } + } + + // Check results. + for i := range results { + if !reflect.DeepEqual(results[i], wantResult) { + t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult) + } + } +} + +func TestClientReconnect(t *testing.T) { + startServer := func(addr string) (*Server, net.Listener) { + srv := newTestServer("service", new(Service)) + l, err := net.Listen("tcp", addr) + if err != nil { + t.Fatal(err) + } + go http.Serve(l, srv.WebsocketHandler("*")) + return srv, l + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Start a server and corresponding client. + s1, l1 := startServer("127.0.0.1:0") + client, err := DialContext(ctx, "ws://"+l1.Addr().String()) + if err != nil { + t.Fatal("can't dial", err) + } + + // Perform a call. This should work because the server is up. + var resp Result + if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil { + t.Fatal(err) + } + + // Shut down the server and try calling again. It shouldn't work. + l1.Close() + s1.Stop() + if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil { + t.Error("successful call while the server is down") + t.Logf("resp: %#v", resp) + } + + // Allow for some cool down time so we can listen on the same address again. + time.Sleep(2 * time.Second) + + // Start it up again and call again. The connection should be reestablished. + // We spawn multiple calls here to check whether this hangs somehow. + s2, l2 := startServer(l1.Addr().String()) + defer l2.Close() + defer s2.Stop() + + start := make(chan struct{}) + errors := make(chan error, 20) + for i := 0; i < cap(errors); i++ { + go func() { + <-start + var resp Result + errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil) + }() + } + close(start) + errcount := 0 + for i := 0; i < cap(errors); i++ { + if err = <-errors; err != nil { + errcount++ + } + } + t.Log("err:", err) + if errcount > 1 { + t.Errorf("expected one error after disconnect, got %d", errcount) + } +} + +func newTestServer(serviceName string, service interface{}) *Server { + server := NewServer() + if err := server.RegisterName(serviceName, service); err != nil { + panic(err) + } + return server +} + +func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) { + // Create the HTTP server. + var hs *httptest.Server + switch transport { + case "ws": + hs = httptest.NewUnstartedServer(srv.WebsocketHandler("*")) + case "http": + hs = httptest.NewUnstartedServer(srv) + default: + panic("unknown HTTP transport: " + transport) + } + // Wrap the listener if required. + if fl != nil { + fl.Listener = hs.Listener + hs.Listener = fl + } + // Connect the client. + hs.Start() + client, err := Dial(transport + "://" + hs.Listener.Addr().String()) + if err != nil { + panic(err) + } + return client, hs +} + +func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) { + // Listen on a random endpoint. + endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63()) + if runtime.GOOS == "windows" { + endpoint = `\\.\pipe\` + endpoint + } else { + endpoint = os.TempDir() + "/" + endpoint + } + l, err := ipcListen(endpoint) + if err != nil { + panic(err) + } + // Connect the listener to the server. + if fl != nil { + fl.Listener = l + l = fl + } + go srv.ServeListener(l) + // Connect the client. + client, err := Dial(endpoint) + if err != nil { + panic(err) + } + return client, l +} + +// flakeyListener kills accepted connections after a random timeout. +type flakeyListener struct { + net.Listener + maxKillTimeout time.Duration + maxAcceptDelay time.Duration +} + +func (l *flakeyListener) Accept() (net.Conn, error) { + delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay))) + time.Sleep(delay) + + c, err := l.Listener.Accept() + if err == nil { + timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout))) + time.AfterFunc(timeout, func() { + glog.V(logger.Debug).Infof("killing conn %v after %v", c.LocalAddr(), timeout) + c.Close() + }) + } + return c, err +} diff --git a/rpc/errors.go b/rpc/errors.go index bc352fc45..9cf9dc60c 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -24,74 +24,43 @@ type methodNotFoundError struct { method string } -func (e *methodNotFoundError) Code() int { - return -32601 -} +func (e *methodNotFoundError) ErrorCode() int { return -32601 } func (e *methodNotFoundError) Error() string { return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method) } // received message isn't a valid request -type invalidRequestError struct { - message string -} +type invalidRequestError struct{ message string } -func (e *invalidRequestError) Code() int { - return -32600 -} +func (e *invalidRequestError) ErrorCode() int { return -32600 } -func (e *invalidRequestError) Error() string { - return e.message -} +func (e *invalidRequestError) Error() string { return e.message } // received message is invalid -type invalidMessageError struct { - message string -} +type invalidMessageError struct{ message string } -func (e *invalidMessageError) Code() int { - return -32700 -} +func (e *invalidMessageError) ErrorCode() int { return -32700 } -func (e *invalidMessageError) Error() string { - return e.message -} +func (e *invalidMessageError) Error() string { return e.message } // unable to decode supplied params, or an invalid number of parameters -type invalidParamsError struct { - message string -} +type invalidParamsError struct{ message string } -func (e *invalidParamsError) Code() int { - return -32602 -} +func (e *invalidParamsError) ErrorCode() int { return -32602 } -func (e *invalidParamsError) Error() string { - return e.message -} +func (e *invalidParamsError) Error() string { return e.message } // logic error, callback returned an error -type callbackError struct { - message string -} +type callbackError struct{ message string } -func (e *callbackError) Code() int { - return -32000 -} +func (e *callbackError) ErrorCode() int { return -32000 } -func (e *callbackError) Error() string { - return e.message -} +func (e *callbackError) Error() string { return e.message } // issued when a request is received after the server is issued to stop. -type shutdownError struct { -} +type shutdownError struct{} -func (e *shutdownError) Code() int { - return -32000 -} +func (e *shutdownError) ErrorCode() int { return -32000 } -func (e *shutdownError) Error() string { - return "server is shutting down" -} +func (e *shutdownError) Error() string { return "server is shutting down" } diff --git a/rpc/http.go b/rpc/http.go index 9283ce0ec..afcdd4bd6 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -22,71 +22,108 @@ import ( "fmt" "io" "io/ioutil" + "net" "net/http" - "net/url" "strings" + "sync" + "time" "github.com/rs/cors" + "golang.org/x/net/context" ) const ( maxHTTPRequestContentLength = 1024 * 128 ) -// httpClient connects to a geth RPC server over HTTP. -type httpClient struct { - endpoint *url.URL // HTTP-RPC server endpoint - httpClient http.Client // reuse connection - lastRes []byte // HTTP requests are synchronous, store last response +var nullAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:0") + +type httpConn struct { + client *http.Client + req *http.Request + closeOnce sync.Once + closed chan struct{} } -// NewHTTPClient create a new RPC clients that connection to a geth RPC server -// over HTTP. -func NewHTTPClient(endpoint string) (Client, error) { - url, err := url.Parse(endpoint) +// httpConn is treated specially by Client. +func (hc *httpConn) LocalAddr() net.Addr { return nullAddr } +func (hc *httpConn) RemoteAddr() net.Addr { return nullAddr } +func (hc *httpConn) SetReadDeadline(time.Time) error { return nil } +func (hc *httpConn) SetWriteDeadline(time.Time) error { return nil } +func (hc *httpConn) SetDeadline(time.Time) error { return nil } +func (hc *httpConn) Write([]byte) (int, error) { panic("Write called") } + +func (hc *httpConn) Read(b []byte) (int, error) { + <-hc.closed + return 0, io.EOF +} + +func (hc *httpConn) Close() error { + hc.closeOnce.Do(func() { close(hc.closed) }) + return nil +} + +// DialHTTP creates a new RPC clients that connection to an RPC server over HTTP. +func DialHTTP(endpoint string) (*Client, error) { + req, err := http.NewRequest("POST", endpoint, nil) if err != nil { return nil, err } - return &httpClient{endpoint: url}, nil + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + initctx := context.Background() + return newClient(initctx, func(context.Context) (net.Conn, error) { + return &httpConn{client: new(http.Client), req: req, closed: make(chan struct{})}, nil + }) } -// Send will serialize the given msg to JSON and sends it to the RPC server. -// Since HTTP is synchronous the response is stored until Recv is called. -func (client *httpClient) Send(msg interface{}) error { - var body []byte - var err error - - client.lastRes = nil - if body, err = json.Marshal(msg); err != nil { - return err - } - - resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body)) +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msg) if err != nil { return err } - - defer resp.Body.Close() - if resp.StatusCode == http.StatusOK { - client.lastRes, err = ioutil.ReadAll(resp.Body) + defer respBody.Close() + var respmsg jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil { return err } - - return fmt.Errorf("request failed: %s", resp.Status) + op.resp <- &respmsg + return nil } -// Recv will try to deserialize the last received response into the given msg. -func (client *httpClient) Recv(msg interface{}) error { - return json.Unmarshal(client.lastRes, &msg) +func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + hc := c.writeConn.(*httpConn) + respBody, err := hc.doRequest(ctx, msgs) + if err != nil { + return err + } + defer respBody.Close() + var respmsgs []jsonrpcMessage + if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { + return err + } + for _, respmsg := range respmsgs { + op.resp <- &respmsg + } + return nil } -// Close is not necessary for httpClient -func (client *httpClient) Close() { -} +func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { + body, err := json.Marshal(msg) + if err != nil { + return nil, err + } + client, req := requestWithContext(hc.client, hc.req, ctx) + req.Body = ioutil.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) -// SupportedModules will return the collection of offered RPC modules. -func (client *httpClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) + resp, err := client.Do(req) + if err != nil { + return nil, err + } + return resp.Body, nil } // httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method. @@ -100,43 +137,39 @@ func (t *httpReadWriteNopCloser) Close() error { return nil } -// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests, -// send the request to the given API provider and sends the response back to the caller. -func newJSONHTTPHandler(srv *Server) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.ContentLength > maxHTTPRequestContentLength { - http.Error(w, - fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), - http.StatusRequestEntityTooLarge) - return - } - - w.Header().Set("content-type", "application/json") - - // create a codec that reads direct from the request body until - // EOF and writes the response to w and order the server to process - // a single request. - codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) - defer codec.Close() - srv.ServeSingleRequest(codec, OptionMethodInvocation) - } +// NewHTTPServer creates a new HTTP RPC server around an API provider. +// +// Deprecated: Server implements http.Handler +func NewHTTPServer(corsString string, srv *Server) *http.Server { + return &http.Server{Handler: newCorsHandler(srv, corsString)} } -// NewHTTPServer creates a new HTTP RPC server around an API provider. -func NewHTTPServer(corsString string, srv *Server) *http.Server { +// ServeHTTP serves JSON-RPC requests over HTTP. +func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ContentLength > maxHTTPRequestContentLength { + http.Error(w, + fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength), + http.StatusRequestEntityTooLarge) + return + } + w.Header().Set("content-type", "application/json") + + // create a codec that reads direct from the request body until + // EOF and writes the response to w and order the server to process + // a single request. + codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) + defer codec.Close() + srv.ServeSingleRequest(codec, OptionMethodInvocation) +} + +func newCorsHandler(srv *Server, corsString string) http.Handler { var allowedOrigins []string for _, domain := range strings.Split(corsString, ",") { allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain)) } - c := cors.New(cors.Options{ AllowedOrigins: allowedOrigins, AllowedMethods: []string{"POST", "GET"}, }) - - handler := c.Handler(newJSONHTTPHandler(srv)) - - return &http.Server{ - Handler: handler, - } + return c.Handler(srv) } diff --git a/rpc/inproc.go b/rpc/inproc.go index 250f5c787..f72b97497 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -17,45 +17,18 @@ package rpc import ( - "encoding/json" - "io" "net" + + "golang.org/x/net/context" ) -// inProcClient is an in-process buffer stream attached to an RPC server. -type inProcClient struct { - server *Server - cl io.Closer - enc *json.Encoder - dec *json.Decoder -} - -// Close tears down the request channel of the in-proc client. -func (c *inProcClient) Close() { - c.cl.Close() -} - -// NewInProcRPCClient creates an in-process buffer stream attachment to a given -// RPC server. -func NewInProcRPCClient(handler *Server) Client { - p1, p2 := net.Pipe() - go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) - return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} -} - -// Send marshals a message into a json format and injects in into the client -// request channel. -func (c *inProcClient) Send(msg interface{}) error { - return c.enc.Encode(msg) -} - -// Recv reads a message from the response channel and tries to parse it into the -// given msg interface. -func (c *inProcClient) Recv(msg interface{}) error { - return c.dec.Decode(msg) -} - -// Returns the collection of modules the RPC server offers. -func (c *inProcClient) SupportedModules() (map[string]string, error) { - return SupportedModules(c) +// NewInProcClient attaches an in-process connection to the given RPC server. +func DialInProc(handler *Server) *Client { + initctx := context.Background() + c, _ := newClient(initctx, func(context.Context) (net.Conn, error) { + p1, p2 := net.Pipe() + go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) + return p2, nil + }) + return c } diff --git a/rpc/ipc.go b/rpc/ipc.go index 05d8909ca..c2b9e3871 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -17,68 +17,39 @@ package rpc import ( - "encoding/json" "net" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" ) -// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe +// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on +// Windows this is a named pipe func CreateIPCListener(endpoint string) (net.Listener, error) { return ipcListen(endpoint) } -// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using -// JSON serialization. -type ipcClient struct { - endpoint string - conn net.Conn - out *json.Encoder - in *json.Decoder -} - -// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. -// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a -// named pipe. -func NewIPCClient(endpoint string) (Client, error) { - conn, err := newIPCConnection(endpoint) - if err != nil { - return nil, err +// ServeListener accepts connections on l, serving JSON-RPC on them. +func (srv *Server) ServeListener(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + return err + } + glog.V(logger.Detail).Infoln("accepted conn", conn.RemoteAddr()) + go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions) } - return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil } -// Send will serialize the given message and send it to the server. -// When sending the message fails it will try to reconnect once and send the message again. -func (client *ipcClient) Send(msg interface{}) error { - if err := client.out.Encode(msg); err == nil { - return nil - } - - // retry once - client.conn.Close() - - conn, err := newIPCConnection(client.endpoint) - if err != nil { - return err - } - - client.conn = conn - client.in = json.NewDecoder(conn) - client.out = json.NewEncoder(conn) - - return client.out.Encode(msg) -} - -// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded. -func (client *ipcClient) Recv(msg interface{}) error { - return client.in.Decode(&msg) -} - -// Close will close the underlying IPC connection -func (client *ipcClient) Close() { - client.conn.Close() -} - -// SupportedModules will return the collection of offered RPC modules. -func (client *ipcClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) +// DialIPC create a new IPC client that connects to the given endpoint. On Unix it assumes +// the endpoint is the full path to a unix socket, and Windows the endpoint is an +// identifier for a named pipe. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialIPC(ctx context.Context, endpoint string) (*Client, error) { + return newClient(ctx, func(ctx context.Context) (net.Conn, error) { + return newIPCConnection(ctx, endpoint) + }) } diff --git a/rpc/ipc_unix.go b/rpc/ipc_unix.go index 9ece01240..a25b21627 100644 --- a/rpc/ipc_unix.go +++ b/rpc/ipc_unix.go @@ -22,6 +22,8 @@ import ( "net" "os" "path/filepath" + + "golang.org/x/net/context" ) // ipcListen will create a Unix socket on the given endpoint. @@ -40,6 +42,6 @@ func ipcListen(endpoint string) (net.Listener, error) { } // newIPCConnection will connect to a Unix socket on the given endpoint. -func newIPCConnection(endpoint string) (net.Conn, error) { - return net.DialUnix("unix", nil, &net.UnixAddr{Name: endpoint, Net: "unix"}) +func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { + return dialContext(ctx, "unix", endpoint) } diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 8342d04d5..68234d215 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -22,16 +22,27 @@ import ( "net" "time" + "golang.org/x/net/context" "gopkg.in/natefinch/npipe.v2" ) +// This is used if the dialing context has no deadline. It is much smaller than the +// defaultDialTimeout because named pipes are local and there is no need to wait so long. +const defaultPipeDialTimeout = 2 * time.Second + // ipcListen will create a named pipe on the given endpoint. func ipcListen(endpoint string) (net.Listener, error) { return npipe.Listen(endpoint) } // newIPCConnection will connect to a named pipe with the given endpoint as name. -func newIPCConnection(endpoint string) (net.Conn, error) { - timeout := 5 * time.Second +func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) { + timeout := defaultPipeDialTimeout + if deadline, ok := ctx.Deadline(); ok { + timeout = deadline.Sub(time.Now()) + if timeout < 0 { + timeout = 0 + } + } return npipe.DialTimeout(endpoint, timeout) } diff --git a/rpc/json.go b/rpc/json.go index ee931bc87..a7053e3f5 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -30,49 +30,43 @@ import ( ) const ( - JSONRPCVersion = "2.0" + jsonrpcVersion = "2.0" serviceMethodSeparator = "_" subscribeMethod = "eth_subscribe" unsubscribeMethod = "eth_unsubscribe" notificationMethod = "eth_subscription" ) -// JSON-RPC request -type JSONRequest struct { +type jsonRequest struct { Method string `json:"method"` Version string `json:"jsonrpc"` Id json.RawMessage `json:"id,omitempty"` Payload json.RawMessage `json:"params,omitempty"` } -// JSON-RPC response -type JSONSuccessResponse struct { +type jsonSuccessResponse struct { Version string `json:"jsonrpc"` Id interface{} `json:"id,omitempty"` Result interface{} `json:"result"` } -// JSON-RPC error object -type JSONError struct { +type jsonError struct { Code int `json:"code"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } -// JSON-RPC error response -type JSONErrResponse struct { +type jsonErrResponse struct { Version string `json:"jsonrpc"` Id interface{} `json:"id,omitempty"` - Error JSONError `json:"error"` + Error jsonError `json:"error"` } -// JSON-RPC notification payload type jsonSubscription struct { Subscription string `json:"subscription"` Result interface{} `json:"result,omitempty"` } -// JSON-RPC notification type jsonNotification struct { Version string `json:"jsonrpc"` Method string `json:"method"` @@ -91,6 +85,17 @@ type jsonCodec struct { rw io.ReadWriteCloser // connection } +func (err *jsonError) Error() string { + if err.Message == "" { + return fmt.Sprintf("json-rpc error %d", err.Code) + } + return err.Message +} + +func (err *jsonError) ErrorCode() int { + return err.Code +} + // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { d := json.NewDecoder(rwc) @@ -113,7 +118,7 @@ func isBatch(msg json.RawMessage) bool { // ReadRequestHeaders will read new requests without parsing the arguments. It will // return a collection of requests, an indication if these requests are in batch // form or an error when the incoming message could not be read/parsed. -func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) { +func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) { c.decMu.Lock() defer c.decMu.Unlock() @@ -148,8 +153,8 @@ func checkReqId(reqId json.RawMessage) error { // parseRequest will parse a single request from the given RawMessage. It will return // the parsed request, an indication if the request was a batch or an error when // the request could not be parsed. -func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { - var in JSONRequest +func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { + var in jsonRequest if err := json.Unmarshal(incomingMsg, &in); err != nil { return nil, false, &invalidMessageError{err.Error()} } @@ -197,8 +202,8 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { // parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication // if the request was a batch or an error when the request could not be read. -func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) { - var in []JSONRequest +func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) { + var in []jsonRequest if err := json.Unmarshal(incomingMsg, &in); err != nil { return nil, false, &invalidMessageError{err.Error()} } @@ -253,7 +258,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCErro // ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed // values or an error when the parsing failed. -func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) { +func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error) { if args, ok := params.(json.RawMessage); !ok { return nil, &invalidParamsError{"Invalid params supplied"} } else { @@ -264,7 +269,7 @@ func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interf // parsePositionalArguments tries to parse the given args to an array of values with the given types. // It returns the parsed values or an error when the args could not be parsed. Missing optional arguments // are returned as reflect.Zero values. -func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, RPCError) { +func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) ([]reflect.Value, Error) { params := make([]interface{}, 0, len(callbackArgs)) for _, t := range callbackArgs { params = append(params, reflect.New(t).Interface()) @@ -302,31 +307,31 @@ func parsePositionalArguments(args json.RawMessage, callbackArgs []reflect.Type) // CreateResponse will create a JSON-RPC success response with the given id and reply as result. func (c *jsonCodec) CreateResponse(id interface{}, reply interface{}) interface{} { if isHexNum(reflect.TypeOf(reply)) { - return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} + return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)} } - return &JSONSuccessResponse{Version: JSONRPCVersion, Id: id, Result: reply} + return &jsonSuccessResponse{Version: jsonrpcVersion, Id: id, Result: reply} } // CreateErrorResponse will create a JSON-RPC error response with the given id and error. -func (c *jsonCodec) CreateErrorResponse(id interface{}, err RPCError) interface{} { - return &JSONErrResponse{Version: JSONRPCVersion, Id: id, Error: JSONError{Code: err.Code(), Message: err.Error()}} +func (c *jsonCodec) CreateErrorResponse(id interface{}, err Error) interface{} { + return &jsonErrResponse{Version: jsonrpcVersion, Id: id, Error: jsonError{Code: err.ErrorCode(), Message: err.Error()}} } // CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error. // info is optional and contains additional information about the error. When an empty string is passed it is ignored. -func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} { - return &JSONErrResponse{Version: JSONRPCVersion, Id: id, - Error: JSONError{Code: err.Code(), Message: err.Error(), Data: info}} +func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} { + return &jsonErrResponse{Version: jsonrpcVersion, Id: id, + Error: jsonError{Code: err.ErrorCode(), Message: err.Error(), Data: info}} } // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { if isHexNum(reflect.TypeOf(event)) { - return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} } - return &jsonNotification{Version: JSONRPCVersion, Method: notificationMethod, + return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, Params: jsonSubscription{Subscription: subid, Result: event}} } diff --git a/rpc/notification.go b/rpc/notification.go index e84e26a58..875433071 100644 --- a/rpc/notification.go +++ b/rpc/notification.go @@ -28,7 +28,7 @@ import ( var ( // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("notifications not supported") + ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport") // ErrNotificationNotFound is returned when the notification for the given id is not found ErrNotificationNotFound = errors.New("notification not found") diff --git a/rpc/notification_test.go b/rpc/notification_test.go index 1bcede177..280503222 100644 --- a/rpc/notification_test.go +++ b/rpc/notification_test.go @@ -19,20 +19,31 @@ package rpc import ( "encoding/json" "net" + "sync" "testing" "time" "golang.org/x/net/context" ) -type NotificationTestService struct{} +type NotificationTestService struct { + mu sync.Mutex + unsubscribed bool -var ( - unsubCallbackCalled = false -) + gotHangSubscriptionReq chan struct{} + unblockHangSubscription chan struct{} +} + +func (s *NotificationTestService) wasUnsubCallbackCalled() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.unsubscribed +} func (s *NotificationTestService) Unsubscribe(subid string) { - unsubCallbackCalled = true + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() } func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { @@ -60,6 +71,26 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i return subscription, nil } +// HangSubscription blocks on s.unblockHangSubscription before +// sending anything. +func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) { + notifier, supported := NotifierFromContext(ctx) + if !supported { + return nil, ErrNotificationsUnsupported + } + + s.gotHangSubscriptionReq <- struct{}{} + <-s.unblockHangSubscription + subscription, err := notifier.NewSubscription(s.Unsubscribe) + if err != nil { + return nil, err + } + go func() { + subscription.Notify(val) + }() + return subscription, nil +} + func TestNotifications(t *testing.T) { server := NewServer() service := &NotificationTestService{} @@ -90,7 +121,7 @@ func TestNotifications(t *testing.T) { } var subid string - response := JSONSuccessResponse{Result: subid} + response := jsonSuccessResponse{Result: subid} if err := in.Decode(&response); err != nil { t.Fatal(err) } @@ -114,7 +145,7 @@ func TestNotifications(t *testing.T) { clientConn.Close() // causes notification unsubscribe callback to be called time.Sleep(1 * time.Second) - if !unsubCallbackCalled { + if !service.wasUnsubCallbackCalled() { t.Error("unsubscribe callback not called after closing connection") } } diff --git a/rpc/server.go b/rpc/server.go index a9bdef285..040805a5c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -381,7 +381,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s // readRequest requests the next (batch) request from the codec. It will return the collection // of requests, an indication if the request was a batch, the invalid request identifier and an // error when the request could not be read/parsed. -func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { +func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { return nil, batch, err diff --git a/rpc/server_test.go b/rpc/server_test.go index de47e1afd..e6840bde4 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -21,6 +21,7 @@ import ( "net" "reflect" "testing" + "time" "golang.org/x/net/context" ) @@ -48,6 +49,13 @@ func (s *Service) EchoWithCtx(ctx context.Context, str string, i int, args *Args return Result{str, i, args} } +func (s *Service) Sleep(ctx context.Context, duration time.Duration) { + select { + case <-time.After(duration): + case <-ctx.Done(): + } +} + func (s *Service) Rets() (string, error) { return "", nil } @@ -85,8 +93,8 @@ func TestServerRegisterName(t *testing.T) { t.Fatalf("Expected service calc to be registered") } - if len(svc.callbacks) != 4 { - t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks)) + if len(svc.callbacks) != 5 { + t.Errorf("Expected 5 callbacks for service 'calc', got %d", len(svc.callbacks)) } if len(svc.subscriptions) != 1 { @@ -126,7 +134,7 @@ func testServerMethodExecution(t *testing.T, method string) { t.Fatal(err) } - response := JSONSuccessResponse{Result: &Result{}} + response := jsonSuccessResponse{Result: &Result{}} if err := in.Decode(&response); err != nil { t.Fatal(err) } diff --git a/rpc/types.go b/rpc/types.go index 460581715..2a7268ad8 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -62,7 +62,7 @@ type serverRequest struct { callb *callback args []reflect.Value isUnsubscribe bool - err RPCError + err Error } type serviceRegistry map[string]*service // collection of services @@ -88,15 +88,13 @@ type rpcRequest struct { id interface{} isPubSub bool params interface{} - err RPCError // invalid batch element + err Error // invalid batch element } -// RPCError implements RPC error, is add support for error codec over regular go errors -type RPCError interface { - // RPC error code - Code() int - // Error message - Error() string +// Error wraps RPC errors, which contain an error code in addition to the message. +type Error interface { + Error() string // returns the message + ErrorCode() int // returns the code } // ServerCodec implements reading, parsing and writing RPC messages for the server side of @@ -104,15 +102,15 @@ type RPCError interface { // multiple go-routines concurrently. type ServerCodec interface { // Read next request - ReadRequestHeaders() ([]rpcRequest, bool, RPCError) + ReadRequestHeaders() ([]rpcRequest, bool, Error) // Parse request argument to the given types - ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError) + ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error) // Assemble success response, expects response id and payload CreateResponse(interface{}, interface{}) interface{} // Assemble error response, expects response id and error - CreateErrorResponse(interface{}, RPCError) interface{} + CreateErrorResponse(interface{}, Error) interface{} // Assemble error response with extra information about the error through info - CreateErrorResponseWithInfo(id interface{}, err RPCError, info interface{}) interface{} + CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} // Create notification response CreateNotification(string, interface{}) interface{} // Write msg to client. @@ -274,14 +272,3 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { func (bn *BlockNumber) Int64() int64 { return (int64)(*bn) } - -// Client defines the interface for go client that wants to connect to a geth RPC endpoint -type Client interface { - // SupportedModules returns the collection of API's the server offers - SupportedModules() (map[string]string, error) - - Send(req interface{}) error - Recv(msg interface{}) error - - Close() -} diff --git a/rpc/utils.go b/rpc/utils.go index fe482e19d..1ac6698f5 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -20,7 +20,6 @@ import ( "crypto/rand" "encoding/hex" "errors" - "fmt" "math/big" "reflect" "unicode" @@ -227,31 +226,3 @@ func newSubscriptionID() (string, error) { } return "0x" + hex.EncodeToString(subid[:]), nil } - -// SupportedModules returns the collection of API's that the RPC server offers -// on which the given client connects. -func SupportedModules(client Client) (map[string]string, error) { - req := JSONRequest{ - Id: []byte("1"), - Version: "2.0", - Method: MetadataApi + "_modules", - } - if err := client.Send(req); err != nil { - return nil, err - } - - var response JSONSuccessResponse - if err := client.Recv(&response); err != nil { - return nil, err - } - if response.Result != nil { - mods := make(map[string]string) - if modules, ok := response.Result.(map[string]interface{}); ok { - for m, v := range modules { - mods[m] = fmt.Sprintf("%s", v) - } - return mods, nil - } - } - return nil, fmt.Errorf("unable to retrieve modules") -} diff --git a/rpc/websocket.go b/rpc/websocket.go index fe9354d94..fc3cd0709 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -17,36 +17,39 @@ package rpc import ( + "crypto/tls" "fmt" + "net" "net/http" + "net/url" "os" "strings" - "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" "golang.org/x/net/websocket" "gopkg.in/fatih/set.v0" ) -// wsReaderWriterCloser reads and write payloads from and to a websocket connection. -type wsReaderWriterCloser struct { - c *websocket.Conn +// WebsocketHandler returns a handler that serves JSON-RPC to WebSocket connections. +// +// allowedOrigins should be a comma-separated list of allowed origin URLs. +// To allow connections with any origin, pass "*". +func (srv *Server) WebsocketHandler(allowedOrigins string) http.Handler { + return websocket.Server{ + Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")), + Handler: func(conn *websocket.Conn) { + srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions) + }, + } } -// Read will read incoming payload data into p. -func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) { - return rw.c.Read(p) -} - -// Write writes p to the websocket. -func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) { - return rw.c.Write(p) -} - -// Close closes the websocket connection. -func (rw *wsReaderWriterCloser) Close() error { - return rw.c.Close() +// NewWSServer creates a new websocket RPC server around an API provider. +// +// Deprecated: use Server.WebsocketHandler +func NewWSServer(allowedOrigins string, srv *Server) *http.Server { + return &http.Server{Handler: srv.WebsocketHandler(allowedOrigins)} } // wsHandshakeValidator returns a handler that verifies the origin during the @@ -87,96 +90,63 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http return f } -// NewWSServer creates a new websocket RPC server around an API provider. -func NewWSServer(allowedOrigins string, handler *Server) *http.Server { - return &http.Server{ - Handler: websocket.Server{ - Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")), - Handler: func(conn *websocket.Conn) { - handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}), - OptionMethodInvocation|OptionSubscriptions) - }, - }, +// DialWebsocket creates a new RPC client that communicates with a JSON-RPC server +// that is listening on the given endpoint. +// +// The context is used for the initial connection establishment. It does not +// affect subsequent interactions with the client. +func DialWebsocket(ctx context.Context, endpoint, origin string) (*Client, error) { + if origin == "" { + var err error + if origin, err = os.Hostname(); err != nil { + return nil, err + } + if strings.HasPrefix(endpoint, "wss") { + origin = "https://" + strings.ToLower(origin) + } else { + origin = "http://" + strings.ToLower(origin) + } } -} - -// wsClient represents a RPC client that communicates over websockets with a -// RPC server. -type wsClient struct { - endpoint string - connMu sync.Mutex - conn *websocket.Conn -} - -// NewWSClientj creates a new RPC client that communicates with a RPC server -// that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (Client, error) { - return &wsClient{endpoint: endpoint}, nil -} - -// connection will return a websocket connection to the RPC server. It will -// (re)connect when necessary. -func (client *wsClient) connection() (*websocket.Conn, error) { - if client.conn != nil { - return client.conn, nil - } - - origin, err := os.Hostname() + config, err := websocket.NewConfig(endpoint, origin) if err != nil { return nil, err } - origin = "http://" + origin - client.conn, err = websocket.Dial(client.endpoint, "", origin) - - return client.conn, err + return newClient(ctx, func(ctx context.Context) (net.Conn, error) { + return wsDialContext(ctx, config) + }) } -// SupportedModules is the collection of modules the RPC server offers. -func (client *wsClient) SupportedModules() (map[string]string, error) { - return SupportedModules(client) +func wsDialContext(ctx context.Context, config *websocket.Config) (*websocket.Conn, error) { + var conn net.Conn + var err error + switch config.Location.Scheme { + case "ws": + conn, err = dialContext(ctx, "tcp", wsDialAddress(config.Location)) + case "wss": + dialer := contextDialer(ctx) + conn, err = tls.DialWithDialer(dialer, "tcp", wsDialAddress(config.Location), config.TlsConfig) + default: + err = websocket.ErrBadScheme + } + if err != nil { + return nil, err + } + ws, err := websocket.NewClient(config, conn) + if err != nil { + conn.Close() + return nil, err + } + return ws, err } -// Send writes the JSON serialized msg to the websocket. It will create a new -// websocket connection to the server if the client is currently not connected. -func (client *wsClient) Send(msg interface{}) (err error) { - client.connMu.Lock() - defer client.connMu.Unlock() +var wsPortMap = map[string]string{"ws": "80", "wss": "443"} - var conn *websocket.Conn - if conn, err = client.connection(); err == nil { - if err = websocket.JSON.Send(conn, msg); err != nil { - client.conn.Close() - client.conn = nil +func wsDialAddress(location *url.URL) string { + if _, ok := wsPortMap[location.Scheme]; ok { + if _, _, err := net.SplitHostPort(location.Host); err != nil { + return net.JoinHostPort(location.Host, wsPortMap[location.Scheme]) } } - - return err -} - -// Recv reads a JSON message from the websocket and unmarshals it into msg. -func (client *wsClient) Recv(msg interface{}) (err error) { - client.connMu.Lock() - defer client.connMu.Unlock() - - var conn *websocket.Conn - if conn, err = client.connection(); err == nil { - if err = websocket.JSON.Receive(conn, msg); err != nil { - client.conn.Close() - client.conn = nil - } - } - return -} - -// Close closes the underlaying websocket connection. -func (client *wsClient) Close() { - client.connMu.Lock() - defer client.connMu.Unlock() - - if client.conn != nil { - client.conn.Close() - client.conn = nil - } - + return location.Host }