From 4c18554fbe945afd5844ab542c5034f09c251157 Mon Sep 17 00:00:00 2001 From: Ilnur Galiev Date: Mon, 19 Oct 2020 23:00:09 +0300 Subject: [PATCH] add websocket and unixsocket counters --- pkg/prom/middleware.go | 25 +++++++++++++++++++------ pkg/prom/prom.go | 32 ++++++++------------------------ pkg/rpc/ipc.go | 23 ++++++++++++++++++++--- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/pkg/prom/middleware.go b/pkg/prom/middleware.go index 741fed9a..b2daa79f 100644 --- a/pkg/prom/middleware.go +++ b/pkg/prom/middleware.go @@ -2,10 +2,12 @@ package prom import ( "net/http" + "time" - "github.com/prometheus/client_golang/prometheus" + "github.com/ethereum/go-ethereum/rpc" ) +// HTTPMiddleware http connection metric reader func HTTPMiddleware(next http.Handler) http.Handler { if !metrics { return next @@ -14,12 +16,14 @@ func HTTPMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { httpCount.Inc() - timer := prometheus.NewTimer(httpDuration) + start := time.Now() next.ServeHTTP(w, r) - timer.ObserveDuration() + duration := time.Now().Sub(start) + httpDuration.Observe(float64(duration.Seconds())) }) } +// WSMiddleware websocket connection counter func WSMiddleware(next http.Handler) http.Handler { if !metrics { return next @@ -27,9 +31,18 @@ func WSMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { wsCount.Inc() - - timer := prometheus.NewTimer(wsDuration) next.ServeHTTP(w, r) - timer.ObserveDuration() + wsCount.Dec() }) } + +// IPCMiddleware unix-socket connection counter +func IPCMiddleware(server *rpc.Server, client rpc.Conn) { + if metrics { + ipcCount.Inc() + } + server.ServeCodec(rpc.NewCodec(client), 0) + if metrics { + ipcCount.Dec() + } +} diff --git a/pkg/prom/prom.go b/pkg/prom/prom.go index 828ed160..f17ef546 100644 --- a/pkg/prom/prom.go +++ b/pkg/prom/prom.go @@ -19,12 +19,8 @@ var ( httpCount prometheus.Counter httpDuration prometheus.Histogram - - wsCount prometheus.Counter - wsDuration prometheus.Histogram - - ipcCount prometheus.Counter - ipcDuration prometheus.Gauge + wsCount prometheus.Gauge + ipcCount prometheus.Gauge ) // Init module initialization @@ -35,39 +31,27 @@ func Init() { Namespace: namespace, Subsystem: subsystemHTTP, Name: "count", - Help: "", + Help: "http request count", }) httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystemHTTP, Name: "duration", - Help: "", + Help: "http request duration", }) - wsCount = promauto.NewCounter(prometheus.CounterOpts{ + wsCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystemWS, Name: "count", - Help: "", - }) - wsDuration = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: namespace, - Subsystem: subsystemWS, - Name: "duration", - Help: "", + Help: "websocket conntection count", }) - ipcCount = promauto.NewCounter(prometheus.CounterOpts{ + ipcCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystemIPC, Name: "count", - Help: "", - }) - ipcDuration = promauto.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystemIPC, - Name: "duration", - Help: "", + Help: "unix socket connection count", }) } diff --git a/pkg/rpc/ipc.go b/pkg/rpc/ipc.go index 297be5ab..4e170ebe 100644 --- a/pkg/rpc/ipc.go +++ b/pkg/rpc/ipc.go @@ -1,15 +1,15 @@ package rpc import ( - "C" - "fmt" "net" "os" "path/filepath" + "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/prom" ) var ( @@ -38,6 +38,22 @@ func ipcListen(endpoint string) (net.Listener, error) { return l, nil } +func ipcServe(srv *rpc.Server, listener net.Listener) { + for { + conn, err := listener.Accept() + if netutil.IsTemporaryError(err) { + log.WithError(err).Warn("rpc accept error") + continue + } + if err != nil { + log.WithError(err).Warn("unknown error") + continue + } + log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection") + go prom.IPCMiddleware(srv, conn) + } +} + // StartIPCEndpoint starts an IPC endpoint. func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) { // Register all the APIs exposed by the services. @@ -53,6 +69,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Se if err != nil { return nil, nil, err } - go handler.ServeListener(listener) + + go ipcServe(handler, listener) return listener, handler, nil }