unspecified address should be calculated before use. add a new timeout flag and add a timeout to TCP dial
422 lines
11 KiB
422 lines
11 KiB
package main
import (
logging "github.com/ipfs/go-log/v2"
paramfetch "github.com/filecoin-project/go-paramfetch"
lcli "github.com/filecoin-project/lotus/cli"
sectorstorage "github.com/filecoin-project/sector-storage"
var log = logging.Logger("main")
const FlagWorkerRepo = "worker-repo"
// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
log.Info("Starting lotus worker")
local := []*cli.Command{
app := &cli.App{
Name: "lotus-worker",
Usage: "Remote miner worker",
Version: build.UserVersion(),
Flags: []cli.Flag{
Name: FlagWorkerRepo,
Aliases: []string{FlagWorkerRepoDeprecation},
EnvVars: []string{"LOTUS_WORKER_PATH", "WORKER_PATH"},
Value: "~/.lotusworker", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify worker repo path. flag %s and env WORKER_PATH are DEPRECATION, will REMOVE SOON", FlagWorkerRepoDeprecation),
Name: "miner-repo",
Aliases: []string{"storagerepo"},
Value: "~/.lotusminer", // TODO: Consider XDG_DATA_HOME
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
Commands: local,
app.Metadata["repoType"] = repo.Worker
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Flags: []cli.Flag{
Name: "address",
Usage: "locally reachable address",
Value: "",
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
Name: "timeout",
Usage: "used when address is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
// Connect to storage-miner
var nodeApi api.StorageMiner
var closer func()
var err error
for {
nodeApi, closer, err = lcli.GetStorageMinerAPI(cctx)
if err == nil {
fmt.Printf("\r\x1b[0KConnecting to miner API... (%s)", err)
defer closer()
ctx := lcli.ReqContext(cctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
v, err := nodeApi.Version(ctx)
if err != nil {
return err
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
log.Infof("Remote version %s", v)
watchMinerConn(ctx, cctx, nodeApi)
// Check params
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
if cctx.Bool("commit") {
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
if cctx.Bool("precommit2") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit2)
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")
// Open repo
repoPath := cctx.String(FlagWorkerRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
ok, err := r.Exists()
if err != nil {
return err
if !ok {
if err := r.Init(repo.Worker); err != nil {
return err
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
var localPaths []stores.LocalPath
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: 10,
CanSeal: true,
CanStore: false,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
localPaths = append(localPaths, stores.LocalPath{
Path: lr.Path(),
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
// init datastore for r.Exists
_, err := lr.Datastore("/metadata")
if err != nil {
return err
if err := lr.Close(); err != nil {
return xerrors.Errorf("close repo: %w", err)
lr, err := r.Lock(repo.Worker)
if err != nil {
return err
log.Info("Opening local storage; connecting to master")
const unspecifiedAddress = ""
address := cctx.String("address")
addressSlice := strings.Split(address, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
timeout, err := time.ParseDuration(cctx.String("timeout"))
if err != nil {
return err
rip, err := extractRoutableIP(timeout)
if err != nil {
return err
address = rip + ":" + addressSlice[1]
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
if err != nil {
return err
// Setup remote sector store
spt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize)
if err != nil {
return xerrors.Errorf("getting proof type: %w", err)
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader())
// Create / expose the worker
workerApi := &worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
mux := mux.NewRouter()
log.Info("Setting up control endpoint at " + address)
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
srv := &http.Server{
Handler: ah,
BaseContext: func(listener net.Listener) context.Context {
return ctx
go func() {
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
log.Warn("Graceful shutdown successful")
nl, err := net.Listen("tcp", address)
if err != nil {
return err
log.Info("Waiting for tasks")
go func() {
if err := nodeApi.WorkerConnect(ctx, "ws://"+address+"/rpc/v0"); err != nil {
log.Errorf("Registering worker failed: %+v", err)
return srv.Serve(nl)
func watchMinerConn(ctx context.Context, cctx *cli.Context, nodeApi api.StorageMiner) {
go func() {
closing, err := nodeApi.Closing(ctx)
if err != nil {
log.Errorf("failed to get remote closing channel: %+v", err)
select {
case <-closing:
case <-ctx.Done():
if ctx.Err() != nil {
return // graceful shutdown
log.Warnf("Connection with miner node lost, restarting")
exe, err := os.Executable()
if err != nil {
log.Errorf("getting executable for auto-restart: %+v", err)
// TODO: there are probably cleaner/more graceful ways to restart,
// but this is good enough for now (FSM can recover from the mess this creates)
if err := syscall.Exec(exe, []string{exe, "run",
fmt.Sprintf("--address=%s", cctx.String("address")),
fmt.Sprintf("--no-local-storage=%t", cctx.Bool("no-local-storage")),
fmt.Sprintf("--precommit1=%t", cctx.Bool("precommit1")),
fmt.Sprintf("--precommit2=%t", cctx.Bool("precommit2")),
fmt.Sprintf("--commit=%t", cctx.Bool("commit")),
}, os.Environ()); err != nil {
func extractRoutableIP(timeout time.Duration) (string, error) {
minerMultiAddrKey := "MINER_API_INFO"
deprecatedMinerMultiAddrKey := "STORAGE_API_INFO"
env, ok := os.LookupEnv(minerMultiAddrKey)
if !ok {
// TODO remove after deprecation period
env, ok = os.LookupEnv(deprecatedMinerMultiAddrKey)
if ok {
log.Warnf("Using a deprecated env(%s) value, please use env(%s) instead.", deprecatedMinerMultiAddrKey, minerMultiAddrKey)
return "", xerrors.New("MINER_API_INFO environment variable required to extract IP")
minerAddr := strings.Split(env, "/")
conn, err := net.DialTimeout("tcp", minerAddr[2]+":"+minerAddr[4], timeout)
if err != nil {
return "", err
defer conn.Close()
localAddr := conn.LocalAddr().(*net.TCPAddr)
return strings.Split(localAddr.IP.String(), ":")[0], nil