diff --git a/fsm.go b/fsm.go index 478779e96..ff9ca4d9b 100644 --- a/fsm.go +++ b/fsm.go @@ -34,8 +34,16 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{ // Sealing - UndefinedSectorState: planOne(on(SectorStart{}, Packing)), - Packing: planOne(on(SectorPacked{}, PreCommit1)), + UndefinedSectorState: planOne( + on(SectorStart{}, Empty), + on(SectorStartCC{}, Packing), + ), + Empty: planOne(on(SectorAddPiece{}, WaitDeals)), + WaitDeals: planOne( + on(SectorAddPiece{}, WaitDeals), + on(SectorStartPacking{}, Packing), + ), + Packing: planOne(on(SectorPacked{}, PreCommit1)), PreCommit1: planOne( on(SectorPreCommit1{}, PreCommit2), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), diff --git a/fsm_events.go b/fsm_events.go index 1c7991834..a631ec08d 100644 --- a/fsm_events.go +++ b/fsm_events.go @@ -50,15 +50,37 @@ func (evt SectorForceState) applyGlobal(state *SectorInfo) bool { type SectorStart struct { ID abi.SectorNumber SectorType abi.RegisteredSealProof - Pieces []Piece } func (evt SectorStart) apply(state *SectorInfo) { + state.SectorNumber = evt.ID + state.SectorType = evt.SectorType +} + +type SectorStartCC struct { + ID abi.SectorNumber + SectorType abi.RegisteredSealProof + Pieces []Piece +} + +func (evt SectorStartCC) apply(state *SectorInfo) { state.SectorNumber = evt.ID state.Pieces = evt.Pieces state.SectorType = evt.SectorType } +type SectorAddPiece struct { + NewPiece Piece +} + +func (evt SectorAddPiece) apply(state *SectorInfo) { + state.Pieces = append(state.Pieces, evt.NewPiece) +} + +type SectorStartPacking struct{} + +func (evt SectorStartPacking) apply(*SectorInfo) {} + type SectorPacked struct{ FillerPieces []abi.PieceInfo } func (evt SectorPacked) apply(state *SectorInfo) { diff --git a/garbage.go b/garbage.go index 9e6246b4c..0464259c3 100644 --- a/garbage.go +++ b/garbage.go @@ -6,7 +6,6 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/specs-actors/actors/abi" nr "github.com/filecoin-project/storage-fsm/lib/nullreader" @@ -46,12 +45,6 @@ func (m *Sealing) PledgeSector() error { size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() - rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) - if err != nil { - log.Error(err) - return - } - sid, err := m.sc.Next() if err != nil { log.Errorf("%+v", err) @@ -77,7 +70,7 @@ func (m *Sealing) PledgeSector() error { } } - if err := m.newSector(sid, rt, ps); err != nil { + if err := m.newSectorCC(sid, ps); err != nil { log.Errorf("%+v", err) return } diff --git a/sealing.go b/sealing.go index b4f3899bc..9eea593a2 100644 --- a/sealing.go +++ b/sealing.go @@ -53,7 +53,14 @@ type Sealing struct { sc SectorIDCounter verif ffiwrapper.Verifier - pcp PreCommitPolicy + unsealedInfos map[abi.SectorNumber]UnsealedSectorInfo + pcp PreCommitPolicy +} + +type UnsealedSectorInfo struct { + // stored should always equal sum of pieceSizes + stored uint64 + pieceSizes []abi.UnpaddedPieceSize } func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy) *Sealing { @@ -61,11 +68,12 @@ func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batc api: api, events: events, - maddr: maddr, - sealer: sealer, - sc: sc, - verif: verif, - pcp: pcp, + maddr: maddr, + sealer: sealer, + sc: sc, + verif: verif, + unsealedInfos: make(map[abi.SectorNumber]UnsealedSectorInfo), + pcp: pcp, } s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) @@ -85,63 +93,135 @@ func (m *Sealing) Run(ctx context.Context) error { func (m *Sealing) Stop(ctx context.Context) error { return m.sectors.Stop(ctx) } - -func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.SectorNumber, offset uint64, err error) { +func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, uint64, error) { + log.Infof("Adding piece for deal %d", d.DealID) if (padreader.PaddedSize(uint64(size))) != size { return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } - sid, err := m.sc.Next() - if err != nil { - return 0, 0, xerrors.Errorf("getting sector number: %w", err) + if size > abi.UnpaddedPieceSize(m.sealer.SectorSize()) { + return 0, 0, xerrors.Errorf("piece cannot fit into a sector") } - err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) // TODO: Put more than one thing in a sector + sid, err := m.getAvailableSector(size) if err != nil { - return 0, 0, xerrors.Errorf("initializing sector: %w", err) + return 0, 0, xerrors.Errorf("creating new sector: %w", err) } - // offset hard-coded to 0 since we only put one thing in a sector for now - return sid, 0, nil + offset := m.unsealedInfos[sid].stored + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sid), m.unsealedInfos[sid].pieceSizes, size, r) + if err != nil { + return 0, 0, xerrors.Errorf("writing piece: %w", err) + } + + err = m.addPiece(sid, Piece{ + Piece: ppi, + DealInfo: &d, + }) + + if err != nil { + return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) + } + + return sid, offset, nil } -func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, d DealInfo) error { - log.Infof("Seal piece for deal %d", d.DealID) - - ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r) +func (m *Sealing) addPiece(sectorID abi.SectorNumber, piece Piece) error { + log.Infof("Adding piece to sector %d", sectorID) + err := m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece}) if err != nil { - return xerrors.Errorf("adding piece to sector: %w", err) + return err } + ui := m.unsealedInfos[sectorID] + m.unsealedInfos[sectorID] = UnsealedSectorInfo{ + stored: ui.stored + uint64(piece.Piece.Size.Unpadded()), + pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()), + } + + return nil +} + +func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorRemove{}) +} + +func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { + log.Infof("Starting packing sector %d", sectorID) + err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) + if err != nil { + return err + } + + delete(m.unsealedInfos, sectorID) + + return nil +} + +func (m *Sealing) getAvailableSector(size abi.UnpaddedPieceSize) (abi.SectorNumber, error) { + ss := m.sealer.SectorSize() + for k, v := range m.unsealedInfos { + if v.stored+uint64(size) <= uint64(ss) { + // TODO: Support multiple deal sizes in the same sector + if len(v.pieceSizes) == 0 || v.pieceSizes[0] == size { + return k, nil + } + } + } + + return m.newSector() +} + +// newSector creates a new sector for deal storage +func (m *Sealing) newSector() (abi.SectorNumber, error) { + sid, err := m.sc.Next() + if err != nil { + return 0, xerrors.Errorf("getting sector number: %w", err) + } + + err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) + if err != nil { + return 0, xerrors.Errorf("initializing sector: %w", err) + } + + rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) + if err != nil { + return 0, xerrors.Errorf("bad sector size: %w", err) + } + + log.Infof("Creating sector %d", sid) + err = m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: rt, + }) + + if err != nil { + return 0, xerrors.Errorf("starting the sector fsm: %w", err) + } + + m.unsealedInfos[sid] = UnsealedSectorInfo{ + stored: 0, + pieceSizes: nil, + } + + return sid, nil +} + +// newSectorCC accepts a slice of pieces with no deal (junk data) +func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error { rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) } - return m.newSector(sectorID, rt, []Piece{ - { - Piece: ppi, - DealInfo: &d, - }, - }) -} - -// newSector accepts a slice of pieces which will have a deal associated with -// them (in the event of a storage deal) or no deal (in the event of sealing -// garbage data) -func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredSealProof, pieces []Piece) error { - log.Infof("Start sealing %d", sid) - return m.sectors.Send(uint64(sid), SectorStart{ + log.Infof("Creating CC sector %d", sid) + return m.sectors.Send(uint64(sid), SectorStartCC{ ID: sid, Pieces: pieces, SectorType: rt, }) } -func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { - return m.sectors.Send(uint64(sid), SectorRemove{}) -} - func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID { mid, err := address.IDFromAddress(m.maddr) if err != nil { diff --git a/sector_state.go b/sector_state.go index 63058cd6c..f2801c9fc 100644 --- a/sector_state.go +++ b/sector_state.go @@ -7,6 +7,7 @@ const ( // happy path Empty SectorState = "Empty" + WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector Packing SectorState = "Packing" // sector not in sealStore, and not on chain PreCommit1 SectorState = "PreCommit1" // do PreCommit1 PreCommit2 SectorState = "PreCommit2" // do PreCommit1