Merge pull request #5375 from filecoin-project/feat/refactor-fsm-input

storagefsm: Rewrite input handling
This commit is contained in:
Łukasz Magiera 2021-02-25 14:28:04 +01:00 committed by GitHub
commit e49a412f6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 632 additions and 389 deletions

View File

@ -284,6 +284,7 @@ var stateList = []stateMeta{
{col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals}, {col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgBlue, state: sealing.AddPiece},
{col: color.FgRed, state: sealing.UndefinedSectorState}, {col: color.FgRed, state: sealing.UndefinedSectorState},
{col: color.FgYellow, state: sealing.Packing}, {col: color.FgYellow, state: sealing.Packing},
@ -306,6 +307,7 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: sealing.Removed}, {col: color.FgCyan, state: sealing.Removed},
{col: color.FgRed, state: sealing.FailedUnrecoverable}, {col: color.FgRed, state: sealing.FailedUnrecoverable},
{col: color.FgRed, state: sealing.AddPieceFailed},
{col: color.FgRed, state: sealing.SealPreCommit1Failed}, {col: color.FgRed, state: sealing.SealPreCommit1Failed},
{col: color.FgRed, state: sealing.SealPreCommit2Failed}, {col: color.FgRed, state: sealing.SealPreCommit2Failed},
{col: color.FgRed, state: sealing.PreCommitFailed}, {col: color.FgRed, state: sealing.PreCommitFailed},

View File

@ -519,7 +519,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull) _, err := w.Write(cbg.CborNull)
return err return err
} }
if _, err := w.Write([]byte{184, 25}); err != nil { if _, err := w.Write([]byte{184, 26}); err != nil {
return err return err
} }
@ -586,6 +586,28 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
} }
} }
// t.CreationTime (int64) (int64)
if len("CreationTime") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"CreationTime\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("CreationTime"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("CreationTime")); err != nil {
return err
}
if t.CreationTime >= 0 {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.CreationTime)); err != nil {
return err
}
} else {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.CreationTime-1)); err != nil {
return err
}
}
// t.Pieces ([]sealing.Piece) (slice) // t.Pieces ([]sealing.Piece) (slice)
if len("Pieces") > cbg.MaxLength { if len("Pieces") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Pieces\" was too long") return xerrors.Errorf("Value in field \"Pieces\" was too long")
@ -1151,6 +1173,32 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
t.SectorType = abi.RegisteredSealProof(extraI) t.SectorType = abi.RegisteredSealProof(extraI)
} }
// t.CreationTime (int64) (int64)
case "CreationTime":
{
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
var extraI int64
if err != nil {
return err
}
switch maj {
case cbg.MajUnsignedInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 positive overflow")
}
case cbg.MajNegativeInt:
extraI = int64(extra)
if extraI < 0 {
return fmt.Errorf("int64 negative oveflow")
}
extraI = -1 - extraI
default:
return fmt.Errorf("wrong type for int64 field: %d", maj)
}
t.CreationTime = int64(extraI)
}
// t.Pieces ([]sealing.Piece) (slice) // t.Pieces ([]sealing.Piece) (slice)
case "Pieces": case "Pieces":

View File

