storagefsm: Cleanup CC sector creation

This commit is contained in:
Łukasz Magiera 2021-02-16 17:14:59 +01:00
parent dd82729f60
commit fc5e243c92
6 changed files with 47 additions and 93 deletions

View File

@ -70,12 +70,10 @@ func (evt SectorStart) apply(state *SectorInfo) {
type SectorStartCC struct {
ID abi.SectorNumber
SectorType abi.RegisteredSealProof
Pieces []Piece
}
func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorNumber = evt.ID
state.Pieces = evt.Pieces
state.SectorType = evt.SectorType
}

View File

@ -4,34 +4,12 @@ import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
)
func (m *Sealing) pledgeSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}
func (m *Sealing) PledgeSector(ctx context.Context) error {
m.inputLk.Lock()
defer m.inputLk.Unlock()
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
existingPieceSizes = append(existingPieceSizes, size)
out[i] = ppi
}
return out, nil
}
func (m *Sealing) PledgeSector() error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
@ -43,53 +21,24 @@ func (m *Sealing) PledgeSector() error {
}
}
go func() {
ctx := context.TODO() // we can't use the context from command which invokes
// this, as we run everything here async, and it's cancelled when the
// command exits
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting seal proof type: %w", err)
}
spt, err := m.currentSealProof(ctx)
if err != nil {
log.Errorf("%+v", err)
return
}
sid, err := m.sc.Next()
if err != nil {
return xerrors.Errorf("generating sector number: %w", err)
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
return xerrors.Errorf("notifying sealer of the new sector: %w", err)
}
size, err := spt.SectorSize()
if err != nil {
log.Errorf("%+v", err)
return
}
sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
sectorID := m.minerSector(spt, sid)
err = m.sealer.NewSector(ctx, sectorID)
if err != nil {
log.Errorf("%+v", err)
return
}
pieces, err := m.pledgeSector(ctx, sectorID, []abi.UnpaddedPieceSize{}, abi.PaddedPieceSize(size).Unpadded())
if err != nil {
log.Errorf("%+v", err)
return
}
ps := make([]Piece, len(pieces))
for idx := range ps {
ps[idx] = Piece{
Piece: pieces[idx],
DealInfo: nil,
}
}
if err := m.newSectorCC(ctx, sid, ps); err != nil {
log.Errorf("%+v", err)
return
}
}()
return nil
log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
SectorType: spt,
})
}

View File

@ -202,21 +202,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx)
}
// newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof type: %w", err)
}
log.Infof("Creating CC sector %d", sid)
return m.sectors.Send(uint64(sid), SectorStartCC{
ID: sid,
Pieces: pieces,
SectorType: spt,
})
}
func (m *Sealing) currentSealProof(ctx context.Context) (abi.RegisteredSealProof, error) {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, nil)
if err != nil {

View File

@ -70,7 +70,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
}
fillerPieces, err := m.pledgeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}
@ -78,6 +78,28 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
}
func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
}
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
existingPieceSizes = append(existingPieceSizes, size)
out[i] = ppi
}
return out, nil
}
func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
}

View File

@ -122,7 +122,7 @@ func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Add
}
func (sm *StorageMinerAPI) PledgeSector(ctx context.Context) error {
return sm.Miner.PledgeSector()
return sm.Miner.PledgeSector(ctx)
}
func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {

View File

@ -34,8 +34,8 @@ func (m *Miner) GetSectorInfo(sid abi.SectorNumber) (sealing.SectorInfo, error)
return m.sealing.GetSectorInfo(sid)
}
func (m *Miner) PledgeSector() error {
return m.sealing.PledgeSector()
func (m *Miner) PledgeSector(ctx context.Context) error {
return m.sealing.PledgeSector(ctx)
}
func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state sealing.SectorState) error {