wrap api in metrics proxy

This commit is contained in:
lanzafame 2020-10-21 18:10:27 +10:00
parent c767399fc6
commit 3e767ca5c8
3 changed files with 86 additions and 1 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
) )
@ -30,7 +31,7 @@ var log = logging.Logger("main")
func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shutdownCh <-chan struct{}) error { func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shutdownCh <-chan struct{}) error {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(a)) rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(metrics.MetricedFullAPI(a)))
ah := &auth.Handler{ ah := &auth.Handler{
Verify: a.AuthVerify, Verify: a.AuthVerify,

View File

@ -1,6 +1,7 @@
package metrics package metrics
import ( import (
"context"
"time" "time"
"go.opencensus.io/stats" "go.opencensus.io/stats"
@ -24,6 +25,8 @@ var (
MessageTo, _ = tag.NewKey("message_to") MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce") MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from") ReceivedFrom, _ = tag.NewKey("received_from")
Endpoint, _ = tag.NewKey("endpoint")
APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls
) )
// Measures // Measures
@ -49,6 +52,7 @@ var (
PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless) PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless)
PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless) PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless)
PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless) PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless)
APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds)
) )
var ( var (
@ -137,6 +141,11 @@ var (
Measure: PubsubDropRPC, Measure: PubsubDropRPC,
Aggregation: view.Count(), Aggregation: view.Count(),
} }
APIRequestDurationView = &view.View{
Measure: APIRequestDuration,
Aggregation: defaultMillisecondsDistribution,
TagKeys: []tag.Key{APIInterface, Endpoint},
}
) )
// DefaultViews is an array of OpenCensus views for metric gathering purposes // DefaultViews is an array of OpenCensus views for metric gathering purposes
@ -161,6 +170,7 @@ var DefaultViews = append([]*view.View{
PubsubRecvRPCView, PubsubRecvRPCView,
PubsubSendRPCView, PubsubSendRPCView,
PubsubDropRPCView, PubsubDropRPCView,
APIRequestDurationView,
}, },
rpcmetrics.DefaultViews...) rpcmetrics.DefaultViews...)
@ -168,3 +178,12 @@ var DefaultViews = append([]*view.View{
func SinceInMilliseconds(startTime time.Time) float64 { func SinceInMilliseconds(startTime time.Time) float64 {
return float64(time.Since(startTime).Nanoseconds()) / 1e6 return float64(time.Since(startTime).Nanoseconds()) / 1e6
} }
// Timer is a function stopwatch, calling it starts the timer,
// calling the returned function will record the duration.
func Timer(ctx context.Context, m *stats.Float64Measure) func() {
start := time.Now()
return func() {
stats.Record(ctx, m.M(SinceInMilliseconds(start)))
}
}

65
metrics/proxy.go Normal file
View File

@ -0,0 +1,65 @@
package metrics
import (
"context"
"reflect"
"go.opencensus.io/tag"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
)
func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
var out apistruct.StorageMinerStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}
func MetricedFullAPI(a api.FullNode) api.FullNode {
var out apistruct.FullNodeStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
return &out
}
func MetricedWorkerAPI(a api.WorkerAPI) api.WorkerAPI {
var out apistruct.WorkerStruct
proxy(a, &out.Internal)
return &out
}
func MetricedWalletAPI(a api.WalletAPI) api.WalletAPI {
var out apistruct.WalletStruct
proxy(a, &out.Internal)
return &out
}
func MetricedGatewayAPI(a api.GatewayAPI) api.GatewayAPI {
var out apistruct.GatewayStruct
proxy(a, &out.Internal)
return &out
}
func proxy(in interface{}, out interface{}) {
rint := reflect.ValueOf(out).Elem()
ra := reflect.ValueOf(in)
for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
fn := ra.MethodByName(field.Name)
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
// upsert function name into context
ctx, _ = tag.New(ctx, tag.Upsert(Endpoint, field.Name))
stop := Timer(ctx, APIRequestDuration)
defer stop()
// pass tagged ctx back into function call
args[0] = reflect.ValueOf(ctx)
return fn.Call(args)
}))
}
}