@ -37,14 +37,22 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
// Sealing // Sealing
UndefinedSectorState: planOne( UndefinedSectorState: planOne(
on(SectorStart{}, Empty), on(SectorStart{}, WaitDeals),
on(SectorStartCC{}, Packing), on(SectorStartCC{}, Packing),
), ),
Empty: planOne(on(SectorAddPiece{}, WaitDeals)), Empty: planOne( // deprecated
WaitDeals: planOne( on(SectorAddPiece{}, AddPiece),
on(SectorAddPiece{}, WaitDeals),
on(SectorStartPacking{}, Packing), on(SectorStartPacking{}, Packing),
), ),
WaitDeals: planOne(
on(SectorAddPiece{}, AddPiece),
on(SectorStartPacking{}, Packing),
),
AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}),
on(SectorAddPieceFailed{}, AddPieceFailed),
),
Packing: planOne(on(SectorPacked{}, GetTicket)), Packing: planOne(on(SectorPacked{}, GetTicket)),
GetTicket: planOne( GetTicket: planOne(
on(SectorTicket{}, PreCommit1), on(SectorTicket{}, PreCommit1),
@ -97,6 +105,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
// Sealing errors // Sealing errors
AddPieceFailed: planOne(),
SealPreCommit1Failed: planOne( SealPreCommit1Failed: planOne(
on(SectorRetrySealPreCommit1{}, PreCommit1), on(SectorRetrySealPreCommit1{}, PreCommit1),
), ),
@ -238,12 +247,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
/* /*
* Empty <- incoming deals UndefinedSectorState (start)
| | v |
| v *<- WaitDeals <-> AddPiece |
*<- WaitDeals <- incoming deals | | /--------------------/
| | | v v
| v
*<- Packing <- incoming committed capacity *<- Packing <- incoming committed capacity
| | | |
| v | v
@ -282,10 +290,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
v v
FailedUnrecoverable FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/ */
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State) m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
@ -295,7 +299,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
case Empty: case Empty:
fallthrough fallthrough
case WaitDeals: case WaitDeals:
log.Infof("Waiting for deals %d", state.SectorNumber) return m.handleWaitDeals, processed, nil
case AddPiece:
return m.handleAddPiece, processed, nil
case Packing: case Packing:
return m.handlePacking, processed, nil return m.handlePacking, processed, nil
case GetTicket: case GetTicket:
@ -418,60 +424,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
log.Errorf("loading sector list: %+v", err) log.Errorf("loading sector list: %+v", err)
} }
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting the sealing delay: %w", err)
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof: %w", err)
}
ssize, err := spt.SectorSize()
if err != nil {
return err
}
// m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races
defer m.unsealedInfoMap.lk.Unlock()
for _, sector := range trackedSectors { for _, sector := range trackedSectors {
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil { if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err) log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
} }
if sector.State == WaitDeals {
// put the sector in the unsealedInfoMap
if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok {
// something's funky here, but probably safe to move on
log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber)
} else {
ui := UnsealedSectorInfo{
ssize: ssize,
}
for _, p := range sector.Pieces {
if p.DealInfo != nil {
ui.numDeals++
}
ui.stored += p.Piece.Size
ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded())
}
m.unsealedInfoMap.infos[sector.SectorNumber] = ui
}
// start a fresh timer for the sector
if cfg.WaitDealsDelay > 0 {
timer := time.NewTimer(cfg.WaitDealsDelay)
go func() {
<-timer.C
if err := m.StartPacking(sector.SectorNumber); err != nil {
log.Errorf("starting sector %d: %+v", sector.SectorNumber, err)
}
}()
}
}
} }
// TODO: Grab on-chain sector set and diff with trackedSectors // TODO: Grab on-chain sector set and diff with trackedSectors
@ -494,56 +450,72 @@ func final(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events) return 0, xerrors.Errorf("didn't expect any events in state %s, got %+v", state.State, events)
} }
func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) error) { func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) error) { return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) error { return mut, func(state *SectorInfo) (bool, error) {
state.State = next state.State = next
return nil return false, nil
} }
} }
} }
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) error) { // like `on`, but doesn't change state
return func() (mutator, func(*SectorInfo) error) { func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) error { return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
return true, nil
}
}
}
func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
if state.Return == "" { if state.Return == "" {
return xerrors.Errorf("return state not set") return false, xerrors.Errorf("return state not set")
} }
state.State = SectorState(state.Return) state.State = SectorState(state.Return)
state.Return = "" state.Return = ""
return nil return false, nil
} }
} }
} }
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) error)) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
if gm, ok := events[0].User.(globalMutator); ok { for i, event := range events {
gm.applyGlobal(state) if gm, ok := event.User.(globalMutator); ok {
return 1, nil gm.applyGlobal(state)
} return uint64(i + 1), nil
}
for _, t := range ts { for _, t := range ts {
mut, next := t() mut, next := t()
if reflect.TypeOf(events[0].User) != reflect.TypeOf(mut) { if reflect.TypeOf(event.User) != reflect.TypeOf(mut) {
continue
}
if err, iserr := event.User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, event.User, err)
}
event.User.(mutator).apply(state)
more, err := next(state)
if err != nil || !more {
return uint64(i + 1), err
}
}
_, ok := event.User.(Ignorable)
if ok {
continue continue
} }
if err, iserr := events[0].User.(error); iserr { return uint64(i + 1), xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, event.User, event)
log.Warnf("sector %d got error event %T: %+v", state.SectorNumber, events[0].User, err)
}
events[0].User.(mutator).apply(state)
return 1, next(state)
} }
_, ok := events[0].User.(Ignorable) return uint64(len(events)), nil
if ok {
return 1, nil
}
return 0, xerrors.Errorf("planner for state %s received unexpected event %T (%+v)", state.State, events[0].User, events[0])
} }
} }

