cli for managing sector reservations
This commit is contained in:
parent
ca72590e49
commit
ef2080a800
@ -10,6 +10,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
||||
@ -123,6 +124,12 @@ type StorageMiner interface {
|
||||
// SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it
|
||||
SectorAbortUpgrade(context.Context, abi.SectorNumber) error //perm:admin
|
||||
|
||||
// SectorNumAssignerMeta returns sector number assigner metadata - reserved/allocated
|
||||
SectorNumAssignerMeta(ctx context.Context) (NumAssignerMeta, error)
|
||||
SectorNumReservations(ctx context.Context) (map[string]bitfield.BitField, error)
|
||||
SectorNumReserve(ctx context.Context, name string, field bitfield.BitField, force bool) error
|
||||
SectorNumFree(ctx context.Context, name string) error
|
||||
|
||||
// WorkerConnect tells the node to connect to workers RPC
|
||||
WorkerConnect(context.Context, string) error //perm:admin retry:true
|
||||
WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin
|
||||
@ -468,3 +475,10 @@ type DagstoreInitializeAllEvent struct {
|
||||
Total int
|
||||
Current int
|
||||
}
|
||||
|
||||
type NumAssignerMeta struct {
|
||||
Reserved bitfield.BitField
|
||||
Allocated bitfield.BitField
|
||||
|
||||
Next abi.SectorNumber
|
||||
}
|
||||
|
@ -336,6 +336,9 @@ func init() {
|
||||
Conns: 4,
|
||||
FD: 5,
|
||||
})
|
||||
addExample(map[string]bitfield.BitField{
|
||||
"": bitfield.NewFromSet([]uint64{5, 6, 7, 10}),
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
|
@ -824,6 +824,14 @@ type StorageMinerStruct struct {
|
||||
|
||||
SectorMatchPendingPiecesToOpenSectors func(p0 context.Context) error `perm:"admin"`
|
||||
|
||||
SectorNumAssignerMeta func(p0 context.Context) (NumAssignerMeta, error) ``
|
||||
|
||||
SectorNumFree func(p0 context.Context, p1 string) error ``
|
||||
|
||||
SectorNumReservations func(p0 context.Context) (map[string]bitfield.BitField, error) ``
|
||||
|
||||
SectorNumReserve func(p0 context.Context, p1 string, p2 bitfield.BitField, p3 bool) error ``
|
||||
|
||||
SectorPreCommitFlush func(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) `perm:"admin"`
|
||||
|
||||
SectorPreCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||
@ -4900,6 +4908,50 @@ func (s *StorageMinerStub) SectorMatchPendingPiecesToOpenSectors(p0 context.Cont
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorNumAssignerMeta(p0 context.Context) (NumAssignerMeta, error) {
|
||||
if s.Internal.SectorNumAssignerMeta == nil {
|
||||
return *new(NumAssignerMeta), ErrNotSupported
|
||||
}
|
||||
return s.Internal.SectorNumAssignerMeta(p0)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorNumAssignerMeta(p0 context.Context) (NumAssignerMeta, error) {
|
||||
return *new(NumAssignerMeta), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorNumFree(p0 context.Context, p1 string) error {
|
||||
if s.Internal.SectorNumFree == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.SectorNumFree(p0, p1)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorNumFree(p0 context.Context, p1 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorNumReservations(p0 context.Context) (map[string]bitfield.BitField, error) {
|
||||
if s.Internal.SectorNumReservations == nil {
|
||||
return *new(map[string]bitfield.BitField), ErrNotSupported
|
||||
}
|
||||
return s.Internal.SectorNumReservations(p0)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorNumReservations(p0 context.Context) (map[string]bitfield.BitField, error) {
|
||||
return *new(map[string]bitfield.BitField), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorNumReserve(p0 context.Context, p1 string, p2 bitfield.BitField, p3 bool) error {
|
||||
if s.Internal.SectorNumReserve == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.SectorNumReserve(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) SectorNumReserve(p0 context.Context, p1 string, p2 bitfield.BitField, p3 bool) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) SectorPreCommitFlush(p0 context.Context) ([]sealiface.PreCommitBatchRes, error) {
|
||||
if s.Internal.SectorPreCommitFlush == nil {
|
||||
return *new([]sealiface.PreCommitBatchRes), ErrNotSupported
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -18,6 +18,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/builtin"
|
||||
@ -45,6 +46,7 @@ var sectorsCmd = &cli.Command{
|
||||
sectorsRefsCmd,
|
||||
sectorsUpdateCmd,
|
||||
sectorsPledgeCmd,
|
||||
sectorsNumbersCmd,
|
||||
sectorPreCommitsCmd,
|
||||
sectorsCheckExpireCmd,
|
||||
sectorsExpiredCmd,
|
||||
@ -2201,3 +2203,229 @@ var sectorsCompactPartitionsCmd = &cli.Command{
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsNumbersCmd = &cli.Command{
|
||||
Name: "numbers",
|
||||
Usage: "manage sector number assignments",
|
||||
Subcommands: []*cli.Command{
|
||||
sectorsNumbersInfoCmd,
|
||||
sectorsNumbersReservationsCmd,
|
||||
sectorsNumbersReserveCmd,
|
||||
sectorsNumbersFreeCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsNumbersInfoCmd = &cli.Command{
|
||||
Name: "info",
|
||||
Usage: "view sector assigner state",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
am, err := api.SectorNumAssignerMeta(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
alloc, err := bitfieldToHumanRanges(am.Allocated)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reserved, err := bitfieldToHumanRanges(am.Reserved)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("Next free: %s\n", am.Next)
|
||||
fmt.Printf("Allocated: %s\n", alloc)
|
||||
fmt.Printf("Reserved: %s\n", reserved)
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsNumbersReservationsCmd = &cli.Command{
|
||||
Name: "reservations",
|
||||
Usage: "list sector number reservations",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
rs, err := api.SectorNumReservations(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var out []string
|
||||
|
||||
for name, field := range rs {
|
||||
hr, err := bitfieldToHumanRanges(field)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count, err := field.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
out = append(out, fmt.Sprintf("%s: count=%d %s", name, count, hr))
|
||||
}
|
||||
|
||||
fmt.Printf("reservations: %d\n", len(out))
|
||||
|
||||
sort.Strings(out)
|
||||
|
||||
for _, s := range out {
|
||||
fmt.Println(s)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsNumbersReserveCmd = &cli.Command{
|
||||
Name: "reserve",
|
||||
Usage: "create sector number reservations",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "force",
|
||||
Usage: "skip duplicate reservation checks (note: can lead to damaging other reservations on free)",
|
||||
},
|
||||
},
|
||||
ArgsUsage: "[reservation name] [reserved ranges]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Args().Len() != 2 {
|
||||
return xerrors.Errorf("expected 2 arguments: [reservation name] [reserved ranges]")
|
||||
}
|
||||
|
||||
bf, err := humanRangesToBitField(cctx.Args().Get(1))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing ranges: %w", err)
|
||||
}
|
||||
|
||||
return api.SectorNumReserve(ctx, cctx.Args().First(), bf, cctx.Bool("force"))
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsNumbersFreeCmd = &cli.Command{
|
||||
Name: "free",
|
||||
Usage: "remove sector number reservations",
|
||||
ArgsUsage: "[reservation name]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.Args().Len() != 1 {
|
||||
return xerrors.Errorf("expected 1 argument: [reservation name]")
|
||||
}
|
||||
|
||||
return api.SectorNumFree(ctx, cctx.Args().First())
|
||||
},
|
||||
}
|
||||
|
||||
func humanRangesToBitField(h string) (bitfield.BitField, error) {
|
||||
var runs []rlepluslazy.Run
|
||||
var last uint64
|
||||
|
||||
strRanges := strings.Split(h, ",")
|
||||
for i, strRange := range strRanges {
|
||||
lr := strings.Split(strRange, "-")
|
||||
|
||||
var start, end uint64
|
||||
var err error
|
||||
|
||||
switch len(lr) {
|
||||
case 1: // one number
|
||||
start, err = strconv.ParseUint(lr[0], 10, 64)
|
||||
if err != nil {
|
||||
return bitfield.BitField{}, xerrors.Errorf("parsing left side of run %d: %w", i, err)
|
||||
}
|
||||
|
||||
end = start
|
||||
case 2: // x-y
|
||||
start, err = strconv.ParseUint(lr[0], 10, 64)
|
||||
if err != nil {
|
||||
return bitfield.BitField{}, xerrors.Errorf("parsing left side of run %d: %w", i, err)
|
||||
}
|
||||
end, err = strconv.ParseUint(lr[1], 10, 64)
|
||||
if err != nil {
|
||||
return bitfield.BitField{}, xerrors.Errorf("parsing right side of run %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if start < last {
|
||||
return bitfield.BitField{}, xerrors.Errorf("run %d start(%d) was less than last run end(%d)", i, start, last)
|
||||
}
|
||||
|
||||
if start == last && last > 0 {
|
||||
return bitfield.BitField{}, xerrors.Errorf("run %d start(%d) was equal to last run end(%d)", i, start, last)
|
||||
}
|
||||
|
||||
if start > end {
|
||||
return bitfield.BitField{}, xerrors.Errorf("run start(%d) can't be greater than run end(%d) (run %d)", start, end, i)
|
||||
}
|
||||
|
||||
if start > last {
|
||||
runs = append(runs, rlepluslazy.Run{Val: false, Len: start - last})
|
||||
}
|
||||
|
||||
runs = append(runs, rlepluslazy.Run{Val: true, Len: end - start + 1})
|
||||
last = end + 1
|
||||
}
|
||||
|
||||
return bitfield.NewFromIter(&rlepluslazy.RunSliceIterator{Runs: runs})
|
||||
}
|
||||
|
||||
func bitfieldToHumanRanges(bf bitfield.BitField) (string, error) {
|
||||
bj, err := bf.MarshalJSON()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var bints []int64
|
||||
if err := json.Unmarshal(bj, &bints); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var at int64
|
||||
var out string
|
||||
|
||||
for i, bi := range bints {
|
||||
at += bi
|
||||
|
||||
if i%2 == 0 {
|
||||
if i > 0 {
|
||||
out += ","
|
||||
}
|
||||
out += fmt.Sprint(at)
|
||||
continue
|
||||
}
|
||||
|
||||
if bi > 1 {
|
||||
out += "-"
|
||||
out += fmt.Sprint(at - 1)
|
||||
}
|
||||
}
|
||||
|
||||
return out, err
|
||||
}
|
||||
|
@ -1 +1,57 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
)
|
||||
|
||||
func TestHumanBitfield(t *testing.T) {
|
||||
check := func(ints []uint64, out string) {
|
||||
bf := bitfield.NewFromSet(ints)
|
||||
h, err := bitfieldToHumanRanges(bf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, out, h)
|
||||
}
|
||||
|
||||
check([]uint64{2, 3, 4}, "2-4")
|
||||
check([]uint64{2, 3, 4, 8, 9, 10, 11}, "2-4,8-11")
|
||||
check([]uint64{2}, "2")
|
||||
check([]uint64{0}, "0")
|
||||
check([]uint64{0, 1, 2}, "0-2")
|
||||
check([]uint64{0, 1, 5, 9, 11, 13, 14, 19}, "0-1,5,9,11,13-14,19")
|
||||
}
|
||||
|
||||
func TestHumanBitfieldRoundtrip(t *testing.T) {
|
||||
check := func(ints []uint64, out string) {
|
||||
parsed, err := humanRangesToBitField(out)
|
||||
require.NoError(t, err)
|
||||
|
||||
h, err := bitfieldToHumanRanges(parsed)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, out, h)
|
||||
|
||||
bf := bitfield.NewFromSet(ints)
|
||||
ins, err := bitfield.IntersectBitField(bf, parsed)
|
||||
require.NoError(t, err)
|
||||
|
||||
// if intersected bitfield has the same length as both bitfields they are the same
|
||||
ic, err := ins.Count()
|
||||
require.NoError(t, err)
|
||||
|
||||
pc, err := parsed.Count()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, uint64(len(ints)), ic)
|
||||
require.Equal(t, ic, pc)
|
||||
}
|
||||
|
||||
check([]uint64{2, 3, 4}, "2-4")
|
||||
check([]uint64{2, 3, 4, 8, 9, 10, 11}, "2-4,8-11")
|
||||
check([]uint64{2}, "2")
|
||||
check([]uint64{0}, "0")
|
||||
check([]uint64{0, 1, 2}, "0-2")
|
||||
check([]uint64{0, 1, 5, 9, 11, 13, 14, 19}, "0-1,5,9,11,13-14,19")
|
||||
}
|
||||
|
@ -139,6 +139,10 @@
|
||||
* [SectorGetSealDelay](#SectorGetSealDelay)
|
||||
* [SectorMarkForUpgrade](#SectorMarkForUpgrade)
|
||||
* [SectorMatchPendingPiecesToOpenSectors](#SectorMatchPendingPiecesToOpenSectors)
|
||||
* [SectorNumAssignerMeta](#SectorNumAssignerMeta)
|
||||
* [SectorNumFree](#SectorNumFree)
|
||||
* [SectorNumReservations](#SectorNumReservations)
|
||||
* [SectorNumReserve](#SectorNumReserve)
|
||||
* [SectorPreCommitFlush](#SectorPreCommitFlush)
|
||||
* [SectorPreCommitPending](#SectorPreCommitPending)
|
||||
* [SectorRemove](#SectorRemove)
|
||||
@ -2938,6 +2942,81 @@ Inputs: `null`
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SectorNumAssignerMeta
|
||||
SectorNumAssignerMeta returns sector number assigner metadata - reserved/allocated
|
||||
|
||||
|
||||
Perms:
|
||||
|
||||
Inputs: `null`
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Reserved": [
|
||||
5,
|
||||
1
|
||||
],
|
||||
"Allocated": [
|
||||
5,
|
||||
1
|
||||
],
|
||||
"Next": 9
|
||||
}
|
||||
```
|
||||
|
||||
### SectorNumFree
|
||||
There are not yet any comments for this method.
|
||||
|
||||
Perms:
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"string value"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SectorNumReservations
|
||||
There are not yet any comments for this method.
|
||||
|
||||
Perms:
|
||||
|
||||
Inputs: `null`
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"": [
|
||||
5,
|
||||
3,
|
||||
2,
|
||||
1
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### SectorNumReserve
|
||||
There are not yet any comments for this method.
|
||||
|
||||
Perms:
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"string value",
|
||||
[
|
||||
5,
|
||||
1
|
||||
],
|
||||
true
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SectorPreCommitFlush
|
||||
SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
|
||||
Returns null if message wasn't sent
|
||||
|
@ -1637,6 +1637,7 @@ COMMANDS:
|
||||
refs List References to sectors
|
||||
update-state ADVANCED: manually update the state of a sector, this may aid in error recovery
|
||||
pledge store random data in a sector
|
||||
numbers manage sector number assignments
|
||||
precommits Print on-chain precommit info
|
||||
check-expire Inspect expiring sectors
|
||||
expired Get or cleanup expired sectors
|
||||
@ -1734,6 +1735,78 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner sectors numbers
|
||||
```
|
||||
NAME:
|
||||
lotus-miner sectors numbers - manage sector number assignments
|
||||
|
||||
USAGE:
|
||||
lotus-miner sectors numbers command [command options] [arguments...]
|
||||
|
||||
COMMANDS:
|
||||
info view sector assigner state
|
||||
reservations list sector number reservations
|
||||
reserve create sector number reservations
|
||||
free remove sector number reservations
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
#### lotus-miner sectors numbers info
|
||||
```
|
||||
NAME:
|
||||
lotus-miner sectors numbers info - view sector assigner state
|
||||
|
||||
USAGE:
|
||||
lotus-miner sectors numbers info [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
#### lotus-miner sectors numbers reservations
|
||||
```
|
||||
NAME:
|
||||
lotus-miner sectors numbers reservations - list sector number reservations
|
||||
|
||||
USAGE:
|
||||
lotus-miner sectors numbers reservations [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
#### lotus-miner sectors numbers reserve
|
||||
```
|
||||
NAME:
|
||||
lotus-miner sectors numbers reserve - create sector number reservations
|
||||
|
||||
USAGE:
|
||||
lotus-miner sectors numbers reserve [command options] [reservation name] [reserved ranges]
|
||||
|
||||
OPTIONS:
|
||||
--force skip duplicate reservation checks (note: can lead to damaging other reservations on free) (default: false)
|
||||
|
||||
```
|
||||
|
||||
#### lotus-miner sectors numbers free
|
||||
```
|
||||
NAME:
|
||||
lotus-miner sectors numbers free - remove sector number reservations
|
||||
|
||||
USAGE:
|
||||
lotus-miner sectors numbers free [command options] [reservation name]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner sectors precommits
|
||||
```
|
||||
NAME:
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"github.com/filecoin-project/dagstore"
|
||||
"github.com/filecoin-project/dagstore/shard"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
gst "github.com/filecoin-project/go-data-transfer/transport/graphsync"
|
||||
"github.com/filecoin-project/go-fil-markets/piecestore"
|
||||
@ -427,6 +428,22 @@ func (sm *StorageMinerAPI) SectorMatchPendingPiecesToOpenSectors(ctx context.Con
|
||||
return sm.Miner.SectorMatchPendingPiecesToOpenSectors(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorNumAssignerMeta(ctx context.Context) (api.NumAssignerMeta, error) {
|
||||
return sm.Miner.NumAssignerMeta(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorNumReservations(ctx context.Context) (map[string]bitfield.BitField, error) {
|
||||
return sm.Miner.NumReservations(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorNumReserve(ctx context.Context, name string, field bitfield.BitField, force bool) error {
|
||||
return sm.Miner.NumReserve(ctx, name, field, force)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorNumFree(ctx context.Context, name string) error {
|
||||
return sm.Miner.NumFree(ctx, name)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]minertypes.SubmitWindowedPoStParams, error) {
|
||||
var ts *types.TipSet
|
||||
var err error
|
||||
|
@ -1 +1,331 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"regexp"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
dsq "github.com/ipfs/go-datastore/query"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
)
|
||||
|
||||
var StorageCounterDSPrefix = "/storage/nextid"
|
||||
var SectorBitfieldsDSPrefix = "/storage/sectorsids/"
|
||||
var SectorReservationsDSPrefix = "/storage/sectorsids/reserved/"
|
||||
|
||||
var allocatedSectorsKey = datastore.NewKey(SectorBitfieldsDSPrefix + "allocated")
|
||||
var reservedSectorsKey = datastore.NewKey(SectorBitfieldsDSPrefix + "reserved")
|
||||
|
||||
func (m *Sealing) loadBitField(ctx context.Context, name datastore.Key) (*bitfield.BitField, error) {
|
||||
raw, err := m.ds.Get(ctx, name)
|
||||
if err == datastore.ErrNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var bf bitfield.BitField
|
||||
|
||||
if err := bf.UnmarshalCBOR(bytes.NewBuffer(raw)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &bf, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) saveBitField(ctx context.Context, name datastore.Key, bf bitfield.BitField) error {
|
||||
var bb bytes.Buffer
|
||||
err := bf.MarshalCBOR(&bb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return m.ds.Put(ctx, name, bb.Bytes())
|
||||
}
|
||||
|
||||
func (m *Sealing) NextSectorNumber(ctx context.Context) (abi.SectorNumber, error) {
|
||||
m.sclk.Lock()
|
||||
defer m.sclk.Unlock()
|
||||
|
||||
am, err := m.numAssignerMetaLocked(ctx)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
allocated := am.Allocated
|
||||
|
||||
allocated.Set(uint64(am.Next))
|
||||
|
||||
if err := m.saveBitField(ctx, allocatedSectorsKey, allocated); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// save legacy counter so that in case of a miner downgrade things keep working
|
||||
{
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
size := binary.PutUvarint(buf, uint64(am.Next))
|
||||
|
||||
if err := m.ds.Put(ctx, datastore.NewKey(StorageCounterDSPrefix), buf[:size]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return am.Next, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) NumAssignerMeta(ctx context.Context) (api.NumAssignerMeta, error) {
|
||||
m.sclk.Lock()
|
||||
defer m.sclk.Unlock()
|
||||
|
||||
return m.numAssignerMetaLocked(ctx)
|
||||
}
|
||||
|
||||
func (m *Sealing) numAssignerMetaLocked(ctx context.Context) (api.NumAssignerMeta, error) {
|
||||
// load user-reserved and allocated bitfields
|
||||
reserved, err := m.loadBitField(ctx, reservedSectorsKey)
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("loading reserved sectors bitfield: %w", err)
|
||||
}
|
||||
allocated, err := m.loadBitField(ctx, allocatedSectorsKey)
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("loading allocated sectors bitfield: %w", err)
|
||||
}
|
||||
|
||||
// if the allocated bitfield doesn't exist, crate it from the legacy sector counter
|
||||
if allocated == nil {
|
||||
var i uint64
|
||||
{
|
||||
curBytes, err := m.ds.Get(ctx, datastore.NewKey(StorageCounterDSPrefix))
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, err
|
||||
}
|
||||
cur, _ := binary.Uvarint(curBytes)
|
||||
i = cur + 1
|
||||
}
|
||||
|
||||
rl := &rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{
|
||||
{
|
||||
Val: true,
|
||||
Len: i,
|
||||
},
|
||||
}}
|
||||
|
||||
bf, err := bitfield.NewFromIter(rl)
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("bitfield from iter: %w", err)
|
||||
}
|
||||
allocated = &bf
|
||||
}
|
||||
|
||||
// if there are no user reservations, use an empty bitfield
|
||||
if reserved == nil {
|
||||
emptyBf := bitfield.New()
|
||||
reserved = &emptyBf
|
||||
}
|
||||
|
||||
// todo union with miner allocated nums
|
||||
inuse, err := bitfield.MergeBitFields(*reserved, *allocated)
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("merge reserved/allocated: %w", err)
|
||||
}
|
||||
|
||||
// find first available sector number
|
||||
iri, err := inuse.RunIterator()
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("inuse run iter: %w", err)
|
||||
}
|
||||
|
||||
var firstFree abi.SectorNumber
|
||||
for iri.HasNext() {
|
||||
r, err := iri.NextRun()
|
||||
if err != nil {
|
||||
return api.NumAssignerMeta{}, xerrors.Errorf("next run: %w", err)
|
||||
}
|
||||
if !r.Val {
|
||||
break
|
||||
}
|
||||
firstFree += abi.SectorNumber(r.Len)
|
||||
}
|
||||
|
||||
return api.NumAssignerMeta{
|
||||
Reserved: *reserved,
|
||||
Allocated: *allocated,
|
||||
Next: firstFree,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) NumReservations(ctx context.Context) (map[string]bitfield.BitField, error) {
|
||||
res, err := m.ds.Query(ctx, dsq.Query{Prefix: SectorReservationsDSPrefix})
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("query reservations: %w", err)
|
||||
}
|
||||
defer res.Close() //nolint:errcheck
|
||||
|
||||
out := map[string]bitfield.BitField{}
|
||||
|
||||
for {
|
||||
res, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if res.Error != nil {
|
||||
return nil, xerrors.Errorf("res error: %w", err)
|
||||
}
|
||||
|
||||
b := bitfield.New()
|
||||
if err := b.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out[datastore.NewKey(res.Key).BaseNamespace()] = b
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) NumReserve(ctx context.Context, name string, reserving bitfield.BitField, force bool) error {
|
||||
m.sclk.Lock()
|
||||
defer m.sclk.Unlock()
|
||||
|
||||
rk, err := reservationKey(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if there's an existing reservation with this name
|
||||
cur := bitfield.New()
|
||||
|
||||
curRes, err := m.ds.Get(ctx, rk)
|
||||
if err == nil {
|
||||
log.Warnw("reservation with this name already exists", "name", name)
|
||||
if !force {
|
||||
return xerrors.Errorf("reservation with this name already exists")
|
||||
}
|
||||
|
||||
if err := cur.UnmarshalCBOR(bytes.NewReader(curRes)); err != nil {
|
||||
return xerrors.Errorf("unmarshaling existing reservation: %w", err)
|
||||
}
|
||||
} else if err == datastore.ErrNotFound {
|
||||
return xerrors.Errorf("checking if reservation exists: %w", err)
|
||||
}
|
||||
|
||||
// load the global reserved bitfield and subtract current reservation if we're overwriting
|
||||
|
||||
nm, err := m.numAssignerMetaLocked(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
allReserved := nm.Reserved
|
||||
|
||||
allReserved, err = bitfield.SubtractBitField(allReserved, cur)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("allReserved - cur: %w", err)
|
||||
}
|
||||
|
||||
// check if the reservation is colliding with any other reservation
|
||||
coliding, err := bitfield.IntersectBitField(allReserved, reserving)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("intersect all / reserving: %w", err)
|
||||
}
|
||||
|
||||
empty, err := coliding.IsEmpty()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("colliding.empty: %w", err)
|
||||
}
|
||||
if !empty {
|
||||
log.Warnw("new reservation is colliding with another existing reservation", "name", name)
|
||||
if !force {
|
||||
return xerrors.Errorf("new reservation is colliding with another existing reservation")
|
||||
}
|
||||
}
|
||||
|
||||
// check if the reservation is colliding with allocated sectors
|
||||
coliding, err = bitfield.IntersectBitField(nm.Allocated, reserving)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("intersect all / reserving: %w", err)
|
||||
}
|
||||
|
||||
empty, err = coliding.IsEmpty()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("colliding.empty: %w", err)
|
||||
}
|
||||
if !empty {
|
||||
log.Warnw("new reservation is colliding with allocated sector numbers", "name", name)
|
||||
if !force {
|
||||
return xerrors.Errorf("new reservation is colliding with allocated sector numbers")
|
||||
}
|
||||
}
|
||||
|
||||
// write the reservation
|
||||
newReserved, err := bitfield.MergeBitFields(allReserved, reserving)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("merge allReserved+reserving: %w", err)
|
||||
}
|
||||
|
||||
if err := m.saveBitField(ctx, rk, reserving); err != nil {
|
||||
return xerrors.Errorf("saving reservation: %w", err)
|
||||
}
|
||||
|
||||
if err := m.saveBitField(ctx, reservedSectorsKey, newReserved); err != nil {
|
||||
return xerrors.Errorf("save reserved nums: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) NumFree(ctx context.Context, name string) error {
|
||||
rk, err := reservationKey(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := m.loadBitField(ctx, rk)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading reservation: %w", err)
|
||||
}
|
||||
if res == nil {
|
||||
return xerrors.Errorf("reservation with this name doesn't exist")
|
||||
}
|
||||
|
||||
allRes, err := m.loadBitField(ctx, reservedSectorsKey)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("leading all reservations: %w", err)
|
||||
}
|
||||
if allRes == nil {
|
||||
return xerrors.Errorf("all reservations bitfield not found")
|
||||
}
|
||||
|
||||
newAll, err := bitfield.SubtractBitField(*allRes, *res)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("allRes-res: %w", err)
|
||||
}
|
||||
|
||||
if err := m.saveBitField(ctx, reservedSectorsKey, newAll); err != nil {
|
||||
return xerrors.Errorf("saving reservations bitfield: %w", err)
|
||||
}
|
||||
|
||||
if err := m.ds.Delete(ctx, rk); err != nil {
|
||||
return xerrors.Errorf("deleting reservation: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func reservationKey(name string) (datastore.Key, error) {
|
||||
ok, err := regexp.Match("[a-zA-Z0-9_\\-]+]", []byte(name))
|
||||
if err != nil {
|
||||
return datastore.Key{}, err
|
||||
}
|
||||
if !ok {
|
||||
return datastore.Key{}, xerrors.Errorf("reservation name contained disallowed characters (allowed: a-z, A-Z, 0-9, '-', '_')")
|
||||
}
|
||||
|
||||
return datastore.KeyWithNamespaces([]string{SectorReservationsDSPrefix, name}), nil
|
||||
}
|
||||
|
@ -1,9 +1,7 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -14,8 +12,6 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/builtin/v8/miner"
|
||||
@ -40,9 +36,6 @@ import (
|
||||
|
||||
const SectorStorePrefix = "/sectors"
|
||||
|
||||
var StorageCounterDSPrefix = "/storage/nextid"
|
||||
var SectorBitfieldsDSPrefix = "/storage/sectors/"
|
||||
|
||||
var ErrTooManySectorsSealing = xerrors.New("too many sectors sealing")
|
||||
|
||||
var log = logging.Logger("sectors")
|
||||
@ -329,107 +322,3 @@ func getDealPerSectorLimit(size abi.SectorSize) (int, error) {
|
||||
}
|
||||
return 512, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) loadBitField(ctx context.Context, name string) (*bitfield.BitField, error) {
|
||||
raw, err := m.ds.Get(ctx, datastore.NewKey(SectorBitfieldsDSPrefix+name))
|
||||
if err == datastore.ErrNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var bf bitfield.BitField
|
||||
|
||||
if err := bf.UnmarshalCBOR(bytes.NewBuffer(raw)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &bf, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) saveBitField(ctx context.Context, name string, bf *bitfield.BitField) error {
|
||||
var bb bytes.Buffer
|
||||
err := bf.MarshalCBOR(&bb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return m.ds.Put(ctx, datastore.NewKey(SectorBitfieldsDSPrefix+name), bb.Bytes())
|
||||
}
|
||||
|
||||
func (m *Sealing) NextSectorNumber(ctx context.Context) (abi.SectorNumber, error) {
|
||||
m.sclk.Lock()
|
||||
defer m.sclk.Unlock()
|
||||
|
||||
reserved, err := m.loadBitField(ctx, "reserved")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
allocated, err := m.loadBitField(ctx, "allocated")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if allocated == nil {
|
||||
i, err := m.legacySc.Next()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
rl := &rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{
|
||||
{
|
||||
Val: true,
|
||||
Len: i,
|
||||
},
|
||||
}}
|
||||
|
||||
bf, err := bitfield.NewFromIter(rl)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
allocated = &bf
|
||||
}
|
||||
|
||||
if reserved == nil {
|
||||
reserved = allocated
|
||||
}
|
||||
|
||||
// todo union with miner allocated nums
|
||||
inuse, err := bitfield.MergeBitFields(*reserved, *allocated)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
iri, err := inuse.RunIterator()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var firstFree abi.SectorNumber
|
||||
for iri.HasNext() {
|
||||
r, err := iri.NextRun()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !r.Val {
|
||||
break
|
||||
}
|
||||
firstFree += abi.SectorNumber(r.Len)
|
||||
}
|
||||
|
||||
allocated.Set(uint64(firstFree))
|
||||
|
||||
if err := m.saveBitField(ctx, "allocated", allocated); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// save legacy counter so that in case of a miner downgrade things keep working
|
||||
{
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
size := binary.PutUvarint(buf, uint64(firstFree))
|
||||
|
||||
if err := m.ds.Put(ctx, datastore.NewKey(StorageCounterDSPrefix), buf[:size]); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return firstFree, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user