diff --git a/api/api_storage.go b/api/api_storage.go index be912401b..e4d5d8a25 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -112,7 +112,7 @@ type StorageMiner interface { WorkerConnect(context.Context, string) error WorkerAttachStorage(context.Context, StorageInfo) error WorkerDeclareSector(ctx context.Context, storageId string, s abi.SectorID) error - WorkerFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) + FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error) diff --git a/api/apistruct/permissioned.go b/api/apistruct/permissioned.go index 4c29f6688..5e513859a 100644 --- a/api/apistruct/permissioned.go +++ b/api/apistruct/permissioned.go @@ -43,6 +43,12 @@ func PermissionedFullAPI(a api.FullNode) api.FullNode { return &out } +func PermissionedWorkerAPI(a api.WorkerApi) api.WorkerApi { + var out WorkerStruct + permissionedAny(a, &out.Internal) + return &out +} + func HasPerm(ctx context.Context, perm api.Permission) bool { callerPerms, ok := ctx.Value(permCtxKey).([]api.Permission) if !ok { diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index e978daa29..a6b2019b6 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -183,7 +183,7 @@ type StorageMinerStruct struct { WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm WorkerAttachStorage func(context.Context, api.StorageInfo) error `perm:"admin"` WorkerDeclareSector func(ctx context.Context, storageId string, s abi.SectorID) error `perm:"admin"` - WorkerFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` + FindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) `perm:"admin"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` @@ -659,8 +659,8 @@ func (c *StorageMinerStruct) WorkerDeclareSector(ctx context.Context, storageId return c.Internal.WorkerDeclareSector(ctx, storageId, s) } -func (c *StorageMinerStruct) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { - return c.Internal.WorkerFindSector(ctx, si, types) +func (c *StorageMinerStruct) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { + return c.Internal.FindSector(ctx, si, types) } func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error { diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 0ce5f732e..c3e867da3 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -1,23 +1,36 @@ package main import ( + "context" + "net/http" "os" + "os/signal" + "syscall" + "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" paramfetch "github.com/filecoin-project/go-paramfetch" + manet "github.com/multiformats/go-multiaddr-net" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/lib/auth" + "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/storage/sealmgr/advmgr" + "github.com/filecoin-project/lotus/storage/sealmgr/stores" ) var log = logging.Logger("main") +const FlagStorageRepo = "workerrepo" + const ( workers = 1 // TODO: Configurability transfers = 1 @@ -38,7 +51,7 @@ func main() { Version: build.UserVersion, Flags: []cli.Flag{ &cli.StringFlag{ - Name: "repo", + Name: "workerrepo", EnvVars: []string{"WORKER_PATH"}, Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME }, @@ -52,12 +65,6 @@ func main() { Usage: "enable use of GPU for mining operations", Value: true, }, - &cli.BoolFlag{ - Name: "no-precommit", - }, - &cli.BoolFlag{ - Name: "no-commit", - }, }, Commands: local, @@ -71,11 +78,6 @@ func main() { } } -type limits struct { - workLimit chan struct{} - transferLimit chan struct{} -} - var runCmd = &cli.Command{ Name: "run", Usage: "Start lotus worker", @@ -98,6 +100,7 @@ var runCmd = &cli.Command{ if v.APIVersion != build.APIVersion { return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion}) } + log.Infof("Remote version %s", v) go func() { <-ctx.Done() @@ -117,7 +120,83 @@ var runCmd = &cli.Command{ return xerrors.Errorf("get params: %w", err) } + repoPath := cctx.String(FlagStorageRepo) + r, err := repo.NewFS(repoPath) + if err != nil { + return err + } - return nil + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { + return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-seal-worker init' to set it up", repoPath) + } + + lr, err := r.Lock(repo.Worker) + if err != nil { + return err + } + + localStore, err := stores.NewLocal(lr) + if err != nil { + return err + } + + endpoint, err := r.APIEndpoint() + if err != nil { + return err + } + + lst, err := manet.Listen(endpoint) + if err != nil { + return xerrors.Errorf("could not listen: %w", err) + } + + _, spt, err := api.ProofTypeFromSectorSize(ssize) + if err != nil { + return xerrors.Errorf("getting proof type: %w", err) + } + + sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner) + if err != nil { + return xerrors.Errorf("could not get api info: %w", err) + } + + remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader()) + + workerApi := &worker{ + LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore), + } + + mux := mux.NewRouter() + + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi)) + + mux.Handle("/rpc/v0", rpcServer) + mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Store: localStore}).ServeHTTP) + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + + ah := &auth.Handler{ + Verify: nodeApi.AuthVerify, + Next: mux.ServeHTTP, + } + + srv := &http.Server{Handler: ah} + + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + log.Warn("Shutting down..") + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + return srv.Serve(manet.NetListener(lst)) }, } diff --git a/cmd/lotus-seal-worker/rpc.go b/cmd/lotus-seal-worker/rpc.go index 80b98155b..9de4d1079 100644 --- a/cmd/lotus-seal-worker/rpc.go +++ b/cmd/lotus-seal-worker/rpc.go @@ -1,6 +1,9 @@ package main import ( + "context" + + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/storage/sealmgr/advmgr" "github.com/filecoin-project/specs-storage/storage" ) @@ -9,4 +12,8 @@ type worker struct { // TODO: use advmgr.LocalWorker here *advmgr.LocalWorker } +func (w *worker) Version(context.Context) (build.Version, error) { + return build.APIVersion, nil +} + var _ storage.Sealer = &worker{} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a2d3dcb9a..9bbc58f1e 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -159,7 +159,7 @@ func (sm *StorageMinerAPI) WorkerDeclareSector(ctx context.Context, storageId st panic("implement me") } -func (sm *StorageMinerAPI) WorkerFindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { +func (sm *StorageMinerAPI) FindSector(ctx context.Context, si abi.SectorID, types sectorbuilder.SectorFileType) ([]api.StorageInfo, error) { panic("implement me") } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 12f9540d8..926541359 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -40,6 +40,7 @@ const ( _ = iota // Default is invalid FullNode RepoType = iota StorageMiner + Worker ) func defConfForType(t RepoType) interface{} { @@ -48,6 +49,8 @@ func defConfForType(t RepoType) interface{} { return config.DefaultFullNode() case StorageMiner: return config.DefaultStorageMiner() + case Worker: + return &struct {}{} default: panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) } diff --git a/storage/sealmgr/advmgr/local.go b/storage/sealmgr/advmgr/local.go index f0b46bf25..1b5d9859a 100644 --- a/storage/sealmgr/advmgr/local.go +++ b/storage/sealmgr/advmgr/local.go @@ -20,7 +20,24 @@ import ( type LocalWorker struct { scfg *sectorbuilder.Config - storage *stores.Local + storage stores.Store + localStore *stores.Local +} + +func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local) *LocalWorker { + ppt, err := spt.RegisteredPoStProof() + if err != nil { + panic(err) + } + return &LocalWorker{ + scfg: §orbuilder.Config{ + SealProofType: spt, + PoStProofType: ppt, + Miner: ma, + }, + storage: store, + localStore: local, + } } type localWorkerPathProvider struct { @@ -33,7 +50,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.Sect return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err) } - return l.w.storage.AcquireSector(abi.SectorID{ + return l.w.storage.AcquireSector(ctx, abi.SectorID{ Miner: abi.ActorID(mid), Number: id, }, existing, allocate, sealing) @@ -107,7 +124,7 @@ func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, } func (l *LocalWorker) Paths(context.Context) ([]api.StoragePath, error) { - return l.storage.Local(), nil + return l.localStore.Local(), nil } var _ Worker = &LocalWorker{} diff --git a/storage/sealmgr/advmgr/roprov.go b/storage/sealmgr/advmgr/roprov.go index 64513cdcd..2269a3257 100644 --- a/storage/sealmgr/advmgr/roprov.go +++ b/storage/sealmgr/advmgr/roprov.go @@ -20,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumbe return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage") } - return l.stor.AcquireSector(abi.SectorID{ + return l.stor.AcquireSector(ctx, abi.SectorID{ Miner: l.miner, Number: id, }, existing, allocate, sealing) diff --git a/storage/sealmgr/stores/http_handler.go b/storage/sealmgr/stores/http_handler.go index 40264b74c..75cb080b5 100644 --- a/storage/sealmgr/stores/http_handler.go +++ b/storage/sealmgr/stores/http_handler.go @@ -44,7 +44,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ if err != nil { return } - paths, done, err := handler.Store.AcquireSector(id, ft, 0, false) + paths, done, err := handler.Store.AcquireSector(r.Context(), id, ft, 0, false) if err != nil { return } diff --git a/storage/sealmgr/stores/local.go b/storage/sealmgr/stores/local.go index 942ea2086..3420a0aea 100644 --- a/storage/sealmgr/stores/local.go +++ b/storage/sealmgr/stores/local.go @@ -1,6 +1,7 @@ package stores import ( + "context" "encoding/json" "io/ioutil" "os" @@ -114,7 +115,7 @@ func (st *Local) open() error { return nil } -func (st *Local) AcquireSector(sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { if existing|allocate != existing^allocate { return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index 77f10c741..e278dd701 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -30,6 +30,14 @@ type Remote struct { // (make sure to not fetch the same sector data twice) } +func NewRemote(local Store, remote SectorIndex, auth http.Header) *Remote { + return &Remote{ + local: local, + remote: remote, + auth: auth, + } +} + type SectorIndex interface { FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error) }