View File

@ -1,13 +1,16 @@
package sealing package sealing
import ( import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
) )
type mutator interface { type mutator interface {
@ -76,14 +79,27 @@ func (evt SectorStartCC) apply(state *SectorInfo) {
state.SectorType = evt.SectorType state.SectorType = evt.SectorType
} }
type SectorAddPiece struct { type SectorAddPiece struct{}
NewPiece Piece
}
func (evt SectorAddPiece) apply(state *SectorInfo) { func (evt SectorAddPiece) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPiece) if state.CreationTime == 0 {
state.CreationTime = time.Now().Unix()
}
} }
type SectorPieceAdded struct {
NewPieces []Piece
}
func (evt SectorPieceAdded) apply(state *SectorInfo) {
state.Pieces = append(state.Pieces, evt.NewPieces...)
}
type SectorAddPieceFailed struct{ error }
func (evt SectorAddPieceFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorAddPieceFailed) apply(si *SectorInfo) {}
type SectorStartPacking struct{} type SectorStartPacking struct{}
func (evt SectorStartPacking) apply(*SectorInfo) {} func (evt SectorStartPacking) apply(*SectorInfo) {}

424
extern/storage-sealing/input.go vendored Normal file
View File

@ -0,0 +1,424 @@
package sealing
import (
"context"
"sort"
"time"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
)
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
var used abi.UnpaddedPieceSize
for _, piece := range sector.Pieces {
used += piece.Piece.Size.Unpadded()
}
m.inputLk.Lock()
started, err := m.maybeStartSealing(ctx, sector, used)
if err != nil || started {
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
return err
}
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
used: used,
maybeAccept: func(cid cid.Cid) error {
// todo check deal start deadline (configurable)
sid := m.minerSectorID(sector.SectorNumber)
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
return ctx.Send(SectorAddPiece{})
},
}
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil {
log.Errorf("%+v", err)
}
}()
return nil
}
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{})
}
}
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size")
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return false, xerrors.Errorf("getting per-sector deal limit: %w", err)
}
if len(sector.dealIDs()) >= maxDeals {
// can't accept more deals
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals")
return true, ctx.Send(SectorStartPacking{})
}
if used.Padded() == abi.PaddedPieceSize(ssize) {
// sector full
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled")
return true, ctx.Send(SectorStartPacking{})
}
if sector.CreationTime != 0 {
cfg, err := m.getConfig()
if err != nil {
m.inputLk.Unlock()
return false, xerrors.Errorf("getting storage config: %w", err)
}
// todo check deal age, start sealing if any deal has less than X (configurable) to start deadline
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
if now.After(sealTime) {
m.inputLk.Unlock()
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{})
}
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer")
if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
}
})
}
return false, nil
}
func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error {
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return err
}
res := SectorPieceAdded{}
m.inputLk.Lock()
pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)]
if ok {
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
}
m.inputLk.Unlock()
if !ok {
// nothing to do here (might happen after a restart in AddPiece)
return ctx.Send(res)
}
var offset abi.UnpaddedPieceSize
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
for i, p := range sector.Pieces {
pieceSizes[i] = p.Piece.Size.Unpadded()
offset += p.Piece.Size.Unpadded()
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return xerrors.Errorf("getting per-sector deal limit: %w", err)
}
for i, piece := range pending {
m.inputLk.Lock()
deal, ok := m.pendingPieces[piece]
m.inputLk.Unlock()
if !ok {
return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber)
}
if len(sector.dealIDs())+(i+1) > maxDeals {
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber))
continue
}
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber))
continue
}
offset += padLength.Unpadded()
for _, p := range pads {
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority),
m.minerSector(sector.SectorType, sector.SectorNumber),
pieceSizes,
p.Unpadded(),
NewNullReader(p.Unpadded()))
if err != nil {
err = xerrors.Errorf("writing padding piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
pieceSizes = append(pieceSizes, p.Unpadded())
res.NewPieces = append(res.NewPieces, Piece{
Piece: ppi,
})
}
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority),
m.minerSector(sector.SectorType, sector.SectorNumber),
pieceSizes,
deal.size,
deal.data)
if err != nil {
err = xerrors.Errorf("writing piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
log.Infow("deal added to a sector", "deal", deal.deal.DealID, "sector", sector.SectorNumber, "piece", ppi.PieceCID)
deal.accepted(sector.SectorNumber, offset, nil)
offset += deal.size
pieceSizes = append(pieceSizes, deal.size)
res.NewPieces = append(res.NewPieces, Piece{
Piece: ppi,
DealInfo: &deal.deal,
})
}
return ctx.Send(res)
}
func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error {
log.Errorf("No recovery plan for AddPiece failing")
// todo: cleanup sector / just go retry (requires adding offset param to AddPiece in sector-storage for this to be safe)
return nil
}
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid)
if (padreader.PaddedSize(uint64(size))) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sp, err := m.currentSealProof(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return 0, 0, err
}
if size > abi.PaddedPieceSize(ssize).Unpadded() {
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
}
if _, err := deal.DealProposal.Cid(); err != nil {
return 0, 0, xerrors.Errorf("getting proposal CID: %w", err)
}
m.inputLk.Lock()
if _, exist := m.pendingPieces[proposalCID(deal)]; exist {
m.inputLk.Unlock()
return 0, 0, xerrors.Errorf("piece for deal %s already pending", proposalCID(deal))
}
resCh := make(chan struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}, 1)
m.pendingPieces[proposalCID(deal)] = &pendingPiece{
size: size,
deal: deal,
data: data,
assigned: false,
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
resCh <- struct {
sn abi.SectorNumber
offset abi.UnpaddedPieceSize
err error
}{sn: sn, offset: offset, err: err}
},
}
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
res := <-resCh
return res.sn, res.offset.Padded(), res.err
}
// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
ssize, err := sp.SectorSize()
if err != nil {
return err
}
type match struct {
sector abi.SectorID
deal cid.Cid
size abi.UnpaddedPieceSize
padding abi.UnpaddedPieceSize
}
var matches []match
toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
// (unlikely to be a problem now)
for proposalCid, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
toAssign[proposalCid] = struct{}{}
for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
matches = append(matches, match{
sector: id,
deal: proposalCid,
size: piece.size,
padding: avail % piece.size,
})
}
}
}
sort.Slice(matches, func(i, j int) bool {
if matches[i].padding != matches[j].padding { // less padding is better
return matches[i].padding < matches[j].padding
}
if matches[i].size != matches[j].size { // larger pieces are better
return matches[i].size < matches[j].size
}
return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors
})
var assigned int
for _, mt := range matches {
if m.pendingPieces[mt.deal].assigned {
assigned++
continue
}
if _, found := m.openSectors[mt.sector]; !found {
continue
}
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
if err != nil {
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
}
m.pendingPieces[mt.deal].assigned = true
delete(toAssign, mt.deal)
if err != nil {
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
continue
}
delete(m.openSectors, mt.sector)
}
if len(toAssign) > 0 {
if err := m.tryCreateDealSector(ctx, sp); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}
return nil
}
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}
// Now actually create a new sector
sid, err := m.sc.Next()
if err != nil {
return xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return xerrors.Errorf("initializing sector: %w", err)
}
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
}
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}
func proposalCID(deal DealInfo) cid.Cid {
pc, err := deal.DealProposal.Cid()
if err != nil {
log.Errorf("DealProposal.Cid error: %+v", err)
return cid.Undef
}
return pc
}

