Storage Manager refactor

This commit is contained in:
Łukasz Magiera 2020-03-03 23:19:22 +01:00
parent 94073cfb5a
commit b31c8a56f7
4 changed files with 28 additions and 25 deletions

View File

@ -25,7 +25,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e
out := make([]Piece, len(sizes))
for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
ppi, err := m.sealer.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
@ -47,15 +47,15 @@ func (m *Sealing) PledgeSector() error {
// this, as we run everything here async, and it's cancelled when the
// command exits
size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
rt, _, err := api.ProofTypeFromSectorSize(m.sb.SectorSize())
rt, _, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
log.Error(err)
return
}
sid, err := m.sb.AcquireSectorNumber()
sid, err := m.sealer.NewSector()
if err != nil {
log.Errorf("%+v", err)
return

View File

@ -12,7 +12,6 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -22,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
const SectorStorePrefix = "/sectors"
@ -65,19 +65,19 @@ type Sealing struct {
maddr address.Address
worker address.Address
sb sectorbuilder.Interface
sealer sealmgr.Manager
sectors *statemachine.StateGroup
tktFn TicketFn
}
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing {
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing {
s := &Sealing{
api: api,
events: events,
maddr: maddr,
worker: worker,
sb: sb,
sealer: sealer,
tktFn: tktFn,
}
@ -104,7 +104,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sid, err := m.sb.AcquireSectorNumber() // TODO: Put more than one thing in a sector
sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector
if err != nil {
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
}
@ -116,12 +116,12 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error {
log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{})
ppi, err := m.sealer.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{})
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
}
_, rt, err := api.ProofTypeFromSectorSize(m.sb.SectorSize())
_, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
return xerrors.Errorf("bad sector size: %w", err)
}

View File

@ -5,7 +5,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -26,7 +25,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
allocated += piece.Size
}
ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
if allocated > ubytes {
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
@ -70,7 +69,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
@ -180,7 +184,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD)
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
@ -241,15 +250,8 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
if !xerrors.Is(err, fs.ErrNoSuitablePath) {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
log.Warnf("finalize sector: %v", err)
}
if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)})
if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
return ctx.Send(SectorFinalized{})

View File

@ -1,9 +1,10 @@
package sealing
import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
)
type Piece struct {