add prometheus-middlewares for http and ws endpoint

This commit is contained in:
Ilnur Galiev 2020-10-19 18:00:55 +03:00
parent 1043df9156
commit f627c2edfa
7 changed files with 261 additions and 5 deletions

View File

@ -27,6 +27,8 @@ import (
"github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve" s "github.com/vulcanize/ipld-eth-server/pkg/serve"
v "github.com/vulcanize/ipld-eth-server/version" v "github.com/vulcanize/ipld-eth-server/version"
) )
@ -78,17 +80,18 @@ func serve() {
func startServers(server s.Server, settings *s.Config) error { func startServers(server s.Server, settings *s.Config) error {
logWithCommand.Info("starting up IPC server") logWithCommand.Info("starting up IPC server")
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs()) _, _, err := srpc.StartIPCEndpoint(settings.IPCEndpoint, server.APIs())
if err != nil { if err != nil {
return err return err
} }
logWithCommand.Info("starting up WS server") logWithCommand.Info("starting up WS server")
_, _, err = rpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true) _, _, err = srpc.StartWSEndpoint(settings.WSEndpoint, server.APIs(), []string{"vdb"}, nil, true)
if err != nil { if err != nil {
return err return err
} }
logWithCommand.Info("starting up HTTP server") logWithCommand.Info("starting up HTTP server")
_, _, err = rpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{}) _, _, err = srpc.StartHTTPEndpoint(settings.HTTPEndpoint, server.APIs(), []string{"eth"}, nil, []string{"*"}, rpc.HTTPTimeouts{})
return err return err
} }

35
pkg/prom/middleware.go Normal file
View File

@ -0,0 +1,35 @@
package prom
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
)
func HTTPMiddleware(next http.Handler) http.Handler {
if !metrics {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpCount.Inc()
timer := prometheus.NewTimer(httpDuration)
next.ServeHTTP(w, r)
timer.ObserveDuration()
})
}
func WSMiddleware(next http.Handler) http.Handler {
if !metrics {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsCount.Inc()
timer := prometheus.NewTimer(wsDuration)
next.ServeHTTP(w, r)
timer.ObserveDuration()
})
}

View File

@ -3,20 +3,72 @@ package prom
import ( import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
const ( const (
namespace = "ipld_eth_server" namespace = "ipld_eth_server"
statsSubsystem = "stats"
subsystemHTTP = "http"
subsystemWS = "ws"
subsystemIPC = "ipc"
) )
var ( var (
metrics bool metrics bool
httpCount prometheus.Counter
httpDuration prometheus.Histogram
wsCount prometheus.Counter
wsDuration prometheus.Histogram
ipcCount prometheus.Counter
ipcDuration prometheus.Gauge
) )
// Init module initialization // Init module initialization
func Init() { func Init() {
metrics = true metrics = true
httpCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystemHTTP,
Name: "count",
Help: "",
})
httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystemHTTP,
Name: "duration",
Help: "",
})
wsCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystemWS,
Name: "count",
Help: "",
})
wsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystemWS,
Name: "duration",
Help: "",
})
ipcCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystemIPC,
Name: "count",
Help: "",
})
ipcDuration = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystemIPC,
Name: "duration",
Help: "",
})
} }
// RegisterDBCollector create metric colletor for given connection // RegisterDBCollector create metric colletor for given connection

21
pkg/rpc/check.go Normal file
View File

@ -0,0 +1,21 @@
package rpc
import "github.com/ethereum/go-ethereum/rpc"
// checkModuleAvailability check that all names given in modules are actually
// available API services.
func checkModuleAvailability(modules []string, apis []rpc.API) (bad, available []string) {
availableSet := make(map[string]struct{})
for _, api := range apis {
if _, ok := availableSet[api.Namespace]; !ok {
availableSet[api.Namespace] = struct{}{}
available = append(available, api.Namespace)
}
}
for _, name := range modules {
if _, ok := availableSet[name]; !ok {
bad = append(bad, name)
}
}
return bad, available
}

41
pkg/rpc/http.go Normal file
View File

@ -0,0 +1,41 @@
package rpc
import (
"net"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
)
// StartHTTPEndpoint starts the HTTP RPC endpoint, configured with cors/vhosts/modules.
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (net.Listener, *rpc.Server, error) {
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
log.Error("Unavailable modules in HTTP API list", "unavailable", bad, "available", available)
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("HTTP registered", "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
go rpc.NewHTTPServer(cors, vhosts, timeouts, prom.HTTPMiddleware(handler)).Serve(listener)
return listener, handler, err
}

58
pkg/rpc/ipc.go Normal file
View File

@ -0,0 +1,58 @@
package rpc
import (
"C"
"fmt"
"net"
"os"
"path/filepath"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
)
var (
// On Linux, sun_path is 108 bytes in size
// see http://man7.org/linux/man-pages/man7/unix.7.html
maxPathSize = 108
)
// ipcListen will create a Unix socket on the given endpoint.
func ipcListen(endpoint string) (net.Listener, error) {
if len(endpoint) > int(maxPathSize) {
log.Warn(fmt.Sprintf("The ipc endpoint is longer than %d characters. ", maxPathSize),
"endpoint", endpoint)
}
// Ensure the IPC path exists and remove any previous leftover
if err := os.MkdirAll(filepath.Dir(endpoint), 0751); err != nil {
return nil, err
}
os.Remove(endpoint)
l, err := net.Listen("unix", endpoint)
if err != nil {
return nil, err
}
os.Chmod(endpoint, 0600)
return l, nil
}
// StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) {
// Register all the APIs exposed by the services.
handler := rpc.NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("IPC registered", "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener.
listener, err := ipcListen(ipcEndpoint)
if err != nil {
return nil, nil, err
}
go handler.ServeListener(listener)
return listener, handler, nil
}

46
pkg/rpc/ws.go Normal file
View File

@ -0,0 +1,46 @@
package rpc
import (
"net"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
)
// StartWSEndpoint starts a websocket endpoint.
func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) (net.Listener, *rpc.Server, error) {
if bad, available := checkModuleAvailability(modules, apis); len(bad) > 0 {
log.Error("Unavailable modules in WS API list", "unavailable", bad, "available", available)
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return nil, nil, err
}
log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, nil, err
}
wsServer := rpc.NewWSServer(wsOrigins, handler)
wsServer.Handler = prom.WSMiddleware(wsServer.Handler)
go wsServer.Serve(listener)
return listener, handler, err
}