From b31c8a56f7523156cafe61def021ce13a9f4bcd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 3 Mar 2020 23:19:22 +0100 Subject: [PATCH] Storage Manager refactor --- garbage.go | 8 ++++---- sealing.go | 14 +++++++------- states.go | 28 +++++++++++++++------------- types.go | 3 ++- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/garbage.go b/garbage.go index 660a2604a..42d1bc80a 100644 --- a/garbage.go +++ b/garbage.go @@ -25,7 +25,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) + ppi, err := m.sealer.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) if err != nil { return nil, xerrors.Errorf("add piece: %w", err) } @@ -47,15 +47,15 @@ func (m *Sealing) PledgeSector() error { // this, as we run everything here async, and it's cancelled when the // command exits - size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() + size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() - rt, _, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) + rt, _, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { log.Error(err) return } - sid, err := m.sb.AcquireSectorNumber() + sid, err := m.sealer.NewSector() if err != nil { log.Errorf("%+v", err) return diff --git a/sealing.go b/sealing.go index cd2f6ff5f..a7d11586a 100644 --- a/sealing.go +++ b/sealing.go @@ -12,7 +12,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-padreader" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -22,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/statemachine" + "github.com/filecoin-project/lotus/storage/sealmgr" ) const SectorStorePrefix = "/sectors" @@ -65,19 +65,19 @@ type Sealing struct { maddr address.Address worker address.Address - sb sectorbuilder.Interface + sealer sealmgr.Manager sectors *statemachine.StateGroup tktFn TicketFn } -func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing { +func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing { s := &Sealing{ api: api, events: events, maddr: maddr, worker: worker, - sb: sb, + sealer: sealer, tktFn: tktFn, } @@ -104,7 +104,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } - sid, err := m.sb.AcquireSectorNumber() // TODO: Put more than one thing in a sector + sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector if err != nil { return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) } @@ -116,12 +116,12 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) + ppi, err := m.sealer.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } - _, rt, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) + _, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) } diff --git a/states.go b/states.go index b3f033a9d..d7966a373 100644 --- a/states.go +++ b/states.go @@ -5,7 +5,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -26,7 +25,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err allocated += piece.Size } - ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() + ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() if allocated > ubytes { return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) @@ -70,7 +69,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } - sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) + if err != nil { + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) + } + + sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o) if err != nil { return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) } @@ -180,7 +184,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD) - proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) + c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) + if err != nil { + return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) + } + + proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } @@ -241,15 +250,8 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: Maybe wait for some finality - if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { - if !xerrors.Is(err, fs.ErrNoSuitablePath) { - return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) - } - log.Warnf("finalize sector: %v", err) - } - - if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil { - return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)}) + if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } return ctx.Send(SectorFinalized{}) diff --git a/types.go b/types.go index b25a43ef8..4e9e0dccc 100644 --- a/types.go +++ b/types.go @@ -1,9 +1,10 @@ package sealing import ( - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" ) type Piece struct {