Metrics and http server #15
10
README.md
10
README.md
@ -98,6 +98,16 @@ TODO: Add the rest of the standard endpoints add unique endpoints (e.g. getSlice
|
||||
`make test` will run the unit tests
|
||||
`make test` setups a clean `vulcanize_testing` db
|
||||
|
||||
## Monitoring
|
||||
|
||||
* Enable http server and metrics using parameters `--http --metrics`
|
||||
* ipld-eth-server exposes prometheus metrics at `/metric` endpoint
|
||||
* start prometheus using `monitoring/prometheus.yml` config (`prometheus --config.file=monitoring/prometheus.yml`)
|
||||
* start grafana, connect to prometheus datasource and import dashboard from `monitoring/grafana/dashboard_main.json`
|
||||
|
||||
![](monitoring/grafana.png)
|
||||
|
||||
|
||||
## Contributing
|
||||
Contributions are welcome!
|
||||
|
||||
|
26
cmd/root.go
26
cmd/root.go
@ -24,6 +24,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -62,6 +63,19 @@ func initFuncs(cmd *cobra.Command, args []string) {
|
||||
if err := logLevel(); err != nil {
|
||||
log.Fatal("Could not set log level: ", err)
|
||||
}
|
||||
|
||||
if viper.GetBool("metrics") {
|
||||
prom.Init()
|
||||
}
|
||||
|
||||
if viper.GetBool("http") {
|
||||
addr := fmt.Sprintf(
|
||||
"%s:%s",
|
||||
viper.GetString("http.addr"),
|
||||
viper.GetString("http.port"),
|
||||
)
|
||||
prom.Serve(addr)
|
||||
}
|
||||
}
|
||||
|
||||
func logLevel() error {
|
||||
@ -93,6 +107,12 @@ func init() {
|
||||
rootCmd.PersistentFlags().String("client-ipcPath", "", "location of geth.ipc file")
|
||||
rootCmd.PersistentFlags().String("log-level", log.InfoLevel.String(), "Log level (trace, debug, info, warn, error, fatal, panic")
|
||||
|
||||
rootCmd.PersistentFlags().Bool("metrics", false, "enable metrics")
|
||||
|
||||
rootCmd.PersistentFlags().Bool("http", false, "enable http service for prometheus")
|
||||
rootCmd.PersistentFlags().String("http-addr", "127.0.0.1", "http host for prometheus")
|
||||
rootCmd.PersistentFlags().String("http-port", "8090", "http port for prometheus")
|
||||
|
||||
viper.BindPFlag("logfile", rootCmd.PersistentFlags().Lookup("logfile"))
|
||||
viper.BindPFlag("database.name", rootCmd.PersistentFlags().Lookup("database-name"))
|
||||
viper.BindPFlag("database.port", rootCmd.PersistentFlags().Lookup("database-port"))
|
||||
@ -100,6 +120,12 @@ func init() {
|
||||
viper.BindPFlag("database.user", rootCmd.PersistentFlags().Lookup("database-user"))
|
||||
viper.BindPFlag("database.password", rootCmd.PersistentFlags().Lookup("database-password"))
|
||||
viper.BindPFlag("log.level", rootCmd.PersistentFlags().Lookup("log-level"))
|
||||
|
||||
viper.BindPFlag("metrics", rootCmd.PersistentFlags().Lookup("metrics"))
|
||||
|
||||
viper.BindPFlag("http", rootCmd.PersistentFlags().Lookup("http"))
|
||||
viper.BindPFlag("http.addr", rootCmd.PersistentFlags().Lookup("http-addr"))
|
||||
viper.BindPFlag("http.port", rootCmd.PersistentFlags().Lookup("http-port"))
|
||||
}
|
||||
|
||||
func initConfig() {
|
||||
|
15
cmd/serve.go
15
cmd/serve.go
@ -27,6 +27,8 @@ import (
|
||||
|
||||
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
|
||||
|
||||
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
|
||||
|
||||
s "github.com/vulcanize/ipld-eth-server/pkg/serve"
|
||||
v "github.com/vulcanize/ipld-eth-server/version"
|
||||
)
|
||||
@ -78,17 +80,18 @@ func serve() {
|
||||
|
||||
func startServers(server s.Server, settings *s.Config) error {
|
||||
logWithCommand.Info("starting up IPC server")
|
||||
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
|
||||
_, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logWithCommand.Info("starting up WS server")
|
||||
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true)
|
||||
_, _, err = srpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logWithCommand.Info("starting up HTTP server")
|
||||
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
|
||||
_, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -109,7 +112,7 @@ func init() {
|
||||
viper.BindPFlag("server.httpPath", serveCmd.PersistentFlags().Lookup("server-http-path"))
|
||||
viper.BindPFlag("server.ipcPath", serveCmd.PersistentFlags().Lookup("server-ipc-path"))
|
||||
|
||||
viper.BindPFlag("ethereum.chainID", rootCmd.PersistentFlags().Lookup("eth-chain-id"))
|
||||
viper.BindPFlag("ethereum.defaultSender", rootCmd.PersistentFlags().Lookup("eth-default-sender"))
|
||||
viper.BindPFlag("ethereum.rpcGasCap", rootCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
|
||||
viper.BindPFlag("ethereum.chainID", serveCmd.PersistentFlags().Lookup("eth-chain-id"))
|
||||
viper.BindPFlag("ethereum.defaultSender", serveCmd.PersistentFlags().Lookup("eth-default-sender"))
|
||||
viper.BindPFlag("ethereum.rpcGasCap", serveCmd.PersistentFlags().Lookup("eth-rpc-gas-cap"))
|
||||
}
|
||||
|
1
go.mod
1
go.mod
@ -14,6 +14,7 @@ require (
|
||||
github.com/multiformats/go-multihash v0.0.13
|
||||
github.com/onsi/ginkgo v1.12.1
|
||||
github.com/onsi/gomega v1.10.1
|
||||
github.com/prometheus/client_golang v1.5.1
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
github.com/spf13/cobra v1.0.0
|
||||
github.com/spf13/viper v1.7.0
|
||||
|
BIN
monitoring/grafana.png
Normal file
BIN
monitoring/grafana.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 118 KiB |
353
monitoring/grafana/dashboard_main.json
Normal file
353
monitoring/grafana/dashboard_main.json
Normal file
@ -0,0 +1,353 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": [
|
||||
{
|
||||
"builtIn": 1,
|
||||
"datasource": "-- Grafana --",
|
||||
"enable": true,
|
||||
"hide": true,
|
||||
"iconColor": "rgba(0, 211, 255, 1)",
|
||||
"name": "Annotations & Alerts",
|
||||
"type": "dashboard"
|
||||
}
|
||||
]
|
||||
},
|
||||
"editable": true,
|
||||
"gnetId": null,
|
||||
"graphTooltip": 0,
|
||||
"id": 4,
|
||||
"links": [],
|
||||
"panels": [
|
||||
{
|
||||
"datasource": null,
|
||||
"description": "",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"id": 4,
|
||||
"options": {
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"mean"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"pluginVersion": "7.2.1",
|
||||
"targets": [
|
||||
{
|
||||
"expr": "ipld_eth_server_ws_count",
|
||||
"interval": "",
|
||||
"legendFormat": "",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"timeFrom": null,
|
||||
"timeShift": null,
|
||||
"title": "Websocket Connection Count",
|
||||
"type": "gauge"
|
||||
},
|
||||
{
|
||||
"datasource": null,
|
||||
"description": "",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {},
|
||||
"mappings": [],
|
||||
"thresholds": {
|
||||
"mode": "absolute",
|
||||
"steps": [
|
||||
{
|
||||
"color": "green",
|
||||
"value": null
|
||||
},
|
||||
{
|
||||
"color": "red",
|
||||
"value": 80
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 0
|
||||
},
|
||||
"id": 5,
|
||||
"options": {
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"mean"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"showThresholdLabels": false,
|
||||
"showThresholdMarkers": true
|
||||
},
|
||||
"pluginVersion": "7.2.1",
|
||||
"targets": [
|
||||
{
|
||||
"expr": "ipld_eth_server_ipc_count",
|
||||
"interval": "",
|
||||
"legendFormat": "",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"timeFrom": null,
|
||||
"timeShift": null,
|
||||
"title": "IPC Connection Count",
|
||||
"type": "gauge"
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": null,
|
||||
"description": "",
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 8
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 2,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 1,
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.2.1",
|
||||
"pointradius": 2,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "rate(ipld_eth_server_http_count[1m])",
|
||||
"interval": "",
|
||||
"legendFormat": "",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "HTTP requests per second",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": null,
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"custom": {},
|
||||
"unit": "s"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 9,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 8
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 7,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 1,
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.2.1",
|
||||
"pointradius": 2,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"expr": "histogram_quantile(0.9, rate(ipld_eth_server_http_duration_bucket[1m]))",
|
||||
"hide": false,
|
||||
"interval": "",
|
||||
"legendFormat": "0.9",
|
||||
"refId": "B"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.75, rate(ipld_eth_server_http_duration_bucket[1m]))",
|
||||
"interval": "",
|
||||
"legendFormat": "0.75",
|
||||
"refId": "A"
|
||||
},
|
||||
{
|
||||
"expr": "histogram_quantile(0.5, rate(ipld_eth_server_http_duration_bucket[1m]))",
|
||||
"interval": "",
|
||||
"legendFormat": "0.5",
|
||||
"refId": "C"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "HTTP Requests duration (percentile)",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "s",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"refresh": "10s",
|
||||
"schemaVersion": 26,
|
||||
"style": "dark",
|
||||
"tags": [],
|
||||
"templating": {
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-15m",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "ipld-eth-server",
|
||||
"uid": "lFVEvNtGk",
|
||||
"version": 7
|
||||
}
|
7
monitoring/prometheus.yml
Normal file
7
monitoring/prometheus.yml
Normal file
@ -0,0 +1,7 @@
|
||||
global:
|
||||
scrape_interval: 10s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: 'ipld-eth-server'
|
||||
static_configs:
|
||||
- targets: ['localhost:8090']
|
143
pkg/prom/db_stats_collector.go
Normal file
143
pkg/prom/db_stats_collector.go
Normal file
@ -0,0 +1,143 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const subsystem = "connections"
|
||||
|
||||
// DBStatsGetter is an interface that gets sql.DBStats.
|
||||
type DBStatsGetter interface {
|
||||
Stats() sql.DBStats
|
||||
}
|
||||
|
||||
// DBStatsCollector implements the prometheus.Collector interface.
|
||||
type DBStatsCollector struct {
|
||||
sg DBStatsGetter
|
||||
|
||||
// descriptions of exported metrics
|
||||
maxOpenDesc *prometheus.Desc
|
||||
openDesc *prometheus.Desc
|
||||
inUseDesc *prometheus.Desc
|
||||
idleDesc *prometheus.Desc
|
||||
waitedForDesc *prometheus.Desc
|
||||
blockedSecondsDesc *prometheus.Desc
|
||||
closedMaxIdleDesc *prometheus.Desc
|
||||
closedMaxLifetimeDesc *prometheus.Desc
|
||||
}
|
||||
|
||||
// NewDBStatsCollector creates a new DBStatsCollector.
|
||||
func NewDBStatsCollector(dbName string, sg DBStatsGetter) *DBStatsCollector {
|
||||
labels := prometheus.Labels{"db_name": dbName}
|
||||
return &DBStatsCollector{
|
||||
sg: sg,
|
||||
maxOpenDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "max_open"),
|
||||
"Maximum number of open connections to the database.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
openDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "open"),
|
||||
"The number of established connections both in use and idle.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
inUseDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "in_use"),
|
||||
"The number of connections currently in use.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
idleDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "idle"),
|
||||
"The number of idle connections.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
waitedForDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "waited_for"),
|
||||
"The total number of connections waited for.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
blockedSecondsDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "blocked_seconds"),
|
||||
"The total time blocked waiting for a new connection.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
closedMaxIdleDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_idle"),
|
||||
"The total number of connections closed due to SetMaxIdleConns.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
closedMaxLifetimeDesc: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, "closed_max_lifetime"),
|
||||
"The total number of connections closed due to SetConnMaxLifetime.",
|
||||
nil,
|
||||
labels,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Describe implements the prometheus.Collector interface.
|
||||
func (c DBStatsCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- c.maxOpenDesc
|
||||
ch <- c.openDesc
|
||||
ch <- c.inUseDesc
|
||||
ch <- c.idleDesc
|
||||
ch <- c.waitedForDesc
|
||||
ch <- c.blockedSecondsDesc
|
||||
ch <- c.closedMaxIdleDesc
|
||||
ch <- c.closedMaxLifetimeDesc
|
||||
}
|
||||
|
||||
// Collect implements the prometheus.Collector interface.
|
||||
func (c DBStatsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
stats := c.sg.Stats()
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.maxOpenDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.MaxOpenConnections),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.openDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.OpenConnections),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.inUseDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.InUse),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.idleDesc,
|
||||
prometheus.GaugeValue,
|
||||
float64(stats.Idle),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.waitedForDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.WaitCount),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.blockedSecondsDesc,
|
||||
prometheus.CounterValue,
|
||||
stats.WaitDuration.Seconds(),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.closedMaxIdleDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.MaxIdleClosed),
|
||||
)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
c.closedMaxLifetimeDesc,
|
||||
prometheus.CounterValue,
|
||||
float64(stats.MaxLifetimeClosed),
|
||||
)
|
||||
}
|
48
pkg/prom/middleware.go
Normal file
48
pkg/prom/middleware.go
Normal file
@ -0,0 +1,48 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
)
|
||||
|
||||
// HTTPMiddleware http connection metric reader
|
||||
func HTTPMiddleware(next http.Handler) http.Handler {
|
||||
if !metrics {
|
||||
return next
|
||||
}
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
httpCount.Inc()
|
||||
|
||||
start := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
duration := time.Now().Sub(start)
|
||||
httpDuration.Observe(float64(duration.Seconds()))
|
||||
})
|
||||
}
|
||||
|
||||
// WSMiddleware websocket connection counter
|
||||
func WSMiddleware(next http.Handler) http.Handler {
|
||||
if !metrics {
|
||||
return next
|
||||
}
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
wsCount.Inc()
|
||||
next.ServeHTTP(w, r)
|
||||
wsCount.Dec()
|
||||
})
|
||||
}
|
||||
|
||||
// IPCMiddleware unix-socket connection counter
|
||||
func IPCMiddleware(server *rpc.Server, client rpc.Conn) {
|
||||
if metrics {
|
||||
ipcCount.Inc()
|
||||
}
|
||||
server.ServeCodec(rpc.NewCodec(client), 0)
|
||||
if metrics {
|
||||
ipcCount.Dec()
|
||||
}
|
||||
}
|
63
pkg/prom/prom.go
Normal file
63
pkg/prom/prom.go
Normal file
@ -0,0 +1,63 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "ipld_eth_server"
|
||||
|
||||
subsystemHTTP = "http"
|
||||
subsystemWS = "ws"
|
||||
subsystemIPC = "ipc"
|
||||
)
|
||||
|
||||
var (
|
||||
metrics bool
|
||||
|
||||
httpCount prometheus.Counter
|
||||
httpDuration prometheus.Histogram
|
||||
wsCount prometheus.Gauge
|
||||
ipcCount prometheus.Gauge
|
||||
)
|
||||
|
||||
// Init module initialization
|
||||
func Init() {
|
||||
metrics = true
|
||||
|
||||
httpCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystemHTTP,
|
||||
Name: "count",
|
||||
Help: "http request count",
|
||||
})
|
||||
httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystemHTTP,
|
||||
Name: "duration",
|
||||
Help: "http request duration",
|
||||
})
|
||||
|
||||
wsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystemWS,
|
||||
Name: "count",
|
||||
Help: "websocket connection count",
|
||||
})
|
||||
|
||||
ipcCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystemIPC,
|
||||
Name: "count",
|
||||
Help: "unix socket connection count",
|
||||
})
|
||||
}
|
||||
|
||||
// RegisterDBCollector create metric colletor for given connection
|
||||
func RegisterDBCollector(name string, db *sqlx.DB) {
|
||||
if metrics {
|
||||
prometheus.Register(NewDBStatsCollector(name, db))
|
||||
}
|
||||
}
|
31
pkg/prom/serve.go
Normal file
31
pkg/prom/serve.go
Normal file
@ -0,0 +1,31 @@
|
||||
package prom
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var errPromHTTP = errors.New("can't start http server for prometheus")
|
||||
|
||||
// Serve start listening http
|
||||
func Serve(addr string) *http.Server {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
srv := http.Server{
|
||||
Addr: addr,
|
||||
Handler: mux,
|
||||
}
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != nil {
|
||||
logrus.
|
||||
WithError(err).
|
||||
WithField("module", "prom").
|
||||
WithField("addr", addr).
|
||||
Fatal(errPromHTTP)
|
||||
}
|
||||
}()
|
||||
return &srv
|
||||
}
|
21
pkg/rpc/check.go
Normal file
21
pkg/rpc/check.go
Normal file
@ -0,0 +1,21 @@
|
||||
package rpc
|
||||
|
||||
import "github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
// checkModuleAvailability check that all names given in modules are actually
|
||||
// available API services.
|
||||
func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) {
|
||||
availableSet := make(map[string]struct{})
|
||||
for _, api := range apis {
|
||||
if _, ok := availableSet[api.Namespace]; !ok {
|
||||
availableSet[api.Namespace] = struct{}{}
|
||||
available = append(available, api.Namespace)
|
||||
}
|
||||
}
|
||||
for _, name := range modules {
|
||||
if _, ok := availableSet[name]; !ok {
|
||||
bad = append(bad, name)
|
||||
}
|
||||
}
|
||||
return bad, available
|
||||
}
|
41
pkg/rpc/http.go
Normal file
41
pkg/rpc/http.go
Normal file
@ -0,0 +1,41 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
)
|
||||
|
||||
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
|
||||
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (net.Listener, *rpc.Server, error) {
|
||||
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
|
||||
log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available)
|
||||
}
|
||||
// Generate the whitelist based on the allowed modules
|
||||
whitelist := make(map[string]bool)
|
||||
for _, module := range modules {
|
||||
whitelist[module] = true
|
||||
}
|
||||
// Register all the APIs exposed by the services
|
||||
handler := rpc.NewServer()
|
||||
for _, api := range apis {
|
||||
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
|
||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Debug("HTTP registered", "namespace", api.Namespace)
|
||||
}
|
||||
}
|
||||
// All APIs registered, start the HTTP listener
|
||||
var (
|
||||
listener net.Listener
|
||||
err error
|
||||
)
|
||||
if listener, err = net.Listen("tcp", endpoint); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
go rpc.NewHTTPServer(cors, vhosts, timeouts, prom.HTTPMiddleware(handler)).Serve(listener)
|
||||
return listener, handler, err
|
||||
}
|
75
pkg/rpc/ipc.go
Normal file
75
pkg/rpc/ipc.go
Normal file
@ -0,0 +1,75 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
)
|
||||
|
||||
var (
|
||||
// On Linux, sun_path is 108 bytes in size
|
||||
// see http://man7.org/linux/man-pages/man7/unix.7.html
|
||||
maxPathSize = 108
|
||||
)
|
||||
|
||||
// ipcListen will create a Unix socket on the given endpoint.
|
||||
func ipcListen(endpoint string) (net.Listener, error) {
|
||||
if len(endpoint) > int(maxPathSize) {
|
||||
log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize),
|
||||
"endpoint", endpoint)
|
||||
}
|
||||
|
||||
// Ensure the IPC path exists and remove any previous leftover
|
||||
if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
os.Remove(endpoint)
|
||||
l, err := net.Listen("unix", endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
os.Chmod(endpoint, 0600)
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func ipcServe(srv *rpc.Server, listener net.Listener) {
|
||||
for {
|
||||
conn, err := listener.Accept()
|
||||
if netutil.IsTemporaryError(err) {
|
||||
log.WithError(err).Warn("rpc accept error")
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("unknown error")
|
||||
continue
|
||||
}
|
||||
log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection")
|
||||
go prom.IPCMiddleware(srv, conn)
|
||||
}
|
||||
}
|
||||
|
||||
// StartIPCEndpoint starts an IPC endpoint.
|
||||
func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) {
|
||||
// Register all the APIs exposed by the services.
|
||||
handler := rpc.NewServer()
|
||||
for _, api := range apis {
|
||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Debug("IPC registered", "namespace", api.Namespace)
|
||||
}
|
||||
// All APIs registered, start the IPC listener.
|
||||
listener, err := ipcListen(ipcEndpoint)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
go ipcServe(handler, listener)
|
||||
return listener, handler, nil
|
||||
}
|
46
pkg/rpc/ws.go
Normal file
46
pkg/rpc/ws.go
Normal file
@ -0,0 +1,46 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
)
|
||||
|
||||
// StartWSEndpoint starts a websocket endpoint.
|
||||
func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *rpc.Server, error) {
|
||||
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
|
||||
log.Error("Unavailable modules in WS API list", "unavailable", bad, "available", available)
|
||||
}
|
||||
// Generate the whitelist based on the allowed modules
|
||||
whitelist := make(map[string]bool)
|
||||
for _, module := range modules {
|
||||
whitelist[module] = true
|
||||
}
|
||||
// Register all the APIs exposed by the services
|
||||
handler := rpc.NewServer()
|
||||
for _, api := range apis {
|
||||
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
|
||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
|
||||
}
|
||||
}
|
||||
// All APIs registered, start the HTTP listener
|
||||
var (
|
||||
listener net.Listener
|
||||
err error
|
||||
)
|
||||
if listener, err = net.Listen("tcp", endpoint); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
wsServer := rpc.NewWSServer(wsOrigins, handler)
|
||||
wsServer.Handler = prom.WSMiddleware(wsServer.Handler)
|
||||
go wsServer.Serve(listener)
|
||||
|
||||
return listener, handler, err
|
||||
|
||||
}
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/vulcanize/ipld-eth-indexer/pkg/node"
|
||||
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
|
||||
"github.com/vulcanize/ipld-eth-indexer/utils"
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/prom"
|
||||
|
||||
"github.com/vulcanize/ipld-eth-server/pkg/eth"
|
||||
)
|
||||
@ -96,6 +97,7 @@ func NewConfig() (*Config, error) {
|
||||
c.HTTPEndpoint = httpPath
|
||||
overrideDBConnConfig(&c.DBConfig)
|
||||
serveDB := utils.LoadPostgres(c.DBConfig, node.Info{})
|
||||
prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB)
|
||||
c.DB = &serveDB
|
||||
|
||||
defaultSenderStr := viper.GetString("ethereum.defaultSender")
|
||||
|
Loading…
Reference in New Issue
Block a user