Merge pull request #36 from filecoin-project/feat/deal-sechd-prio

Give sector with deals priority in sealing
This commit is contained in:
Łukasz Magiera 2020-06-25 18:08:32 +02:00 committed by GitHub
commit 379a4655b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 9 deletions

2
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 // indirect
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246
github.com/filecoin-project/specs-actors v0.6.0 github.com/filecoin-project/specs-actors v0.6.0
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-cid v0.0.5

4
go.sum
View File

@ -55,8 +55,8 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d h1:yJJqXCMEhvXJoOS6T1O46FXl+A3mlttXhgjcTCp+Tgo= github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246 h1:NfYQRmVRe0LzlNbK5Ket3vbBOwFD5TvtcNtfo/Sd8mg=
github.com/filecoin-project/sector-storage v0.0.0-20200623210524-47d93356586d/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY= github.com/filecoin-project/sector-storage v0.0.0-20200625154333-98ef8e4ef246/go.mod h1:8f0hWDzzIi1hKs4IVKH9RnDsO4LEHVz8BNat0okDOuY=
github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA= github.com/filecoin-project/specs-actors v0.0.0-20200210130641-2d1fbd8672cf/go.mod h1:xtDZUB6pe4Pksa/bAJbJ693OilaC5Wbot9jMhLm3cZA=
github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0= github.com/filecoin-project/specs-actors v0.3.0 h1:QxgAuTrZr5TPqjyprZk0nTYW5o0JWpzbb5v+4UHHvN0=
github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y= github.com/filecoin-project/specs-actors v0.3.0/go.mod h1:nQYnFbQ7Y0bHZyq6HDEuVlCPR+U3z5Q3wMOQ+2aiV+Y=

View File

@ -108,7 +108,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, d DealInfo) error { func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, d DealInfo) error {
log.Infof("Seal piece for deal %d", d.DealID) log.Infof("Seal piece for deal %d", d.DealID)
ppi, err := m.sealer.AddPiece(ctx, m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r) ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r)
if err != nil { if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err) return xerrors.Errorf("adding piece to sector: %w", err)
} }

View File

@ -15,6 +15,8 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
) )
var DealSectorPriority = 1024
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
@ -97,7 +99,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)}) return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)})
} }
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos()) pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
if err != nil { if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
} }
@ -110,7 +112,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
} }
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorNumber), sector.PreCommit1Out) cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.PreCommit1Out)
if err != nil { if err != nil {
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
} }
@ -251,12 +253,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
Unsealed: *sector.CommD, Unsealed: *sector.CommD,
Sealed: *sector.CommR, Sealed: *sector.CommR,
} }
c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids) c2in, err := m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
if err != nil { if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)}) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
} }
proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorNumber), c2in) proof, err := m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), c2in)
if err != nil { if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)}) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
} }
@ -332,7 +334,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality // TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorNumber), nil); err != nil { if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorNumber), nil); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
} }

View File

@ -2,9 +2,11 @@ package sealing
import ( import (
"bytes" "bytes"
"context"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -114,6 +116,27 @@ func (t *SectorInfo) existingPieceSizes() []abi.UnpaddedPieceSize {
return out return out
} }
func (t *SectorInfo) hasDeals() bool {
for _, piece := range t.Pieces {
if piece.DealInfo != nil {
return true
}
}
return false
}
func (t *SectorInfo) sealingCtx(ctx context.Context) context.Context {
// TODO: can also take start epoch into account to give priority to sectors
// we need sealed sooner
if t.hasDeals() {
return sectorstorage.WithPriority(ctx, DealSectorPriority)
}
return ctx
}
type SectorIDCounter interface { type SectorIDCounter interface {
Next() (abi.SectorNumber, error) Next() (abi.SectorNumber, error)
} }