Co-authored-by: Tyler <48813565+technicallyty@users.noreply.github.com> Co-authored-by: Alex | Cosmos Labs <alex@cosmoslabs.io>
241 lines
7.7 KiB
Go
241 lines
7.7 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
cmtrpcserver "github.com/cometbft/cometbft/rpc/jsonrpc/server"
|
|
gateway "github.com/cosmos/gogogateway"
|
|
"github.com/golang/protobuf/proto" //nolint:staticcheck // needed for compatibility
|
|
"github.com/gorilla/handlers"
|
|
"github.com/gorilla/mux"
|
|
"github.com/grpc-ecosystem/grpc-gateway/runtime"
|
|
"github.com/improbable-eng/grpc-web/go/grpcweb"
|
|
"google.golang.org/grpc"
|
|
|
|
"cosmossdk.io/log"
|
|
|
|
"github.com/cosmos/cosmos-sdk/client"
|
|
"github.com/cosmos/cosmos-sdk/codec/legacy"
|
|
"github.com/cosmos/cosmos-sdk/server/config"
|
|
cmtlogwrapper "github.com/cosmos/cosmos-sdk/server/log"
|
|
"github.com/cosmos/cosmos-sdk/telemetry"
|
|
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
|
|
)
|
|
|
|
// Server defines the server's API interface.
|
|
type Server struct {
|
|
Router *mux.Router
|
|
GRPCGatewayRouter *runtime.ServeMux
|
|
ClientCtx client.Context
|
|
GRPCSrv *grpc.Server
|
|
logger log.Logger
|
|
|
|
//nolint:staticcheck // TODO: switch to OpenTelemetry
|
|
metrics *telemetry.Metrics
|
|
|
|
// Start() is blocking and generally called from a separate goroutine.
|
|
// Close() can be called asynchronously and access shared memory
|
|
// via the listener. Therefore, we sync access to Start and Close with
|
|
// this mutex to avoid data races.
|
|
mtx sync.Mutex
|
|
listener net.Listener
|
|
}
|
|
|
|
// CustomGRPCHeaderMatcher for mapping request headers to
|
|
// GRPC metadata.
|
|
// HTTP headers that start with 'Grpc-Metadata-' are automatically mapped to
|
|
// gRPC metadata after removing the prefix 'Grpc-Metadata-'. We can use this
|
|
// CustomGRPCHeaderMatcher if headers don't start with `Grpc-Metadata-`
|
|
func CustomGRPCHeaderMatcher(key string) (string, bool) {
|
|
switch strings.ToLower(key) {
|
|
case grpctypes.GRPCBlockHeightHeader:
|
|
return grpctypes.GRPCBlockHeightHeader, true
|
|
|
|
default:
|
|
return runtime.DefaultHeaderMatcher(key)
|
|
}
|
|
}
|
|
|
|
func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Server {
|
|
// The default JSON marshaller used by the gRPC-Gateway is unable to marshal non-nullable non-scalar fields.
|
|
// Using the gogo/gateway package with the gRPC-Gateway WithMarshaler option fixes the scalar field marshaling issue.
|
|
marshalerOption := &gateway.JSONPb{
|
|
EmitDefaults: true,
|
|
Indent: "",
|
|
OrigName: true,
|
|
AnyResolver: clientCtx.InterfaceRegistry,
|
|
}
|
|
|
|
return &Server{
|
|
logger: logger,
|
|
Router: mux.NewRouter(),
|
|
ClientCtx: clientCtx,
|
|
GRPCGatewayRouter: runtime.NewServeMux(
|
|
// Custom marshaler option is required for gogo proto
|
|
runtime.WithMarshalerOption(runtime.MIMEWildcard, marshalerOption),
|
|
|
|
// This is necessary to get error details properly
|
|
// marshaled in unary requests.
|
|
runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),
|
|
|
|
// Custom header matcher for mapping request headers to
|
|
// GRPC metadata
|
|
runtime.WithIncomingHeaderMatcher(CustomGRPCHeaderMatcher),
|
|
|
|
// extension to set custom response headers
|
|
runtime.WithForwardResponseOption(customGRPCResponseHeaders),
|
|
),
|
|
GRPCSrv: grpcSrv,
|
|
}
|
|
}
|
|
|
|
func customGRPCResponseHeaders(ctx context.Context, w http.ResponseWriter, _ proto.Message) error {
|
|
if meta, ok := runtime.ServerMetadataFromContext(ctx); ok {
|
|
if values := meta.HeaderMD.Get(grpctypes.GRPCBlockHeightHeader); len(values) == 1 {
|
|
w.Header().Set(grpctypes.GRPCBlockHeightHeader, values[0])
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Start starts the API server. Internally, the API server leverages CometBFT's
|
|
// JSON RPC server. Configuration options are provided via config.APIConfig
|
|
// and are delegated to the CometBFT JSON RPC server.
|
|
//
|
|
// Note, this creates a blocking process if the server is started successfully.
|
|
// Otherwise, an error is returned. The caller is expected to provide a Context
|
|
// that is properly canceled or closed to indicate the server should be stopped.
|
|
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
|
|
s.mtx.Lock()
|
|
|
|
cmtCfg := cmtrpcserver.DefaultConfig()
|
|
cmtCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections)
|
|
cmtCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second
|
|
cmtCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second
|
|
cmtCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes)
|
|
|
|
listener, err := cmtrpcserver.Listen(cfg.API.Address, cmtCfg.MaxOpenConnections)
|
|
if err != nil {
|
|
s.mtx.Unlock()
|
|
return err
|
|
}
|
|
|
|
s.listener = listener
|
|
s.mtx.Unlock()
|
|
|
|
// configure grpc-web server
|
|
if cfg.GRPC.Enable && cfg.GRPCWeb.Enable {
|
|
var options []grpcweb.Option
|
|
if cfg.API.EnableUnsafeCORS {
|
|
options = append(options,
|
|
grpcweb.WithOriginFunc(func(origin string) bool {
|
|
return true
|
|
}),
|
|
)
|
|
}
|
|
|
|
wrappedGrpc := grpcweb.WrapServer(s.GRPCSrv, options...)
|
|
s.Router.PathPrefix("/").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
|
if wrappedGrpc.IsGrpcWebRequest(req) {
|
|
wrappedGrpc.ServeHTTP(w, req)
|
|
return
|
|
}
|
|
|
|
// Fall back to the grpc gateway server.
|
|
s.GRPCGatewayRouter.ServeHTTP(w, req)
|
|
}))
|
|
}
|
|
|
|
// register grpc-gateway routes (after grpc-web server as the first match is used)
|
|
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)
|
|
|
|
errCh := make(chan error)
|
|
|
|
// Start the API in an external goroutine as Serve is blocking and will return
|
|
// an error upon failure, which we'll send on the error channel that will be
|
|
// consumed by the for block below.
|
|
go func(enableUnsafeCORS bool) {
|
|
s.logger.Info("starting API server...", "address", cfg.API.Address)
|
|
|
|
if enableUnsafeCORS {
|
|
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
|
|
errCh <- cmtrpcserver.Serve(s.listener, allowAllCORS(s.Router), cmtlogwrapper.CometLoggerWrapper{Logger: s.logger}, cmtCfg)
|
|
} else {
|
|
errCh <- cmtrpcserver.Serve(s.listener, s.Router, cmtlogwrapper.CometLoggerWrapper{Logger: s.logger}, cmtCfg)
|
|
}
|
|
}(cfg.API.EnableUnsafeCORS)
|
|
|
|
// Start a blocking select to wait for an indication to stop the server or that
|
|
// the server failed to start properly.
|
|
select {
|
|
case <-ctx.Done():
|
|
// The calling process canceled or closed the provided context, so we must
|
|
// gracefully stop the API server.
|
|
s.logger.Info("stopping API server...", "address", cfg.API.Address)
|
|
return s.Close()
|
|
|
|
case err := <-errCh:
|
|
s.logger.Error("failed to start API server", "err", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Close closes the API server.
|
|
func (s *Server) Close() error {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
return s.listener.Close()
|
|
}
|
|
|
|
// Deprecated: Use OpenTelemetry instead, see the `telemetry` package for more details.
|
|
func (s *Server) SetTelemetry(m *telemetry.Metrics) {
|
|
s.mtx.Lock()
|
|
s.registerMetrics(m)
|
|
s.mtx.Unlock()
|
|
}
|
|
|
|
//nolint:staticcheck // TODO: switch to OpenTelemetry
|
|
func (s *Server) registerMetrics(m *telemetry.Metrics) {
|
|
s.metrics = m
|
|
|
|
metricsHandler := func(w http.ResponseWriter, r *http.Request) {
|
|
format := strings.TrimSpace(r.FormValue("format"))
|
|
|
|
gr, err := s.metrics.Gather(format)
|
|
if err != nil {
|
|
writeErrorResponse(w, http.StatusBadRequest, fmt.Sprintf("failed to gather metrics: %s", err))
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", gr.ContentType)
|
|
_, _ = w.Write(gr.Metrics)
|
|
}
|
|
|
|
s.Router.HandleFunc("/metrics", metricsHandler).Methods("GET")
|
|
}
|
|
|
|
// errorResponse defines the attributes of a JSON error response.
|
|
type errorResponse struct {
|
|
Code int `json:"code,omitempty"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
// newErrorResponse creates a new errorResponse instance.
|
|
func newErrorResponse(code int, err string) errorResponse {
|
|
return errorResponse{Code: code, Error: err}
|
|
}
|
|
|
|
// writeErrorResponse prepares and writes an HTTP error
|
|
// given a status code and an error message.
|
|
func writeErrorResponse(w http.ResponseWriter, status int, err string) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
_, _ = w.Write(legacy.Cdc.MustMarshalJSON(newErrorResponse(0, err)))
|
|
}
|