lotus/cmd/lotus-provider/rpc/rpc.go

158 lines
4.6 KiB
Go
Raw Normal View History

// Package rpc provides all direct access to this node.
2023-12-03 06:40:01 +00:00
package rpc
import (
"context"
"encoding/base64"
"encoding/json"
"net"
2023-12-03 06:40:01 +00:00
"net/http"
2024-01-20 14:52:38 +00:00
"net/url"
"time"
2023-12-03 06:40:01 +00:00
"github.com/gbrlsnchs/jwt/v3"
2023-12-03 06:40:01 +00:00
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/tag"
2023-12-12 05:16:57 +00:00
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
2023-12-03 06:40:01 +00:00
2024-01-20 14:52:38 +00:00
"github.com/filecoin-project/go-address"
2023-12-03 06:40:01 +00:00
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
2023-12-03 06:40:01 +00:00
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics"
2023-12-03 06:40:01 +00:00
"github.com/filecoin-project/lotus/metrics/proxy"
2024-01-20 14:52:38 +00:00
"github.com/filecoin-project/lotus/provider/lpmarket"
2024-01-13 12:47:22 +00:00
"github.com/filecoin-project/lotus/provider/lpweb"
"github.com/filecoin-project/lotus/storage/paths"
2023-12-03 06:40:01 +00:00
)
var log = logging.Logger("lp/rpc")
2023-12-03 06:40:01 +00:00
func LotusProviderHandler(
authv func(ctx context.Context, token string) ([]auth.Permission, error),
remote http.HandlerFunc,
a api.LotusProvider,
permissioned bool) http.Handler {
mux := mux.NewRouter()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(jsonrpc.WithServerErrors(api.RPCErrors), readerServerOpt)
wapi := proxy.MetricedAPI[api.LotusProvider, api.LotusProviderStruct](a)
2023-12-03 06:40:01 +00:00
if permissioned {
wapi = api.PermissionedAPI[api.LotusProvider, api.LotusProviderStruct](wapi)
2023-12-03 06:40:01 +00:00
}
rpcServer.Register("Filecoin", wapi)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc(remote)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
if !permissioned {
return mux
}
ah := &auth.Handler{
Verify: authv,
Next: mux.ServeHTTP,
}
return ah
}
type ProviderAPI struct {
*deps.Deps
ShutdownChan chan struct{}
}
func (p *ProviderAPI) Version(context.Context) (api.Version, error) {
return api.ProviderAPIVersion0, nil
}
2024-01-20 14:52:38 +00:00
func (p *ProviderAPI) AllocatePieceToSector(ctx context.Context, maddr address.Address, piece api.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (api.SectorOffset, error) {
di := lpmarket.NewPieceIngester(p.Deps.DB, p.Deps.Full)
return di.AllocatePieceToSector(ctx, maddr, piece, rawSize, source, header)
}
// Trigger shutdown
func (p *ProviderAPI) Shutdown(context.Context) error {
close(p.ShutdownChan)
return nil
}
func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error {
fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
w.WriteHeader(401)
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"})
return
}
fh.ServeHTTP(w, r)
}
var authVerify func(context.Context, string) ([]auth.Permission, error)
{
privateKey, err := base64.StdEncoding.DecodeString(dependencies.Cfg.Apis.StorageRPCSecret)
if err != nil {
return xerrors.Errorf("decoding storage rpc secret: %w", err)
}
authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) {
var payload deps.JwtPayload
if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil {
return nil, xerrors.Errorf("JWT Verification failed: %w", err)
}
return payload.Allow, nil
}
}
// Serve the RPC.
srv := &http.Server{
Handler: LotusProviderHandler(
authVerify,
remoteHandler,
&ProviderAPI{dependencies, shutdownChan},
true),
ReadHeaderTimeout: time.Minute * 3,
BaseContext: func(listener net.Listener) context.Context {
ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker"))
return ctx
},
2023-12-12 00:30:39 +00:00
Addr: dependencies.ListenAddr,
}
2023-12-12 05:16:57 +00:00
log.Infof("Setting up RPC server at %s", dependencies.ListenAddr)
eg := errgroup.Group{}
eg.Go(srv.ListenAndServe)
2023-12-12 05:16:57 +00:00
if dependencies.Cfg.Subsystems.EnableWebGui {
2024-01-13 12:47:22 +00:00
web, err := lpweb.GetSrv(ctx, dependencies)
if err != nil {
return err
2023-12-12 05:16:57 +00:00
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
if err := web.Shutdown(context.Background()); err != nil {
log.Errorf("shutting down web server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
log.Infof("Setting up web server at %s", dependencies.Cfg.Subsystems.GuiAddress)
eg.Go(web.ListenAndServe)
}
2023-12-12 05:16:57 +00:00
return eg.Wait()
}