diff --git a/README.md b/README.md index b13f9172..ed6f7f36 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,16 @@ TODO: Add the rest of the standard endpoints add unique endpoints (e.g. getSlice `make test` will run the unit tests `make test` setups a clean `vulcanize_testing` db +## Monitoring + +* Enable http server and metrics using parameters `--http --metrics` +* ipld-eth-server exposes prometheus metrics at `/metric` endpoint +* start prometheus using `monitoring/prometheus.yml` config (`prometheus --config.file=monitoring/prometheus.yml`) +* start grafana, connect to prometheus datasource and import dashboard from `monitoring/grafana/dashboard_main.json` + +![](monitoring/grafana.png) + + ## Contributing Contributions are welcome! diff --git a/cmd/root.go b/cmd/root.go index fdcd87d2..00fba60e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -24,6 +24,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/vulcanize/ipld-eth-server/pkg/prom" ) var ( @@ -62,6 +63,19 @@ func initFuncs(cmd *cobra.Command, args []string) { if err := logLevel(); err != nil { log.Fatal("Could not set log level: ", err) } + + if viper.GetBool("metrics") { + prom.Init() + } + + if viper.GetBool("http") { + addr := fmt.Sprintf( + "%s:%s", + viper.GetString("http.addr"), + viper.GetString("http.port"), + ) + prom.Serve(addr) + } } func logLevel() error { @@ -93,6 +107,12 @@ func init() { rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") + rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics") + + rootCmd.PersistentFlags().Bool("http", false, "enable http service for prometheus") + rootCmd.PersistentFlags().String("http-addr", "127.0.0.1", "http host for prometheus") + rootCmd.PersistentFlags().String("http-port", "8090", "http port for prometheus") + viper.BindPFlag("logfile", rootCmd.PersistentFlags().Lookup("logfile")) viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port")) @@ -100,6 +120,12 @@ func init() { viper.BindPFlag("database.user", rootCmd.PersistentFlags().Lookup("database-user")) viper.BindPFlag("database.password", rootCmd.PersistentFlags().Lookup("database-password")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) + + viper.BindPFlag("metrics", rootCmd.PersistentFlags().Lookup("metrics")) + + viper.BindPFlag("http", rootCmd.PersistentFlags().Lookup("http")) + viper.BindPFlag("http.addr", rootCmd.PersistentFlags().Lookup("http-addr")) + viper.BindPFlag("http.port", rootCmd.PersistentFlags().Lookup("http-port")) } func initConfig() { diff --git a/cmd/serve.go b/cmd/serve.go index 487835ef..502f2918 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -27,6 +27,8 @@ import ( "github.com/vulcanize/ipld-eth-indexer/pkg/eth" + srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" + s "github.com/vulcanize/ipld-eth-server/pkg/serve" v "github.com/vulcanize/ipld-eth-server/version" ) @@ -78,17 +80,18 @@ func serve() { func startServers(server s.Server, settings *s.Config) error { logWithCommand.Info("starting up IPC server") - _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) + _, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) if err != nil { return err } logWithCommand.Info("starting up WS server") - _, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) + _, _, err = srpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) if err != nil { return err } logWithCommand.Info("starting up HTTP server") - _, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + _, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) + return err } @@ -109,7 +112,7 @@ func init() { viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path")) viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path")) - viper.BindPFlag("ethereum.chainID", rootCmd.PersistentFlags().Lookup("eth-chain-id")) - viper.BindPFlag("ethereum.defaultSender", rootCmd.PersistentFlags().Lookup("eth-default-sender")) - viper.BindPFlag("ethereum.rpcGasCap", rootCmd.PersistentFlags().Lookup("eth-rpc-gas-cap")) + viper.BindPFlag("ethereum.chainID", serveCmd.PersistentFlags().Lookup("eth-chain-id")) + viper.BindPFlag("ethereum.defaultSender", serveCmd.PersistentFlags().Lookup("eth-default-sender")) + viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap")) } diff --git a/go.mod b/go.mod index 51765db7..1a68847d 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 github.com/onsi/ginkgo v1.12.1 github.com/onsi/gomega v1.10.1 + github.com/prometheus/client_golang v1.5.1 github.com/sirupsen/logrus v1.6.0 github.com/spf13/cobra v1.0.0 github.com/spf13/viper v1.7.0 diff --git a/monitoring/grafana.png b/monitoring/grafana.png new file mode 100644 index 00000000..3d1220e1 Binary files /dev/null and b/monitoring/grafana.png differ diff --git a/monitoring/grafana/dashboard_main.json b/monitoring/grafana/dashboard_main.json new file mode 100644 index 00000000..83ed1a0b --- /dev/null +++ b/monitoring/grafana/dashboard_main.json @@ -0,0 +1,353 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": 4, + "links": [], + "panels": [ + { + "datasource": null, + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 4, + "options": { + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "7.2.1", + "targets": [ + { + "expr": "ipld_eth_server_ws_count", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Websocket Connection Count", + "type": "gauge" + }, + { + "datasource": null, + "description": "", + "fieldConfig": { + "defaults": { + "custom": {}, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 5, + "options": { + "reduceOptions": { + "calcs": [ + "mean" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "7.2.1", + "targets": [ + { + "expr": "ipld_eth_server_ipc_count", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "timeFrom": null, + "timeShift": null, + "title": "IPC Connection Count", + "type": "gauge" + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "description": "", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 8 + }, + "hiddenSeries": false, + "id": 2, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.2.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(ipld_eth_server_http_count[1m])", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "HTTP requests per second", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fieldConfig": { + "defaults": { + "custom": {}, + "unit": "s" + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 8 + }, + "hiddenSeries": false, + "id": 7, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.2.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "histogram_quantile(0.9, rate(ipld_eth_server_http_duration_bucket[1m]))", + "hide": false, + "interval": "", + "legendFormat": "0.9", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.75, rate(ipld_eth_server_http_duration_bucket[1m]))", + "interval": "", + "legendFormat": "0.75", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.5, rate(ipld_eth_server_http_duration_bucket[1m]))", + "interval": "", + "legendFormat": "0.5", + "refId": "C" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "HTTP Requests duration (percentile)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "10s", + "schemaVersion": 26, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "ipld-eth-server", + "uid": "lFVEvNtGk", + "version": 7 +} diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml new file mode 100644 index 00000000..79b4d620 --- /dev/null +++ b/monitoring/prometheus.yml @@ -0,0 +1,7 @@ +global: + scrape_interval: 10s + +scrape_configs: + - job_name: 'ipld-eth-server' + static_configs: + - targets: ['localhost:8090'] diff --git a/pkg/prom/db_stats_collector.go b/pkg/prom/db_stats_collector.go new file mode 100644 index 00000000..0b225899 --- /dev/null +++ b/pkg/prom/db_stats_collector.go @@ -0,0 +1,143 @@ +package prom + +import ( + "database/sql" + + "github.com/prometheus/client_golang/prometheus" +) + +const subsystem = "connections" + +// DBStatsGetter is an interface that gets sql.DBStats. +type DBStatsGetter interface { + Stats() sql.DBStats +} + +// DBStatsCollector implements the prometheus.Collector interface. +type DBStatsCollector struct { + sg DBStatsGetter + + // descriptions of exported metrics + maxOpenDesc *prometheus.Desc + openDesc *prometheus.Desc + inUseDesc *prometheus.Desc + idleDesc *prometheus.Desc + waitedForDesc *prometheus.Desc + blockedSecondsDesc *prometheus.Desc + closedMaxIdleDesc *prometheus.Desc + closedMaxLifetimeDesc *prometheus.Desc +} + +// NewDBStatsCollector creates a new DBStatsCollector. +func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector { + labels := prometheus.Labels{"db_name": dbName} + return &DBStatsCollector{ + sg: sg, + maxOpenDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "max_open"), + "Maximum number of open connections to the database.", + nil, + labels, + ), + openDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "open"), + "The number of established connections both in use and idle.", + nil, + labels, + ), + inUseDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "in_use"), + "The number of connections currently in use.", + nil, + labels, + ), + idleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "idle"), + "The number of idle connections.", + nil, + labels, + ), + waitedForDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "waited_for"), + "The total number of connections waited for.", + nil, + labels, + ), + blockedSecondsDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"), + "The total time blocked waiting for a new connection.", + nil, + labels, + ), + closedMaxIdleDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"), + "The total number of connections closed due to SetMaxIdleConns.", + nil, + labels, + ), + closedMaxLifetimeDesc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"), + "The total number of connections closed due to SetConnMaxLifetime.", + nil, + labels, + ), + } +} + +// Describe implements the prometheus.Collector interface. +func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.maxOpenDesc + ch <- c.openDesc + ch <- c.inUseDesc + ch <- c.idleDesc + ch <- c.waitedForDesc + ch <- c.blockedSecondsDesc + ch <- c.closedMaxIdleDesc + ch <- c.closedMaxLifetimeDesc +} + +// Collect implements the prometheus.Collector interface. +func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) { + stats := c.sg.Stats() + + ch <- prometheus.MustNewConstMetric( + c.maxOpenDesc, + prometheus.GaugeValue, + float64(stats.MaxOpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.openDesc, + prometheus.GaugeValue, + float64(stats.OpenConnections), + ) + ch <- prometheus.MustNewConstMetric( + c.inUseDesc, + prometheus.GaugeValue, + float64(stats.InUse), + ) + ch <- prometheus.MustNewConstMetric( + c.idleDesc, + prometheus.GaugeValue, + float64(stats.Idle), + ) + ch <- prometheus.MustNewConstMetric( + c.waitedForDesc, + prometheus.CounterValue, + float64(stats.WaitCount), + ) + ch <- prometheus.MustNewConstMetric( + c.blockedSecondsDesc, + prometheus.CounterValue, + stats.WaitDuration.Seconds(), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxIdleDesc, + prometheus.CounterValue, + float64(stats.MaxIdleClosed), + ) + ch <- prometheus.MustNewConstMetric( + c.closedMaxLifetimeDesc, + prometheus.CounterValue, + float64(stats.MaxLifetimeClosed), + ) +} diff --git a/pkg/prom/middleware.go b/pkg/prom/middleware.go new file mode 100644 index 00000000..b2daa79f --- /dev/null +++ b/pkg/prom/middleware.go @@ -0,0 +1,48 @@ +package prom + +import ( + "net/http" + "time" + + "github.com/ethereum/go-ethereum/rpc" +) + +// HTTPMiddleware http connection metric reader +func HTTPMiddleware(next http.Handler) http.Handler { + if !metrics { + return next + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + httpCount.Inc() + + start := time.Now() + next.ServeHTTP(w, r) + duration := time.Now().Sub(start) + httpDuration.Observe(float64(duration.Seconds())) + }) +} + +// WSMiddleware websocket connection counter +func WSMiddleware(next http.Handler) http.Handler { + if !metrics { + return next + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wsCount.Inc() + next.ServeHTTP(w, r) + wsCount.Dec() + }) +} + +// IPCMiddleware unix-socket connection counter +func IPCMiddleware(server *rpc.Server, client rpc.Conn) { + if metrics { + ipcCount.Inc() + } + server.ServeCodec(rpc.NewCodec(client), 0) + if metrics { + ipcCount.Dec() + } +} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go new file mode 100644 index 00000000..72a282ce --- /dev/null +++ b/pkg/prom/prom.go @@ -0,0 +1,63 @@ +package prom + +import ( + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + namespace = "ipld_eth_server" + + subsystemHTTP = "http" + subsystemWS = "ws" + subsystemIPC = "ipc" +) + +var ( + metrics bool + + httpCount prometheus.Counter + httpDuration prometheus.Histogram + wsCount prometheus.Gauge + ipcCount prometheus.Gauge +) + +// Init module initialization +func Init() { + metrics = true + + httpCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystemHTTP, + Name: "count", + Help: "http request count", + }) + httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystemHTTP, + Name: "duration", + Help: "http request duration", + }) + + wsCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystemWS, + Name: "count", + Help: "websocket connection count", + }) + + ipcCount = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystemIPC, + Name: "count", + Help: "unix socket connection count", + }) +} + +// RegisterDBCollector create metric colletor for given connection +func RegisterDBCollector(name string, db *sqlx.DB) { + if metrics { + prometheus.Register(NewDBStatsCollector(name, db)) + } +} diff --git a/pkg/prom/serve.go b/pkg/prom/serve.go new file mode 100644 index 00000000..058d1772 --- /dev/null +++ b/pkg/prom/serve.go @@ -0,0 +1,31 @@ +package prom + +import ( + "errors" + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +var errPromHTTP = errors.New("can't start http server for prometheus") + +// Serve start listening http +func Serve(addr string) *http.Server { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + srv := http.Server{ + Addr: addr, + Handler: mux, + } + go func() { + if err := srv.ListenAndServe(); err != nil { + logrus. + WithError(err). + WithField("module", "prom"). + WithField("addr", addr). + Fatal(errPromHTTP) + } + }() + return &srv +} diff --git a/pkg/rpc/check.go b/pkg/rpc/check.go new file mode 100644 index 00000000..754f752e --- /dev/null +++ b/pkg/rpc/check.go @@ -0,0 +1,21 @@ +package rpc + +import "github.com/ethereum/go-ethereum/rpc" + +// checkModuleAvailability check that all names given in modules are actually +// available API services. +func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) { + availableSet := make(map[string]struct{}) + for _, api := range apis { + if _, ok := availableSet[api.Namespace]; !ok { + availableSet[api.Namespace] = struct{}{} + available = append(available, api.Namespace) + } + } + for _, name := range modules { + if _, ok := availableSet[name]; !ok { + bad = append(bad, name) + } + } + return bad, available +} diff --git a/pkg/rpc/http.go b/pkg/rpc/http.go new file mode 100644 index 00000000..ae1ab03e --- /dev/null +++ b/pkg/rpc/http.go @@ -0,0 +1,41 @@ +package rpc + +import ( + "net" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules. +func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (net.Listener, *rpc.Server, error) { + if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { + log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available) + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("HTTP registered", "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + go rpc.NewHTTPServer(cors, vhosts, timeouts, prom.HTTPMiddleware(handler)).Serve(listener) + return listener, handler, err +} diff --git a/pkg/rpc/ipc.go b/pkg/rpc/ipc.go new file mode 100644 index 00000000..4e170ebe --- /dev/null +++ b/pkg/rpc/ipc.go @@ -0,0 +1,75 @@ +package rpc + +import ( + "fmt" + "net" + "os" + "path/filepath" + + "github.com/ethereum/go-ethereum/p2p/netutil" + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +var ( + // On Linux, sun_path is 108 bytes in size + // see http://man7.org/linux/man-pages/man7/unix.7.html + maxPathSize = 108 +) + +// ipcListen will create a Unix socket on the given endpoint. +func ipcListen(endpoint string) (net.Listener, error) { + if len(endpoint) > int(maxPathSize) { + log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize), + "endpoint", endpoint) + } + + // Ensure the IPC path exists and remove any previous leftover + if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil { + return nil, err + } + os.Remove(endpoint) + l, err := net.Listen("unix", endpoint) + if err != nil { + return nil, err + } + os.Chmod(endpoint, 0600) + return l, nil +} + +func ipcServe(srv *rpc.Server, listener net.Listener) { + for { + conn, err := listener.Accept() + if netutil.IsTemporaryError(err) { + log.WithError(err).Warn("rpc accept error") + continue + } + if err != nil { + log.WithError(err).Warn("unknown error") + continue + } + log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection") + go prom.IPCMiddleware(srv, conn) + } +} + +// StartIPCEndpoint starts an IPC endpoint. +func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) { + // Register all the APIs exposed by the services. + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("IPC registered", "namespace", api.Namespace) + } + // All APIs registered, start the IPC listener. + listener, err := ipcListen(ipcEndpoint) + if err != nil { + return nil, nil, err + } + + go ipcServe(handler, listener) + return listener, handler, nil +} diff --git a/pkg/rpc/ws.go b/pkg/rpc/ws.go new file mode 100644 index 00000000..cff19ab9 --- /dev/null +++ b/pkg/rpc/ws.go @@ -0,0 +1,46 @@ +package rpc + +import ( + "net" + + "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" +) + +// StartWSEndpoint starts a websocket endpoint. +func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *rpc.Server, error) { + if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 { + log.Error("Unavailable modules in WS API list", "unavailable", bad, "available", available) + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return nil, nil, err + } + log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return nil, nil, err + } + + wsServer := rpc.NewWSServer(wsOrigins, handler) + wsServer.Handler = prom.WSMiddleware(wsServer.Handler) + go wsServer.Serve(listener) + + return listener, handler, err + +} diff --git a/pkg/serve/config.go b/pkg/serve/config.go index ecd72101..95f7d97e 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -28,6 +28,7 @@ import ( "github.com/vulcanize/ipld-eth-indexer/pkg/node" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-indexer/utils" + "github.com/vulcanize/ipld-eth-server/pkg/prom" "github.com/vulcanize/ipld-eth-server/pkg/eth" ) @@ -96,6 +97,7 @@ func NewConfig() (*Config, error) { c.HTTPEndpoint = httpPath overrideDBConnConfig(&c.DBConfig) serveDB := utils.LoadPostgres(c.DBConfig, node.Info{}) + prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) c.DB = &serveDB defaultSenderStr := viper.GetString("ethereum.defaultSender")