diff --git a/go.mod b/go.mod index 106669a8f..07b86ce9f 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 - github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d + github.com/filecoin-project/sector-storage v0.0.0-20200624210703-6c04c8d4c3ce github.com/filecoin-project/specs-actors v0.6.0 github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea github.com/ipfs/go-cid v0.0.5 diff --git a/go.sum b/go.sum index f9e8224cc..f98e1ab50 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= -github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d h1:yJJqXCMEhvXJoOS6T1O46FXl+A3mlttXhgjcTCp+Tgo= -github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY= +github.com/filecoin-project/sector-storage v0.0.0-20200624210703-6c04c8d4c3ce h1:+MzcyIekjXIE1xOfXpo0u/xErY/U00EtlHbu2UG2m8s= +github.com/filecoin-project/sector-storage v0.0.0-20200624210703-6c04c8d4c3ce/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= diff --git a/sealing.go b/sealing.go index 573e0886d..b4f3899bc 100644 --- a/sealing.go +++ b/sealing.go @@ -108,7 +108,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, d DealInfo) error { log.Infof("Seal piece for deal %d", d.DealID) - ppi, err := m.sealer.AddPiece(ctx, m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r) + ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } diff --git a/states_sealing.go b/states_sealing.go index 4e8e7e5c1..7de46d4da 100644 --- a/states_sealing.go +++ b/states_sealing.go @@ -7,6 +7,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-statemachine" + sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" @@ -15,6 +16,8 @@ import ( "github.com/filecoin-project/specs-storage/storage" ) +var DealSectorPriority = sectorstorage.DefaultSchedPriority * 2 + func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) @@ -97,7 +100,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)}) } - pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) if err != nil { return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } @@ -110,7 +113,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { - cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorNumber), sector.PreCommit1Out) + cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.PreCommit1Out) if err != nil { return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } @@ -251,12 +254,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) Unsealed: *sector.CommD, Sealed: *sector.CommR, } - c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) + c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)}) } - proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorNumber), c2in) + proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), c2in) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) } @@ -332,7 +335,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: Maybe wait for some finality - if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorNumber), nil); err != nil { + if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), nil); err != nil { return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } diff --git a/types.go b/types.go index 3bbb15e76..c76a6c68d 100644 --- a/types.go +++ b/types.go @@ -2,9 +2,11 @@ package sealing import ( "bytes" + "context" "github.com/ipfs/go-cid" + sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-storage/storage" @@ -114,6 +116,27 @@ func (t *SectorInfo) existingPieceSizes() []abi.UnpaddedPieceSize { return out } +func (t *SectorInfo) hasDeals() bool { + for _, piece := range t.Pieces { + if piece.DealInfo != nil { + return true + } + } + + return false +} + +func (t *SectorInfo) sealingCtx(ctx context.Context) context.Context { + // TODO: can also take start epoch into account to give priority to sectors + // we need sealed sooner + + if t.hasDeals() { + return sectorstorage.WithPriority(ctx, DealSectorPriority) + } + + return ctx +} + type SectorIDCounter interface { Next() (abi.SectorNumber, error) }