// Copyright 2020 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package node import ( "compress/gzip" "context" "fmt" "io" "net" "net/http" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/rs/cors" ) // httpConfig is the JSON-RPC/HTTP configuration. type httpConfig struct { Modules []string CorsAllowedOrigins []string Vhosts []string prefix string // path prefix on which to mount http handler rpcEndpointConfig } // wsConfig is the JSON-RPC/Websocket configuration type wsConfig struct { Origins []string Modules []string prefix string // path prefix on which to mount ws handler rpcEndpointConfig } type rpcEndpointConfig struct { jwtSecret []byte // optional JWT secret batchItemLimit int batchResponseSizeLimit int httpBodyLimit int } type rpcHandler struct { http.Handler server *rpc.Server } type httpServer struct { log log.Logger timeouts rpc.HTTPTimeouts mux http.ServeMux // registered handlers go here mu sync.Mutex server *http.Server listener net.Listener // non-nil when server is running // HTTP RPC handler things. httpConfig httpConfig httpHandler atomic.Value // *rpcHandler // WebSocket handler things. wsConfig wsConfig wsHandler atomic.Value // *rpcHandler // These are set by setListenAddr. endpoint string host string port int handlerNames map[string]string } const ( shutdownTimeout = 5 * time.Second ) func newHTTPServer(log log.Logger, timeouts rpc.HTTPTimeouts) *httpServer { h := &httpServer{log: log, timeouts: timeouts, handlerNames: make(map[string]string)} h.httpHandler.Store((*rpcHandler)(nil)) h.wsHandler.Store((*rpcHandler)(nil)) return h } // setListenAddr configures the listening address of the server. // The address can only be set while the server isn't running. func (h *httpServer) setListenAddr(host string, port int) error { h.mu.Lock() defer h.mu.Unlock() if h.listener != nil && (host != h.host || port != h.port) { return fmt.Errorf("HTTP server already running on %s", h.endpoint) } h.host, h.port = host, port h.endpoint = net.JoinHostPort(host, fmt.Sprintf("%d", port)) return nil } // listenAddr returns the listening address of the server. func (h *httpServer) listenAddr() string { h.mu.Lock() defer h.mu.Unlock() if h.listener != nil { return h.listener.Addr().String() } return h.endpoint } // start starts the HTTP server if it is enabled and not already running. func (h *httpServer) start() error { h.mu.Lock() defer h.mu.Unlock() if h.endpoint == "" || h.listener != nil { return nil // already running or not configured } // Initialize the server. h.server = &http.Server{Handler: h} if h.timeouts != (rpc.HTTPTimeouts{}) { CheckTimeouts(&h.timeouts) h.server.ReadTimeout = h.timeouts.ReadTimeout h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout h.server.WriteTimeout = h.timeouts.WriteTimeout h.server.IdleTimeout = h.timeouts.IdleTimeout } // Start the server. listener, err := net.Listen("tcp", h.endpoint) if err != nil { // If the server fails to start, we need to clear out the RPC and WS // configuration so they can be configured another time. h.disableRPC() h.disableWS() return err } h.listener = listener go h.server.Serve(listener) if h.wsAllowed() { url := fmt.Sprintf("ws://%v", listener.Addr()) if h.wsConfig.prefix != "" { url += h.wsConfig.prefix } h.log.Info("WebSocket enabled", "url", url) } // if server is websocket only, return after logging if !h.rpcAllowed() { return nil } // Log http endpoint. h.log.Info("HTTP server started", "endpoint", listener.Addr(), "auth", (h.httpConfig.jwtSecret != nil), "prefix", h.httpConfig.prefix, "cors", strings.Join(h.httpConfig.CorsAllowedOrigins, ","), "vhosts", strings.Join(h.httpConfig.Vhosts, ","), ) // Log all handlers mounted on server. var paths []string for path := range h.handlerNames { paths = append(paths, path) } sort.Strings(paths) logged := make(map[string]bool, len(paths)) for _, path := range paths { name := h.handlerNames[path] if !logged[name] { log.Info(name+" enabled", "url", "http://"+listener.Addr().String()+path) logged[name] = true } } return nil } func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { // check if ws request and serve if ws enabled ws := h.wsHandler.Load().(*rpcHandler) if ws != nil && isWebsocket(r) { if checkPath(r, h.wsConfig.prefix) { ws.ServeHTTP(w, r) } return } // if http-rpc is enabled, try to serve request rpc := h.httpHandler.Load().(*rpcHandler) if rpc != nil { // First try to route in the mux. // Requests to a path below root are handled by the mux, // which has all the handlers registered via Node.RegisterHandler. // These are made available when RPC is enabled. muxHandler, pattern := h.mux.Handler(r) if pattern != "" { muxHandler.ServeHTTP(w, r) return } if checkPath(r, h.httpConfig.prefix) { rpc.ServeHTTP(w, r) return } } w.WriteHeader(http.StatusNotFound) } // checkPath checks whether a given request URL matches a given path prefix. func checkPath(r *http.Request, path string) bool { // if no prefix has been specified, request URL must be on root if path == "" { return r.URL.Path == "/" } // otherwise, check to make sure prefix matches return len(r.URL.Path) >= len(path) && r.URL.Path[:len(path)] == path } // validatePrefix checks if 'path' is a valid configuration value for the RPC prefix option. func validatePrefix(what, path string) error { if path == "" { return nil } if path[0] != '/' { return fmt.Errorf(`%s RPC path prefix %q does not contain leading "/"`, what, path) } if strings.ContainsAny(path, "?#") { // This is just to avoid confusion. While these would match correctly (i.e. they'd // match if URL-escaped into path), it's not easy to understand for users when // setting that on the command line. return fmt.Errorf("%s RPC path prefix %q contains URL meta-characters", what, path) } return nil } // stop shuts down the HTTP server. func (h *httpServer) stop() { h.mu.Lock() defer h.mu.Unlock() h.doStop() } func (h *httpServer) doStop() { if h.listener == nil { return // not running } // Shut down the server. httpHandler := h.httpHandler.Load().(*rpcHandler) wsHandler := h.wsHandler.Load().(*rpcHandler) if httpHandler != nil { h.httpHandler.Store((*rpcHandler)(nil)) httpHandler.server.Stop() } if wsHandler != nil { h.wsHandler.Store((*rpcHandler)(nil)) wsHandler.server.Stop() } ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() err := h.server.Shutdown(ctx) if err != nil && err == ctx.Err() { h.log.Warn("HTTP server graceful shutdown timed out") h.server.Close() } h.listener.Close() h.log.Info("HTTP server stopped", "endpoint", h.listener.Addr()) // Clear out everything to allow re-configuring it later. h.host, h.port, h.endpoint = "", 0, "" h.server, h.listener = nil, nil } // enableRPC turns on JSON-RPC over HTTP on the server. func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { h.mu.Lock() defer h.mu.Unlock() if h.rpcAllowed() { return fmt.Errorf("JSON-RPC over HTTP is already enabled") } // Create RPC server and handler. srv := rpc.NewServer() srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if config.httpBodyLimit > 0 { srv.SetHTTPBodyLimit(config.httpBodyLimit) } if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } h.httpConfig = config h.httpHandler.Store(&rpcHandler{ Handler: NewHTTPHandlerStack(srv, config.CorsAllowedOrigins, config.Vhosts, config.jwtSecret), server: srv, }) return nil } // disableRPC stops the HTTP RPC handler. This is internal, the caller must hold h.mu. func (h *httpServer) disableRPC() bool { handler := h.httpHandler.Load().(*rpcHandler) if handler != nil { h.httpHandler.Store((*rpcHandler)(nil)) handler.server.Stop() } return handler != nil } // enableWS turns on JSON-RPC over WebSocket on the server. func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error { h.mu.Lock() defer h.mu.Unlock() if h.wsAllowed() { return fmt.Errorf("JSON-RPC over WebSocket is already enabled") } // Create RPC server and handler. srv := rpc.NewServer() srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit) if config.httpBodyLimit > 0 { srv.SetHTTPBodyLimit(config.httpBodyLimit) } if err := RegisterApis(apis, config.Modules, srv); err != nil { return err } h.wsConfig = config h.wsHandler.Store(&rpcHandler{ Handler: NewWSHandlerStack(srv.WebsocketHandler(config.Origins), config.jwtSecret), server: srv, }) return nil } // stopWS disables JSON-RPC over WebSocket and also stops the server if it only serves WebSocket. func (h *httpServer) stopWS() { h.mu.Lock() defer h.mu.Unlock() if h.disableWS() { if !h.rpcAllowed() { h.doStop() } } } // disableWS disables the WebSocket handler. This is internal, the caller must hold h.mu. func (h *httpServer) disableWS() bool { ws := h.wsHandler.Load().(*rpcHandler) if ws != nil { h.wsHandler.Store((*rpcHandler)(nil)) ws.server.Stop() } return ws != nil } // rpcAllowed returns true when JSON-RPC over HTTP is enabled. func (h *httpServer) rpcAllowed() bool { return h.httpHandler.Load().(*rpcHandler) != nil } // wsAllowed returns true when JSON-RPC over WebSocket is enabled. func (h *httpServer) wsAllowed() bool { return h.wsHandler.Load().(*rpcHandler) != nil } // isWebsocket checks the header of an http request for a websocket upgrade request. func isWebsocket(r *http.Request) bool { return strings.EqualFold(r.Header.Get("Upgrade"), "websocket") && strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") } // NewHTTPHandlerStack returns wrapped http-related handlers func NewHTTPHandlerStack(srv http.Handler, cors []string, vhosts []string, jwtSecret []byte) http.Handler { // Wrap the CORS-handler within a host-handler handler := newCorsHandler(srv, cors) handler = newVHostHandler(vhosts, handler) if len(jwtSecret) != 0 { handler = newJWTHandler(jwtSecret, handler) } return newGzipHandler(handler) } // NewWSHandlerStack returns a wrapped ws-related handler. func NewWSHandlerStack(srv http.Handler, jwtSecret []byte) http.Handler { if len(jwtSecret) != 0 { return newJWTHandler(jwtSecret, srv) } return srv } func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler { // disable CORS support if user has not specified a custom CORS configuration if len(allowedOrigins) == 0 { return srv } c := cors.New(cors.Options{ AllowedOrigins: allowedOrigins, AllowedMethods: []string{http.MethodPost, http.MethodGet}, AllowedHeaders: []string{"*"}, MaxAge: 600, }) return c.Handler(srv) } // virtualHostHandler is a handler which validates the Host-header of incoming requests. // Using virtual hosts can help prevent DNS rebinding attacks, where a 'random' domain name points to // the service ip address (but without CORS headers). By verifying the targeted virtual host, we can // ensure that it's a destination that the node operator has defined. type virtualHostHandler struct { vhosts map[string]struct{} next http.Handler } func newVHostHandler(vhosts []string, next http.Handler) http.Handler { vhostMap := make(map[string]struct{}) for _, allowedHost := range vhosts { vhostMap[strings.ToLower(allowedHost)] = struct{}{} } return &virtualHostHandler{vhostMap, next} } // ServeHTTP serves JSON-RPC requests over HTTP, implements http.Handler func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // if r.Host is not set, we can continue serving since a browser would set the Host header if r.Host == "" { h.next.ServeHTTP(w, r) return } host, _, err := net.SplitHostPort(r.Host) if err != nil { // Either invalid (too many colons) or no port specified host = r.Host } if ipAddr := net.ParseIP(host); ipAddr != nil { // It's an IP address, we can serve that h.next.ServeHTTP(w, r) return } // Not an IP address, but a hostname. Need to validate if _, exist := h.vhosts["*"]; exist { h.next.ServeHTTP(w, r) return } if _, exist := h.vhosts[host]; exist { h.next.ServeHTTP(w, r) return } http.Error(w, "invalid host specified", http.StatusForbidden) } var gzPool = sync.Pool{ New: func() interface{} { w := gzip.NewWriter(io.Discard) return w }, } type gzipResponseWriter struct { resp http.ResponseWriter gz *gzip.Writer contentLength uint64 // total length of the uncompressed response written uint64 // amount of written bytes from the uncompressed response hasLength bool // true if uncompressed response had Content-Length inited bool // true after init was called for the first time } // init runs just before response headers are written. Among other things, this function // also decides whether compression will be applied at all. func (w *gzipResponseWriter) init() { if w.inited { return } w.inited = true hdr := w.resp.Header() length := hdr.Get("content-length") if len(length) > 0 { if n, err := strconv.ParseUint(length, 10, 64); err != nil { w.hasLength = true w.contentLength = n } } // Setting Transfer-Encoding to "identity" explicitly disables compression. net/http // also recognizes this header value and uses it to disable "chunked" transfer // encoding, trimming the header from the response. This means downstream handlers can // set this without harm, even if they aren't wrapped by newGzipHandler. // // In go-ethereum, we use this signal to disable compression for certain error // responses which are flushed out close to the write deadline of the response. For // these cases, we want to avoid chunked transfer encoding and compression because // they require additional output that may not get written in time. passthrough := hdr.Get("transfer-encoding") == "identity" if !passthrough { w.gz = gzPool.Get().(*gzip.Writer) w.gz.Reset(w.resp) hdr.Del("content-length") hdr.Set("content-encoding", "gzip") } } func (w *gzipResponseWriter) Header() http.Header { return w.resp.Header() } func (w *gzipResponseWriter) WriteHeader(status int) { w.init() w.resp.WriteHeader(status) } func (w *gzipResponseWriter) Write(b []byte) (int, error) { w.init() if w.gz == nil { // Compression is disabled. return w.resp.Write(b) } n, err := w.gz.Write(b) w.written += uint64(n) if w.hasLength && w.written >= w.contentLength { // The HTTP handler has finished writing the entire uncompressed response. Close // the gzip stream to ensure the footer will be seen by the client in case the // response is flushed after this call to write. err = w.gz.Close() } return n, err } func (w *gzipResponseWriter) Flush() { if w.gz != nil { w.gz.Flush() } if f, ok := w.resp.(http.Flusher); ok { f.Flush() } } func (w *gzipResponseWriter) close() { if w.gz == nil { return } w.gz.Close() gzPool.Put(w.gz) w.gz = nil } func newGzipHandler(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { next.ServeHTTP(w, r) return } wrapper := &gzipResponseWriter{resp: w} defer wrapper.close() next.ServeHTTP(wrapper, r) }) } type ipcServer struct { log log.Logger endpoint string mu sync.Mutex listener net.Listener srv *rpc.Server } func newIPCServer(log log.Logger, endpoint string) *ipcServer { return &ipcServer{log: log, endpoint: endpoint} } // Start starts the httpServer's http.Server func (is *ipcServer) start(apis []rpc.API) error { is.mu.Lock() defer is.mu.Unlock() if is.listener != nil { return nil // already running } listener, srv, err := rpc.StartIPCEndpoint(is.endpoint, apis) if err != nil { is.log.Warn("IPC opening failed", "url", is.endpoint, "error", err) return err } is.log.Info("IPC endpoint opened", "url", is.endpoint) is.listener, is.srv = listener, srv return nil } func (is *ipcServer) stop() error { is.mu.Lock() defer is.mu.Unlock() if is.listener == nil { return nil // not running } err := is.listener.Close() is.srv.Stop() is.listener, is.srv = nil, nil is.log.Info("IPC endpoint closed", "url", is.endpoint) return err } // RegisterApis checks the given modules' availability, generates an allowlist based on the allowed modules, // and then registers all of the APIs exposed by the services. func RegisterApis(apis []rpc.API, modules []string, srv *rpc.Server) error { if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available) } // Generate the allow list based on the allowed modules allowList := make(map[string]bool) for _, module := range modules { allowList[module] = true } // Register all the APIs exposed by the services for _, api := range apis { if allowList[api.Namespace] || len(allowList) == 0 { if err := srv.RegisterName(api.Namespace, api.Service); err != nil { return err } } } return nil }