From 6ce8cda0a3d40af544174584680d8d69ea43dd48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Mar 2020 21:19:52 +0100 Subject: [PATCH] Use new specs-storage interface --- garbage.go | 11 ++++++++--- sealing.go | 32 +++++++++++++++++++++++++++++--- states.go | 21 +++++++++++++-------- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/garbage.go b/garbage.go index 445ba8969..4903abf25 100644 --- a/garbage.go +++ b/garbage.go @@ -16,7 +16,7 @@ func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader { return io.LimitReader(&nullreader.Reader{}, int64(size)) } -func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) { +func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) { if len(sizes) == 0 { return nil, nil } @@ -55,13 +55,18 @@ func (m *Sealing) PledgeSector() error { return } - sid, err := m.sealer.NewSector() + sid, err := m.sc.Next() + if err != nil { + log.Errorf("%+v", err) + return + } + err = m.sealer.NewSector(ctx, m.minerSector(sid)) if err != nil { log.Errorf("%+v", err) return } - pieces, err := m.pledgeSector(ctx, sid, []abi.UnpaddedPieceSize{}, size) + pieces, err := m.pledgeSector(ctx, m.minerSector(sid), []abi.UnpaddedPieceSize{}, size) if err != nil { log.Errorf("%+v", err) return diff --git a/sealing.go b/sealing.go index b74d34db1..ed793b0ae 100644 --- a/sealing.go +++ b/sealing.go @@ -30,6 +30,10 @@ var log = logging.Logger("sectors") type TicketFn func(context.Context) (*api.SealTicket, error) +type SectorIDCounter interface { + Next() (abi.SectorNumber, error) +} + type sealingApi interface { // TODO: trim down // Call a read only method on actors (no interaction with the chain required) StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) @@ -68,6 +72,7 @@ type Sealing struct { sealer sealmgr.Manager sectors *statemachine.StateGroup tktFn TicketFn + sc SectorIDCounter } func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing { @@ -104,9 +109,14 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } - sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector + sid, err := m.sc.Next() if err != nil { - return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) + return 0, 0, xerrors.Errorf("getting sector number: %w", err) + } + + err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) // TODO: Put more than one thing in a sector + if err != nil { + return 0, 0, xerrors.Errorf("initializing sector: %w", err) } // offset hard-coded to 0 since we only put one thing in a sector for now @@ -116,7 +126,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sealer.AddPiece(ctx, sectorID, []abi.UnpaddedPieceSize{}, size, r) + ppi, err := m.sealer.AddPiece(ctx, m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } @@ -144,3 +154,19 @@ func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces sectorType: rt, }) } + +func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID { + mid, err := address.IDFromAddress(m.maddr) + if err != nil { + panic(err) + } + + return abi.SectorID{ + Number: num, + Miner: abi.ActorID(mid), + } +} + +func (m *Sealing) Address() address.Address { + return m.maddr +} diff --git a/states.go b/states.go index e12860447..ca02e90f4 100644 --- a/states.go +++ b/states.go @@ -2,6 +2,7 @@ package sealing import ( "context" + "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -40,7 +41,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) } - pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...) + pieces, err := m.pledgeSector(ctx.Context(), m.minerSector(sector.SectorID), sector.existingPieces(), fillerSizes...) if err != nil { return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } @@ -69,19 +70,19 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } - pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos()) if err != nil { return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) } - sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o) + cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o) if err != nil { return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) } return ctx.Send(SectorSealed{ - commD: unsealed, - commR: sealed, + commD: cids.Unsealed, + commR: cids.Sealed, ticket: *ticket, }) } @@ -184,12 +185,16 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD) - c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) + cids := storage.SectorCids{ + Unsealed: *sector.CommD, + Sealed: *sector.CommR, + } + c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), cids) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } - proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in) + proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorID), c2in) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } @@ -250,7 +255,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: Maybe wait for some finality - if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { + if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorID)); err != nil { return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) }