From ead3dd9759c9cc8076ad716fe10cf641751b65b0 Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Thu, 16 Apr 2015 19:23:57 +0200 Subject: [PATCH] Stop accepted and alive connections (http keep-alive) when the rpc service is stopped --- rpc/http.go | 22 ++++++------- rpc/types.go | 91 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 71 insertions(+), 42 deletions(-) diff --git a/rpc/http.go b/rpc/http.go index 61f8da549..882aff7ea 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -14,7 +14,7 @@ import ( ) var rpclogger = logger.NewLogger("RPC") -var rpclistener *ControllableTCPListener +var rpclistener *StoppableTCPListener const ( jsonrpcver = "2.0" @@ -22,12 +22,14 @@ const ( ) func Start(pipe *xeth.XEth, config RpcConfig) error { - if rpclistener != nil { // listener already running - glog.Infoln("RPC listener already running") - return fmt.Errorf("RPC already running on %s", rpclistener.Addr().String()) + if rpclistener != nil { + if fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort) != rpclistener.Addr().String() { + return fmt.Errorf("RPC service already running on %s ", rpclistener.Addr().String()) + } + return nil // RPC service already running on given host/port } - l, err := NewControllableTCPListener(fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort)) + l, err := NewStoppableTCPListener(fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort)) if err != nil { rpclogger.Errorf("Can't listen on %s:%d: %v", config.ListenAddress, config.ListenPort, err) return err @@ -41,7 +43,7 @@ func Start(pipe *xeth.XEth, config RpcConfig) error { opts.AllowedOrigins = []string{config.CorsDomain} c := cors.New(opts) - handler = c.Handler(JSONRPC(pipe)) + handler = NewStoppableHandler(c.Handler(JSONRPC(pipe)), l.stop) } else { handler = JSONRPC(pipe) } @@ -52,13 +54,11 @@ func Start(pipe *xeth.XEth, config RpcConfig) error { } func Stop() error { - if rpclistener == nil { // listener not running - glog.Infoln("RPC listener not running") - return nil + if rpclistener != nil { + rpclistener.Stop() + rpclistener = nil } - rpclistener.Stop() - rpclistener = nil return nil } diff --git a/rpc/types.go b/rpc/types.go index c7dc2cc9a..b33621fef 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -25,8 +25,11 @@ import ( "errors" "net" + "net/http" "time" + "io" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) @@ -266,39 +269,64 @@ type ListenerStoppedError struct { msg string } -func (self ListenerStoppedError) Timout() bool { - return false -} - -func (self ListenerStoppedError) Temporary() bool { - return false -} - func (self ListenerStoppedError) Error() string { return self.msg } -type ControllableTCPListener struct { +var listenerStoppedError = ListenerStoppedError{"Listener stopped"} + +type StoppableTCPListener struct { *net.TCPListener - stop chan struct{} + stop *chan struct{} // closed when the listener must stop } -var listenerStoppedError ListenerStoppedError - -func (self *ControllableTCPListener) Stop() { - close(self.stop) +// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an +// error indicating that the service was stopped. This will only happen for connections which are +// kept open (HTTP keep-alive) when the RPC service was shutdown. +func NewStoppableHandler(h http.Handler, stop *chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-*stop: + w.Header().Set("Content-Type", "application/json") + jsonerr := &RpcErrorObject{-32603, "RPC service stopt"} + send(w, &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: nil, Error: jsonerr}) + default: + h.ServeHTTP(w, r) + } + }) } -func (self *ControllableTCPListener) Accept() (net.Conn, error) { +// Stop the listener and all accepted and still active connections. +func (self *StoppableTCPListener) Stop() { + close(*self.stop) +} + +func NewStoppableTCPListener(addr string) (*StoppableTCPListener, error) { + wl, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + if tcpl, ok := wl.(*net.TCPListener); ok { + stop := make(chan struct{}) + l := &StoppableTCPListener{tcpl, &stop} + return l, nil + } + + return nil, errors.New("Unable to create TCP listener for RPC service") +} + +func (self *StoppableTCPListener) Accept() (net.Conn, error) { for { - self.SetDeadline(time.Now().Add(time.Duration(500 * time.Millisecond))) + self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) c, err := self.TCPListener.AcceptTCP() select { - case <-self.stop: + case <-*self.stop: + c.Close() self.TCPListener.Close() return nil, listenerStoppedError - default: // keep on going + default: } if err != nil { @@ -307,20 +335,21 @@ func (self *ControllableTCPListener) Accept() (net.Conn, error) { } } - return c, err + return &ClosableConnection{c, self.stop}, err } } -func NewControllableTCPListener(addr string) (*ControllableTCPListener, error) { - wl, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - if tcpl, ok := wl.(*net.TCPListener); ok { - l := &ControllableTCPListener{tcpl, make(chan struct{})} - return l, nil - } - - return nil, errors.New("Unable to create TCP listener for RPC") +type ClosableConnection struct { + *net.TCPConn + closed *chan struct{} +} + +func (self *ClosableConnection) Read(b []byte) (n int, err error) { + select { + case <-*self.closed: + self.TCPConn.Close() + return 0, io.EOF + default: + return self.TCPConn.Read(b) + } }