refactor gateway rpc.

This commit is contained in:
Raúl Kripalani 2021-05-23 18:37:53 +01:00
parent 2337248aa5
commit 188688c9ce
2 changed files with 93 additions and 75 deletions

View File

@ -3,29 +3,25 @@ package main
import ( import (
"context" "context"
"net" "net"
"net/http"
"os" "os"
"contrib.go.opencensus.io/exporter/prometheus" "github.com/urfave/cli/v2"
"github.com/filecoin-project/go-jsonrpc" "go.opencensus.io/stats/view"
"github.com/filecoin-project/go-state-types/abi" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/gateway"
promclient "github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/tag"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats/view"
"github.com/gorilla/mux" "github.com/filecoin-project/go-jsonrpc"
"github.com/urfave/cli/v2" "github.com/filecoin-project/go-state-types/abi"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
) )
var log = logging.Logger("gateway") var log = logging.Logger("gateway")
@ -103,70 +99,44 @@ var runCmd = &cli.Command{
} }
defer closer() defer closer()
address := cctx.String("listen") var (
mux := mux.NewRouter() lookbackCap = cctx.Duration("api-max-lookback")
address = cctx.String("listen")
waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit"))
)
log.Info("Setting up API endpoint at " + address) serverOptions := make([]jsonrpc.ServerOption, 0)
if maxRequestSize := cctx.Int("api-max-req-size"); maxRequestSize != 0 {
serveRpc := func(path string, hnd interface{}) { serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(int64(maxRequestSize)))
serverOptions := make([]jsonrpc.ServerOption, 0)
if maxRequestSize := cctx.Int("api-max-req-size"); maxRequestSize != 0 {
serverOptions = append(serverOptions, jsonrpc.WithMaxRequestSize(int64(maxRequestSize)))
}
rpcServer := jsonrpc.NewServer(serverOptions...)
rpcServer.Register("Filecoin", hnd)
mux.Handle(path, rpcServer)
} }
lookbackCap := cctx.Duration("api-max-lookback") log.Info("setting up API endpoint at " + address)
waitLookback := abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit")) addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return xerrors.Errorf("failed to resolve endpoint address: %w", err)
}
ma := metrics.MetricedGatewayAPI(gateway.NewNode(api, lookbackCap, waitLookback)) maddr, err := manet.FromNetAddr(addr)
if err != nil {
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
}
serveRpc("/rpc/v1", ma) gwapi := gateway.NewNode(api, lookbackCap, waitLookback)
serveRpc("/rpc/v0", lapi.Wrap(new(v1api.FullNodeStruct), new(v0api.WrapperV1Full), ma)) h, err := gateway.Handler(gwapi, serverOptions...)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
}
registry := promclient.DefaultRegisterer.(*promclient.Registry) stopFunc, err := node.ServeRPC(h, "lotus-gateway", maddr)
exporter, err := prometheus.NewExporter(prometheus.Options{ if err != nil {
Registry: registry, return xerrors.Errorf("failed to serve rpc endpoint: %w", err)
Namespace: "lotus_gw", }
<-node.MonitorShutdown(nil, node.ShutdownHandler{
Component: "rpc",
StopFunc: stopFunc,
}) })
if err != nil { return nil
return err
}
mux.Handle("/debug/metrics", exporter)
mux.PathPrefix("/").Handler(http.DefaultServeMux)
/*ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}*/
srv := &http.Server{
Handler: mux,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-gateway"))
return ctx
},
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
nl, err := net.Listen("tcp", address)
if err != nil {
return err
}
return srv.Serve(nl)
}, },
} }

48
gateway/handler.go Normal file
View File

@ -0,0 +1,48 @@
package gateway
import (
"net/http"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/metrics"
"github.com/gorilla/mux"
promclient "github.com/prometheus/client_golang/prometheus"
)
// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(a api.Gateway, opts ...jsonrpc.ServerOption) (http.Handler, error) {
m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(opts...)
rpcServer.Register("Filecoin", hnd)
m.Handle(path, rpcServer)
}
ma := metrics.MetricedGatewayAPI(a)
serveRpc("/rpc/v1", ma)
serveRpc("/rpc/v0", api.Wrap(new(v1api.FullNodeStruct), new(v0api.WrapperV1Full), ma))
registry := promclient.DefaultRegisterer.(*promclient.Registry)
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus_gw",
})
if err != nil {
return nil, err
}
m.Handle("/debug/metrics", exporter)
m.PathPrefix("/").Handler(http.DefaultServeMux)
/*ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}*/
return m, nil
}