Merge pull request #11473 from filecoin-project/fixLpStop

fix: lp api handler for stop
This commit is contained in:
Łukasz Magiera 2023-12-03 10:09:32 +01:00 committed by GitHub
commit 839d00558a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 187 additions and 24 deletions

10
api/api_lp.go Normal file
View File

@ -0,0 +1,10 @@
package api
import "context"
type LotusProvider interface {
Version(context.Context) (Version, error) //perm:admin
// Trigger shutdown
Shutdown(context.Context) error //perm:admin
}

View File

@ -15,6 +15,16 @@ import (
"github.com/filecoin-project/lotus/lib/rpcenc"
)
// NewProviderRpc creates a new http jsonrpc client.
func NewProviderRpc(ctx context.Context, addr string, requestHeader http.Header) (api.LotusProvider, jsonrpc.ClientCloser, error) {
var res v1api.LotusProviderStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, jsonrpc.WithErrors(api.RPCErrors))
return &res, closer, err
}
// NewCommonRPCV0 creates a new http jsonrpc client.
func NewCommonRPCV0(ctx context.Context, addr string, requestHeader http.Header) (api.CommonNet, jsonrpc.ClientCloser, error) {
var res v0api.CommonNetStruct

View File

@ -41,6 +41,12 @@ func PermissionedWorkerAPI(a Worker) Worker {
return &out
}
func PermissionedAPI[T any](a T) T {
var out T
permissionedProxies(a, &out)
return out
}
func PermissionedWalletAPI(a Wallet) Wallet {
var out WalletStruct
permissionedProxies(a, &out)

View File

@ -831,6 +831,19 @@ type GatewayMethods struct {
type GatewayStub struct {
}
type LotusProviderStruct struct {
Internal LotusProviderMethods
}
type LotusProviderMethods struct {
Shutdown func(p0 context.Context) error `perm:"admin"`
Version func(p0 context.Context) (Version, error) `perm:"admin"`
}
type LotusProviderStub struct {
}
type NetStruct struct {
Internal NetMethods
}
@ -5214,6 +5227,28 @@ func (s *GatewayStub) Web3ClientVersion(p0 context.Context) (string, error) {
return "", ErrNotSupported
}
func (s *LotusProviderStruct) Shutdown(p0 context.Context) error {
if s.Internal.Shutdown == nil {
return ErrNotSupported
}
return s.Internal.Shutdown(p0)
}
func (s *LotusProviderStub) Shutdown(p0 context.Context) error {
return ErrNotSupported
}
func (s *LotusProviderStruct) Version(p0 context.Context) (Version, error) {
if s.Internal.Version == nil {
return *new(Version), ErrNotSupported
}
return s.Internal.Version(p0)
}
func (s *LotusProviderStub) Version(p0 context.Context) (Version, error) {
return *new(Version), ErrNotSupported
}
func (s *NetStruct) ID(p0 context.Context) (peer.ID, error) {
if s.Internal.ID == nil {
return *new(peer.ID), ErrNotSupported
@ -7442,6 +7477,7 @@ var _ CommonNet = new(CommonNetStruct)
var _ EthSubscriber = new(EthSubscriberStruct)
var _ FullNode = new(FullNodeStruct)
var _ Gateway = new(GatewayStruct)
var _ LotusProvider = new(LotusProviderStruct)
var _ Net = new(NetStruct)
var _ Signable = new(SignableStruct)
var _ StorageMiner = new(StorageMinerStruct)

View File

@ -12,3 +12,5 @@ type RawFullNodeAPI FullNode
func PermissionedFullAPI(a FullNode) FullNode {
return api.PermissionedFullAPI(a)
}
type LotusProviderStruct = api.LotusProviderStruct

View File

@ -59,6 +59,8 @@ var (
MinerAPIVersion0 = newVer(1, 5, 0)
WorkerAPIVersion0 = newVer(1, 7, 0)
ProviderAPIVersion0 = newVer(1, 0, 0)
)
//nolint:varcheck,deadcode

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,51 @@
package rpc
import (
"context"
"net/http"
"github.com/gorilla/mux"
// logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics/proxy"
)
//var log = logging.Logger("lp/rpc")
func LotusProviderHandler(
authv func(ctx context.Context, token string) ([]auth.Permission, error),
remote http.HandlerFunc,
a api.LotusProvider,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(jsonrpc.WithServerErrors(api.RPCErrors), readerServerOpt)
wapi := proxy.MetricedAPI(a)
if permissioned {
wapi = api.PermissionedAPI(wapi)
}
rpcServer.Register("Filecoin", wapi)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
if !permissioned {
return mux
}
ah := &auth.Handler{
Verify: authv,
Next: mux.ServeHTTP,
}
return ah
}

View File

@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/cmd/lotus-provider/rpc"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal"
@ -118,11 +119,14 @@ var runCmd = &cli.Command{
tag.Insert(metrics.NodeType, "provider"),
)
shutdownChan := make(chan struct{})
ctx, ctxclose := context.WithCancel(ctx)
go func() {
<-shutdownChan
ctxclose()
}()
{
var ctxclose func()
ctx, ctxclose = context.WithCancel(ctx)
go func() {
<-shutdownChan
ctxclose()
}()
}
// Register all metric views
/*
if err := view.Register(
@ -206,26 +210,43 @@ var runCmd = &cli.Command{
}
// Serve the RPC.
/*
srv := &http.Server{
Handler: sealworker.WorkerHandler(nodeApi.AuthVerify, remoteHandler, workerApi, true),
ReadHeaderTimeout: timeout,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
var authVerify func(context.Context, string) ([]auth.Permission, error)
{
privateKey, err := base64.RawStdEncoding.DecodeString(deps.cfg.Apis.StorageRPCSecret)
if err != nil {
return err
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) {
var payload jwtPayload
if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil {
return nil, xerrors.Errorf("JWT Verification failed: %w", err)
}
log.Warn("Graceful shutdown successful")
}()
*/
return payload.Allow, nil
}
}
// Serve the RPC.
srv := &http.Server{
Handler: rpc.LotusProviderHandler(
authVerify,
remoteHandler,
&ProviderAPI{deps, shutdownChan},
true),
ReadHeaderTimeout: time.Minute * 3,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
// Monitor for shutdown.
// TODO provide a graceful shutdown API on shutdownChan
@ -421,3 +442,18 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
}, nil
}
type ProviderAPI struct {
*Deps
ShutdownChan chan struct{}
}
func (p *ProviderAPI) Version(context.Context) (api.Version, error) {
return api.Version(api.ProviderAPIVersion0), nil
}
// Trigger shutdown
func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}

View File

@ -26,7 +26,11 @@ import (
var log = logging.Logger("sealworker")
func WorkerHandler(authv func(ctx context.Context, token string) ([]auth.Permission, error), remote http.HandlerFunc, a api.Worker, permissioned bool) http.Handler {
func WorkerHandler(
authv func(ctx context.Context, token string) ([]auth.Permission, error),
remote http.HandlerFunc,
a api.Worker,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(jsonrpc.WithServerErrors(api.RPCErrors), readerServerOpt)

View File

@ -10,6 +10,12 @@ import (
"github.com/filecoin-project/lotus/metrics"
)
func MetricedAPI[T any](a T) T {
var out T
proxy(a, &out)
return out
}
func MetricedStorMinerAPI(a api.StorageMiner) api.StorageMiner {
var out api.StorageMinerStruct
proxy(a, &out)