storagefsm: Rewrite input handling
This commit is contained in:
parent
e77df95157
commit
239d6f8f4d
@ -284,6 +284,7 @@ var stateList = []stateMeta{
|
|||||||
|
|
||||||
{col: color.FgBlue, state: sealing.Empty},
|
{col: color.FgBlue, state: sealing.Empty},
|
||||||
{col: color.FgBlue, state: sealing.WaitDeals},
|
{col: color.FgBlue, state: sealing.WaitDeals},
|
||||||
|
{col: color.FgBlue, state: sealing.AddPiece},
|
||||||
|
|
||||||
{col: color.FgRed, state: sealing.UndefinedSectorState},
|
{col: color.FgRed, state: sealing.UndefinedSectorState},
|
||||||
{col: color.FgYellow, state: sealing.Packing},
|
{col: color.FgYellow, state: sealing.Packing},
|
||||||
|
83
extern/storage-sealing/fsm.go
vendored
83
extern/storage-sealing/fsm.go
vendored
@ -37,14 +37,20 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
// Sealing
|
// Sealing
|
||||||
|
|
||||||
UndefinedSectorState: planOne(
|
UndefinedSectorState: planOne(
|
||||||
on(SectorStart{}, Empty),
|
on(SectorStart{}, WaitDeals),
|
||||||
on(SectorStartCC{}, Packing),
|
on(SectorStartCC{}, Packing),
|
||||||
),
|
),
|
||||||
Empty: planOne(on(SectorAddPiece{}, WaitDeals)),
|
Empty: planOne( // deprecated
|
||||||
WaitDeals: planOne(
|
on(SectorAddPiece{}, AddPiece),
|
||||||
on(SectorAddPiece{}, WaitDeals),
|
|
||||||
on(SectorStartPacking{}, Packing),
|
on(SectorStartPacking{}, Packing),
|
||||||
),
|
),
|
||||||
|
WaitDeals: planOne(
|
||||||
|
on(SectorAddPiece{}, AddPiece),
|
||||||
|
on(SectorStartPacking{}, Packing),
|
||||||
|
),
|
||||||
|
AddPiece: planOne(
|
||||||
|
on(SectorPieceAdded{}, WaitDeals),
|
||||||
|
),
|
||||||
Packing: planOne(on(SectorPacked{}, GetTicket)),
|
Packing: planOne(on(SectorPacked{}, GetTicket)),
|
||||||
GetTicket: planOne(
|
GetTicket: planOne(
|
||||||
on(SectorTicket{}, PreCommit1),
|
on(SectorTicket{}, PreCommit1),
|
||||||
@ -238,12 +244,11 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
||||||
* Empty <- incoming deals
|
UndefinedSectorState (start)
|
||||||
| |
|
v |
|
||||||
| v
|
*<- WaitDeals <-> AddPiece |
|
||||||
*<- WaitDeals <- incoming deals
|
| | /--------------------/
|
||||||
| |
|
| v v
|
||||||
| v
|
|
||||||
*<- Packing <- incoming committed capacity
|
*<- Packing <- incoming committed capacity
|
||||||
| |
|
| |
|
||||||
| v
|
| v
|
||||||
@ -282,10 +287,6 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
v
|
v
|
||||||
FailedUnrecoverable
|
FailedUnrecoverable
|
||||||
|
|
||||||
UndefinedSectorState <- ¯\_(ツ)_/¯
|
|
||||||
| ^
|
|
||||||
*---------------------/
|
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
|
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
|
||||||
@ -295,7 +296,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
case Empty:
|
case Empty:
|
||||||
fallthrough
|
fallthrough
|
||||||
case WaitDeals:
|
case WaitDeals:
|
||||||
log.Infof("Waiting for deals %d", state.SectorNumber)
|
return m.handleWaitDeals, processed, nil
|
||||||
|
case AddPiece:
|
||||||
|
return m.handleAddPiece, processed, nil
|
||||||
case Packing:
|
case Packing:
|
||||||
return m.handlePacking, processed, nil
|
return m.handlePacking, processed, nil
|
||||||
case GetTicket:
|
case GetTicket:
|
||||||
@ -418,60 +421,10 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
|||||||
log.Errorf("loading sector list: %+v", err)
|
log.Errorf("loading sector list: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg, err := m.getConfig()
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting the sealing delay: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
spt, err := m.currentSealProof(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting current seal proof: %w", err)
|
|
||||||
}
|
|
||||||
ssize, err := spt.SectorSize()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// m.unsealedInfoMap.lk.Lock() taken early in .New to prevent races
|
|
||||||
defer m.unsealedInfoMap.lk.Unlock()
|
|
||||||
|
|
||||||
for _, sector := range trackedSectors {
|
for _, sector := range trackedSectors {
|
||||||
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
|
if err := m.sectors.Send(uint64(sector.SectorNumber), SectorRestart{}); err != nil {
|
||||||
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
|
log.Errorf("restarting sector %d: %+v", sector.SectorNumber, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if sector.State == WaitDeals {
|
|
||||||
|
|
||||||
// put the sector in the unsealedInfoMap
|
|
||||||
if _, ok := m.unsealedInfoMap.infos[sector.SectorNumber]; ok {
|
|
||||||
// something's funky here, but probably safe to move on
|
|
||||||
log.Warnf("sector %v was already in the unsealedInfoMap when restarting", sector.SectorNumber)
|
|
||||||
} else {
|
|
||||||
ui := UnsealedSectorInfo{
|
|
||||||
ssize: ssize,
|
|
||||||
}
|
|
||||||
for _, p := range sector.Pieces {
|
|
||||||
if p.DealInfo != nil {
|
|
||||||
ui.numDeals++
|
|
||||||
}
|
|
||||||
ui.stored += p.Piece.Size
|
|
||||||
ui.pieceSizes = append(ui.pieceSizes, p.Piece.Size.Unpadded())
|
|
||||||
}
|
|
||||||
|
|
||||||
m.unsealedInfoMap.infos[sector.SectorNumber] = ui
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a fresh timer for the sector
|
|
||||||
if cfg.WaitDealsDelay > 0 {
|
|
||||||
timer := time.NewTimer(cfg.WaitDealsDelay)
|
|
||||||
go func() {
|
|
||||||
<-timer.C
|
|
||||||
if err := m.StartPacking(sector.SectorNumber); err != nil {
|
|
||||||
log.Errorf("starting sector %d: %+v", sector.SectorNumber, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Grab on-chain sector set and diff with trackedSectors
|
// TODO: Grab on-chain sector set and diff with trackedSectors
|
||||||
|
13
extern/storage-sealing/fsm_events.go
vendored
13
extern/storage-sealing/fsm_events.go
vendored
@ -77,11 +77,20 @@ func (evt SectorStartCC) apply(state *SectorInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SectorAddPiece struct {
|
type SectorAddPiece struct {
|
||||||
NewPiece Piece
|
NewPiece cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (evt SectorAddPiece) apply(state *SectorInfo) {
|
func (evt SectorAddPiece) apply(state *SectorInfo) {
|
||||||
state.Pieces = append(state.Pieces, evt.NewPiece)
|
state.PendingPieces = append(state.PendingPieces, evt.NewPiece)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SectorPieceAdded struct {
|
||||||
|
NewPieces []Piece
|
||||||
|
}
|
||||||
|
|
||||||
|
func (evt SectorPieceAdded) apply(state *SectorInfo) {
|
||||||
|
state.Pieces = append(state.Pieces, evt.NewPieces...)
|
||||||
|
state.PendingPieces = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorStartPacking struct{}
|
type SectorStartPacking struct{}
|
||||||
|
308
extern/storage-sealing/input.go
vendored
Normal file
308
extern/storage-sealing/input.go
vendored
Normal file
@ -0,0 +1,308 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-padreader"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-statemachine"
|
||||||
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// if full / oldish / has oldish deals goto seal
|
||||||
|
// ^ also per sector deal limit
|
||||||
|
|
||||||
|
// send SectorStartPacking
|
||||||
|
|
||||||
|
m.inputLk.Lock()
|
||||||
|
var used abi.UnpaddedPieceSize
|
||||||
|
for _, piece := range sector.Pieces {
|
||||||
|
used += piece.Piece.Size.Unpadded()
|
||||||
|
}
|
||||||
|
|
||||||
|
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
|
||||||
|
used: used,
|
||||||
|
maybeAccept: func(cid cid.Cid) error {
|
||||||
|
// todo check space
|
||||||
|
|
||||||
|
// todo check deal expiration
|
||||||
|
|
||||||
|
return ctx.Send(SectorAddPiece{cid})
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer m.inputLk.Unlock()
|
||||||
|
if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
ssize, err := sector.SectorType.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.inputLk.Lock()
|
||||||
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) // todo: do this when handling the event
|
||||||
|
m.inputLk.Unlock()
|
||||||
|
|
||||||
|
res := SectorPieceAdded{}
|
||||||
|
|
||||||
|
for _, piece := range sector.PendingPieces {
|
||||||
|
m.inputLk.Lock()
|
||||||
|
deal, ok := m.pendingPieces[piece]
|
||||||
|
m.inputLk.Unlock()
|
||||||
|
if !ok {
|
||||||
|
// todo: this probably means that the miner process was restarted in the middle of adding pieces.
|
||||||
|
// Truncate whatever was in process of being added to the sector (keep sector.Pieces as those are cleanly added, then go to WaitDeals)
|
||||||
|
return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
var stored abi.PaddedPieceSize
|
||||||
|
for _, piece := range sector.Pieces {
|
||||||
|
stored += piece.Piece.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
pads, padLength := ffiwrapper.GetRequiredPadding(stored, deal.size.Padded())
|
||||||
|
|
||||||
|
if stored+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
|
||||||
|
return xerrors.Errorf("piece assigned to a sector with not enough space")
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := padLength
|
||||||
|
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
|
||||||
|
for i, p := range sector.Pieces {
|
||||||
|
pieceSizes[i] = p.Piece.Size.Unpadded()
|
||||||
|
offset += p.Piece.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range pads {
|
||||||
|
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority),
|
||||||
|
m.minerSector(sector.SectorType, sector.SectorNumber),
|
||||||
|
pieceSizes,
|
||||||
|
p.Unpadded(),
|
||||||
|
NewNullReader(p.Unpadded()))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("writing padding piece: %w", err) // todo failed state
|
||||||
|
}
|
||||||
|
|
||||||
|
pieceSizes = append(pieceSizes, p.Unpadded())
|
||||||
|
res.NewPieces = append(res.NewPieces, Piece{
|
||||||
|
Piece: ppi,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx.Context(), DealSectorPriority),
|
||||||
|
m.minerSector(sector.SectorType, sector.SectorNumber),
|
||||||
|
pieceSizes,
|
||||||
|
deal.size,
|
||||||
|
deal.data)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("writing padding piece: %w", err) // todo failed state
|
||||||
|
}
|
||||||
|
|
||||||
|
pieceSizes = append(pieceSizes, deal.size)
|
||||||
|
res.NewPieces = append(res.NewPieces, Piece{
|
||||||
|
Piece: ppi,
|
||||||
|
DealInfo: &deal.deal,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
|
||||||
|
log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid)
|
||||||
|
if (padreader.PaddedSize(uint64(size))) != size {
|
||||||
|
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
||||||
|
}
|
||||||
|
|
||||||
|
sp, err := m.currentSealProof(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize, err := sp.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if size > abi.PaddedPieceSize(ssize).Unpadded() {
|
||||||
|
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
||||||
|
}
|
||||||
|
|
||||||
|
if deal.PublishCid == nil {
|
||||||
|
return 0, 0, xerrors.Errorf("piece must have a PublishCID")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.inputLk.Lock()
|
||||||
|
if _, exist := m.pendingPieces[*deal.PublishCid]; exist {
|
||||||
|
m.inputLk.Unlock()
|
||||||
|
return 0, 0, xerrors.Errorf("piece for deal %s already pending", *deal.PublishCid)
|
||||||
|
}
|
||||||
|
|
||||||
|
resCh := make(chan struct{sn abi.SectorNumber; offset abi.UnpaddedPieceSize; err error}, 1)
|
||||||
|
|
||||||
|
m.pendingPieces[*deal.PublishCid] = &pendingPiece{
|
||||||
|
size: size,
|
||||||
|
deal: deal,
|
||||||
|
data: data,
|
||||||
|
assigned: false,
|
||||||
|
accepted: func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
|
||||||
|
resCh <- struct {
|
||||||
|
sn abi.SectorNumber
|
||||||
|
offset abi.UnpaddedPieceSize
|
||||||
|
err error
|
||||||
|
}{sn: sn, offset: offset, err: err}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer m.inputLk.Unlock()
|
||||||
|
if err := m.updateInput(ctx, sp); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
res := <-resCh
|
||||||
|
|
||||||
|
return res.sn, res.offset.Padded(), res.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// called with m.inputLk
|
||||||
|
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
|
||||||
|
ssize, err := sp.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type match struct {
|
||||||
|
sector abi.SectorID
|
||||||
|
deal cid.Cid
|
||||||
|
|
||||||
|
size abi.UnpaddedPieceSize
|
||||||
|
padding abi.UnpaddedPieceSize
|
||||||
|
}
|
||||||
|
|
||||||
|
var matches []match
|
||||||
|
toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors
|
||||||
|
|
||||||
|
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
|
||||||
|
// (unlikely to be a problem now)
|
||||||
|
for id, sector := range m.openSectors {
|
||||||
|
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
|
||||||
|
|
||||||
|
for pieceCid, piece := range m.pendingPieces {
|
||||||
|
if piece.assigned {
|
||||||
|
continue // already assigned to a sector, skip
|
||||||
|
}
|
||||||
|
|
||||||
|
toAssign[pieceCid] = struct{}{}
|
||||||
|
|
||||||
|
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
|
||||||
|
matches = append(matches, match{
|
||||||
|
sector: id,
|
||||||
|
deal: pieceCid,
|
||||||
|
|
||||||
|
size: piece.size,
|
||||||
|
padding: avail % piece.size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(matches, func(i, j int) bool {
|
||||||
|
if matches[i].padding != matches[j].padding { // less padding is better
|
||||||
|
return matches[i].padding < matches[j].padding
|
||||||
|
}
|
||||||
|
|
||||||
|
if matches[i].size != matches[j].size { // larger pieces are better
|
||||||
|
return matches[i].size < matches[j].size
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors
|
||||||
|
})
|
||||||
|
|
||||||
|
var assigned int
|
||||||
|
for _, mt := range matches {
|
||||||
|
if m.pendingPieces[mt.deal].assigned {
|
||||||
|
assigned++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, found := m.openSectors[mt.sector]; !found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
|
||||||
|
if err != nil {
|
||||||
|
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
|
||||||
|
}
|
||||||
|
|
||||||
|
m.pendingPieces[mt.deal].assigned = true
|
||||||
|
delete(toAssign, mt.deal)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(m.openSectors, mt.sector)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(toAssign) > 0 {
|
||||||
|
m.tryCreateDealSector(ctx, sp)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
||||||
|
cfg, err := m.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting storage config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectorsForDeals > 0 {
|
||||||
|
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if m.stats.curStaging() > cfg.MaxWaitDealsSectors {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now actually create a new sector
|
||||||
|
|
||||||
|
sid, err := m.sc.Next()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting sector number: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("initializing sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
|
||||||
|
return m.sectors.Send(uint64(sid), SectorStart{
|
||||||
|
ID: sid,
|
||||||
|
SectorType: sp,
|
||||||
|
})
|
||||||
|
}
|
2
extern/storage-sealing/sealiface/config.go
vendored
2
extern/storage-sealing/sealiface/config.go
vendored
@ -15,4 +15,6 @@ type Config struct {
|
|||||||
MaxSealingSectorsForDeals uint64
|
MaxSealingSectorsForDeals uint64
|
||||||
|
|
||||||
WaitDealsDelay time.Duration
|
WaitDealsDelay time.Duration
|
||||||
|
|
||||||
|
TargetWaitDealsSectors uint64
|
||||||
}
|
}
|
||||||
|
301
extern/storage-sealing/sealing.go
vendored
301
extern/storage-sealing/sealing.go
vendored
@ -3,10 +3,7 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -15,7 +12,6 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
padreader "github.com/filecoin-project/go-padreader"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
@ -85,9 +81,11 @@ type Sealing struct {
|
|||||||
sectors *statemachine.StateGroup
|
sectors *statemachine.StateGroup
|
||||||
sc SectorIDCounter
|
sc SectorIDCounter
|
||||||
verif ffiwrapper.Verifier
|
verif ffiwrapper.Verifier
|
||||||
|
pcp PreCommitPolicy
|
||||||
|
|
||||||
pcp PreCommitPolicy
|
inputLk sync.Mutex
|
||||||
unsealedInfoMap UnsealedSectorMap
|
openSectors map[abi.SectorID]*openSector
|
||||||
|
pendingPieces map[cid.Cid]*pendingPiece
|
||||||
|
|
||||||
upgradeLk sync.Mutex
|
upgradeLk sync.Mutex
|
||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
@ -108,17 +106,20 @@ type FeeConfig struct {
|
|||||||
MaxTerminateGasFee abi.TokenAmount
|
MaxTerminateGasFee abi.TokenAmount
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnsealedSectorMap struct {
|
type openSector struct {
|
||||||
infos map[abi.SectorNumber]UnsealedSectorInfo
|
used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors
|
||||||
lk sync.Mutex
|
|
||||||
|
maybeAccept func(cid.Cid) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnsealedSectorInfo struct {
|
type pendingPiece struct {
|
||||||
numDeals uint64
|
size abi.UnpaddedPieceSize
|
||||||
// stored should always equal sum of pieceSizes.Padded()
|
deal DealInfo
|
||||||
stored abi.PaddedPieceSize
|
|
||||||
pieceSizes []abi.UnpaddedPieceSize
|
data storage.Data
|
||||||
ssize abi.SectorSize
|
|
||||||
|
assigned bool // assigned to a sector?
|
||||||
|
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
|
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
|
||||||
@ -132,12 +133,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
sc: sc,
|
sc: sc,
|
||||||
verif: verif,
|
verif: verif,
|
||||||
pcp: pcp,
|
pcp: pcp,
|
||||||
unsealedInfoMap: UnsealedSectorMap{
|
|
||||||
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
|
||||||
lk: sync.Mutex{},
|
|
||||||
},
|
|
||||||
|
|
||||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
openSectors: map[abi.SectorID]*openSector{},
|
||||||
|
pendingPieces: map[cid.Cid]*pendingPiece{},
|
||||||
|
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||||
|
|
||||||
notifee: notifee,
|
notifee: notifee,
|
||||||
addrSel: as,
|
addrSel: as,
|
||||||
@ -153,8 +152,6 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
|
|
||||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
|
||||||
s.unsealedInfoMap.lk.Lock() // released after initialized in .Run()
|
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,104 +175,6 @@ func (m *Sealing) Stop(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
|
|
||||||
log.Infof("Adding piece for deal %d (publish msg: %s)", d.DealID, d.PublishCid)
|
|
||||||
if (padreader.PaddedSize(uint64(size))) != size {
|
|
||||||
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
|
|
||||||
}
|
|
||||||
|
|
||||||
sp, err := m.currentSealProof(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ssize, err := sp.SectorSize()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if size > abi.PaddedPieceSize(ssize).Unpadded() {
|
|
||||||
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
|
||||||
}
|
|
||||||
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
|
|
||||||
sid, pads, err := m.getSectorAndPadding(ctx, size)
|
|
||||||
if err != nil {
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range pads {
|
|
||||||
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
|
|
||||||
if err != nil {
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
return 0, 0, xerrors.Errorf("writing pads: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
offset := m.unsealedInfoMap.infos[sid].stored
|
|
||||||
err = m.addPiece(ctx, sid, size, r, &d)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(ssize)
|
|
||||||
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
|
|
||||||
if startPacking {
|
|
||||||
if err := m.StartPacking(sid); err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("start packing: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return sid, offset, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Caller should hold m.unsealedInfoMap.lk
|
|
||||||
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
|
|
||||||
log.Infof("Adding piece to sector %d", sectorID)
|
|
||||||
sp, err := m.currentSealProof(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting current seal proof type: %w", err)
|
|
||||||
}
|
|
||||||
ssize, err := sp.SectorSize()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sp, sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("writing piece: %w", err)
|
|
||||||
}
|
|
||||||
piece := Piece{
|
|
||||||
Piece: ppi,
|
|
||||||
DealInfo: di,
|
|
||||||
}
|
|
||||||
|
|
||||||
err = m.sectors.Send(uint64(sectorID), SectorAddPiece{NewPiece: piece})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ui := m.unsealedInfoMap.infos[sectorID]
|
|
||||||
num := m.unsealedInfoMap.infos[sectorID].numDeals
|
|
||||||
if di != nil {
|
|
||||||
num = num + 1
|
|
||||||
}
|
|
||||||
m.unsealedInfoMap.infos[sectorID] = UnsealedSectorInfo{
|
|
||||||
numDeals: num,
|
|
||||||
stored: ui.stored + piece.Piece.Size,
|
|
||||||
pieceSizes: append(ui.pieceSizes, piece.Piece.Size.Unpadded()),
|
|
||||||
ssize: ssize,
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||||
}
|
}
|
||||||
@ -292,168 +191,6 @@ func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
|||||||
return m.terminator.Pending(ctx)
|
return m.terminator.Pending(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caller should NOT hold m.unsealedInfoMap.lk
|
|
||||||
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
|
||||||
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
defer m.unsealedInfoMap.lk.Unlock()
|
|
||||||
|
|
||||||
// cannot send SectorStartPacking to sectors that have already been packed, otherwise it will cause the state machine to exit
|
|
||||||
if _, ok := m.unsealedInfoMap.infos[sectorID]; !ok {
|
|
||||||
log.Warnf("call start packing, but sector %v not in unsealedInfoMap.infos, maybe have called", sectorID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
log.Infof("Starting packing sector %d", sectorID)
|
|
||||||
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Infof("send Starting packing event success sector %d", sectorID)
|
|
||||||
|
|
||||||
delete(m.unsealedInfoMap.infos, sectorID)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Caller should hold m.unsealedInfoMap.lk
|
|
||||||
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
|
||||||
for tries := 0; tries < 100; tries++ {
|
|
||||||
for k, v := range m.unsealedInfoMap.infos {
|
|
||||||
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
|
|
||||||
|
|
||||||
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
|
|
||||||
return k, pads, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(m.unsealedInfoMap.infos) > 0 {
|
|
||||||
log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries)
|
|
||||||
}
|
|
||||||
|
|
||||||
ns, ssize, err := m.newDealSector(ctx)
|
|
||||||
switch err {
|
|
||||||
case nil:
|
|
||||||
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
|
|
||||||
numDeals: 0,
|
|
||||||
stored: 0,
|
|
||||||
pieceSizes: nil,
|
|
||||||
ssize: ssize,
|
|
||||||
}
|
|
||||||
case errTooManySealing:
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
case <-ctx.Done():
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
return 0, nil, xerrors.Errorf("creating new sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ns, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, nil, xerrors.Errorf("failed to allocate piece to a sector")
|
|
||||||
}
|
|
||||||
|
|
||||||
var errTooManySealing = errors.New("too many sectors sealing")
|
|
||||||
|
|
||||||
// newDealSector creates a new sector for deal storage
|
|
||||||
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
|
|
||||||
// First make sure we don't have too many 'open' sectors
|
|
||||||
|
|
||||||
cfg, err := m.getConfig()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("getting config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.MaxSealingSectorsForDeals > 0 {
|
|
||||||
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
|
|
||||||
return 0, 0, ErrTooManySectorsSealing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
|
|
||||||
// Too many sectors are sealing in parallel. Start sealing one, and retry
|
|
||||||
// allocating the piece to a sector (we're dropping the lock here, so in
|
|
||||||
// case other goroutines are also trying to create a sector, we retry in
|
|
||||||
// getSectorAndPadding instead of here - otherwise if we have lots of
|
|
||||||
// parallel deals in progress, we can start creating a ton of sectors
|
|
||||||
// with just a single deal in them)
|
|
||||||
var mostStored abi.PaddedPieceSize = math.MaxUint64
|
|
||||||
var best abi.SectorNumber = math.MaxUint64
|
|
||||||
|
|
||||||
for sn, info := range m.unsealedInfoMap.infos {
|
|
||||||
if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
|
|
||||||
best = sn
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if best != math.MaxUint64 {
|
|
||||||
m.unsealedInfoMap.lk.Unlock()
|
|
||||||
err := m.StartPacking(best)
|
|
||||||
m.unsealedInfoMap.lk.Lock()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("newDealSector StartPacking error: %+v", err)
|
|
||||||
// let's pretend this is fine
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, 0, errTooManySealing // will wait a bit and retry
|
|
||||||
}
|
|
||||||
|
|
||||||
spt, err := m.currentSealProof(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("getting current seal proof type: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now actually create a new sector
|
|
||||||
|
|
||||||
sid, err := m.sc.Next()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = m.sealer.NewSector(context.TODO(), m.minerSector(spt, sid))
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Creating sector %d", sid)
|
|
||||||
err = m.sectors.Send(uint64(sid), SectorStart{
|
|
||||||
ID: sid,
|
|
||||||
SectorType: spt,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("starting the sector fsm: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cf, err := m.getConfig()
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, xerrors.Errorf("getting the sealing delay: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if cf.WaitDealsDelay > 0 {
|
|
||||||
timer := time.NewTimer(cf.WaitDealsDelay)
|
|
||||||
go func() {
|
|
||||||
<-timer.C
|
|
||||||
if err := m.StartPacking(sid); err != nil {
|
|
||||||
log.Errorf("starting sector %d: %+v", sid, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
ssize, err := spt.SectorSize()
|
|
||||||
return sid, ssize, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// newSectorCC accepts a slice of pieces with no deal (junk data)
|
// newSectorCC accepts a slice of pieces with no deal (junk data)
|
||||||
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
|
func (m *Sealing) newSectorCC(ctx context.Context, sid abi.SectorNumber, pieces []Piece) error {
|
||||||
spt, err := m.currentSealProof(ctx)
|
spt, err := m.currentSealProof(ctx)
|
||||||
|
8
extern/storage-sealing/sector_state.go
vendored
8
extern/storage-sealing/sector_state.go
vendored
@ -6,6 +6,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
|
|||||||
Empty: {},
|
Empty: {},
|
||||||
WaitDeals: {},
|
WaitDeals: {},
|
||||||
Packing: {},
|
Packing: {},
|
||||||
|
AddPiece: {},
|
||||||
GetTicket: {},
|
GetTicket: {},
|
||||||
PreCommit1: {},
|
PreCommit1: {},
|
||||||
PreCommit2: {},
|
PreCommit2: {},
|
||||||
@ -43,8 +44,9 @@ const (
|
|||||||
UndefinedSectorState SectorState = ""
|
UndefinedSectorState SectorState = ""
|
||||||
|
|
||||||
// happy path
|
// happy path
|
||||||
Empty SectorState = "Empty"
|
Empty SectorState = "Empty" // deprecated
|
||||||
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
|
WaitDeals SectorState = "WaitDeals" // waiting for more pieces (deals) to be added to the sector
|
||||||
|
AddPiece SectorState = "AddPiece" // put deal data (and padding if required) into the sector
|
||||||
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
|
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
|
||||||
GetTicket SectorState = "GetTicket" // generate ticket
|
GetTicket SectorState = "GetTicket" // generate ticket
|
||||||
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
|
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
|
||||||
@ -85,7 +87,9 @@ const (
|
|||||||
|
|
||||||
func toStatState(st SectorState) statSectorState {
|
func toStatState(st SectorState) statSectorState {
|
||||||
switch st {
|
switch st {
|
||||||
case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector:
|
case Empty, WaitDeals, AddPiece:
|
||||||
|
return sstStaging
|
||||||
|
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector:
|
||||||
return sstSealing
|
return sstSealing
|
||||||
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
|
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
|
||||||
return sstProving
|
return sstProving
|
||||||
|
13
extern/storage-sealing/stats.go
vendored
13
extern/storage-sealing/stats.go
vendored
@ -9,7 +9,8 @@ import (
|
|||||||
type statSectorState int
|
type statSectorState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
sstSealing statSectorState = iota
|
sstStaging statSectorState = iota
|
||||||
|
sstSealing
|
||||||
sstFailed
|
sstFailed
|
||||||
sstProving
|
sstProving
|
||||||
nsst
|
nsst
|
||||||
@ -41,5 +42,13 @@ func (ss *SectorStats) curSealing() uint64 {
|
|||||||
ss.lk.Lock()
|
ss.lk.Lock()
|
||||||
defer ss.lk.Unlock()
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
return ss.totals[sstSealing] + ss.totals[sstFailed]
|
return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the number of sectors waiting to enter the sealing pipeline
|
||||||
|
func (ss *SectorStats) curStaging() uint64 {
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
return ss.totals[sstStaging]
|
||||||
}
|
}
|
||||||
|
3
extern/storage-sealing/types.go
vendored
3
extern/storage-sealing/types.go
vendored
@ -70,7 +70,8 @@ type SectorInfo struct {
|
|||||||
SectorType abi.RegisteredSealProof
|
SectorType abi.RegisteredSealProof
|
||||||
|
|
||||||
// Packing
|
// Packing
|
||||||
Pieces []Piece
|
Pieces []Piece
|
||||||
|
PendingPieces []cid.Cid
|
||||||
|
|
||||||
// PreCommit1
|
// PreCommit1
|
||||||
TicketValue abi.SealRandomness
|
TicketValue abi.SealRandomness
|
||||||
|
@ -64,6 +64,14 @@ type SealingConfig struct {
|
|||||||
MaxSealingSectorsForDeals uint64
|
MaxSealingSectorsForDeals uint64
|
||||||
|
|
||||||
WaitDealsDelay Duration
|
WaitDealsDelay Duration
|
||||||
|
|
||||||
|
// Keep this many sectors in sealing pipeline, start CC if needed
|
||||||
|
// todo TargetSealingSectors uint64
|
||||||
|
|
||||||
|
// Try to keep this many sectors waiting for deals
|
||||||
|
TargetWaitDealsSectors uint64
|
||||||
|
|
||||||
|
// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
|
||||||
}
|
}
|
||||||
|
|
||||||
type MinerFeeConfig struct {
|
type MinerFeeConfig struct {
|
||||||
@ -183,6 +191,7 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
MaxSealingSectors: 0,
|
MaxSealingSectors: 0,
|
||||||
MaxSealingSectorsForDeals: 0,
|
MaxSealingSectorsForDeals: 0,
|
||||||
WaitDealsDelay: Duration(time.Hour * 6),
|
WaitDealsDelay: Duration(time.Hour * 6),
|
||||||
|
TargetWaitDealsSectors: 2,
|
||||||
},
|
},
|
||||||
|
|
||||||
Storage: sectorstorage.SealerConfig{
|
Storage: sectorstorage.SealerConfig{
|
||||||
|
@ -805,6 +805,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
|||||||
MaxSealingSectors: cfg.MaxSealingSectors,
|
MaxSealingSectors: cfg.MaxSealingSectors,
|
||||||
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
|
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
|
||||||
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
|
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
|
||||||
|
TargetWaitDealsSectors: cfg.TargetWaitDealsSectors,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
@ -819,6 +820,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
|||||||
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||||
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
||||||
|
TargetWaitDealsSectors: cfg.Sealing.TargetWaitDealsSectors,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user