feat: sealer: Custom worker name config

This commit is contained in:
Łukasz Magiera 2022-08-03 12:54:32 +02:00
parent 4d10adae3c
commit da33d82e1b
8 changed files with 48 additions and 23 deletions

View File

@ -159,6 +159,12 @@ var runCmd = &cli.Command{
Usage: "don't use swap", Usage: "don't use swap",
Value: false, Value: false,
}, },
&cli.StringFlag{
Name: "name",
Usage: "custom worker name",
EnvVars: []string{"LOTUS_WORKER_NAME"},
DefaultText: "hostname",
},
&cli.BoolFlag{ &cli.BoolFlag{
Name: "addpiece", Name: "addpiece",
Usage: "enable addpiece", Usage: "enable addpiece",
@ -513,6 +519,7 @@ var runCmd = &cli.Command{
NoSwap: cctx.Bool("no-swap"), NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"), ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts), }, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore, LocalStore: localStore,
Storage: lr, Storage: lr,

View File

@ -41,6 +41,7 @@ OPTIONS:
--addpiece enable addpiece (default: true) --addpiece enable addpiece (default: true)
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true) --commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") --listen value host address and port the worker api will listen on (default: "0.0.0.0:3456")
--name value custom worker name (default: hostname) [$LOTUS_WORKER_NAME]
--no-default disable all default compute tasks, use the worker for storage/fetching only (default: false) --no-default disable all default compute tasks, use the worker for storage/fetching only (default: false)
--no-local-storage don't use storageminer repo for sector storage (default: false) --no-local-storage don't use storageminer repo for sector storage (default: false)
--no-swap don't use swap (default: false) --no-swap don't use swap (default: false)

View File

@ -622,6 +622,13 @@
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true #AllowRegenSectorKey = true
# LocalWorkerName specifies a custom name for the builtin worker.
# If set to an empty string (default) os hostname will be used
#
# type: string
# env var: LOTUS_STORAGE_LOCALWORKERNAME
#LocalWorkerName = ""
# Assigner specifies the worker assigner to use when scheduling tasks. # Assigner specifies the worker assigner to use when scheduling tasks.
# "utilization" (default) - assign tasks to workers with lowest utilization. # "utilization" (default) - assign tasks to workers with lowest utilization.
# "spread" - assign tasks to as many distinct workers as possible. # "spread" - assign tasks to as many distinct workers as possible.

View File

@ -844,6 +844,13 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
Comment: ``, Comment: ``,
}, },
{
Name: "LocalWorkerName",
Type: "string",
Comment: `LocalWorkerName specifies a custom name for the builtin worker.
If set to an empty string (default) os hostname will be used`,
},
{ {
Name: "Assigner", Name: "Assigner",
Type: "string", Type: "string",

View File

@ -65,6 +65,8 @@ func (c *StorageMiner) StorageManager() sealer.Config {
ResourceFiltering: c.Storage.ResourceFiltering, ResourceFiltering: c.Storage.ResourceFiltering,
DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize, DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize,
LocalWorkerName: c.Storage.LocalWorkerName,
Assigner: c.Storage.Assigner, Assigner: c.Storage.Assigner,
ParallelCheckLimit: c.Proving.ParallelCheckLimit, ParallelCheckLimit: c.Proving.ParallelCheckLimit,

View File

@ -401,6 +401,10 @@ type SealerConfig struct {
AllowProveReplicaUpdate2 bool AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool AllowRegenSectorKey bool
// LocalWorkerName specifies a custom name for the builtin worker.
// If set to an empty string (default) os hostname will be used
LocalWorkerName string
// Assigner specifies the worker assigner to use when scheduling tasks. // Assigner specifies the worker assigner to use when scheduling tasks.
// "utilization" (default) - assign tasks to workers with lowest utilization. // "utilization" (default) - assign tasks to workers with lowest utilization.
// "spread" - assign tasks to as many distinct workers as possible. // "spread" - assign tasks to as many distinct workers as possible.

View File

@ -116,6 +116,8 @@ type Config struct {
AllowProveReplicaUpdate2 bool AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool AllowRegenSectorKey bool
LocalWorkerName string
// ResourceFiltering instructs the system which resource filtering strategy // ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults // to use when evaluating tasks against this worker. An empty value defaults
// to "hardware". // to "hardware".
@ -207,6 +209,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
wcfg := WorkerConfig{ wcfg := WorkerConfig{
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled, IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
TaskTypes: localTasks, TaskTypes: localTasks,
Name: sc.LocalWorkerName,
} }
worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss) worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss)
err = m.AddWorker(ctx, worker) err = m.AddWorker(ctx, worker)

View File

@ -34,6 +34,9 @@ type WorkerConfig struct {
TaskTypes []sealtasks.TaskType TaskTypes []sealtasks.TaskType
NoSwap bool NoSwap bool
// os.Hostname if not set
Name string
// IgnoreResourceFiltering enables task distribution to happen on this // IgnoreResourceFiltering enables task distribution to happen on this
// worker regardless of its currently available resources. Used in testing // worker regardless of its currently available resources. Used in testing
// with the local worker. // with the local worker.
@ -56,6 +59,8 @@ type LocalWorker struct {
noSwap bool noSwap bool
envLookup EnvFunc envLookup EnvFunc
name string
// see equivalent field on WorkerConfig. // see equivalent field on WorkerConfig.
ignoreResources bool ignoreResources bool
@ -83,6 +88,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
localStore: local, localStore: local,
sindex: sindex, sindex: sindex,
ret: ret, ret: ret,
name: wcfg.Name,
ct: &workerCallTracker{ ct: &workerCallTracker{
st: cst, st: cst,
@ -97,6 +103,14 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
closing: make(chan struct{}), closing: make(chan struct{}),
} }
if w.name == "" {
var err error
w.name, err = os.Hostname()
if err != nil {
panic(err)
}
}
if wcfg.MaxParallelChallengeReads > 0 { if wcfg.MaxParallelChallengeReads > 0 {
w.challengeThrottle = make(chan struct{}, wcfg.MaxParallelChallengeReads) w.challengeThrottle = make(chan struct{}, wcfg.MaxParallelChallengeReads)
} }
@ -113,13 +127,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
go func() { go func() {
for _, call := range unfinished { for _, call := range unfinished {
hostname, osErr := os.Hostname() err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.Errorf("worker [name: %s] restarted", w.name))
if osErr != nil {
log.Errorf("get hostname err: %+v", err)
hostname = ""
}
err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.Errorf("worker [Hostname: %s] restarted", hostname))
// TODO: Handle restarting PC1 once support is merged // TODO: Handle restarting PC1 once support is merged
@ -283,12 +291,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storiface.SectorRef,
} }
if err != nil { if err != nil {
hostname, osErr := os.Hostname() err = xerrors.Errorf("%w [name: %s]", err, l.name)
if osErr != nil {
log.Errorf("get hostname err: %+v", err)
}
err = xerrors.Errorf("%w [Hostname: %s]", err, hostname)
} }
if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) { if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {
@ -774,15 +777,6 @@ func (l *LocalWorker) memInfo() (memPhysical, memUsed, memSwap, memSwapUsed uint
} }
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
hostname, ok := os.LookupEnv("LOTUS_WORKER_HOSTNAME")
if !ok {
var err error
hostname, err = os.Hostname()
if err != nil {
panic(err)
}
}
gpus, err := ffi.GetGPUDevices() gpus, err := ffi.GetGPUDevices()
if err != nil { if err != nil {
log.Errorf("getting gpu devices failed: %+v", err) log.Errorf("getting gpu devices failed: %+v", err)
@ -801,7 +795,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
} }
return storiface.WorkerInfo{ return storiface.WorkerInfo{
Hostname: hostname, Hostname: l.name,
IgnoreResources: l.ignoreResources, IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{ Resources: storiface.WorkerResources{
MemPhysical: memPhysical, MemPhysical: memPhysical,