Metrics and http server #15

Merged
n0cte merged 10 commits from metrics into master 2020-10-21 16:42:05 +00:00
16 changed files with 876 additions and 6 deletions

View File

@ -98,6 +98,16 @@ TODO: Add the rest of the standard endpoints add unique endpoints (e.g. getSlice
`make test` will run the unit tests `make test` will run the unit tests
`make test` setups a clean `vulcanize_testing` db `make test` setups a clean `vulcanize_testing` db
## Monitoring
* Enable http server and metrics using parameters `--http --metrics`
* ipld-eth-server exposes prometheus metrics at `/metric` endpoint
* start prometheus using `monitoring/prometheus.yml` config (`prometheus --config.file=monitoring/prometheus.yml`)
* start grafana, connect to prometheus datasource and import dashboard from `monitoring/grafana/dashboard_main.json`
![](monitoring/grafana.png)
## Contributing ## Contributing
Contributions are welcome! Contributions are welcome!

View File

@ -24,6 +24,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
) )
var ( var (
@ -62,6 +63,19 @@ func initFuncs(cmd *cobra.Command, args []string) {
if err := logLevel(); err != nil { if err := logLevel(); err != nil {
log.Fatal("Could not set log level: ", err) log.Fatal("Could not set log level: ", err)
} }
if viper.GetBool("metrics") {
prom.Init()
}
if viper.GetBool("http") {
addr := fmt.Sprintf(
"%s:%s",
viper.GetString("http.addr"),
viper.GetString("http.port"),
)
prom.Serve(addr)
}
} }
func logLevel() error { func logLevel() error {
@ -93,6 +107,12 @@ func init() {
rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file") rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file")
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic") rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic")
rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics")
rootCmd.PersistentFlags().Bool("http", false, "enable http service for prometheus")
rootCmd.PersistentFlags().String("http-addr", "127.0.0.1", "http host for prometheus")
rootCmd.PersistentFlags().String("http-port", "8090", "http port for prometheus")
viper.BindPFlag("logfile", rootCmd.PersistentFlags().Lookup("logfile")) viper.BindPFlag("logfile", rootCmd.PersistentFlags().Lookup("logfile"))
viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name")) viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name"))
viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port")) viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port"))
@ -100,6 +120,12 @@ func init() {
viper.BindPFlag("database.user", rootCmd.PersistentFlags().Lookup("database-user")) viper.BindPFlag("database.user", rootCmd.PersistentFlags().Lookup("database-user"))
viper.BindPFlag("database.password", rootCmd.PersistentFlags().Lookup("database-password")) viper.BindPFlag("database.password", rootCmd.PersistentFlags().Lookup("database-password"))
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level")) viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("metrics", rootCmd.PersistentFlags().Lookup("metrics"))
viper.BindPFlag("http", rootCmd.PersistentFlags().Lookup("http"))
viper.BindPFlag("http.addr", rootCmd.PersistentFlags().Lookup("http-addr"))
viper.BindPFlag("http.port", rootCmd.PersistentFlags().Lookup("http-port"))
} }
func initConfig() { func initConfig() {

View File

@ -27,6 +27,8 @@ import (
"github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve" s "github.com/vulcanize/ipld-eth-server/pkg/serve"
v "github.com/vulcanize/ipld-eth-server/version" v "github.com/vulcanize/ipld-eth-server/version"
) )
@ -78,17 +80,18 @@ func serve() {
func startServers(server s.Server, settings *s.Config) error { func startServers(server s.Server, settings *s.Config) error {
logWithCommand.Info("starting up IPC server") logWithCommand.Info("starting up IPC server")
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) _, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
if err != nil { if err != nil {
return err return err
} }
logWithCommand.Info("starting up WS server") logWithCommand.Info("starting up WS server")
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) _, _, err = srpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true)
if err != nil { if err != nil {
return err return err
} }
logWithCommand.Info("starting up HTTP server") logWithCommand.Info("starting up HTTP server")
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) _, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
return err return err
} }
@ -109,7 +112,7 @@ func init() {
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path")) viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path"))
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path")) viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path"))
viper.BindPFlag("ethereum.chainID", rootCmd.PersistentFlags().Lookup("eth-chain-id")) viper.BindPFlag("ethereum.chainID", serveCmd.PersistentFlags().Lookup("eth-chain-id"))
viper.BindPFlag("ethereum.defaultSender", rootCmd.PersistentFlags().Lookup("eth-default-sender")) viper.BindPFlag("ethereum.defaultSender", serveCmd.PersistentFlags().Lookup("eth-default-sender"))
viper.BindPFlag("ethereum.rpcGasCap", rootCmd.PersistentFlags().Lookup("eth-rpc-gas-cap")) viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
} }

