Merge pull request #2168 from karalabe/move-rpc-into-node

cmd, common, node, rpc: move IPC into the node itself
This commit is contained in:
Péter Szilágyi 2016-02-05 11:33:24 +02:00
commit ba7c125153
14 changed files with 292 additions and 156 deletions

View File

@ -502,12 +502,7 @@ func startNode(ctx *cli.Context, stack *node.Node) {
unlockAccount(ctx, accman, trimmed, i, passwords) unlockAccount(ctx, accman, trimmed, i, passwords)
} }
} }
// Start auxiliary services if enabled. // Start auxiliary services if enabled
if !ctx.GlobalBool(utils.IPCDisabledFlag.Name) {
if err := utils.StartIPC(stack, ctx); err != nil {
utils.Fatalf("Failed to start IPC: %v", err)
}
}
if ctx.GlobalBool(utils.RPCEnabledFlag.Name) { if ctx.GlobalBool(utils.RPCEnabledFlag.Name) {
if err := utils.StartRPC(stack, ctx); err != nil { if err := utils.StartRPC(stack, ctx); err != nil {
utils.Fatalf("Failed to start RPC: %v", err) utils.Fatalf("Failed to start RPC: %v", err)

View File

@ -28,7 +28,7 @@ import (
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/gizak/termui" "github.com/gizak/termui"
) )
@ -36,7 +36,7 @@ import (
var ( var (
monitorCommandAttachFlag = cli.StringFlag{ monitorCommandAttachFlag = cli.StringFlag{
Name: "attach", Name: "attach",
Value: "ipc:" + common.DefaultIpcPath(), Value: "ipc:" + node.DefaultIpcEndpoint(),
Usage: "API endpoint to attach to", Usage: "API endpoint to attach to",
} }
monitorCommandRowsFlag = cli.IntFlag{ monitorCommandRowsFlag = cli.IntFlag{

View File

@ -18,25 +18,20 @@
package main package main
import ( import (
"errors"
"flag" "flag"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"runtime"
"errors"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/tests" "github.com/ethereum/go-ethereum/tests"
"github.com/ethereum/go-ethereum/whisper" "github.com/ethereum/go-ethereum/whisper"
) )
@ -89,11 +84,6 @@ func main() {
} }
log.Println("Initial test suite passed...") log.Println("Initial test suite passed...")
if err := StartIPC(stack); err != nil {
log.Fatalf("Failed to start IPC interface: %v\n", err)
}
log.Println("IPC Interface started, accepting requests...")
// Start the RPC interface and wait until terminated // Start the RPC interface and wait until terminated
if err := StartRPC(stack); err != nil { if err := StartRPC(stack); err != nil {
log.Fatalf("Failed to start RPC interface: %v", err) log.Fatalf("Failed to start RPC interface: %v", err)
@ -109,7 +99,7 @@ func main() {
// keystore path and initial pre-state. // keystore path and initial pre-state.
func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node.Node, error) { func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node.Node, error) {
// Create a networkless protocol stack // Create a networkless protocol stack
stack, err := node.New(&node.Config{NoDiscovery: true}) stack, err := node.New(&node.Config{IpcPath: node.DefaultIpcEndpoint(), NoDiscovery: true})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -194,50 +184,3 @@ func StartRPC(stack *node.Node) error {
glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API") glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API")
return errors.New("Unable to start RPC-HTTP interface") return errors.New("Unable to start RPC-HTTP interface")
} }
// StartIPC initializes an IPC interface to the given protocol stack.
func StartIPC(stack *node.Node) error {
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
return err
}
endpoint := `\\.\pipe\geth.ipc`
if runtime.GOOS != "windows" {
endpoint = filepath.Join(common.DefaultDataDir(), "geth.ipc")
}
listener, err := rpc.CreateIPCListener(endpoint)
if err != nil {
return err
}
server := rpc.NewServer()
// register package API's this node provides
offered := stack.APIs()
for _, api := range offered {
server.RegisterName(api.Namespace, api.Service)
glog.V(logger.Debug).Infof("Register %T under namespace '%s' for IPC service\n", api.Service, api.Namespace)
}
//var ethereum *eth.Ethereum
//if err := stack.Service(&ethereum); err != nil {
// return err
//}
go func() {
glog.V(logger.Info).Infof("Start IPC server on %s\n", endpoint)
for {
conn, err := listener.Accept()
if err != nil {
glog.V(logger.Error).Infof("Unable to accept connection - %v\n", err)
}
codec := rpc.NewJSONCodec(conn)
go server.ServeCodec(codec)
}
}()
return nil
}

View File

@ -150,10 +150,8 @@ func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
endpoint := ctx.Args().First() endpoint := ctx.Args().First()
return NewRemoteRPCClientFromString(endpoint) return NewRemoteRPCClientFromString(endpoint)
} }
// use IPC by default // use IPC by default
endpoint := IPCSocketPath(ctx) return rpc.NewIPCClient(node.DefaultIpcEndpoint())
return rpc.NewIPCClient(endpoint)
} }
// NewRemoteRPCClientFromString returns a RPC client which connects to the given // NewRemoteRPCClientFromString returns a RPC client which connects to the given

View File

@ -261,8 +261,8 @@ var (
} }
IPCPathFlag = DirectoryFlag{ IPCPathFlag = DirectoryFlag{
Name: "ipcpath", Name: "ipcpath",
Usage: "Filename for IPC socket/pipe", Usage: "Filename for IPC socket/pipe within the datadir (explicit paths escape it)",
Value: DirectoryString{common.DefaultIpcPath()}, Value: DirectoryString{common.DefaultIpcSocket()},
} }
WSEnabledFlag = cli.BoolFlag{ WSEnabledFlag = cli.BoolFlag{
Name: "ws", Name: "ws",
@ -394,6 +394,15 @@ func MustMakeDataDir(ctx *cli.Context) string {
return "" return ""
} }
// MakeIpcPath creates an IPC path configuration from the set command line flags,
// returning an empty string if IPC was explicitly disabled, or the set path.
func MakeIpcPath(ctx *cli.Context) string {
if ctx.GlobalBool(IPCDisabledFlag.Name) {
return ""
}
return ctx.GlobalString(IPCPathFlag.Name)
}
// MakeNodeKey creates a node key from set command line flags, either loading it // MakeNodeKey creates a node key from set command line flags, either loading it
// from a file or as a specified hex value. If neither flags were provided, this // from a file or as a specified hex value. If neither flags were provided, this
// method returns nil and an emphemeral key is to be generated. // method returns nil and an emphemeral key is to be generated.
@ -582,6 +591,7 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node.
// Configure the node's service container // Configure the node's service container
stackConf := &node.Config{ stackConf := &node.Config{
DataDir: MustMakeDataDir(ctx), DataDir: MustMakeDataDir(ctx),
IpcPath: MakeIpcPath(ctx),
PrivateKey: MakeNodeKey(ctx), PrivateKey: MakeNodeKey(ctx),
Name: MakeNodeName(name, version, ctx), Name: MakeNodeName(name, version, ctx),
NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name), NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name),
@ -734,63 +744,6 @@ func MakeChain(ctx *cli.Context) (chain *core.BlockChain, chainDb ethdb.Database
return chain, chainDb return chain, chainDb
} }
func IPCSocketPath(ctx *cli.Context) (ipcpath string) {
if runtime.GOOS == "windows" {
ipcpath = common.DefaultIpcPath()
if ctx.GlobalIsSet(IPCPathFlag.Name) {
ipcpath = ctx.GlobalString(IPCPathFlag.Name)
}
} else {
ipcpath = common.DefaultIpcPath()
if ctx.GlobalIsSet(DataDirFlag.Name) {
ipcpath = filepath.Join(ctx.GlobalString(DataDirFlag.Name), "geth.ipc")
}
if ctx.GlobalIsSet(IPCPathFlag.Name) {
ipcpath = ctx.GlobalString(IPCPathFlag.Name)
}
}
return
}
func StartIPC(stack *node.Node, ctx *cli.Context) error {
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
return err
}
endpoint := IPCSocketPath(ctx)
listener, err := rpc.CreateIPCListener(endpoint)
if err != nil {
return err
}
server := rpc.NewServer()
// register package API's this node provides
offered := stack.APIs()
for _, api := range offered {
server.RegisterName(api.Namespace, api.Service)
glog.V(logger.Debug).Infof("Register %T under namespace '%s' for IPC service\n", api.Service, api.Namespace)
}
go func() {
glog.V(logger.Info).Infof("Start IPC server on %s\n", endpoint)
for {
conn, err := listener.Accept()
if err != nil {
glog.V(logger.Error).Infof("Unable to accept connection - %v\n", err)
}
codec := rpc.NewJSONCodec(conn)
go server.ServeCodec(codec)
}
}()
return nil
}
// StartRPC starts a HTTP JSON-RPC API server. // StartRPC starts a HTTP JSON-RPC API server.
func StartRPC(stack *node.Node, ctx *cli.Context) error { func StartRPC(stack *node.Node, ctx *cli.Context) error {
for _, api := range stack.APIs() { for _, api := range stack.APIs() {

View File

@ -89,9 +89,8 @@ func DefaultDataDir() string {
return "" return ""
} }
func DefaultIpcPath() string { // DefaultIpcSocket returns the relative name of the default IPC socket. The path
if runtime.GOOS == "windows" { // resolution is done by a node with other contextual infos.
return `\\.\pipe\geth.ipc` func DefaultIpcSocket() string {
} return "geth.ipc"
return filepath.Join(DefaultDataDir(), "geth.ipc")
} }

View File

@ -23,7 +23,10 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
@ -49,6 +52,12 @@ type Config struct {
// in memory. // in memory.
DataDir string DataDir string
// IpcPath is the requested location to place the IPC endpoint. If the path is
// a simple file name, it is placed inside the data directory (or on the root
// pipe path on Windows), whereas if it's a resolvable path name (absolute or
// relative), then that specific path is enforced. An empty path disables IPC.
IpcPath string
// This field should be a valid secp256k1 private key that will be used for both // This field should be a valid secp256k1 private key that will be used for both
// remote peer identification as well as network traffic encryption. If no key // remote peer identification as well as network traffic encryption. If no key
// is configured, the preset one is loaded from the data dir, generating it if // is configured, the preset one is loaded from the data dir, generating it if
@ -90,6 +99,37 @@ type Config struct {
MaxPendingPeers int MaxPendingPeers int
} }
// IpcEndpoint resolves an IPC endpoint based on a configured value, taking into
// account the set data folders as well as the designated platform we're currently
// running on.
func (c *Config) IpcEndpoint() string {
// Short circuit if IPC has not been enabled
if c.IpcPath == "" {
return ""
}
// On windows we can only use plain top-level pipes
if runtime.GOOS == "windows" {
if strings.HasPrefix(c.IpcPath, `\\.\pipe\`) {
return c.IpcPath
}
return `\\.\pipe\` + c.IpcPath
}
// Resolve names into the data directory full paths otherwise
if filepath.Base(c.IpcPath) == c.IpcPath {
if c.DataDir == "" {
return filepath.Join(os.TempDir(), c.IpcPath)
}
return filepath.Join(c.DataDir, c.IpcPath)
}
return c.IpcPath
}
// DefaultIpcEndpoint returns the IPC path used by default.
func DefaultIpcEndpoint() string {
config := &Config{DataDir: common.DefaultDataDir(), IpcPath: common.DefaultIpcSocket()}
return config.IpcEndpoint()
}
// NodeKey retrieves the currently configured private key of the node, checking // NodeKey retrieves the currently configured private key of the node, checking
// first any manually set key, falling back to the one found in the configured // first any manually set key, falling back to the one found in the configured
// data folder. If no key can be found, a new one is generated. // data folder. If no key can be found, a new one is generated.

View File

@ -21,6 +21,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"testing" "testing"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -60,6 +61,37 @@ func TestDatadirCreation(t *testing.T) {
} }
} }
// Tests that IPC paths are correctly resolved to valid endpoints of different
// platforms.
func TestIpcPathResolution(t *testing.T) {
var tests = []struct {
DataDir string
IpcPath string
Windows bool
Endpoint string
}{
{"", "", false, ""},
{"data", "", false, ""},
{"", "geth.ipc", false, filepath.Join(os.TempDir(), "geth.ipc")},
{"data", "geth.ipc", false, "data/geth.ipc"},
{"data", "./geth.ipc", false, "./geth.ipc"},
{"data", "/geth.ipc", false, "/geth.ipc"},
{"", "", true, ``},
{"data", "", true, ``},
{"", "geth.ipc", true, `\\.\pipe\geth.ipc`},
{"data", "geth.ipc", true, `\\.\pipe\geth.ipc`},
{"data", `\\.\pipe\geth.ipc`, true, `\\.\pipe\geth.ipc`},
}
for i, test := range tests {
// Only run when platform/test match
if (runtime.GOOS == "windows") == test.Windows {
if endpoint := (&Config{DataDir: test.DataDir, IpcPath: test.IpcPath}).IpcEndpoint(); endpoint != test.Endpoint {
t.Errorf("test %d: IPC endpoint mismatch: have %s, want %s", i, endpoint, test.Endpoint)
}
}
}
}
// Tests that node keys can be correctly created, persisted, loaded and/or made // Tests that node keys can be correctly created, persisted, loaded and/or made
// ephemeral. // ephemeral.
func TestNodeKeyPersistency(t *testing.T) { func TestNodeKeyPersistency(t *testing.T) {

View File

@ -19,6 +19,7 @@ package node
import ( import (
"errors" "errors"
"net"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -27,6 +28,8 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -52,6 +55,10 @@ type Node struct {
serviceFuncs []ServiceConstructor // Service constructors (in dependency order) serviceFuncs []ServiceConstructor // Service constructors (in dependency order)
services map[reflect.Type]Service // Currently running services services map[reflect.Type]Service // Currently running services
ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled)
ipcListener net.Listener // IPC RPC listener socket to serve API requests
ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
stop chan struct{} // Channel to wait for termination notifications stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex lock sync.RWMutex
} }
@ -87,6 +94,7 @@ func New(conf *Config) (*Node, error) {
MaxPendingPeers: conf.MaxPendingPeers, MaxPendingPeers: conf.MaxPendingPeers,
}, },
serviceFuncs: []ServiceConstructor{}, serviceFuncs: []ServiceConstructor{},
ipcEndpoint: conf.IpcEndpoint(),
eventmux: new(event.TypeMux), eventmux: new(event.TypeMux),
}, nil }, nil
} }
@ -164,6 +172,14 @@ func (n *Node) Start() error {
// Mark the service started for potential cleanup // Mark the service started for potential cleanup
started = append(started, kind) started = append(started, kind)
} }
// Lastly start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
}
running.Stop()
return err
}
// Finish initializing the startup // Finish initializing the startup
n.services = services n.services = services
n.server = running n.server = running
@ -172,6 +188,58 @@ func (n *Node) Start() error {
return nil return nil
} }
// startRPC initializes and starts the IPC RPC endpoints.
func (n *Node) startRPC(services map[reflect.Type]Service) error {
// Gather and register all the APIs exposed by the services
apis := n.apis()
for _, service := range services {
apis = append(apis, service.APIs()...)
}
ipcHandler := rpc.NewServer()
for _, api := range apis {
if err := ipcHandler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
glog.V(logger.Debug).Infof("Register %T under namespace '%s'", api.Service, api.Namespace)
}
// All APIs registered, start the IPC and HTTP listeners
var (
ipcListener net.Listener
err error
)
if n.ipcEndpoint != "" {
if ipcListener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
return err
}
go func() {
glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint)
defer glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint)
for {
conn, err := ipcListener.Accept()
if err != nil {
// Terminate if the listener was closed
n.lock.RLock()
closed := n.ipcListener == nil
n.lock.RUnlock()
if closed {
return
}
// Not closed, just some error; report and continue
glog.V(logger.Error).Infof("IPC accept failed: %v", err)
continue
}
go ipcHandler.ServeCodec(rpc.NewJSONCodec(conn))
}
}()
}
// All listeners booted successfully
n.ipcListener = ipcListener
n.ipcHandler = ipcHandler
return nil
}
// Stop terminates a running node along with all it's services. In the node was // Stop terminates a running node along with all it's services. In the node was
// not started, an error is returned. // not started, an error is returned.
func (n *Node) Stop() error { func (n *Node) Stop() error {
@ -182,7 +250,15 @@ func (n *Node) Stop() error {
if n.server == nil { if n.server == nil {
return ErrNodeStopped return ErrNodeStopped
} }
// Otherwise terminate all the services and the P2P server too // Otherwise terminate the API, all services and the P2P server too
if n.ipcListener != nil {
n.ipcListener.Close()
n.ipcListener = nil
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
n.ipcHandler = nil
}
failure := &StopError{ failure := &StopError{
Services: make(map[reflect.Type]error), Services: make(map[reflect.Type]error),
} }
@ -261,18 +337,20 @@ func (n *Node) DataDir() string {
return n.datadir return n.datadir
} }
// IpcEndpoint retrieves the current IPC endpoint used by the protocol stack.
func (n *Node) IpcEndpoint() string {
return n.ipcEndpoint
}
// EventMux retrieves the event multiplexer used by all the network services in // EventMux retrieves the event multiplexer used by all the network services in
// the current protocol stack. // the current protocol stack.
func (n *Node) EventMux() *event.TypeMux { func (n *Node) EventMux() *event.TypeMux {
return n.eventmux return n.eventmux
} }
// APIs returns the collection of RPC descriptor this node offers. This method // apis returns the collection of RPC descriptors this node offers.
// is just a quick placeholder passthrough for the RPC update, which in the next func (n *Node) apis() []rpc.API {
// step will be fully integrated into the node itself. return []rpc.API{
func (n *Node) APIs() []rpc.API {
// Define all the APIs owned by the node itself
apis := []rpc.API{
{ {
Namespace: "admin", Namespace: "admin",
Version: "1.0", Version: "1.0",
@ -298,7 +376,13 @@ func (n *Node) APIs() []rpc.API {
Public: true, Public: true,
}, },
} }
// Inject all the APIs owned by various services }
// APIs returns the collection of RPC descriptor this node offers. This method
// is just a quick placeholder passthrough for the RPC update, which in the next
// step will be fully integrated into the node itself.
func (n *Node) APIs() []rpc.API {
apis := n.apis()
for _, api := range n.services { for _, api := range n.services {
apis = append(apis, api.APIs()...) apis = append(apis, api.APIs()...)
} }

View File

@ -31,6 +31,7 @@ import (
// //
// The following methods are needed to implement a node.Service: // The following methods are needed to implement a node.Service:
// - Protocols() []p2p.Protocol - devp2p protocols the service can communicate on // - Protocols() []p2p.Protocol - devp2p protocols the service can communicate on
// - APIs() []rpc.API - api methods the service wants to expose on rpc channels
// - Start() error - method invoked when the node is ready to start the service // - Start() error - method invoked when the node is ready to start the service
// - Stop() error - method invoked when the node terminates the service // - Stop() error - method invoked when the node terminates the service
type SampleService struct{} type SampleService struct{}

View File

@ -18,27 +18,34 @@ package node
import ( import (
"errors" "errors"
"fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
) )
var ( var (
testNodeKey, _ = crypto.GenerateKey() testNodeKey, _ = crypto.GenerateKey()
)
testNodeConfig = &Config{ func testNodeConfig() *Config {
return &Config{
IpcPath: fmt.Sprintf("test-%d.ipc", rand.Int63()),
PrivateKey: testNodeKey, PrivateKey: testNodeKey,
Name: "test node", Name: "test node",
} }
) }
// Tests that an empty protocol stack can be started, restarted and stopped. // Tests that an empty protocol stack can be started, restarted and stopped.
func TestNodeLifeCycle(t *testing.T) { func TestNodeLifeCycle(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -101,7 +108,7 @@ func TestNodeUsedDataDir(t *testing.T) {
// Tests whether services can be registered and duplicates caught. // Tests whether services can be registered and duplicates caught.
func TestServiceRegistry(t *testing.T) { func TestServiceRegistry(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -133,7 +140,7 @@ func TestServiceRegistry(t *testing.T) {
// Tests that registered services get started and stopped correctly. // Tests that registered services get started and stopped correctly.
func TestServiceLifeCycle(t *testing.T) { func TestServiceLifeCycle(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -183,7 +190,7 @@ func TestServiceLifeCycle(t *testing.T) {
// Tests that services are restarted cleanly as new instances. // Tests that services are restarted cleanly as new instances.
func TestServiceRestarts(t *testing.T) { func TestServiceRestarts(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -231,7 +238,7 @@ func TestServiceRestarts(t *testing.T) {
// Tests that if a service fails to initialize itself, none of the other services // Tests that if a service fails to initialize itself, none of the other services
// will be allowed to even start. // will be allowed to even start.
func TestServiceConstructionAbortion(t *testing.T) { func TestServiceConstructionAbortion(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -278,7 +285,7 @@ func TestServiceConstructionAbortion(t *testing.T) {
// Tests that if a service fails to start, all others started before it will be // Tests that if a service fails to start, all others started before it will be
// shut down. // shut down.
func TestServiceStartupAbortion(t *testing.T) { func TestServiceStartupAbortion(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -331,7 +338,7 @@ func TestServiceStartupAbortion(t *testing.T) {
// Tests that even if a registered service fails to shut down cleanly, it does // Tests that even if a registered service fails to shut down cleanly, it does
// not influece the rest of the shutdown invocations. // not influece the rest of the shutdown invocations.
func TestServiceTerminationGuarantee(t *testing.T) { func TestServiceTerminationGuarantee(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -406,7 +413,7 @@ func TestServiceTerminationGuarantee(t *testing.T) {
// TestServiceRetrieval tests that individual services can be retrieved. // TestServiceRetrieval tests that individual services can be retrieved.
func TestServiceRetrieval(t *testing.T) { func TestServiceRetrieval(t *testing.T) {
// Create a simple stack and register two service types // Create a simple stack and register two service types
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -441,7 +448,7 @@ func TestServiceRetrieval(t *testing.T) {
// Tests that all protocols defined by individual services get launched. // Tests that all protocols defined by individual services get launched.
func TestProtocolGather(t *testing.T) { func TestProtocolGather(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }
@ -494,3 +501,75 @@ func TestProtocolGather(t *testing.T) {
} }
} }
} }
// Tests that all APIs defined by individual services get exposed.
func TestAPIGather(t *testing.T) {
stack, err := New(testNodeConfig())
if err != nil {
t.Fatalf("failed to create protocol stack: %v", err)
}
// Register a batch of services with some configured APIs
calls := make(chan string, 1)
services := map[string]struct {
APIs []rpc.API
Maker InstrumentingWrapper
}{
"Zero APIs": {[]rpc.API{}, InstrumentedServiceMakerA},
"Single API": {[]rpc.API{
{"single", "1", &OneMethodApi{fun: func() { calls <- "single.v1" }}, true},
}, InstrumentedServiceMakerB},
"Many APIs": {[]rpc.API{
{"multi", "1", &OneMethodApi{fun: func() { calls <- "multi.v1" }}, true},
{"multi.v2", "2", &OneMethodApi{fun: func() { calls <- "multi.v2" }}, true},
{"multi.v2.nested", "2", &OneMethodApi{fun: func() { calls <- "multi.v2.nested" }}, true},
}, InstrumentedServiceMakerC},
}
for id, config := range services {
config := config
constructor := func(*ServiceContext) (Service, error) {
return &InstrumentedService{apis: config.APIs}, nil
}
if err := stack.Register(config.Maker(constructor)); err != nil {
t.Fatalf("service %s: registration failed: %v", id, err)
}
}
// Start the services and ensure all API start successfully
if err := stack.Start(); err != nil {
t.Fatalf("failed to start protocol stack: %v", err)
}
defer stack.Stop()
// Connect to the RPC server and verify the various registered endpoints
ipcClient, err := rpc.NewIPCClient(stack.IpcEndpoint())
if err != nil {
t.Fatalf("failed to connect to the IPC API server: %v", err)
}
tests := []struct {
Method string
Result string
}{
{"single_theOneMethod", "single.v1"},
{"multi_theOneMethod", "multi.v1"},
{"multi.v2_theOneMethod", "multi.v2"},
{"multi.v2.nested_theOneMethod", "multi.v2.nested"},
}
for i, test := range tests {
if err := ipcClient.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil {
t.Fatalf("test %d: failed to send API request: %v", i, err)
}
reply := new(rpc.JSONSuccessResponse)
if err := ipcClient.Recv(reply); err != nil {
t.Fatalf("test %d: failed to read API reply: %v", i, err)
}
select {
case result := <-calls:
if result != test.Result {
t.Errorf("test %d: result mismatch: have %s, want %s", i, result, test.Result)
}
case <-time.After(time.Second):
t.Fatalf("test %d: rpc execution timeout", i)
}
}
}

View File

@ -63,7 +63,7 @@ func TestContextDatabases(t *testing.T) {
// Tests that already constructed services can be retrieves by later ones. // Tests that already constructed services can be retrieves by later ones.
func TestContextServices(t *testing.T) { func TestContextServices(t *testing.T) {
stack, err := New(testNodeConfig) stack, err := New(testNodeConfig())
if err != nil { if err != nil {
t.Fatalf("failed to create protocol stack: %v", err) t.Fatalf("failed to create protocol stack: %v", err)
} }

View File

@ -52,6 +52,7 @@ func NewNoopServiceD(*ServiceContext) (Service, error) { return new(NoopServiceD
// methods can be instrumented both return value as well as event hook wise. // methods can be instrumented both return value as well as event hook wise.
type InstrumentedService struct { type InstrumentedService struct {
protocols []p2p.Protocol protocols []p2p.Protocol
apis []rpc.API
start error start error
stop error stop error
@ -70,7 +71,7 @@ func (s *InstrumentedService) Protocols() []p2p.Protocol {
} }
func (s *InstrumentedService) APIs() []rpc.API { func (s *InstrumentedService) APIs() []rpc.API {
return nil return s.apis
} }
func (s *InstrumentedService) Start(server *p2p.Server) error { func (s *InstrumentedService) Start(server *p2p.Server) error {
@ -121,3 +122,14 @@ func InstrumentedServiceMakerB(base ServiceConstructor) ServiceConstructor {
func InstrumentedServiceMakerC(base ServiceConstructor) ServiceConstructor { func InstrumentedServiceMakerC(base ServiceConstructor) ServiceConstructor {
return InstrumentingWrapperMaker(base, reflect.TypeOf(InstrumentedServiceC{})) return InstrumentingWrapperMaker(base, reflect.TypeOf(InstrumentedServiceC{}))
} }
// OneMethodApi is a single-method API handler to be returned by test services.
type OneMethodApi struct {
fun func()
}
func (api *OneMethodApi) TheOneMethod() {
if api.fun != nil {
api.fun()
}
}

View File

@ -41,7 +41,7 @@ type JSONRequest struct {
Method string `json:"method"` Method string `json:"method"`
Version string `json:"jsonrpc"` Version string `json:"jsonrpc"`
Id *int64 `json:"id,omitempty"` Id *int64 `json:"id,omitempty"`
Payload json.RawMessage `json:"params"` Payload json.RawMessage `json:"params,omitempty"`
} }
// JSON-RPC response // JSON-RPC response