diff --git a/cmd/serve.go b/cmd/serve.go index 03508b65..0e1aeb52 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -27,6 +27,8 @@ import ( "github.com/vulcanize/ipld-eth-indexer/pkg/eth" + srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" + s "github.com/vulcanize/ipld-eth-server/pkg/serve" v "github.com/vulcanize/ipld-eth-server/version" ) @@ -78,17 +80,18 @@ func serve() { func startServers(server s.Server, settings *s.Config) error { logWithCommand.Info("starting up IPC server") - _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) + _, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) if err != nil { return err } logWithCommand.Info("starting up WS server") - _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) + _, _, err = srpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) if err != nil { return err } logWithCommand.Info("starting up HTTP server") - _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + _, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + return err } diff --git a/pkg/prom/middleware.go b/pkg/prom/middleware.go new file mode 100644 index 00000000..741fed9a --- /dev/null +++ b/pkg/prom/middleware.go @@ -0,0 +1,35 @@ +package prom + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" +) + +func HTTPMiddleware(next http.Handler) http.Handler { + if !metrics { + return next + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + httpCount.Inc() + + timer := prometheus.NewTimer(httpDuration) + next.ServeHTTP(w, r) + timer.ObserveDuration() + }) +} + +func WSMiddleware(next http.Handler) http.Handler { + if !metrics { + return next + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wsCount.Inc() + + timer := prometheus.NewTimer(wsDuration) + next.ServeHTTP(w, r) + timer.ObserveDuration() + }) +} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index ebf95c86..828ed160 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -3,20 +3,72 @@ package prom import ( "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" ) const ( - namespace = "ipld_eth_server" - statsSubsystem = "stats" + namespace = "ipld_eth_server" + + subsystemHTTP = "http" + subsystemWS = "ws" + subsystemIPC = "ipc" ) var ( metrics bool + + httpCount prometheus.Counter + httpDuration prometheus.Histogram + + wsCount prometheus.Counter + wsDuration prometheus.Histogram + + ipcCount prometheus.Counter + ipcDuration prometheus.Gauge ) // Init module initialization func Init() { metrics = true + + httpCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystemHTTP, + Name: "count", + Help: "", + }) + httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystemHTTP, + Name: "duration", + Help: "", + }) + + wsCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystemWS, + Name: "count", + Help: "", + }) + wsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystemWS, + Name: "duration", + Help: "", + }) + + ipcCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystemIPC, + Name: "count", + Help: "", + }) + ipcDuration = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystemIPC, + Name: "duration", + Help: "", + }) } // RegisterDBCollector create metric colletor for given connection diff --git a/pkg/rpc/check.go b/pkg/rpc/check.go new file mode 100644 index 00000000..754f752e --- /dev/null +++ b/pkg/rpc/check.go @@ -0,0 +1,21 @@ +package rpc + +import "github.com/ethereum/go-ethereum/rpc" + +// checkModuleAvailability check that all names given in modules are actually +// available API services. +func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) { + availableSet := make(map[string]struct{}) + for _, api := range apis { + if _, ok := availableSet[api.Namespace]; !ok { + availableSet[api.Namespace] = struct{}{} + available = append(available, api.Namespace) + } + } + for _, name := range modules { + if _, ok := availableSet[name]; !ok { + bad = append(bad, name) + } + } + return bad, available +} diff --git a/pkg/rpc/http.go b/pkg/rpc/http.go new file mode 100644 index 00000000..ae1ab03e --- /dev/null +++ b/pkg/rpc/http.go @@ -0,0 +1,41 @@ +package rpc + +import ( + "net" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules. +func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (net.Listener, *rpc.Server, error) { + if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { + log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available) + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("HTTP registered", "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go rpc.NewHTTPServer(cors, vhosts, timeouts, prom.HTTPMiddleware(handler)).Serve(listener) + return listener, handler, err +} diff --git a/pkg/rpc/ipc.go b/pkg/rpc/ipc.go new file mode 100644 index 00000000..297be5ab --- /dev/null +++ b/pkg/rpc/ipc.go @@ -0,0 +1,58 @@ +package rpc + +import ( + "C" + + "fmt" + "net" + "os" + "path/filepath" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" +) + +var ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + maxPathSize = 108 +) + +// ipcListen will create a Unix socket on the given endpoint. +func ipcListen(endpoint string) (net.Listener, error) { + if len(endpoint) > int(maxPathSize) { + log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize), + "endpoint", endpoint) + } + + // Ensure the IPC path exists and remove any previous leftover + if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { + return nil, err + } + os.Remove(endpoint) + l, err := net.Listen("unix", endpoint) + if err != nil { + return nil, err + } + os.Chmod(endpoint, 0600) + return l, nil +} + +// StartIPCEndpoint starts an IPC endpoint. +func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) { + // Register all the APIs exposed by the services. + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("IPC registered", "namespace", api.Namespace) + } + // All APIs registered, start the IPC listener. + listener, err := ipcListen(ipcEndpoint) + if err != nil { + return nil, nil, err + } + go handler.ServeListener(listener) + return listener, handler, nil +} diff --git a/pkg/rpc/ws.go b/pkg/rpc/ws.go new file mode 100644 index 00000000..cff19ab9 --- /dev/null +++ b/pkg/rpc/ws.go @@ -0,0 +1,46 @@ +package rpc + +import ( + "net" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +// StartWSEndpoint starts a websocket endpoint. +func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *rpc.Server, error) { + if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { + log.Error("Unavailable modules in WS API list", "unavailable", bad, "available", available) + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + + wsServer := rpc.NewWSServer(wsOrigins, handler) + wsServer.Handler = prom.WSMiddleware(wsServer.Handler) + go wsServer.Serve(listener) + + return listener, handler, err + +}