Merge pull request #5612 from filecoin-project/feat/cc-fsm-cleanup
storagefsm: Cleanup CC sector creation
This commit is contained in:
commit
29c9fa3137
@ -36,7 +36,7 @@ type StorageMiner interface {
|
|||||||
MiningBase(context.Context) (*types.TipSet, error)
|
MiningBase(context.Context) (*types.TipSet, error)
|
||||||
|
|
||||||
// Temp api for testing
|
// Temp api for testing
|
||||||
PledgeSector(context.Context) error
|
PledgeSector(context.Context) (abi.SectorID, error)
|
||||||
|
|
||||||
// Get the status of a given sector by ID
|
// Get the status of a given sector by ID
|
||||||
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
|
SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (SectorInfo, error)
|
||||||
|
@ -304,7 +304,7 @@ type StorageMinerStruct struct {
|
|||||||
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
|
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
|
||||||
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`
|
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`
|
||||||
|
|
||||||
PledgeSector func(context.Context) error `perm:"write"`
|
PledgeSector func(context.Context) (abi.SectorID, error) `perm:"write"`
|
||||||
|
|
||||||
SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
|
SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
|
||||||
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
|
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
|
||||||
@ -1274,7 +1274,7 @@ func (c *StorageMinerStruct) ActorAddressConfig(ctx context.Context) (api.Addres
|
|||||||
return c.Internal.ActorAddressConfig(ctx)
|
return c.Internal.ActorAddressConfig(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) PledgeSector(ctx context.Context) error {
|
func (c *StorageMinerStruct) PledgeSector(ctx context.Context) (abi.SectorID, error) {
|
||||||
return c.Internal.PledgeSector(ctx)
|
return c.Internal.PledgeSector(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
"github.com/filecoin-project/go-state-types/network"
|
"github.com/filecoin-project/go-state-types/network"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -75,23 +74,9 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
|
|||||||
<-done
|
<-done
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = miner.PledgeSector(ctx)
|
sid, err := miner.PledgeSector(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Wait till done.
|
|
||||||
var sectorNo abi.SectorNumber
|
|
||||||
for {
|
|
||||||
s, err := miner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM
|
|
||||||
require.NoError(t, err)
|
|
||||||
fmt.Printf("Sectors: %d\n", len(s))
|
|
||||||
if len(s) == 1 {
|
|
||||||
sectorNo = s[0]
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
build.Clock.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("All sectors is fsm\n")
|
fmt.Printf("All sectors is fsm\n")
|
||||||
|
|
||||||
// If before, we expect the precommit to fail
|
// If before, we expect the precommit to fail
|
||||||
@ -103,7 +88,7 @@ func testTapeFix(t *testing.T, b APIBuilder, blocktime time.Duration, after bool
|
|||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
st, err := miner.SectorsStatus(ctx, sectorNo, false)
|
st, err := miner.SectorsStatus(ctx, sid.Number, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if st.State == successState {
|
if st.State == successState {
|
||||||
break
|
break
|
||||||
|
@ -162,7 +162,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n,
|
|||||||
log.Errorf("WAIT")
|
log.Errorf("WAIT")
|
||||||
}
|
}
|
||||||
log.Errorf("PLEDGING %d", i)
|
log.Errorf("PLEDGING %d", i)
|
||||||
err := miner.PledgeSector(ctx)
|
_, err := miner.PledgeSector(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,7 +55,14 @@ var sectorsPledgeCmd = &cli.Command{
|
|||||||
defer closer()
|
defer closer()
|
||||||
ctx := lcli.ReqContext(cctx)
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
return nodeApi.PledgeSector(ctx)
|
id, err := nodeApi.PledgeSector(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("Created CC sector: ", id.Number)
|
||||||
|
|
||||||
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1143,7 +1143,13 @@ Perms: write
|
|||||||
|
|
||||||
Inputs: `null`
|
Inputs: `null`
|
||||||
|
|
||||||
Response: `{}`
|
Response:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"Miner": 1000,
|
||||||
|
"Number": 9
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## Return
|
## Return
|
||||||
|
|
||||||
|
2
extern/storage-sealing/fsm_events.go
vendored
2
extern/storage-sealing/fsm_events.go
vendored
@ -70,12 +70,10 @@ func (evt SectorStart) apply(state *SectorInfo) {
|
|||||||
type SectorStartCC struct {
|
type SectorStartCC struct {
|
||||||
ID abi.SectorNumber
|
ID abi.SectorNumber
|
||||||
SectorType abi.RegisteredSealProof
|
SectorType abi.RegisteredSealProof
|
||||||
Pieces []Piece
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorStartCC) apply(state *SectorInfo) {
|
func (evt SectorStartCC) apply(state *SectorInfo) {
|
||||||
state.SectorNumber = evt.ID
|
state.SectorNumber = evt.ID
|
||||||
state.Pieces = evt.Pieces
|
|
||||||
state.SectorType = evt.SectorType
|
state.SectorType = evt.SectorType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
95
extern/storage-sealing/garbage.go
vendored
95
extern/storage-sealing/garbage.go
vendored
@ -5,91 +5,42 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
"github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
|
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
||||||
if len(sizes) == 0 {
|
m.inputLk.Lock()
|
||||||
return nil, nil
|
defer m.inputLk.Unlock()
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
|
|
||||||
|
|
||||||
out := make([]abi.PieceInfo, len(sizes))
|
|
||||||
for i, size := range sizes {
|
|
||||||
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("add piece: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
existingPieceSizes = append(existingPieceSizes, size)
|
|
||||||
|
|
||||||
out[i] = ppi
|
|
||||||
}
|
|
||||||
|
|
||||||
return out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Sealing) PledgeSector() error {
|
|
||||||
cfg, err := m.getConfig()
|
cfg, err := m.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting config: %w", err)
|
return storage.SectorRef{}, xerrors.Errorf("getting config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.MaxSealingSectors > 0 {
|
if cfg.MaxSealingSectors > 0 {
|
||||||
if m.stats.curSealing() >= cfg.MaxSealingSectors {
|
if m.stats.curSealing() >= cfg.MaxSealingSectors {
|
||||||
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
|
return storage.SectorRef{}, xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
spt, err := m.currentSealProof(ctx)
|
||||||
ctx := context.TODO() // we can't use the context from command which invokes
|
if err != nil {
|
||||||
// this, as we run everything here async, and it's cancelled when the
|
return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err)
|
||||||
// command exits
|
}
|
||||||
|
|
||||||
spt, err := m.currentSealProof(ctx)
|
sid, err := m.sc.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("%+v", err)
|
return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err)
|
||||||
return
|
}
|
||||||
}
|
sectorID := m.minerSector(spt, sid)
|
||||||
|
err = m.sealer.NewSector(ctx, sectorID)
|
||||||
|
if err != nil {
|
||||||
|
return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
size, err := spt.SectorSize()
|
log.Infof("Creating CC sector %d", sid)
|
||||||
if err != nil {
|
return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{
|
||||||
log.Errorf("%+v", err)
|
ID: sid,
|
||||||
return
|
SectorType: spt,
|
||||||
}
|
})
|
||||||
|
|
||||||
sid, err := m.sc.Next()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sectorID := m.minerSector(spt, sid)
|
|
||||||
err = m.sealer.NewSector(ctx, sectorID)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ps := make([]Piece, len(pieces))
|
|
||||||
for idx := range ps {
|
|
||||||
ps[idx] = Piece{
|
|
||||||
Piece: pieces[idx],
|
|
||||||
DealInfo: nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := m.newSectorCC(ctx, sid, ps); err != nil {
|
|
||||||
log.Errorf("%+v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
15
extern/storage-sealing/sealing.go
vendored
15
extern/storage-sealing/sealing.go
vendored
@ -202,21 +202,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
|||||||
return m.terminator.Pending(ctx)
|
return m.terminator.Pending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSectorCC accepts a slice of pieces with no deal (junk data)
|
|
||||||
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
|
|
||||||
spt, err := m.currentSealProof(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting current seal proof type: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Creating CC sector %d", sid)
|
|
||||||
return m.sectors.Send(uint64(sid), SectorStartCC{
|
|
||||||
ID: sid,
|
|
||||||
Pieces: pieces,
|
|
||||||
SectorType: spt,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
|
func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
|
||||||
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
|
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
24
extern/storage-sealing/states_sealing.go
vendored
24
extern/storage-sealing/states_sealing.go
vendored
@ -70,7 +70,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
|
fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
||||||
}
|
}
|
||||||
@ -78,6 +78,28 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
|
return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
|
||||||
|
if len(sizes) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
|
||||||
|
|
||||||
|
out := make([]abi.PieceInfo, len(sizes))
|
||||||
|
for i, size := range sizes {
|
||||||
|
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("add piece: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
existingPieceSizes = append(existingPieceSizes, size)
|
||||||
|
|
||||||
|
out[i] = ppi
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
|
func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
|
||||||
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
|
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
|
||||||
}
|
}
|
||||||
|
@ -121,8 +121,30 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add
|
|||||||
return mi.SectorSize, nil
|
return mi.SectorSize, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
|
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) (abi.SectorID, error) {
|
||||||
return sm.Miner.PledgeSector()
|
sr, err := sm.Miner.PledgeSector(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return abi.SectorID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the sector to enter the Packing state
|
||||||
|
// TODO: instead of polling implement some pubsub-type thing in storagefsm
|
||||||
|
for {
|
||||||
|
info, err := sm.Miner.GetSectorInfo(sr.ID.Number)
|
||||||
|
if err != nil {
|
||||||
|
return abi.SectorID{}, xerrors.Errorf("getting pledged sector info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.State != sealing.UndefinedSectorState {
|
||||||
|
return sr.ID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(10 * time.Millisecond):
|
||||||
|
case <-ctx.Done():
|
||||||
|
return abi.SectorID{}, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
|
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
|
||||||
@ -219,6 +241,10 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err
|
|||||||
|
|
||||||
out := make([]abi.SectorNumber, len(sectors))
|
out := make([]abi.SectorNumber, len(sectors))
|
||||||
for i, sector := range sectors {
|
for i, sector := range sectors {
|
||||||
|
if sector.State == sealing.UndefinedSectorState {
|
||||||
|
continue // sector ID not set yet
|
||||||
|
}
|
||||||
|
|
||||||
out[i] = sector.SectorNumber
|
out[i] = sector.SectorNumber
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
)
|
)
|
||||||
@ -34,8 +35,8 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error)
|
|||||||
return m.sealing.GetSectorInfo(sid)
|
return m.sealing.GetSectorInfo(sid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) PledgeSector() error {
|
func (m *Miner) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
||||||
return m.sealing.PledgeSector()
|
return m.sealing.PledgeSector(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {
|
func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user