package sealworker import ( "context" "net/http" "sync/atomic" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" "github.com/filecoin-project/lotus/build" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/metrics/proxy" ) func WorkerHandler(authv func(ctx context.Context, token string) ([]auth.Permission, error), remote http.HandlerFunc, a api.Worker, permissioned bool) http.Handler { mux := mux.NewRouter() readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder() rpcServer := jsonrpc.NewServer(readerServerOpt) wapi := proxy.MetricedWorkerAPI(a) if permissioned { wapi = api.PermissionedWorkerAPI(wapi) } rpcServer.Register("Filecoin", wapi) rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") mux.Handle("/rpc/v0", rpcServer) mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) mux.PathPrefix("/remote").HandlerFunc(remote) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof if !permissioned { return mux } ah := &auth.Handler{ Verify: authv, Next: mux.ServeHTTP, } return ah } type Worker struct { *sectorstorage.LocalWorker LocalStore *stores.Local Storage stores.LocalStorage disabled int64 } func (w *Worker) Version(context.Context) (api.Version, error) { return api.WorkerAPIVersion0, nil } func (w *Worker) StorageAddLocal(ctx context.Context, path string) error { path, err := homedir.Expand(path) if err != nil { return xerrors.Errorf("expanding local path: %w", err) } if err := w.LocalStore.OpenPath(ctx, path); err != nil { return xerrors.Errorf("opening local path: %w", err) } if err := w.Storage.SetStorage(func(sc *stores.StorageConfig) { sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path}) }); err != nil { return xerrors.Errorf("get storage config: %w", err) } return nil } func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error { disabled := int64(1) if enabled { disabled = 0 } atomic.StoreInt64(&w.disabled, disabled) return nil } func (w *Worker) Enabled(ctx context.Context) (bool, error) { return atomic.LoadInt64(&w.disabled) == 0, nil } func (w *Worker) WaitQuiet(ctx context.Context) error { w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/ return nil } func (w *Worker) ProcessSession(ctx context.Context) (uuid.UUID, error) { return w.LocalWorker.Session(ctx) } func (w *Worker) Session(ctx context.Context) (uuid.UUID, error) { if atomic.LoadInt64(&w.disabled) == 1 { return uuid.UUID{}, xerrors.Errorf("worker disabled") } return w.LocalWorker.Session(ctx) } func (w *Worker) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error) { return build.OpenRPCDiscoverJSON_Worker(), nil } var _ storiface.WorkerCalls = &Worker{}