View File

@ -3,8 +3,6 @@ package sealing
import ( import (
"context" "context"
"errors" "errors"
"io"
"math"
"sync" "sync"
"time" "time"
@ -15,7 +13,6 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
padreader "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -89,9 +86,13 @@ type Sealing struct {
sectors *statemachine.StateGroup sectors *statemachine.StateGroup
sc SectorIDCounter sc SectorIDCounter
verif ffiwrapper.Verifier verif ffiwrapper.Verifier
pcp PreCommitPolicy
pcp PreCommitPolicy inputLk sync.Mutex
unsealedInfoMap UnsealedSectorMap openSectors map[abi.SectorID]*openSector
sectorTimers map[abi.SectorID]*time.Timer
pendingPieces map[cid.Cid]*pendingPiece
assignedPieces map[abi.SectorID][]cid.Cid
upgradeLk sync.Mutex upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{} toUpgrade map[abi.SectorNumber]struct{}
@ -113,17 +114,20 @@ type FeeConfig struct {
MaxTerminateGasFee abi.TokenAmount MaxTerminateGasFee abi.TokenAmount
} }
type UnsealedSectorMap struct { type openSector struct {
infos map[abi.SectorNumber]UnsealedSectorInfo used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors
lk sync.Mutex
maybeAccept func(cid.Cid) error // called with inputLk
} }
type UnsealedSectorInfo struct { type pendingPiece struct {
numDeals uint64 size abi.UnpaddedPieceSize
// stored should always equal sum of pieceSizes.Padded() deal DealInfo
stored abi.PaddedPieceSize
pieceSizes []abi.UnpaddedPieceSize data storage.Data
ssize abi.SectorSize
assigned bool // assigned to a sector?
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
} }
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing { func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
@ -137,12 +141,12 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
sc: sc, sc: sc,
verif: verif, verif: verif,
pcp: pcp, pcp: pcp,
unsealedInfoMap: UnsealedSectorMap{
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
lk: sync.Mutex{},
},
toUpgrade: map[abi.SectorNumber]struct{}{}, openSectors: map[abi.SectorID]*openSector{},
sectorTimers: map[abi.SectorID]*time.Timer{},
pendingPieces: map[cid.Cid]*pendingPiece{},
assignedPieces: map[abi.SectorID][]cid.Cid{},
toUpgrade: map[abi.SectorNumber]struct{}{},
notifee: notifee, notifee: notifee,
addrSel: as, addrSel: as,
@ -159,8 +163,6 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
s.unsealedInfoMap.lk.Lock() // released after initialized in .Run()
return s return s
} }
@ -184,104 +186,6 @@ func (m *Sealing) Stop(ctx context.Context) error {
return nil return nil
} }
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid)
if (padreader.PaddedSize(uint64(size))) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sp, err := m.currentSealProof(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return 0, 0, err
}
if size > abi.PaddedPieceSize(ssize).Unpadded() {
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
}
m.unsealedInfoMap.lk.Lock()
sid, pads, err := m.getSectorAndPadding(ctx, size)
if err != nil {
m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
}
for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
if err != nil {
m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err)
}
}
offset := m.unsealedInfoMap.infos[sid].stored
err = m.addPiece(ctx, sid, size, r, &d)
if err != nil {
m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
}
startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize)
m.unsealedInfoMap.lk.Unlock()
if startPacking {
if err := m.StartPacking(sid); err != nil {
return 0, 0, xerrors.Errorf("start packing: %w", err)
}
}
return sid, offset, nil
}
// Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
log.Infof("Adding piece to sector %d", sectorID)
sp, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return err
}
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
if err != nil {
return xerrors.Errorf("writing piece: %w", err)
}
piece := Piece{
Piece: ppi,
DealInfo: di,
}
err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece})
if err != nil {
return err
}
ui := m.unsealedInfoMap.infos[sectorID]
num := m.unsealedInfoMap.infos[sectorID].numDeals
if di != nil {
num = num + 1
}
m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{
numDeals: num,
stored: ui.stored + piece.Piece.Size,
pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()),
ssize: ssize,
}
return nil
}
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{}) return m.sectors.Send(uint64(sid), SectorRemove{})
} }
@ -298,168 +202,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
return m.terminator.Pending(ctx) return m.terminator.Pending(ctx)
} }
// Caller should NOT hold m.unsealedInfoMap.lk
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else
m.unsealedInfoMap.lk.Lock()
defer m.unsealedInfoMap.lk.Unlock()
// cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit
if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok {
log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID)
return nil
}
log.Infof("Starting packing sector %d", sectorID)
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
if err != nil {
return err
}
log.Infof("send Starting packing event success sector %d", sectorID)
delete(m.unsealedInfoMap.infos, sectorID)
return nil
}
// Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
for tries := 0; tries < 100; tries++ {
for k, v := range m.unsealedInfoMap.infos {
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
return k, pads, nil
}
}
if len(m.unsealedInfoMap.infos) > 0 {
log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries)
}
ns, ssize, err := m.newDealSector(ctx)
switch err {
case nil:
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
numDeals: 0,
stored: 0,
pieceSizes: nil,
ssize: ssize,
}
case errTooManySealing:
m.unsealedInfoMap.lk.Unlock()
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
m.unsealedInfoMap.lk.Lock()
return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err())
}
m.unsealedInfoMap.lk.Lock()
continue
default:
return 0, nil, xerrors.Errorf("creating new sector: %w", err)
}
return ns, nil, nil
}
return 0, nil, xerrors.Errorf("failed to allocate piece to a sector")
}
var errTooManySealing = errors.New("too many sectors sealing")
// newDealSector creates a new sector for deal storage
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
// First make sure we don't have too many 'open' sectors
cfg, err := m.getConfig()
if err != nil {
return 0, 0, xerrors.Errorf("getting config: %w", err)
}
if cfg.MaxSealingSectorsForDeals > 0 {
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
return 0, 0, ErrTooManySectorsSealing
}
}
if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
// Too many sectors are sealing in parallel. Start sealing one, and retry
// allocating the piece to a sector (we're dropping the lock here, so in
// case other goroutines are also trying to create a sector, we retry in
// getSectorAndPadding instead of here - otherwise if we have lots of
// parallel deals in progress, we can start creating a ton of sectors
// with just a single deal in them)
var mostStored abi.PaddedPieceSize = math.MaxUint64
var best abi.SectorNumber = math.MaxUint64
for sn, info := range m.unsealedInfoMap.infos {
if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
best = sn
}
}
if best != math.MaxUint64 {
m.unsealedInfoMap.lk.Unlock()
err := m.StartPacking(best)
m.unsealedInfoMap.lk.Lock()
if err != nil {
log.Errorf("newDealSector StartPacking error: %+v", err)
// let's pretend this is fine
}
}
return 0, 0, errTooManySealing // will wait a bit and retry
}
spt, err := m.currentSealProof(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
}
// Now actually create a new sector
sid, err := m.sc.Next()
if err != nil {
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid))
if err != nil {
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
}
log.Infof("Creating sector %d", sid)
err = m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: spt,
})
if err != nil {
return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err)
}
cf, err := m.getConfig()
if err != nil {
return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err)
}
if cf.WaitDealsDelay > 0 {
timer := time.NewTimer(cf.WaitDealsDelay)
go func() {
<-timer.C
if err := m.StartPacking(sid); err != nil {
log.Errorf("starting sector %d: %+v", sid, err)
}
}()
}
ssize, err := spt.SectorSize()
return sid, ssize, err
}
// newSectorCC accepts a slice of pieces with no deal (junk data) // newSectorCC accepts a slice of pieces with no deal (junk data)
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error { func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
spt, err := m.currentSealProof(ctx) spt, err := m.currentSealProof(ctx)
@ -512,9 +254,9 @@ func (m *Sealing) Address() address.Address {
return m.maddr return m.maddr
} }
func getDealPerSectorLimit(size abi.SectorSize) uint64 { func getDealPerSectorLimit(size abi.SectorSize) (int, error) {
if size < 64<<30 { if size < 64<<30 {
return 256 return 256, nil
} }
return 512 return 512, nil
} }

