Merge pull request #9116 from filecoin-project/feat/worker-name-set

feat: sealing: Allow overriding worker hostname
This commit is contained in:
Łukasz Magiera 2022-08-03 22:44:29 +02:00 committed by GitHub
commit ab0592231c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 82 additions and 19 deletions

View File

@ -159,6 +159,12 @@ var runCmd = &cli.Command{
Usage: "don't use swap", Usage: "don't use swap",
Value: false, Value: false,
}, },
&cli.StringFlag{
Name: "name",
Usage: "custom worker name",
EnvVars: []string{"LOTUS_WORKER_NAME"},
DefaultText: "hostname",
},
&cli.BoolFlag{ &cli.BoolFlag{
Name: "addpiece", Name: "addpiece",
Usage: "enable addpiece", Usage: "enable addpiece",
@ -513,6 +519,7 @@ var runCmd = &cli.Command{
NoSwap: cctx.Bool("no-swap"), NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"), MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"), ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
Name: cctx.String("name"),
}, remote, localStore, nodeApi, nodeApi, wsts), }, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore, LocalStore: localStore,
Storage: lr, Storage: lr,

View File

@ -41,6 +41,7 @@ OPTIONS:
--addpiece enable addpiece (default: true) --addpiece enable addpiece (default: true)
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true) --commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") --listen value host address and port the worker api will listen on (default: "0.0.0.0:3456")
--name value custom worker name (default: hostname) [$LOTUS_WORKER_NAME]
--no-default disable all default compute tasks, use the worker for storage/fetching only (default: false) --no-default disable all default compute tasks, use the worker for storage/fetching only (default: false)
--no-local-storage don't use storageminer repo for sector storage (default: false) --no-local-storage don't use storageminer repo for sector storage (default: false)
--no-swap don't use swap (default: false) --no-swap don't use swap (default: false)

View File

@ -622,6 +622,13 @@
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true #AllowRegenSectorKey = true
# LocalWorkerName specifies a custom name for the builtin worker.
# If set to an empty string (default) os hostname will be used
#
# type: string
# env var: LOTUS_STORAGE_LOCALWORKERNAME
#LocalWorkerName = ""
# Assigner specifies the worker assigner to use when scheduling tasks. # Assigner specifies the worker assigner to use when scheduling tasks.
# "utilization" (default) - assign tasks to workers with lowest utilization. # "utilization" (default) - assign tasks to workers with lowest utilization.
# "spread" - assign tasks to as many distinct workers as possible. # "spread" - assign tasks to as many distinct workers as possible.

View File

@ -727,6 +727,7 @@ func (n *Ensemble) Start() *Ensemble {
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{ LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
TaskTypes: m.options.workerTasks, TaskTypes: m.options.workerTasks,
NoSwap: false, NoSwap: false,
Name: m.options.workerName,
}, store, localStore, m.MinerNode, m.MinerNode, wsts), }, store, localStore, m.MinerNode, m.MinerNode, wsts),
LocalStore: localStore, LocalStore: localStore,
Storage: lr, Storage: lr,

View File

@ -47,6 +47,7 @@ type nodeOpts struct {
workerTasks []sealtasks.TaskType workerTasks []sealtasks.TaskType
workerStorageOpt func(paths.Store) paths.Store workerStorageOpt func(paths.Store) paths.Store
workerName string
} }
// DefaultNodeOpts are the default options that will be applied to test nodes. // DefaultNodeOpts are the default options that will be applied to test nodes.
@ -219,6 +220,13 @@ func WithTaskTypes(tt []sealtasks.TaskType) NodeOpt {
} }
} }
func WithWorkerName(n string) NodeOpt {
return func(opts *nodeOpts) error {
opts.workerName = n
return nil
}
}
var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}) var WithSealWorkerTasks = WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt { func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {

View File

@ -401,3 +401,28 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, lastPending, 0) require.Len(t, lastPending, 0)
} }
func TestWorkerName(t *testing.T) {
name := "thisstringisprobablynotahostnameihope"
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithWorkerName(name))
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Info(ctx)
require.NoError(t, err)
require.Equal(t, name, e.Hostname)
ws, err := miner.WorkerStats(ctx)
require.NoError(t, err)
var found bool
for _, stats := range ws {
if stats.Info.Hostname == name {
found = true
}
}
require.True(t, found)
}

View File