1
go.mod
View File

@ -14,6 +14,7 @@ require (
github.com/multiformats/go-multihash v0.0.13 github.com/multiformats/go-multihash v0.0.13
github.com/onsi/ginkgo v1.12.1 github.com/onsi/ginkgo v1.12.1
github.com/onsi/gomega v1.10.1 github.com/onsi/gomega v1.10.1
github.com/prometheus/client_golang v1.5.1
github.com/sirupsen/logrus v1.6.0 github.com/sirupsen/logrus v1.6.0
github.com/spf13/cobra v1.0.0 github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0 github.com/spf13/viper v1.7.0

BIN
monitoring/grafana.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

View File

@ -0,0 +1,353 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 4,
"links": [],
"panels": [
{
"datasource": null,
"description": "",
"fieldConfig": {
"defaults": {
"custom": {},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 4,
"options": {
"reduceOptions": {
"calcs": [
"mean"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "7.2.1",
"targets": [
{
"expr": "ipld_eth_server_ws_count",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"timeFrom": null,
"timeShift": null,
"title": "Websocket Connection Count",
"type": "gauge"
},
{
"datasource": null,
"description": "",
"fieldConfig": {
"defaults": {
"custom": {},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 5,
"options": {
"reduceOptions": {
"calcs": [
"mean"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "7.2.1",
"targets": [
{
"expr": "ipld_eth_server_ipc_count",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"timeFrom": null,
"timeShift": null,
"title": "IPC Connection Count",
"type": "gauge"
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"description": "",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 8
},
"hiddenSeries": false,
"id": 2,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.2.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rate(ipld_eth_server_http_count[1m])",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "HTTP requests per second",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fieldConfig": {
"defaults": {
"custom": {},
"unit": "s"
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 8
},
"hiddenSeries": false,
"id": 7,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.2.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "histogram_quantile(0.9, rate(ipld_eth_server_http_duration_bucket[1m]))",
"hide": false,
"interval": "",
"legendFormat": "0.9",
"refId": "B"
},
{
"expr": "histogram_quantile(0.75, rate(ipld_eth_server_http_duration_bucket[1m]))",
"interval": "",
"legendFormat": "0.75",
"refId": "A"
},
{
"expr": "histogram_quantile(0.5, rate(ipld_eth_server_http_duration_bucket[1m]))",
"interval": "",
"legendFormat": "0.5",
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "HTTP Requests duration (percentile)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "10s",
"schemaVersion": 26,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-15m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "ipld-eth-server",
"uid": "lFVEvNtGk",
"version": 7
}

View File

@ -0,0 +1,7 @@
global:
scrape_interval: 10s
scrape_configs:
- job_name: 'ipld-eth-server'
static_configs:
- targets: ['localhost:8090']

View File

@ -0,0 +1,143 @@
package prom
import (
"database/sql"
"github.com/prometheus/client_golang/prometheus"
)
const subsystem = "connections"
// DBStatsGetter is an interface that gets sql.DBStats.
type DBStatsGetter interface {
Stats() sql.DBStats
}
// DBStatsCollector implements the prometheus.Collector interface.
type DBStatsCollector struct {
sg DBStatsGetter
// descriptions of exported metrics
maxOpenDesc *prometheus.Desc
openDesc *prometheus.Desc
inUseDesc *prometheus.Desc
idleDesc *prometheus.Desc
waitedForDesc *prometheus.Desc
blockedSecondsDesc *prometheus.Desc
closedMaxIdleDesc *prometheus.Desc
closedMaxLifetimeDesc *prometheus.Desc
}
// NewDBStatsCollector creates a new DBStatsCollector.
func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector {
labels := prometheus.Labels{"db_name": dbName}
return &DBStatsCollector{
sg: sg,
maxOpenDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "max_open"),
"Maximum number of open connections to the database.",
nil,
labels,
),
openDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "open"),
"The number of established connections both in use and idle.",
nil,
labels,
),
inUseDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "in_use"),
"The number of connections currently in use.",
nil,
labels,
),
idleDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "idle"),
"The number of idle connections.",
nil,
labels,
),
waitedForDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "waited_for"),
"The total number of connections waited for.",
nil,
labels,
),
blockedSecondsDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"),
"The total time blocked waiting for a new connection.",
nil,
labels,
),
closedMaxIdleDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"),
"The total number of connections closed due to SetMaxIdleConns.",
nil,
labels,
),
closedMaxLifetimeDesc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"),
"The total number of connections closed due to SetConnMaxLifetime.",
nil,
labels,
),
}
}
// Describe implements the prometheus.Collector interface.
func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.maxOpenDesc
ch <- c.openDesc
ch <- c.inUseDesc
ch <- c.idleDesc
ch <- c.waitedForDesc
ch <- c.blockedSecondsDesc
ch <- c.closedMaxIdleDesc
ch <- c.closedMaxLifetimeDesc
}
// Collect implements the prometheus.Collector interface.
func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
stats := c.sg.Stats()
ch <- prometheus.MustNewConstMetric(
c.maxOpenDesc,
prometheus.GaugeValue,
float64(stats.MaxOpenConnections),
)
ch <- prometheus.MustNewConstMetric(
c.openDesc,
prometheus.GaugeValue,
float64(stats.OpenConnections),
)
ch <- prometheus.MustNewConstMetric(
c.inUseDesc,
prometheus.GaugeValue,
float64(stats.InUse),
)
ch <- prometheus.MustNewConstMetric(
c.idleDesc,
prometheus.GaugeValue,
float64(stats.Idle),
)
ch <- prometheus.MustNewConstMetric(
c.waitedForDesc,
prometheus.CounterValue,
float64(stats.WaitCount),
)
ch <- prometheus.MustNewConstMetric(
c.blockedSecondsDesc,
prometheus.CounterValue,
stats.WaitDuration.Seconds(),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxIdleDesc,
prometheus.CounterValue,
float64(stats.MaxIdleClosed),
)
ch <- prometheus.MustNewConstMetric(
c.closedMaxLifetimeDesc,
prometheus.CounterValue,
float64(stats.MaxLifetimeClosed),
)
}

48
pkg/prom/middleware.go Normal file
View File

@ -0,0 +1,48 @@
package prom
import (
"net/http"
"time"
"github.com/ethereum/go-ethereum/rpc"
)
// HTTPMiddleware http connection metric reader
func HTTPMiddleware(next http.Handler) http.Handler {
if !metrics {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpCount.Inc()
start := time.Now()
next.ServeHTTP(w, r)
duration := time.Now().Sub(start)
httpDuration.Observe(float64(duration.Seconds()))
})
}
// WSMiddleware websocket connection counter
func WSMiddleware(next http.Handler) http.Handler {
if !metrics {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsCount.Inc()
next.ServeHTTP(w, r)
wsCount.Dec()
})
}
// IPCMiddleware unix-socket connection counter
func IPCMiddleware(server *rpc.Server, client rpc.Conn) {
if metrics {
ipcCount.Inc()
}
server.ServeCodec(rpc.NewCodec(client), 0)
if metrics {
ipcCount.Dec()
}
}

