Merge remote-tracking branch 'origin/master' into feat/storage-redeclare

This commit is contained in:
Łukasz Magiera 2022-08-05 21:26:15 +02:00
commit cb2b4fa4aa
23 changed files with 317 additions and 96 deletions

View File

@ -31,7 +31,7 @@ Please send an email to security@filecoin.org. See our [security policy](SECURIT
These repos are independent and reusable modules, but are tightly integrated into Lotus to make up a fully featured Filecoin implementation:
- [go-fil-markets](https://github.com/filecoin-project/go-fil-markets) which has its own [kanban work tracker available here](https://app.zenhub.com/workspaces/markets-shared-components-5daa144a7046a60001c6e253/board)
- [specs-actors](https://github.com/filecoin-project/specs-actors) which has its own [kanban work tracker available here](https://app.zenhub.com/workspaces/actors-5ee6f3aa87591f0016c05685/board)
- [builtin-actors](https://github.com/filecoin-project/builtin-actors)
## Contribute

View File

@ -402,7 +402,7 @@ type FullNode interface {
StateCall(context.Context, *types.Message, types.TipSetKey) (*InvocResult, error) //perm:read
// StateReplay replays a given message, assuming it was included in a block in the specified tipset.
//
// If a tipset key is provided, and a replacing message is found on chain,
// If a tipset key is provided, and a replacing message is not found on chain,
// the method will return an error saying that the message wasn't found
//
// If no tipset key is provided, the appropriate tipset is looked up, and if

View File

@ -150,6 +150,8 @@ type StorageMiner interface {
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
//SealingSchedRemove removes a request from sealing pipeline
SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin
// paths.SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin

View File

@ -802,6 +802,8 @@ type StorageMinerStruct struct {
SealingAbort func(p0 context.Context, p1 storiface.CallID) error `perm:"admin"`
SealingRemoveRequest func(p0 context.Context, p1 uuid.UUID) error `perm:"admin"`
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"`
@ -4773,6 +4775,17 @@ func (s *StorageMinerStub) SealingAbort(p0 context.Context, p1 storiface.CallID)
return ErrNotSupported
}
func (s *StorageMinerStruct) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
if s.Internal.SealingRemoveRequest == nil {
return ErrNotSupported
}
return s.Internal.SealingRemoveRequest(p0, p1)
}
func (s *StorageMinerStub) SealingRemoveRequest(p0 context.Context, p1 uuid.UUID) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (interface{}, error) {
if s.Internal.SealingSchedDiag == nil {
return nil, ErrNotSupported

View File

@ -386,7 +386,7 @@ type FullNode interface {
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) //perm:read
// StateReplay replays a given message, assuming it was included in a block in the specified tipset.
//
// If a tipset key is provided, and a replacing message is found on chain,
// If a tipset key is provided, and a replacing message is not found on chain,
// the method will return an error saying that the message wasn't found
//
// If no tipset key is provided, the appropriate tipset is looked up, and if

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/adt"
builtin2 "github.com/filecoin-project/lotus/chain/actors/builtin"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
@ -411,7 +412,7 @@ var actorControlList = &cli.Command{
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
api, acloser, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
@ -493,18 +494,21 @@ var actorControlList = &cli.Command{
}
printKey := func(name string, a address.Address) {
b, err := api.WalletBalance(ctx, a)
if err != nil {
fmt.Printf("%s\t%s: error getting balance: %s\n", name, a, err)
var actor *types.Actor
if actor, err = api.StateGetActor(ctx, a, types.EmptyTSK); err != nil {
fmt.Printf("%s\t%s: error getting actor: %s\n", name, a, err)
return
}
b := actor.Balance
k, err := api.StateAccountKey(ctx, a, types.EmptyTSK)
if err != nil {
fmt.Printf("%s\t%s: error getting account key: %s\n", name, a, err)
return
var k = a
// 'a' maybe a 'robust', in that case, 'StateAccountKey' returns an error.
if builtin2.IsAccountActor(actor.Code) {
if k, err = api.StateAccountKey(ctx, a, types.EmptyTSK); err != nil {
fmt.Printf("%s\t%s: error getting account key: %s\n", name, a, err)
return
}
}
kstr := k.String()
if !cctx.Bool("verbose") {
kstr = kstr[:9] + "..."

View File

@ -365,6 +365,12 @@ var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[callid]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "sched",
Usage: "Specifies that the argument is UUID of the request to be removed from scheduler",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
@ -378,6 +384,14 @@ var sealingAbortCmd = &cli.Command{
ctx := lcli.ReqContext(cctx)
if cctx.Bool("sched") {
err = nodeApi.SealingRemoveRequest(ctx, uuid.Must(uuid.Parse(cctx.Args().First())))
if err != nil {
return xerrors.Errorf("Failed to removed the request with UUID %s: %w", cctx.Args().First(), err)
}
return nil
}
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)

View File

@ -91,15 +91,16 @@ func main() {
Usage: fmt.Sprintf("Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON"),
},
&cli.BoolFlag{
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
Name: "enable-gpu-proving",
Usage: "enable use of GPU for mining operations",
Value: true,
EnvVars: []string{"LOTUS_WORKER_ENABLE_GPU_PROVING"},
},
},
After: func(c *cli.Context) error {
if r := recover(); r != nil {
// Generate report in LOTUS_PATH and re-raise panic
// Generate report in LOTUS_PANIC_REPORT_PATH and re-raise panic
build.GeneratePanicReport(c.String("panic-reports"), c.String(FlagWorkerRepo), c.App.Name)
panic(r)
}
@ -142,22 +143,25 @@ var runCmd = &cli.Command{
Usage: "Start lotus worker",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "0.0.0.0:3456",
Name: "listen",
Usage: "host address and port the worker api will listen on",
Value: "0.0.0.0:3456",
EnvVars: []string{"LOTUS_WORKER_LISTEN"},
},
&cli.StringFlag{
Name: "address",
Hidden: true,
},
&cli.BoolFlag{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
EnvVars: []string{"LOTUS_WORKER_NO_LOCAL_STORAGE"},
},
&cli.BoolFlag{
Name: "no-swap",
Usage: "don't use swap",
Value: false,
Name: "no-swap",
Usage: "don't use swap",
Value: false,
EnvVars: []string{"LOTUS_WORKER_NO_SWAP"},
},
&cli.StringFlag{
Name: "name",
@ -166,79 +170,94 @@ var runCmd = &cli.Command{
DefaultText: "hostname",
},
&cli.BoolFlag{
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
EnvVars: []string{"LOTUS_WORKER_ADDPIECE"},
},
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
Value: true,
EnvVars: []string{"LOTUS_WORKER_PRECOMMIT1"},
},
&cli.BoolFlag{
Name: "unseal",
Usage: "enable unsealing (32G sectors: 1 core, 128GiB Memory)",
Value: true,
Name: "unseal",
Usage: "enable unsealing (32G sectors: 1 core, 128GiB Memory)",
Value: true,
EnvVars: []string{"LOTUS_WORKER_UNSEAL"},
},
&cli.BoolFlag{
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
Name: "precommit2",
Usage: "enable precommit2 (32G sectors: all cores, 96GiB Memory)",
Value: true,
EnvVars: []string{"LOTUS_WORKER_PRECOMMIT2"},
},
&cli.BoolFlag{
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
Name: "commit",
Usage: "enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap)",
Value: true,
EnvVars: []string{"LOTUS_WORKER_COMMIT"},
},
&cli.BoolFlag{
Name: "replica-update",
Usage: "enable replica update",
Value: true,
Name: "replica-update",
Usage: "enable replica update",
Value: true,
EnvVars: []string{"LOTUS_WORKER_REPLICA_UPDATE"},
},
&cli.BoolFlag{
Name: "prove-replica-update2",
Usage: "enable prove replica update 2",
Value: true,
Name: "prove-replica-update2",
Usage: "enable prove replica update 2",
Value: true,
EnvVars: []string{"LOTUS_WORKER_PROVE_REPLICA_UPDATE2"},
},
&cli.BoolFlag{
Name: "regen-sector-key",
Usage: "enable regen sector key",
Value: true,
Name: "regen-sector-key",
Usage: "enable regen sector key",
Value: true,
EnvVars: []string{"LOTUS_WORKER_REGEN_SECTOR_KEY"},
},
&cli.BoolFlag{
Name: "windowpost",
Usage: "enable window post",
Value: false,
Name: "windowpost",
Usage: "enable window post",
Value: false,
EnvVars: []string{"LOTUS_WORKER_WINDOWPOST"},
},
&cli.BoolFlag{
Name: "winningpost",
Usage: "enable winning post",
Value: false,
Name: "winningpost",
Usage: "enable winning post",
Value: false,
EnvVars: []string{"LOTUS_WORKER_WINNINGPOST"},
},
&cli.BoolFlag{
Name: "no-default",
Usage: "disable all default compute tasks, use the worker for storage/fetching only",
Value: false,
Name: "no-default",
Usage: "disable all default compute tasks, use the worker for storage/fetching only",
Value: false,
EnvVars: []string{"LOTUS_WORKER_NO_DEFAULT"},
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
Value: 5,
EnvVars: []string{"LOTUS_WORKER_PARALLEL_FETCH_LIMIT"},
},
&cli.IntFlag{
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 128,
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 128,
EnvVars: []string{"LOTUS_WORKER_POST_PARALLEL_READS"},
},
&cli.DurationFlag{
Name: "post-read-timeout",
Usage: "time limit for reading PoSt challenges (0 = no limit)",
Value: 0,
Name: "post-read-timeout",
Usage: "time limit for reading PoSt challenges (0 = no limit)",
Value: 0,
EnvVars: []string{"LOTUS_WORKER_POST_READ_TIMEOUT"},
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
Value: "30m",
EnvVars: []string{"LOTUS_WORKER_TIMEOUT"},
},
},
Before: func(cctx *cli.Context) error {

View File

@ -128,6 +128,7 @@
* [RuntimeSubsystems](#RuntimeSubsystems)
* [Sealing](#Sealing)
* [SealingAbort](#SealingAbort)
* [SealingRemoveRequest](#SealingRemoveRequest)
* [SealingSchedDiag](#SealingSchedDiag)
* [Sector](#Sector)
* [SectorAbortUpgrade](#SectorAbortUpgrade)
@ -2752,6 +2753,21 @@ Inputs:
Response: `{}`
### SealingRemoveRequest
SealingSchedRemove removes a request from sealing pipeline
Perms: admin
Inputs:
```json
[
"07070707-0707-0707-0707-070707070707"
]
```
Response: `{}`
### SealingSchedDiag
SealingSchedDiag dumps internal sealing scheduler state

View File

@ -6195,7 +6195,7 @@ Response:
### StateReplay
StateReplay replays a given message, assuming it was included in a block in the specified tipset.
If a tipset key is provided, and a replacing message is found on chain,
If a tipset key is provided, and a replacing message is not found on chain,
the method will return an error saying that the message wasn't found
If no tipset key is provided, the appropriate tipset is looked up, and if

View File

@ -6663,7 +6663,7 @@ Response:
### StateReplay
StateReplay replays a given message, assuming it was included in a block in the specified tipset.
If a tipset key is provided, and a replacing message is found on chain,
If a tipset key is provided, and a replacing message is not found on chain,
the method will return an error saying that the message wasn't found
If no tipset key is provided, the appropriate tipset is looked up, and if

View File

@ -2372,7 +2372,7 @@ USAGE:
lotus-miner sealing abort [command options] [callid]
OPTIONS:
--help, -h show help (default: false)
--sched Specifies that the argument is UUID of the request to be removed from scheduler (default: false)
```

View File

@ -21,7 +21,7 @@ COMMANDS:
help, h Shows a list of commands or help for one command
GLOBAL OPTIONS:
--enable-gpu-proving enable use of GPU for mining operations (default: true)
--enable-gpu-proving enable use of GPU for mining operations (default: true) [$LOTUS_WORKER_ENABLE_GPU_PROVING]
--help, -h show help (default: false)
--miner-repo value, --storagerepo value Specify miner repo path. flag storagerepo and env LOTUS_STORAGE_PATH are DEPRECATION, will REMOVE SOON (default: "~/.lotusminer") [$LOTUS_MINER_PATH, $LOTUS_STORAGE_PATH]
--version, -v print the version (default: false)
@ -38,25 +38,25 @@ USAGE:
lotus-worker run [command options] [arguments...]
OPTIONS:
--addpiece enable addpiece (default: true)
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456")
--addpiece enable addpiece (default: true) [$LOTUS_WORKER_ADDPIECE]
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true) [$LOTUS_WORKER_COMMIT]
--listen value host address and port the worker api will listen on (default: "0.0.0.0:3456") [$LOTUS_WORKER_LISTEN]
--name value custom worker name (default: hostname) [$LOTUS_WORKER_NAME]
--no-default disable all default compute tasks, use the worker for storage/fetching only (default: false)
--no-local-storage don't use storageminer repo for sector storage (default: false)
--no-swap don't use swap (default: false)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128)
--post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s)
--precommit1 enable precommit1 (32G sectors: 1 core, 128GiB Memory) (default: true)
--precommit2 enable precommit2 (32G sectors: all cores, 96GiB Memory) (default: true)
--prove-replica-update2 enable prove replica update 2 (default: true)
--regen-sector-key enable regen sector key (default: true)
--replica-update enable replica update (default: true)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--unseal enable unsealing (32G sectors: 1 core, 128GiB Memory) (default: true)
--windowpost enable window post (default: false)
--winningpost enable winning post (default: false)
--no-default disable all default compute tasks, use the worker for storage/fetching only (default: false) [$LOTUS_WORKER_NO_DEFAULT]
--no-local-storage don't use storageminer repo for sector storage (default: false) [$LOTUS_WORKER_NO_LOCAL_STORAGE]
--no-swap don't use swap (default: false) [$LOTUS_WORKER_NO_SWAP]
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5) [$LOTUS_WORKER_PARALLEL_FETCH_LIMIT]
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128) [$LOTUS_WORKER_POST_PARALLEL_READS]
--post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s) [$LOTUS_WORKER_POST_READ_TIMEOUT]
--precommit1 enable precommit1 (32G sectors: 1 core, 128GiB Memory) (default: true) [$LOTUS_WORKER_PRECOMMIT1]
--precommit2 enable precommit2 (32G sectors: all cores, 96GiB Memory) (default: true) [$LOTUS_WORKER_PRECOMMIT2]
--prove-replica-update2 enable prove replica update 2 (default: true) [$LOTUS_WORKER_PROVE_REPLICA_UPDATE2]
--regen-sector-key enable regen sector key (default: true) [$LOTUS_WORKER_REGEN_SECTOR_KEY]
--replica-update enable replica update (default: true) [$LOTUS_WORKER_REPLICA_UPDATE]
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") [$LOTUS_WORKER_TIMEOUT]
--unseal enable unsealing (32G sectors: 1 core, 128GiB Memory) (default: true) [$LOTUS_WORKER_UNSEAL]
--windowpost enable window post (default: false) [$LOTUS_WORKER_WINDOWPOST]
--winningpost enable winning post (default: false) [$LOTUS_WORKER_WINNINGPOST]
```

2
go.mod
View File

@ -125,7 +125,7 @@ require (
github.com/multiformats/go-base32 v0.0.4
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multibase v0.1.1
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-varint v0.0.6
github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333

3
go.sum
View File

@ -1655,8 +1655,9 @@ github.com/multiformats/go-multiaddr-net v0.1.4/go.mod h1:ilNnaM9HbmVFqsb/qcNysj
github.com/multiformats/go-multiaddr-net v0.1.5/go.mod h1:ilNnaM9HbmVFqsb/qcNysjCu4PVONlrBZpHIrw/qQuA=
github.com/multiformats/go-multiaddr-net v0.2.0/go.mod h1:gGdH3UXny6U3cKKYCvpXI5rnK7YaOIEOPVDI9tsJbEA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI=
github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8=
github.com/multiformats/go-multicodec v0.2.0/go.mod h1:/y4YVwkfMyry5kFbMTbLJKErhycTIftytRV+llXdyS4=
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=

View File

@ -2,11 +2,13 @@ package itests
import (
"context"
"encoding/json"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
@ -14,6 +16,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
@ -21,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/wdpost"
@ -402,6 +406,90 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
require.Len(t, lastPending, 0)
}
func TestSchedulerRemoveRequest(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
type info struct {
CallToWork struct {
} `json:"CallToWork"`
EarlyRet interface{} `json:"EarlyRet"`
ReturnedWork interface{} `json:"ReturnedWork"`
SchedInfo struct {
OpenWindows []string `json:"OpenWindows"`
Requests []struct {
Priority int `json:"Priority"`
SchedID string `json:"SchedId"`
Sector struct {
Miner int `json:"Miner"`
Number int `json:"Number"`
} `json:"Sector"`
TaskType string `json:"TaskType"`
} `json:"Requests"`
} `json:"SchedInfo"`
Waiting interface{} `json:"Waiting"`
}
tocheck := miner.StartPledge(ctx, 1, 0, nil)
var sn abi.SectorNumber
for n := range tocheck {
sn = n
}
// Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2
for {
st, err := miner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)
if st.State == api.SectorState(sealing.PreCommit2) {
break
}
time.Sleep(time.Second)
}
// Dump current scheduler info
schedb, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)
j, err := json.MarshalIndent(&schedb, "", " ")
require.NoError(t, err)
var b info
err = json.Unmarshal(j, &b)
require.NoError(t, err)
var schedidb uuid.UUID
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
require.Len(t, b.SchedInfo.Requests, 1)
require.Equal(t, "seal/v0/precommit/2", b.SchedInfo.Requests[0].TaskType)
schedidb, err = uuid.Parse(b.SchedInfo.Requests[0].SchedID)
require.NoError(t, err)
err = miner.SealingRemoveRequest(ctx, schedidb)
require.NoError(t, err)
// Dump the schduler again and compare the UUID if a request is present
// If no request present then pass the test
scheda, err := miner.SealingSchedDiag(ctx, false)
require.NoError(t, err)
k, err := json.MarshalIndent(&scheda, "", " ")
require.NoError(t, err)
var a info
err = json.Unmarshal(k, &a)
require.NoError(t, err)
require.Len(t, a.SchedInfo.Requests, 0)
}
func TestWorkerName(t *testing.T) {
name := "thisstringisprobablynotahostnameihope"

View File

@ -462,6 +462,10 @@ func (sm *StorageMinerAPI) SealingAbort(ctx context.Context, call storiface.Call
return sm.StorageMgr.Abort(ctx, call)
}
func (sm *StorageMinerAPI) SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error {
return sm.StorageMgr.RemoveSchedRequest(ctx, schedId)
}
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
if err != nil {

View File

@ -1223,6 +1223,10 @@ func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, err
return i, nil
}
func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error {
return m.sched.RemoveRequest(ctx, schedId)
}
func (m *Manager) Close(ctx context.Context) error {
m.windowPoStSched.schedClose()
m.winningPoStSched.schedClose()

View File

@ -68,7 +68,8 @@ type Scheduler struct {
workTracker *workTracker
info chan func(interface{})
info chan func(interface{})
rmRequest chan *rmRequest
closing chan struct{}
closed chan struct{}
@ -122,6 +123,7 @@ type WorkerRequest struct {
TaskType sealtasks.TaskType
Priority int // larger values more important
Sel WorkerSelector
SchedId uuid.UUID
prepare WorkerAction
work WorkerAction
@ -139,6 +141,11 @@ type workerResponse struct {
err error
}
type rmRequest struct {
id uuid.UUID
res chan error
}
func newScheduler(assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
@ -168,7 +175,8 @@ func newScheduler(assigner string) (*Scheduler, error) {
prepared: map[uuid.UUID]trackedWork{},
},
info: make(chan func(interface{})),
info: make(chan func(interface{})),
rmRequest: make(chan *rmRequest),
closing: make(chan struct{}),
closed: make(chan struct{}),
@ -184,6 +192,7 @@ func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, t
TaskType: taskType,
Priority: getPriority(ctx),
Sel: sel,
SchedId: uuid.New(),
prepare: prepare,
work: work,
@ -228,6 +237,7 @@ type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
SchedId uuid.UUID
}
type SchedDiagInfo struct {
@ -246,6 +256,9 @@ func (sh *Scheduler) runSched() {
var toDisable []workerDisableReq
select {
case rmreq := <-sh.rmRequest:
sh.removeRequest(rmreq)
doSched = true
case <-sh.workerChange:
doSched = true
case dreq := <-sh.workerDisable:
@ -263,7 +276,6 @@ func (sh *Scheduler) runSched() {
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())
case <-iw:
initialised = true
iw = nil
@ -332,6 +344,7 @@ func (sh *Scheduler) diag() SchedDiagInfo {
Sector: task.Sector.ID,
TaskType: task.TaskType,
Priority: task.Priority,
SchedId: task.SchedId,
})
}
@ -381,6 +394,49 @@ func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
}
}
func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {
if sh.SchedQueue.Len() < 0 {
rmrequest.res <- xerrors.New("No requests in the scheduler")
return
}
queue := sh.SchedQueue
for i, r := range *queue {
if r.SchedId == rmrequest.id {
queue.Remove(i)
rmrequest.res <- nil
go r.respond(xerrors.Errorf("scheduling request removed"))
return
}
}
rmrequest.res <- xerrors.New("No request with provided details found")
}
func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error {
ret := make(chan error, 1)
select {
case sh.rmRequest <- &rmRequest{
id: schedId,
res: ret,
}:
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
select {
case resp := <-ret:
return resp
case <-sh.closing:
return xerrors.New("closing")
case <-ctx.Done():
return ctx.Err()
}
}
func (sh *Scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {