refactor: Split up server functions a bit (#16152)

Co-authored-by: marbar3778 <marbar3778@yahoo.com>
This commit is contained in:
Dev Ojha 2023-05-17 14:52:19 +02:00 committed by GitHub
parent d2671adfb5
commit 9329a00e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"io"
"net"
"os"
"runtime/pprof"
@ -11,6 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/cometbft/cometbft/abci/server"
cmtcmd "github.com/cometbft/cometbft/cmd/cometbft/commands"
cmtcfg "github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/node"
"github.com/cometbft/cometbft/p2p"
pvm "github.com/cometbft/cometbft/privval"
@ -210,6 +212,8 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
return err
}
// TODO: Should we be using startTraceServer, and defer closing the traceWriter?
// right now its left unclosed
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
@ -223,6 +227,10 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
return err
}
if err := config.ValidateBasic(); err != nil {
return err
}
if _, err := startTelemetry(config); err != nil {
return err
}
@ -259,57 +267,32 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error {
}
func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := svrCtx.Config
home := cfg.RootDir
cmtCfg := svrCtx.Config
home := cmtCfg.RootDir
db, err := openDB(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return err
}
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
svrCfg, err := getAndValidateConfig(svrCtx)
if err != nil {
return err
}
// clean up the traceWriter when the server is shutting down
var traceWriterCleanup func()
// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
traceWriterCleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}
config, err := serverconfig.GetConfig(svrCtx.Viper)
traceWriter, traceWriterCleanup, err := setupTraceWriter(svrCtx)
if err != nil {
return err
}
if err := config.ValidateBasic(); err != nil {
return err
}
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)
nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
// TODO: Move this to only be done if were launching the node. (So not in GRPC-only mode)
nodeKey, err := p2p.LoadOrGenNodeKey(cmtCfg.NodeKeyFile())
if err != nil {
return err
}
genDocProvider := func() (*cmttypes.GenesisDoc, error) {
appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}
return appGenesis.ToGenesisDoc()
}
var (
tmNode *node.Node
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
@ -317,125 +300,49 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
if gRPCOnly {
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
config.GRPC.Enable = true
svrCfg.GRPC.Enable = true
} else {
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")
tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
genDocProvider,
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
tmNode, err = startCmtNode(cmtCfg, nodeKey, app, svrCtx)
if err != nil {
return err
}
if err := tmNode.Start(); err != nil {
return err
// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
if svrCfg.API.Enable || svrCfg.GRPC.Enable {
// Re-assign for making the client available below do not use := to avoid
// shadowing the clientCtx variable.
clientCtx = clientCtx.WithClient(local.New(tmNode))
app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, svrCfg)
}
}
// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local CometBFT RPC client.
if (config.API.Enable || config.GRPC.Enable) && tmNode != nil {
// Re-assign for making the client available below do not use := to avoid
// shadowing the clientCtx variable.
clientCtx = clientCtx.WithClient(local.New(tmNode))
app.RegisterTxService(clientCtx)
app.RegisterTendermintService(clientCtx)
app.RegisterNodeService(clientCtx, config)
}
metrics, err := startTelemetry(config)
metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}
emitServerInfoMetrics()
var (
apiSrv *api.Server
grpcSrv *grpc.Server
)
ctx, cancelFn := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
// listen for quit signals so the calling parent process can gracefully exit
ListenForQuitSignals(cancelFn, svrCtx.Logger)
if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}
maxSendMsgSize := config.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}
maxRecvMsgSize := config.GRPC.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return err
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)
grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv)
})
grpcSrv, clientCtx, err := startGrpcServer(ctx, g, svrCfg.GRPC, clientCtx, svrCtx, app)
if err != nil {
return err
}
if config.API.Enable {
genDoc, err := genDocProvider()
if err != nil {
return err
}
clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)
apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
app.RegisterAPIRoutes(apiSrv, config.API)
if config.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
}
g.Go(func() error {
return apiSrv.Start(ctx, config)
})
err = startAPIServer(ctx, g, cmtCfg, svrCfg, clientCtx, svrCtx, app, home, grpcSrv, metrics)
if err != nil {
return err
}
// At this point it is safe to block the process if we're in gRPC-only mode as
@ -454,6 +361,8 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
})
// deferred cleanup function
// TODO: Make a generic cleanup function that takes in several func(), and runs them all.
// then we defer that.
defer func() {
if tmNode != nil && tmNode.IsRunning() {
_ = tmNode.Stop()
@ -468,6 +377,157 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.
return g.Wait()
}
// TODO: Move nodeKey into being created within the function.
func startCmtNode(cfg *cmtcfg.Config, nodeKey *p2p.NodeKey, app types.Application, svrCtx *Context) (tmNode *node.Node, err error) {
tmNode, err = node.NewNode(
cfg,
pvm.LoadOrGenFilePV(cfg.PrivValidatorKeyFile(), cfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
getGenDocProvider(cfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
servercmtlog.CometLoggerWrapper{Logger: svrCtx.Logger},
)
if err != nil {
return tmNode, err
}
if err := tmNode.Start(); err != nil {
return tmNode, err
}
return tmNode, nil
}
func getAndValidateConfig(svrCtx *Context) (serverconfig.Config, error) {
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return config, err
}
if err := config.ValidateBasic(); err != nil {
return config, err
}
return config, nil
}
// returns a function which returns the genesis doc from the genesis file.
func getGenDocProvider(cfg *cmtcfg.Config) func() (*cmttypes.GenesisDoc, error) {
return func() (*cmttypes.GenesisDoc, error) {
appGenesis, err := genutiltypes.AppGenesisFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}
return appGenesis.ToGenesisDoc()
}
}
func setupTraceWriter(svrCtx *Context) (traceWriter io.WriteCloser, cleanup func(), err error) {
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err = openTraceWriter(traceWriterFile)
if err != nil {
return traceWriter, cleanup, err
}
// clean up the traceWriter when the server is shutting down
cleanup = func() {}
// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
cleanup = func() {
if err = traceWriter.Close(); err != nil {
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}
return traceWriter, cleanup, nil
}
func startGrpcServer(ctx context.Context, g *errgroup.Group, config serverconfig.GRPCConfig, clientCtx client.Context, svrCtx *Context, app types.Application) (
*grpc.Server, client.Context, error,
) {
if !config.Enable {
// return grpcServer as nil if gRPC is disabled
return nil, clientCtx, nil
}
_, port, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, clientCtx, err
}
maxSendMsgSize := config.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}
maxRecvMsgSize := config.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return nil, clientCtx, err
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress)
grpcSrv, err := servergrpc.NewGRPCServer(clientCtx, app, config)
if err != nil {
return nil, clientCtx, err
}
// Start the gRPC server in a goroutine. Note, the provided ctx will ensure
// that the server is gracefully shut down.
g.Go(func() error {
return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config, grpcSrv)
})
return grpcSrv, clientCtx, nil
}
func startAPIServer(ctx context.Context, g *errgroup.Group, cmtCfg *cmtcfg.Config, svrCfg serverconfig.Config,
clientCtx client.Context, svrCtx *Context, app types.Application, home string, grpcSrv *grpc.Server, metrics *telemetry.Metrics,
) error {
if !svrCfg.API.Enable {
return nil
}
// TODO: Why do we reload and unmarshal the entire genesis doc in order to get the chain ID.
// surely theres a better way. This is likely a serious node start time overhead.
genDocProvider := getGenDocProvider(cmtCfg)
genDoc, err := genDocProvider()
if err != nil {
return err
}
clientCtx = clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)
apiSrv := api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
app.RegisterAPIRoutes(apiSrv, svrCfg.API)
if svrCfg.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
}
g.Go(func() error {
return apiSrv.Start(ctx, svrCfg)
})
return nil
}
func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil