Merge pull request #5023 from filecoin-project/feat/worker-set-task-types

worker: Support setting task types at runtime
This commit is contained in:
Łukasz Magiera 2020-12-01 14:51:07 +01:00 committed by GitHub
commit c4a6b94b9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 177 additions and 0 deletions

View File

@ -23,6 +23,9 @@ type WorkerAPI interface {
storiface.WorkerCalls
TaskDisable(ctx context.Context, tt sealtasks.TaskType) error
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error
// Storage / Other
Remove(ctx context.Context, sector abi.SectorID) error

View File

@ -388,6 +388,9 @@ type WorkerStruct struct {
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
TaskDisable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`
TaskEnable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
@ -1573,6 +1576,14 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
}
func (w *WorkerStruct) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskDisable(ctx, tt)
}
func (w *WorkerStruct) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskEnable(ctx, tt)
}
func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}

View File

@ -241,6 +241,7 @@ func init() {
addExample(map[sealtasks.TaskType]struct{}{
sealtasks.TTPreCommit2: {},
})
addExample(sealtasks.TTCommit2)
}
func exampleValue(method string, t, parent reflect.Type) interface{} {

View File

@ -2,12 +2,14 @@ package main
import (
"fmt"
"sort"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
var infoCmd = &cli.Command{
@ -49,10 +51,22 @@ var infoCmd = &cli.Command{
return xerrors.Errorf("getting info: %w", err)
}
tt, err := api.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting task types: %w", err)
}
fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
fmt.Printf("Task types: ")
for _, t := range ttList(tt) {
fmt.Printf("%s ", t.Short())
}
fmt.Println()
fmt.Println()
paths, err := api.Paths(ctx)
@ -80,3 +94,14 @@ var infoCmd = &cli.Command{
return nil
},
}
func ttList(tt map[sealtasks.TaskType]struct{}) []sealtasks.TaskType {
tasks := make([]sealtasks.TaskType, 0, len(tt))
for taskType := range tt {
tasks = append(tasks, taskType)
}
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Less(tasks[j])
})
return tasks
}

View File

@ -59,6 +59,7 @@ func main() {
storageCmd,
setCmd,
waitQuietCmd,
tasksCmd,
}
app := &cli.App{

View File

@ -0,0 +1,82 @@
package main
import (
"context"
"strings"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
var tasksCmd = &cli.Command{
Name: "tasks",
Usage: "Manage task processing",
Subcommands: []*cli.Command{
tasksEnableCmd,
tasksDisableCmd,
},
}
var allowSetting = map[sealtasks.TaskType]struct{}{
sealtasks.TTAddPiece: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},
sealtasks.TTUnseal: {},
}
var settableStr = func() string {
var s []string
for _, tt := range ttList(allowSetting) {
s = append(s, tt.Short())
}
return strings.Join(s, "|")
}()
var tasksEnableCmd = &cli.Command{
Name: "enable",
Usage: "Enable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskEnable),
}
var tasksDisableCmd = &cli.Command{
Name: "disable",
Usage: "Disable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskDisable),
}
func taskAction(tf func(a api.WorkerAPI, ctx context.Context, tt sealtasks.TaskType) error) func(cctx *cli.Context) error {
return func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return xerrors.Errorf("expected 1 argument")
}
var tt sealtasks.TaskType
for taskType := range allowSetting {
if taskType.Short() == cctx.Args().First() {
tt = taskType
break
}
}
if tt == "" {
return xerrors.Errorf("unknown task type '%s'", cctx.Args().First())
}
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return tf(api, ctx, tt)
}
}

View File

@ -29,6 +29,8 @@
* [Storage](#Storage)
* [StorageAddLocal](#StorageAddLocal)
* [Task](#Task)
* [TaskDisable](#TaskDisable)
* [TaskEnable](#TaskEnable)
* [TaskTypes](#TaskTypes)
* [Unseal](#Unseal)
* [UnsealPiece](#UnsealPiece)
@ -502,6 +504,34 @@ Response: `{}`
## Task
### TaskDisable
There are not yet any comments for this method.
Perms: admin
Inputs:
```json
[
"seal/v0/commit/2"
]
```
Response: `{}`
### TaskEnable
There are not yet any comments for this method.
Perms: admin
Inputs:
```json
[
"seal/v0/commit/2"
]
```
Response: `{}`
### TaskTypes
TaskType -> Weight

View File

@ -49,6 +49,7 @@ type LocalWorker struct {
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
taskLk sync.Mutex
session uuid.UUID
testDisable int64
@ -457,9 +458,28 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector st
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()
return l.acceptTasks, nil
}
func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()
delete(l.acceptTasks, tt)
return nil
}
func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()
l.acceptTasks[tt] = struct{}{}
return nil
}
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}

4
go.sum
View File

@ -698,6 +698,7 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
@ -1210,6 +1211,7 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
@ -1423,7 +1425,9 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=