Metrics and http server #15

Merged
n0cte merged 10 commits from metrics into master 2020-10-21 16:42:05 +00:00
3 changed files with 47 additions and 33 deletions
Showing only changes of commit 4c18554fbe - Show all commits

View File

@ -2,10 +2,12 @@ package prom
import ( import (
"net/http" "net/http"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/ethereum/go-ethereum/rpc"
) )
// HTTPMiddleware http connection metric reader
func HTTPMiddleware(next http.Handler) http.Handler { func HTTPMiddleware(next http.Handler) http.Handler {
if !metrics { if !metrics {
return next return next
@ -14,12 +16,14 @@ func HTTPMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
httpCount.Inc() httpCount.Inc()
timer := prometheus.NewTimer(httpDuration) start := time.Now()
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
timer.ObserveDuration() duration := time.Now().Sub(start)
httpDuration.Observe(float64(duration.Seconds()))
}) })
} }
// WSMiddleware websocket connection counter
func WSMiddleware(next http.Handler) http.Handler { func WSMiddleware(next http.Handler) http.Handler {
if !metrics { if !metrics {
return next return next
@ -27,9 +31,18 @@ func WSMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wsCount.Inc() wsCount.Inc()
timer := prometheus.NewTimer(wsDuration)
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
timer.ObserveDuration() wsCount.Dec()
}) })
} }
// IPCMiddleware unix-socket connection counter
func IPCMiddleware(server *rpc.Server, client rpc.Conn) {
if metrics {
ipcCount.Inc()
}
server.ServeCodec(rpc.NewCodec(client), 0)
if metrics {
ipcCount.Dec()
}
}

View File

@ -19,12 +19,8 @@ var (
httpCount prometheus.Counter httpCount prometheus.Counter
httpDuration prometheus.Histogram httpDuration prometheus.Histogram
wsCount prometheus.Gauge
wsCount prometheus.Counter ipcCount prometheus.Gauge
wsDuration prometheus.Histogram
ipcCount prometheus.Counter
ipcDuration prometheus.Gauge
) )
// Init module initialization // Init module initialization
@ -35,39 +31,27 @@ func Init() {
Namespace: namespace, Namespace: namespace,
Subsystem: subsystemHTTP, Subsystem: subsystemHTTP,
Name: "count", Name: "count",
Help: "", Help: "http request count",
}) })
httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{ httpDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystemHTTP, Subsystem: subsystemHTTP,
Name: "duration", Name: "duration",
Help: "", Help: "http request duration",
}) })
wsCount = promauto.NewCounter(prometheus.CounterOpts{ wsCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystemWS, Subsystem: subsystemWS,
Name: "count", Name: "count",
Help: "", Help: "websocket conntection count",
})
wsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystemWS,
Name: "duration",
Help: "",
}) })
ipcCount = promauto.NewCounter(prometheus.CounterOpts{ ipcCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystemIPC, Subsystem: subsystemIPC,
Name: "count", Name: "count",
Help: "", Help: "unix socket connection count",
})
ipcDuration = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystemIPC,
Name: "duration",
Help: "",
}) })
} }

View File

@ -1,15 +1,15 @@
package rpc package rpc
import ( import (
"C"
"fmt" "fmt"
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/prom"
) )
var ( var (
@ -38,6 +38,22 @@ func ipcListen(endpoint string) (net.Listener, error) {
return l, nil return l, nil
} }
func ipcServe(srv *rpc.Server, listener net.Listener) {
for {
conn, err := listener.Accept()
if netutil.IsTemporaryError(err) {
log.WithError(err).Warn("rpc accept error")
continue
}
if err != nil {
log.WithError(err).Warn("unknown error")
continue
}
log.WithField("addr", conn.RemoteAddr()).Trace("accepted ipc connection")
go prom.IPCMiddleware(srv, conn)
}
}
// StartIPCEndpoint starts an IPC endpoint. // StartIPCEndpoint starts an IPC endpoint.
func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) { func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Server, error) {
// Register all the APIs exposed by the services. // Register all the APIs exposed by the services.
@ -53,6 +69,7 @@ func StartIPCEndpoint(ipcEndpoint string, apis []rpc.API) (net.Listener, *rpc.Se
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
go handler.ServeListener(listener)
go ipcServe(handler, listener)
return listener, handler, nil return listener, handler, nil
} }