@ -844,6 +844,13 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
Comment: ``, Comment: ``,
}, },
{
Name: "LocalWorkerName",
Type: "string",
Comment: `LocalWorkerName specifies a custom name for the builtin worker.
If set to an empty string (default) os hostname will be used`,
},
{ {
Name: "Assigner", Name: "Assigner",
Type: "string", Type: "string",

View File

@ -65,6 +65,8 @@ func (c *StorageMiner) StorageManager() sealer.Config {
ResourceFiltering: c.Storage.ResourceFiltering, ResourceFiltering: c.Storage.ResourceFiltering,
DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize, DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize,
LocalWorkerName: c.Storage.LocalWorkerName,
Assigner: c.Storage.Assigner, Assigner: c.Storage.Assigner,
ParallelCheckLimit: c.Proving.ParallelCheckLimit, ParallelCheckLimit: c.Proving.ParallelCheckLimit,

View File

@ -401,6 +401,10 @@ type SealerConfig struct {
AllowProveReplicaUpdate2 bool AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool AllowRegenSectorKey bool
// LocalWorkerName specifies a custom name for the builtin worker.
// If set to an empty string (default) os hostname will be used
LocalWorkerName string
// Assigner specifies the worker assigner to use when scheduling tasks. // Assigner specifies the worker assigner to use when scheduling tasks.
// "utilization" (default) - assign tasks to workers with lowest utilization. // "utilization" (default) - assign tasks to workers with lowest utilization.
// "spread" - assign tasks to as many distinct workers as possible. // "spread" - assign tasks to as many distinct workers as possible.

View File

@ -116,6 +116,8 @@ type Config struct {
AllowProveReplicaUpdate2 bool AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool AllowRegenSectorKey bool
LocalWorkerName string
// ResourceFiltering instructs the system which resource filtering strategy // ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults // to use when evaluating tasks against this worker. An empty value defaults
// to "hardware". // to "hardware".
@ -207,6 +209,7 @@ func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.Loc
wcfg := WorkerConfig{ wcfg := WorkerConfig{
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled, IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
TaskTypes: localTasks, TaskTypes: localTasks,
Name: sc.LocalWorkerName,
} }
worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss) worker := NewLocalWorker(wcfg, stor, lstor, si, m, wss)
err = m.AddWorker(ctx, worker) err = m.AddWorker(ctx, worker)

View File

@ -34,6 +34,9 @@ type WorkerConfig struct {
TaskTypes []sealtasks.TaskType TaskTypes []sealtasks.TaskType
NoSwap bool NoSwap bool
// os.Hostname if not set
Name string
// IgnoreResourceFiltering enables task distribution to happen on this // IgnoreResourceFiltering enables task distribution to happen on this
// worker regardless of its currently available resources. Used in testing // worker regardless of its currently available resources. Used in testing
// with the local worker. // with the local worker.
@ -56,6 +59,8 @@ type LocalWorker struct {
noSwap bool noSwap bool
envLookup EnvFunc envLookup EnvFunc
name string
// see equivalent field on WorkerConfig. // see equivalent field on WorkerConfig.
ignoreResources bool ignoreResources bool
@ -83,6 +88,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
localStore: local, localStore: local,
sindex: sindex, sindex: sindex,
ret: ret, ret: ret,
name: wcfg.Name,
ct: &workerCallTracker{ ct: &workerCallTracker{
st: cst, st: cst,
@ -97,6 +103,14 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
closing: make(chan struct{}), closing: make(chan struct{}),
} }
if w.name == "" {
var err error
w.name, err = os.Hostname()
if err != nil {
panic(err)
}
}
if wcfg.MaxParallelChallengeReads > 0 { if wcfg.MaxParallelChallengeReads > 0 {
w.challengeThrottle = make(chan struct{}, wcfg.MaxParallelChallengeReads) w.challengeThrottle = make(chan struct{}, wcfg.MaxParallelChallengeReads)
} }
@ -113,13 +127,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
go func() { go func() {
for _, call := range unfinished { for _, call := range unfinished {
hostname, osErr := os.Hostname() err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.Errorf("worker [name: %s] restarted", w.name))
if osErr != nil {
log.Errorf("get hostname err: %+v", err)
hostname = ""
}
err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.Errorf("worker [Hostname: %s] restarted", hostname))
// TODO: Handle restarting PC1 once support is merged // TODO: Handle restarting PC1 once support is merged
@ -283,12 +291,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storiface.SectorRef,
} }
if err != nil { if err != nil {
hostname, osErr := os.Hostname() err = xerrors.Errorf("%w [name: %s]", err, l.name)
if osErr != nil {
log.Errorf("get hostname err: %+v", err)
}
err = xerrors.Errorf("%w [Hostname: %s]", err, hostname)
} }
if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) { if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {
@ -774,11 +777,6 @@ func (l *LocalWorker) memInfo() (memPhysical, memUsed, memSwap, memSwapUsed uint
} }
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
panic(err)
}
gpus, err := ffi.GetGPUDevices() gpus, err := ffi.GetGPUDevices()
if err != nil { if err != nil {
log.Errorf("getting gpu devices failed: %+v", err) log.Errorf("getting gpu devices failed: %+v", err)
@ -797,7 +795,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
} }
return storiface.WorkerInfo{ return storiface.WorkerInfo{
Hostname: hostname, Hostname: l.name,
IgnoreResources: l.ignoreResources, IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{ Resources: storiface.WorkerResources{
MemPhysical: memPhysical, MemPhysical: memPhysical,