63
pkg/prom/prom.go Normal file
View File

@ -0,0 +1,63 @@
package prom
import (
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
namespace = "ipld_eth_server"
subsystemHTTP = "http"
subsystemWS = "ws"
subsystemIPC = "ipc"
)
var (
metrics bool
httpCount prometheus.Counter
httpDuration prometheus.Histogram
wsCount prometheus.Gauge
ipcCount prometheus.Gauge
)
// Init module initialization
func Init() {
metrics = true
httpCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystemHTTP,
Name: "count",
Help: "http request count",
})
httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystemHTTP,
Name: "duration",
Help: "http request duration",
})
wsCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystemWS,
Name: "count",
Help: "websocket connection count",
})
ipcCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystemIPC,
Name: "count",
Help: "unix socket connection count",
})
}
// RegisterDBCollector create metric colletor for given connection
func RegisterDBCollector(name string, db *sqlx.DB) {
if metrics {
prometheus.Register(NewDBStatsCollector(name, db))
}
}

31
pkg/prom/serve.go Normal file
View File

@ -0,0 +1,31 @@
package prom
import (
"errors"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)
var errPromHTTP = errors.New("can't start http server for prometheus")
// Serve start listening http
func Serve(addr string) *http.Server {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
srv := http.Server{
Addr: addr,
Handler: mux,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
logrus.
WithError(err).
WithField("module", "prom").
WithField("addr", addr).
Fatal(errPromHTTP)
}
}()
return &srv
}

