diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index 34e068ffd..3fed88468 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -7,10 +7,15 @@ import ( "os" "github.com/filecoin-project/go-jsonrpc" + "go.opencensus.io/tag" + "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/lotuslog" + "github.com/filecoin-project/lotus/metrics" + logging "github.com/ipfs/go-log" + "go.opencensus.io/stats/view" "github.com/gorilla/mux" "github.com/urfave/cli/v2" @@ -64,6 +69,13 @@ var runCmd = &cli.Command{ ctx, cancel := context.WithCancel(ctx) defer cancel() + // Register all metric views + if err := view.Register( + metrics.DefaultViews..., + ); err != nil { + log.Fatalf("Cannot register the view: %v", err) + } + api, closer, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err @@ -76,7 +88,7 @@ var runCmd = &cli.Command{ log.Info("Setting up API endpoint at " + address) rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", NewGatewayAPI(api)) + rpcServer.Register("Filecoin", metrics.MetricedGatewayAPI(NewGatewayAPI(api))) mux.Handle("/rpc/v0", rpcServer) mux.PathPrefix("/").Handler(http.DefaultServeMux) @@ -89,6 +101,7 @@ var runCmd = &cli.Command{ srv := &http.Server{ Handler: mux, BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-gateway")) return ctx }, } diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index d2c57e680..9c2ad6319 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -18,6 +18,8 @@ import ( logging "github.com/ipfs/go-log/v2" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/go-jsonrpc" @@ -34,6 +36,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/rpcenc" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" ) @@ -190,6 +193,13 @@ var runCmd = &cli.Command{ ctx, cancel := context.WithCancel(ctx) defer cancel() + // Register all metric views + if err := view.Register( + metrics.DefaultViews..., + ); err != nil { + log.Fatalf("Cannot register the view: %v", err) + } + v, err := nodeApi.Version(ctx) if err != nil { return err @@ -363,7 +373,7 @@ var runCmd = &cli.Command{ readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder() rpcServer := jsonrpc.NewServer(readerServerOpt) - rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi)) + rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(metrics.MetricedWorkerAPI(workerApi))) mux.Handle("/rpc/v0", rpcServer) mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) @@ -378,6 +388,7 @@ var runCmd = &cli.Command{ srv := &http.Server{ Handler: ah, BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) return ctx }, } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index c043bd903..0c2fba8b3 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -2,6 +2,7 @@ package main import ( "context" + "net" "net/http" _ "net/http/pprof" "os" @@ -12,6 +13,8 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "golang.org/x/xerrors" "github.com/filecoin-project/go-jsonrpc" @@ -22,6 +25,7 @@ import ( "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/ulimit" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/modules/dtypes" @@ -66,6 +70,13 @@ var runCmd = &cli.Command{ defer ncloser() ctx := lcli.DaemonContext(cctx) + // Register all metric views + if err := view.Register( + metrics.DefaultViews..., + ); err != nil { + log.Fatalf("Cannot register the view: %v", err) + } + v, err := nodeApi.Version(ctx) if err != nil { return err @@ -147,7 +158,7 @@ var runCmd = &cli.Command{ mux := mux.NewRouter() rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(minerapi)) + rpcServer.Register("Filecoin", apistruct.PermissionedStorMinerAPI(metrics.MetricedStorMinerAPI(minerapi))) mux.Handle("/rpc/v0", rpcServer) mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) @@ -158,7 +169,13 @@ var runCmd = &cli.Command{ Next: mux.ServeHTTP, } - srv := &http.Server{Handler: ah} + srv := &http.Server{ + Handler: ah, + BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-miner")) + return ctx + }, + } sigChan := make(chan os.Signal, 2) go func() { diff --git a/cmd/lotus-wallet/main.go b/cmd/lotus-wallet/main.go index 3285b13e7..25b89eb9d 100644 --- a/cmd/lotus-wallet/main.go +++ b/cmd/lotus-wallet/main.go @@ -9,6 +9,8 @@ import ( "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" "github.com/urfave/cli/v2" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" "github.com/filecoin-project/go-jsonrpc" @@ -18,6 +20,7 @@ import ( ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/lotuslog" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" ) @@ -75,6 +78,13 @@ var runCmd = &cli.Command{ ctx, cancel := context.WithCancel(ctx) defer cancel() + // Register all metric views + if err := view.Register( + metrics.DefaultViews..., + ); err != nil { + log.Fatalf("Cannot register the view: %v", err) + } + repoPath := cctx.String(FlagWalletRepo) r, err := repo.NewFS(repoPath) if err != nil { @@ -125,7 +135,7 @@ var runCmd = &cli.Command{ log.Info("Setting up API endpoint at " + address) rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", &LoggedWallet{under: w}) + rpcServer.Register("Filecoin", &LoggedWallet{under: metrics.MetricedWalletAPI(w)}) mux.Handle("/rpc/v0", rpcServer) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof @@ -138,6 +148,7 @@ var runCmd = &cli.Command{ srv := &http.Server{ Handler: mux, BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-wallet")) return ctx }, } diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 9718deb3a..4f68ac85a 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "net" "net/http" _ "net/http/pprof" "os" @@ -13,6 +14,7 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "go.opencensus.io/tag" "golang.org/x/xerrors" "contrib.go.opencensus.io/exporter/prometheus" @@ -22,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" ) @@ -30,7 +33,7 @@ var log = logging.Logger("main") func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shutdownCh <-chan struct{}) error { rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(a)) + rpcServer.Register("Filecoin", apistruct.PermissionedFullAPI(metrics.MetricedFullAPI(a))) ah := &auth.Handler{ Verify: a.AuthVerify, @@ -60,7 +63,13 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr multiaddr.Multiaddr, shut return xerrors.Errorf("could not listen: %w", err) } - srv := &http.Server{Handler: http.DefaultServeMux} + srv := &http.Server{ + Handler: http.DefaultServeMux, + BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-daemon")) + return ctx + }, + } sigCh := make(chan os.Signal, 2) shutdownDone := make(chan struct{}) diff --git a/metrics/metrics.go b/metrics/metrics.go index 5dd865263..33d9e9174 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "time" "go.opencensus.io/stats" @@ -24,6 +25,8 @@ var ( MessageTo, _ = tag.NewKey("message_to") MessageNonce, _ = tag.NewKey("message_nonce") ReceivedFrom, _ = tag.NewKey("received_from") + Endpoint, _ = tag.NewKey("endpoint") + APIInterface, _ = tag.NewKey("api") // to distinguish between gateway api and full node api endpoint calls ) // Measures @@ -49,6 +52,7 @@ var ( PubsubRecvRPC = stats.Int64("pubsub/recv_rpc", "Counter for total received RPCs", stats.UnitDimensionless) PubsubSendRPC = stats.Int64("pubsub/send_rpc", "Counter for total sent RPCs", stats.UnitDimensionless) PubsubDropRPC = stats.Int64("pubsub/drop_rpc", "Counter for total dropped RPCs", stats.UnitDimensionless) + APIRequestDuration = stats.Float64("api/request_duration_ms", "Duration of API requests", stats.UnitMilliseconds) ) var ( @@ -137,6 +141,11 @@ var ( Measure: PubsubDropRPC, Aggregation: view.Count(), } + APIRequestDurationView = &view.View{ + Measure: APIRequestDuration, + Aggregation: defaultMillisecondsDistribution, + TagKeys: []tag.Key{APIInterface, Endpoint}, + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -161,6 +170,7 @@ var DefaultViews = append([]*view.View{ PubsubRecvRPCView, PubsubSendRPCView, PubsubDropRPCView, + APIRequestDurationView, }, rpcmetrics.DefaultViews...) @@ -168,3 +178,12 @@ var DefaultViews = append([]*view.View{ func SinceInMilliseconds(startTime time.Time) float64 { return float64(time.Since(startTime).Nanoseconds()) / 1e6 } + +// Timer is a function stopwatch, calling it starts the timer, +// calling the returned function will record the duration. +func Timer(ctx context.Context, m *stats.Float64Measure) func() { + start := time.Now() + return func() { + stats.Record(ctx, m.M(SinceInMilliseconds(start))) + } +} diff --git a/metrics/proxy.go b/metrics/proxy.go new file mode 100644 index 000000000..f3714ec2e --- /dev/null +++ b/metrics/proxy.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "context" + "reflect" + + "go.opencensus.io/tag" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" +) + +func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner { + var out apistruct.StorageMinerStruct + proxy(a, &out.Internal) + proxy(a, &out.CommonStruct.Internal) + return &out +} + +func MetricedFullAPI(a api.FullNode) api.FullNode { + var out apistruct.FullNodeStruct + proxy(a, &out.Internal) + proxy(a, &out.CommonStruct.Internal) + return &out +} + +func MetricedWorkerAPI(a api.WorkerAPI) api.WorkerAPI { + var out apistruct.WorkerStruct + proxy(a, &out.Internal) + return &out +} + +func MetricedWalletAPI(a api.WalletAPI) api.WalletAPI { + var out apistruct.WalletStruct + proxy(a, &out.Internal) + return &out +} + +func MetricedGatewayAPI(a api.GatewayAPI) api.GatewayAPI { + var out apistruct.GatewayStruct + proxy(a, &out.Internal) + return &out +} + +func proxy(in interface{}, out interface{}) { + rint := reflect.ValueOf(out).Elem() + ra := reflect.ValueOf(in) + + for f := 0; f < rint.NumField(); f++ { + field := rint.Type().Field(f) + fn := ra.MethodByName(field.Name) + + rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) { + ctx := args[0].Interface().(context.Context) + // upsert function name into context + ctx, _ = tag.New(ctx, tag.Upsert(Endpoint, field.Name)) + stop := Timer(ctx, APIRequestDuration) + defer stop() + // pass tagged ctx back into function call + args[0] = reflect.ValueOf(ctx) + return fn.Call(args) + })) + + } +}