Use new specs-storage interface

This commit is contained in:
Łukasz Magiera 2020-03-17 21:19:52 +01:00
parent d255b7d6e9
commit 6ce8cda0a3
3 changed files with 50 additions and 14 deletions

View File

@ -16,7 +16,7 @@ func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(&nullreader.Reader{}, int64(size))
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
if len(sizes) == 0 {
return nil, nil
}
@ -55,13 +55,18 @@ func (m *Sealing) PledgeSector() error {
return
}
sid, err := m.sealer.NewSector()
sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
err = m.sealer.NewSector(ctx, m.minerSector(sid))
if err != nil {
log.Errorf("%+v", err)
return
}
pieces, err := m.pledgeSector(ctx, sid, []abi.UnpaddedPieceSize{}, size)
pieces, err := m.pledgeSector(ctx, m.minerSector(sid), []abi.UnpaddedPieceSize{}, size)
if err != nil {
log.Errorf("%+v", err)
return

View File

@ -30,6 +30,10 @@ var log = logging.Logger("sectors")
type TicketFn func(context.Context) (*api.SealTicket, error)
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type sealingApi interface { // TODO: trim down
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
@ -68,6 +72,7 @@ type Sealing struct {
sealer sealmgr.Manager
sectors *statemachine.StateGroup
tktFn TicketFn
sc SectorIDCounter
}
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing {
@ -104,9 +109,14 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector
sid, err := m.sc.Next()
if err != nil {
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) // TODO: Put more than one thing in a sector
if err != nil {
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
}
// offset hard-coded to 0 since we only put one thing in a sector for now
@ -116,7 +126,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error {
log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sealer.AddPiece(ctx, sectorID, []abi.UnpaddedPieceSize{}, size, r)
ppi, err := m.sealer.AddPiece(ctx, m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r)
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
}
@ -144,3 +154,19 @@ func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces
sectorType: rt,
})
}
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)
}
return abi.SectorID{
Number: num,
Miner: abi.ActorID(mid),
}
}
func (m *Sealing) Address() address.Address {
return m.maddr
}

View File

@ -2,6 +2,7 @@ package sealing
import (
"context"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -40,7 +41,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
}
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
pieces, err := m.pledgeSector(ctx.Context(), m.minerSector(sector.SectorID), sector.existingPieces(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}
@ -69,19 +70,19 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o)
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
return ctx.Send(SectorSealed{
commD: unsealed,
commR: sealed,
commD: cids.Unsealed,
commR: cids.Sealed,
ticket: *ticket,
})
}
@ -184,12 +185,16 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD)
c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
cids := storage.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), cids)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in)
proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorID), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
@ -250,7 +255,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorID)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}