storagefsm: Start packing correctly

This commit is contained in:
Łukasz Magiera 2021-01-18 21:59:34 +01:00
parent 239d6f8f4d
commit 542357a1df
6 changed files with 131 additions and 71 deletions

View File

@ -50,6 +50,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
), ),
AddPiece: planOne( AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals), on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}),
), ),
Packing: planOne(on(SectorPacked{}, GetTicket)), Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne( GetTicket: planOne(
@ -447,56 +448,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
} }
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) error) { return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) error { return mut, func(state *SectorInfo) (bool, error) {
state.State = next state.State = next
return nil return false, nil
} }
} }
} }
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { // like `on`, but doesn't change state
return func() (mutator, func(*SectorInfo) error) { func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) error { return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
return true, nil
}
}
}
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
if state.Return == "" { if state.Return == "" {
return xerrors.Errorf("return state not set") return false, xerrors.Errorf("return state not set")
} }
state.State = SectorState(state.Return) state.State = SectorState(state.Return)
state.Return = "" state.Return = ""
return nil return false, nil
} }
} }
} }
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok { for i, event := range events {
gm.applyGlobal(state) if gm, ok := event.User.(globalMutator); ok {
return 1, nil gm.applyGlobal(state)
} return uint64(i + 1), nil
}
for _, t := range ts { for _, t := range ts {
mut, next := t() mut, next := t()
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { if reflect.TypeOf(event.User) != reflect.TypeOf(mut) {
continue
}
if err, iserr := event.User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err)
}
event.User.(mutator).apply(state)
more, err := next(state)
if err != nil || !more {
return uint64(i + 1), err
}
}
_, ok := event.User.(Ignorable)
if ok {
continue continue
} }
if err, iserr := events[0].User.(error); iserr { return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event)
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err)
}
events[0].User.(mutator).apply(state)
return 1, next(state)
} }
_, ok := events[0].User.(Ignorable) return uint64(len(events)), nil
if ok {
return 1, nil
}
return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
} }
} }

View File