View File

@ -6,6 +6,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
Empty: {}, Empty: {},
WaitDeals: {}, WaitDeals: {},
Packing: {}, Packing: {},
AddPiece: {},
AddPieceFailed: {},
GetTicket: {}, GetTicket: {},
PreCommit1: {}, PreCommit1: {},
PreCommit2: {}, PreCommit2: {},
@ -43,8 +45,9 @@ const (
UndefinedSectorState SectorState = "" UndefinedSectorState SectorState = ""
// happy path // happy path
Empty SectorState = "Empty" Empty SectorState = "Empty" // deprecated
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
AddPiece SectorState = "AddPiece" // put deal data (and padding if required) into the sector
Packing SectorState = "Packing" // sector not in sealStore, and not on chain Packing SectorState = "Packing" // sector not in sealStore, and not on chain
GetTicket SectorState = "GetTicket" // generate ticket GetTicket SectorState = "GetTicket" // generate ticket
PreCommit1 SectorState = "PreCommit1" // do PreCommit1 PreCommit1 SectorState = "PreCommit1" // do PreCommit1
@ -59,6 +62,7 @@ const (
Proving SectorState = "Proving" Proving SectorState = "Proving"
// error modes // error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable" FailedUnrecoverable SectorState = "FailedUnrecoverable"
AddPieceFailed SectorState = "AddPieceFailed"
SealPreCommit1Failed SectorState = "SealPreCommit1Failed" SealPreCommit1Failed SectorState = "SealPreCommit1Failed"
SealPreCommit2Failed SectorState = "SealPreCommit2Failed" SealPreCommit2Failed SectorState = "SealPreCommit2Failed"
PreCommitFailed SectorState = "PreCommitFailed" PreCommitFailed SectorState = "PreCommitFailed"
@ -85,7 +89,9 @@ const (
func toStatState(st SectorState) statSectorState { func toStatState(st SectorState) statSectorState {
switch st { switch st {
case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector:
return sstSealing return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving return sstProving

View File

@ -25,6 +25,24 @@ var DealSectorPriority = 1024
var MaxTicketAge = abi.ChainEpoch(builtin0.EpochsInDay * 2) var MaxTicketAge = abi.ChainEpoch(builtin0.EpochsInDay * 2)
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
// make sure we not accepting deals into this sector
for _, c := range m.assignedPieces[m.minerSectorID(sector.SectorNumber)] {
pp := m.pendingPieces[c]
delete(m.pendingPieces, c)
if pp == nil {
log.Errorf("nil assigned pending piece %s", c)
continue
}
// todo: return to the sealing queue (this is extremely unlikely to happen)
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early"))
}
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
var allocated abi.UnpaddedPieceSize var allocated abi.UnpaddedPieceSize

View File

@ -9,7 +9,8 @@ import (
type statSectorState int type statSectorState int
const ( const (
sstSealing statSectorState = iota sstStaging statSectorState = iota
sstSealing
sstFailed sstFailed
sstProving sstProving
nsst nsst
@ -41,5 +42,13 @@ func (ss *SectorStats) curSealing() uint64 {
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
return ss.totals[sstSealing] + ss.totals[sstFailed] return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
}
// return the number of sectors waiting to enter the sealing pipeline
func (ss *SectorStats) curStaging() uint64 {
ss.lk.Lock()
defer ss.lk.Unlock()
return ss.totals[sstStaging]
} }

View File

@ -72,7 +72,8 @@ type SectorInfo struct {
SectorType abi.RegisteredSealProof SectorType abi.RegisteredSealProof
// Packing // Packing
Pieces []Piece CreationTime int64 // unix seconds
Pieces []Piece
// PreCommit1 // PreCommit1
TicketValue abi.SealRandomness TicketValue abi.SealRandomness

View File

@ -72,6 +72,11 @@ type SealingConfig struct {
WaitDealsDelay Duration WaitDealsDelay Duration
AlwaysKeepUnsealedCopy bool AlwaysKeepUnsealedCopy bool
// Keep this many sectors in sealing pipeline, start CC if needed
// todo TargetSealingSectors uint64
// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
} }
type MinerFeeConfig struct { type MinerFeeConfig struct {