21
pkg/rpc/check.go Normal file
View File

@ -0,0 +1,21 @@
package rpc
import "github.com/ethereum/go-ethereum/rpc"
// checkModuleAvailability check that all names given in modules are actually
// available API services.
func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) {
availableSet := make(map[string]struct{})
for _, api := range apis {
if _, ok := availableSet[api.Namespace]; !ok {
availableSet[api.Namespace] = struct{}{}
available = append(available, api.Namespace)
}
}
for _, name := range modules {
if _, ok := availableSet[name]; !ok {
bad = append(bad, name)
}
}
return bad, available
}

41
pkg/rpc/http.go Normal file
View File

@ -0,0 +1,41 @@
package rpc
import (
"net"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (net.Listener, *rpc.Server, error) {
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available)
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("HTTP registered", "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go rpc.NewHTTPServer(cors, vhosts, timeouts, prom.HTTPMiddleware(handler)).Serve(listener)
return listener, handler, err
}

75
pkg/rpc/ipc.go Normal file
View File

@ -0,0 +1,75 @@
package rpc
import (
"fmt"
"net"
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
)
var (
// On Linux, sun_path is 108 bytes in size
// see http://man7.org/linux/man-pages/man7/unix.7.html
maxPathSize = 108
)
// ipcListen will create a Unix socket on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
if len(endpoint) > int(maxPathSize) {
log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize),
"endpoint", endpoint)
}
// Ensure the IPC path exists and remove any previous leftover
if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil {
return nil, err
}
os.Remove(endpoint)
l, err := net.Listen("unix", endpoint)
if err != nil {
return nil, err
}
os.Chmod(endpoint, 0600)
return l, nil
}
func ipcServe(srv *rpc.Server, listener net.Listener) {
for {
conn, err := listener.Accept()
if netutil.IsTemporaryError(err) {
log.WithError(err).Warn("rpc accept error")
continue
}
if err != nil {
log.WithError(err).Warn("unknown error")
continue
}
log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection")
go prom.IPCMiddleware(srv, conn)
}
}
// StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) {
// Register all the APIs exposed by the services.
handler := rpc.NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("IPC registered", "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener.
listener, err := ipcListen(ipcEndpoint)
if err != nil {
return nil, nil, err
}
go ipcServe(handler, listener)
return listener, handler, nil
}

46
pkg/rpc/ws.go Normal file
View File

@ -0,0 +1,46 @@
package rpc
import (
"net"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
)
// StartWSEndpoint starts a websocket endpoint.
func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *rpc.Server, error) {
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
log.Error("Unavailable modules in WS API list", "unavailable", bad, "available", available)
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
wsServer := rpc.NewWSServer(wsOrigins, handler)
wsServer.Handler = prom.WSMiddleware(wsServer.Handler)
go wsServer.Serve(listener)
return listener, handler, err
}

View File

@ -28,6 +28,7 @@ import (
"github.com/vulcanize/ipld-eth-indexer/pkg/node" "github.com/vulcanize/ipld-eth-indexer/pkg/node"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-indexer/utils"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )
@ -96,6 +97,7 @@ func NewConfig() (*Config, error) {
c.HTTPEndpoint = httpPath c.HTTPEndpoint = httpPath
overrideDBConnConfig(&c.DBConfig) overrideDBConnConfig(&c.DBConfig)
serveDB := utils.LoadPostgres(c.DBConfig, node.Info{}) serveDB := utils.LoadPostgres(c.DBConfig, node.Info{})
prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB)
c.DB = &serveDB c.DB = &serveDB
defaultSenderStr := viper.GetString("ethereum.defaultSender") defaultSenderStr := viper.GetString("ethereum.defaultSender")