@ -1,13 +1,16 @@
package sealing package sealing
import ( import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
) )
type mutator interface { type mutator interface {
@ -81,6 +84,9 @@ type SectorAddPiece struct {
} }
func (evt SectorAddPiece) apply(state *SectorInfo) { func (evt SectorAddPiece) apply(state *SectorInfo) {
if state.CreationTime.IsZero() {
state.CreationTime = time.Now()
}
state.PendingPieces = append(state.PendingPieces, evt.NewPiece) state.PendingPieces = append(state.PendingPieces, evt.NewPiece)
} }

View File

@ -3,6 +3,7 @@ package sealing
import ( import (
"context" "context"
"sort" "sort"
"time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -21,9 +22,35 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
// if full / oldish / has oldish deals goto seal // if full / oldish / has oldish deals goto seal
// ^ also per sector deal limit // ^ also per sector deal limit
// send SectorStartPacking
m.inputLk.Lock() m.inputLk.Lock()
now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
return ctx.Send(SectorStartPacking{})
}
}
if !sector.CreationTime.IsZero() {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
sealTime := sector.CreationTime.Add(cfg.WaitDealsDelay)
if now.After(sealTime) {
return ctx.Send(SectorStartPacking{})
} else {
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
}
})
}
}
var used abi.UnpaddedPieceSize var used abi.UnpaddedPieceSize
for _, piece := range sector.Pieces { for _, piece := range sector.Pieces {
used += piece.Piece.Size.Unpadded() used += piece.Piece.Size.Unpadded()
@ -32,7 +59,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
used: used, used: used,
maybeAccept: func(cid cid.Cid) error { maybeAccept: func(cid cid.Cid) error {
// todo check space // todo double check space
// todo check deal expiration // todo check deal expiration
@ -62,6 +89,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
res := SectorPieceAdded{} res := SectorPieceAdded{}
var offset abi.UnpaddedPieceSize
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
for i, p := range sector.Pieces {
pieceSizes[i] = p.Piece.Size.Unpadded()
offset += p.Piece.Size.Unpadded()
}
for _, piece := range sector.PendingPieces { for _, piece := range sector.PendingPieces {
m.inputLk.Lock() m.inputLk.Lock()
deal, ok := m.pendingPieces[piece] deal, ok := m.pendingPieces[piece]
@ -72,23 +106,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber) return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber)
} }
var stored abi.PaddedPieceSize pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
for _, piece := range sector.Pieces {
stored += piece.Piece.Size
}
pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded()) if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
return xerrors.Errorf("piece assigned to a sector with not enough space") return xerrors.Errorf("piece assigned to a sector with not enough space")
} }
offset := padLength offset += padLength.Unpadded()
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
for i, p := range sector.Pieces {
pieceSizes[i] = p.Piece.Size.Unpadded()
offset += p.Piece.Size
}
for _, p := range pads { for _, p := range pads {
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority), ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority),
@ -115,9 +139,13 @@ func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) er
return xerrors.Errorf("writing padding piece: %w", err) // todo failed state return xerrors.Errorf("writing padding piece: %w", err) // todo failed state
} }
deal.accepted(sector.SectorNumber, offset, nil)
offset += deal.size
pieceSizes = append(pieceSizes, deal.size) pieceSizes = append(pieceSizes, deal.size)
res.NewPieces = append(res.NewPieces, Piece{ res.NewPieces = append(res.NewPieces, Piece{
Piece: ppi, Piece: ppi,
DealInfo: &deal.deal, DealInfo: &deal.deal,
}) })
} }
@ -155,7 +183,11 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid) return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid)
} }
resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1) resCh := make(chan struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
m.pendingPieces[*deal.PublishCid] = &pendingPiece{ m.pendingPieces[*deal.PublishCid] = &pendingPiece{
size: size, size: size,
@ -164,9 +196,9 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
assigned: false, assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct { resCh <- struct {
sn abi.SectorNumber sn abi.SectorNumber
offset abi.UnpaddedPieceSize offset abi.UnpaddedPieceSize
err error err error
}{sn: sn, offset: offset, err: err} }{sn: sn, offset: offset, err: err}
}, },
} }
@ -203,15 +235,15 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners // todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
// (unlikely to be a problem now) // (unlikely to be a problem now)
for id, sector := range m.openSectors { for pieceCid, piece := range m.pendingPieces {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used if piece.assigned {
continue // already assigned to a sector, skip
}
for pieceCid, piece := range m.pendingPieces { toAssign[pieceCid] = struct{}{}
if piece.assigned {
continue // already assigned to a sector, skip
}
toAssign[pieceCid] = struct{}{} for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding) if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
matches = append(matches, match{ matches = append(matches, match{
@ -222,10 +254,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
padding: avail % piece.size, padding: avail % piece.size,
}) })
} }
} }
} }
sort.Slice(matches, func(i, j int) bool { sort.Slice(matches, func(i, j int) bool {
if matches[i].padding != matches[j].padding { // less padding is better if matches[i].padding != matches[j].padding { // less padding is better
return matches[i].padding < matches[j].padding return matches[i].padding < matches[j].padding
@ -266,8 +296,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
} }
if len(toAssign) > 0 { if len(toAssign) > 0 {
m.tryCreateDealSector(ctx, sp) if err := m.tryCreateDealSector(ctx, sp); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
} }
return nil return nil
@ -279,13 +310,12 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err) return xerrors.Errorf("getting storage config: %w", err)
} }
if cfg.MaxSealingSectorsForDeals > 0 { if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { return nil
return nil }
}
if m.stats.curStaging() > cfg.MaxWaitDealsSectors { if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() > cfg.MaxWaitDealsSectors {
return nil return nil
}
} }
// Now actually create a new sector // Now actually create a new sector
@ -306,3 +336,7 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
SectorType: sp, SectorType: sp,
}) })
} }
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"sync" "sync"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
@ -85,6 +86,7 @@ type Sealing struct {
inputLk sync.Mutex inputLk sync.Mutex
openSectors map[abi.SectorID]*openSector openSectors map[abi.SectorID]*openSector
sectorTimers map[abi.SectorID]*time.Timer
pendingPieces map[cid.Cid]*pendingPiece pendingPieces map[cid.Cid]*pendingPiece
upgradeLk sync.Mutex upgradeLk sync.Mutex

View File

@ -3,8 +3,8 @@ package sealing
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"time"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -70,6 +70,7 @@ type SectorInfo struct {
SectorType abi.RegisteredSealProof SectorType abi.RegisteredSealProof
// Packing // Packing
CreationTime time.Time
Pieces []Piece Pieces []Piece
PendingPieces []cid.Cid PendingPieces []cid.Cid

View File

@ -805,7 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
TargetWaitDealsSectors: cfg.TargetWaitDealsSectors, TargetWaitDealsSectors: cfg.TargetWaitDealsSectors,
} }
}) })
return return
@ -820,7 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors, TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors,
} }
}) })
return return