Merge pull request #3147 from filecoin-project/feat/fsm-seal-limits
fsm: Config for limiting max sealing sectors
This commit is contained in:
commit
3fd4921ea3
@ -56,4 +56,3 @@ var PprofGoroutines = &cli.Command{
|
|||||||
return r.Body.Close()
|
return r.Body.Close()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
15
extern/storage-sealing/checks.go
vendored
15
extern/storage-sealing/checks.go
vendored
@ -26,10 +26,12 @@ type ErrBadCommD struct{ error }
|
|||||||
type ErrExpiredTicket struct{ error }
|
type ErrExpiredTicket struct{ error }
|
||||||
type ErrBadTicket struct{ error }
|
type ErrBadTicket struct{ error }
|
||||||
type ErrPrecommitOnChain struct{ error }
|
type ErrPrecommitOnChain struct{ error }
|
||||||
|
type ErrSectorNumberAllocated struct{ error }
|
||||||
|
|
||||||
type ErrBadSeed struct{ error }
|
type ErrBadSeed struct{ error }
|
||||||
type ErrInvalidProof struct{ error }
|
type ErrInvalidProof struct{ error }
|
||||||
type ErrNoPrecommit struct{ error }
|
type ErrNoPrecommit struct{ error }
|
||||||
|
type ErrCommitWaitFailed struct{ error }
|
||||||
|
|
||||||
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
|
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
|
||||||
tok, height, err := api.ChainHead(ctx)
|
tok, height, err := api.ChainHead(ctx)
|
||||||
@ -87,6 +89,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t
|
|||||||
|
|
||||||
pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok)
|
pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == ErrSectorAllocated {
|
||||||
|
return &ErrSectorNumberAllocated{err}
|
||||||
|
}
|
||||||
return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)}
|
return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,6 +111,16 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
|
|||||||
}
|
}
|
||||||
|
|
||||||
pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
|
pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
|
||||||
|
if err == ErrSectorAllocated {
|
||||||
|
// not much more we can check here, basically try to wait for commit,
|
||||||
|
// and hope that this will work
|
||||||
|
|
||||||
|
if si.CommitMessage != nil {
|
||||||
|
return &ErrCommitWaitFailed{err}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting precommit info: %w", err)
|
return xerrors.Errorf("getting precommit info: %w", err)
|
||||||
}
|
}
|
||||||
|
11
extern/storage-sealing/fsm.go
vendored
11
extern/storage-sealing/fsm.go
vendored
@ -108,6 +108,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
on(SectorRetryPreCommitWait{}, PreCommitWait),
|
on(SectorRetryPreCommitWait{}, PreCommitWait),
|
||||||
on(SectorChainPreCommitFailed{}, PreCommitFailed),
|
on(SectorChainPreCommitFailed{}, PreCommitFailed),
|
||||||
on(SectorRetryPreCommit{}, PreCommitting),
|
on(SectorRetryPreCommit{}, PreCommitting),
|
||||||
|
on(SectorRetryCommitWait{}, CommitWait),
|
||||||
),
|
),
|
||||||
FinalizeFailed: planOne(
|
FinalizeFailed: planOne(
|
||||||
on(SectorRetryFinalize{}, FinalizeSector),
|
on(SectorRetryFinalize{}, FinalizeSector),
|
||||||
@ -225,6 +226,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
m.stats.updateSector(m.minerSector(state.SectorNumber), state.State)
|
||||||
|
|
||||||
switch state.State {
|
switch state.State {
|
||||||
// Happy path
|
// Happy path
|
||||||
case Empty:
|
case Empty:
|
||||||
@ -315,6 +318,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
|||||||
state.State = SealPreCommit1Failed
|
state.State = SealPreCommit1Failed
|
||||||
case SectorCommitFailed:
|
case SectorCommitFailed:
|
||||||
state.State = CommitFailed
|
state.State = CommitFailed
|
||||||
|
case SectorRetryCommitWait:
|
||||||
|
state.State = CommitWait
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||||
}
|
}
|
||||||
@ -328,7 +333,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
|||||||
log.Errorf("loading sector list: %+v", err)
|
log.Errorf("loading sector list: %+v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sd, err := m.getSealDelay()
|
cfg, err := m.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting the sealing delay: %w", err)
|
return xerrors.Errorf("getting the sealing delay: %w", err)
|
||||||
}
|
}
|
||||||
@ -339,8 +344,8 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if sector.State == WaitDeals {
|
if sector.State == WaitDeals {
|
||||||
if sd > 0 {
|
if cfg.WaitDealsDelay > 0 {
|
||||||
timer := time.NewTimer(sd)
|
timer := time.NewTimer(cfg.WaitDealsDelay)
|
||||||
go func() {
|
go func() {
|
||||||
<-timer.C
|
<-timer.C
|
||||||
m.StartPacking(sector.SectorNumber)
|
m.StartPacking(sector.SectorNumber)
|
||||||
|
4
extern/storage-sealing/fsm_events.go
vendored
4
extern/storage-sealing/fsm_events.go
vendored
@ -252,6 +252,10 @@ func (evt SectorRetryInvalidProof) apply(state *SectorInfo) {
|
|||||||
state.InvalidProofs++
|
state.InvalidProofs++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SectorRetryCommitWait struct{}
|
||||||
|
|
||||||
|
func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
// Faults
|
// Faults
|
||||||
|
|
||||||
type SectorFaulty struct{}
|
type SectorFaulty struct{}
|
||||||
|
26
extern/storage-sealing/fsm_test.go
vendored
26
extern/storage-sealing/fsm_test.go
vendored
@ -3,6 +3,8 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
@ -25,8 +27,14 @@ type test struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestHappyPath(t *testing.T) {
|
func TestHappyPath(t *testing.T) {
|
||||||
|
ma, _ := address.NewIDAddress(55151)
|
||||||
m := test{
|
m := test{
|
||||||
s: &Sealing{},
|
s: &Sealing{
|
||||||
|
maddr: ma,
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
|
},
|
||||||
|
},
|
||||||
t: t,
|
t: t,
|
||||||
state: &SectorInfo{State: Packing},
|
state: &SectorInfo{State: Packing},
|
||||||
}
|
}
|
||||||
@ -60,8 +68,14 @@ func TestHappyPath(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSeedRevert(t *testing.T) {
|
func TestSeedRevert(t *testing.T) {
|
||||||
|
ma, _ := address.NewIDAddress(55151)
|
||||||
m := test{
|
m := test{
|
||||||
s: &Sealing{},
|
s: &Sealing{
|
||||||
|
maddr: ma,
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
|
},
|
||||||
|
},
|
||||||
t: t,
|
t: t,
|
||||||
state: &SectorInfo{State: Packing},
|
state: &SectorInfo{State: Packing},
|
||||||
}
|
}
|
||||||
@ -101,8 +115,14 @@ func TestSeedRevert(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) {
|
func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) {
|
||||||
|
ma, _ := address.NewIDAddress(55151)
|
||||||
m := test{
|
m := test{
|
||||||
s: &Sealing{},
|
s: &Sealing{
|
||||||
|
maddr: ma,
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
|
},
|
||||||
|
},
|
||||||
t: t,
|
t: t,
|
||||||
state: &SectorInfo{State: Committing},
|
state: &SectorInfo{State: Committing},
|
||||||
}
|
}
|
||||||
|
11
extern/storage-sealing/garbage.go
vendored
11
extern/storage-sealing/garbage.go
vendored
@ -31,6 +31,17 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) PledgeSector() error {
|
func (m *Sealing) PledgeSector() error {
|
||||||
|
cfg, err := m.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectors > 0 {
|
||||||
|
if m.stats.curSealing() > cfg.MaxSealingSectors {
|
||||||
|
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
ctx := context.TODO() // we can't use the context from command which invokes
|
ctx := context.TODO() // we can't use the context from command which invokes
|
||||||
// this, as we run everything here async, and it's cancelled when the
|
// this, as we run everything here async, and it's cancelled when the
|
||||||
|
18
extern/storage-sealing/sealiface/config.go
vendored
Normal file
18
extern/storage-sealing/sealiface/config.go
vendored
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package sealiface
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
// 0 = no limit
|
||||||
|
MaxWaitDealsSectors uint64
|
||||||
|
|
||||||
|
// includes failed, 0 = no limit
|
||||||
|
MaxSealingSectors uint64
|
||||||
|
|
||||||
|
// includes failed, 0 = no limit
|
||||||
|
MaxSealingSectorsForDeals uint64
|
||||||
|
|
||||||
|
WaitDealsDelay time.Duration
|
||||||
|
}
|
118
extern/storage-sealing/sealing.go
vendored
118
extern/storage-sealing/sealing.go
vendored
@ -2,7 +2,9 @@ package sealing
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -33,9 +35,14 @@ type SectorLocation struct {
|
|||||||
Partition uint64
|
Partition uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain")
|
||||||
|
|
||||||
type SealingAPI interface {
|
type SealingAPI interface {
|
||||||
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
|
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
|
||||||
|
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
|
||||||
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
|
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
|
||||||
|
|
||||||
|
// Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated
|
||||||
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
||||||
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
|
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
|
||||||
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
||||||
@ -70,7 +77,9 @@ type Sealing struct {
|
|||||||
upgradeLk sync.Mutex
|
upgradeLk sync.Mutex
|
||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
|
|
||||||
getSealDelay GetSealingDelayFunc
|
stats SectorStats
|
||||||
|
|
||||||
|
getConfig GetSealingConfigFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
type FeeConfig struct {
|
type FeeConfig struct {
|
||||||
@ -80,7 +89,7 @@ type FeeConfig struct {
|
|||||||
|
|
||||||
type UnsealedSectorMap struct {
|
type UnsealedSectorMap struct {
|
||||||
infos map[abi.SectorNumber]UnsealedSectorInfo
|
infos map[abi.SectorNumber]UnsealedSectorInfo
|
||||||
mux sync.Mutex
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnsealedSectorInfo struct {
|
type UnsealedSectorInfo struct {
|
||||||
@ -90,7 +99,7 @@ type UnsealedSectorInfo struct {
|
|||||||
pieceSizes []abi.UnpaddedPieceSize
|
pieceSizes []abi.UnpaddedPieceSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing {
|
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
|
||||||
s := &Sealing{
|
s := &Sealing{
|
||||||
api: api,
|
api: api,
|
||||||
feeCfg: fc,
|
feeCfg: fc,
|
||||||
@ -103,11 +112,15 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
pcp: pcp,
|
pcp: pcp,
|
||||||
unsealedInfoMap: UnsealedSectorMap{
|
unsealedInfoMap: UnsealedSectorMap{
|
||||||
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
|
||||||
mux: sync.Mutex{},
|
lk: sync.Mutex{},
|
||||||
},
|
},
|
||||||
|
|
||||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||||
getSealDelay: gsd,
|
getConfig: gc,
|
||||||
|
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
@ -137,18 +150,18 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
|||||||
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
m.unsealedInfoMap.mux.Lock()
|
m.unsealedInfoMap.lk.Lock()
|
||||||
|
|
||||||
sid, pads, err := m.getSectorAndPadding(size)
|
sid, pads, err := m.getSectorAndPadding(size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.unsealedInfoMap.mux.Unlock()
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
|
return 0, 0, xerrors.Errorf("getting available sector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range pads {
|
for _, p := range pads {
|
||||||
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
|
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.unsealedInfoMap.mux.Unlock()
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
return 0, 0, xerrors.Errorf("writing pads: %w", err)
|
return 0, 0, xerrors.Errorf("writing pads: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -157,12 +170,15 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
|||||||
err = m.addPiece(ctx, sid, size, r, &d)
|
err = m.addPiece(ctx, sid, size, r, &d)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.unsealedInfoMap.mux.Unlock()
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
|
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.unsealedInfoMap.mux.Unlock()
|
startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize())
|
||||||
if m.unsealedInfoMap.infos[sid].numDeals == getDealPerSectorLimit(m.sealer.SectorSize()) {
|
|
||||||
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
|
|
||||||
|
if startPacking {
|
||||||
if err := m.StartPacking(sid); err != nil {
|
if err := m.StartPacking(sid); err != nil {
|
||||||
return 0, 0, xerrors.Errorf("start packing: %w", err)
|
return 0, 0, xerrors.Errorf("start packing: %w", err)
|
||||||
}
|
}
|
||||||
@ -171,7 +187,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
|||||||
return sid, offset, nil
|
return sid, offset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caller should hold m.unsealedInfoMap.mux
|
// Caller should hold m.unsealedInfoMap.lk
|
||||||
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
|
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
|
||||||
log.Infof("Adding piece to sector %d", sectorID)
|
log.Infof("Adding piece to sector %d", sectorID)
|
||||||
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
|
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
|
||||||
@ -206,7 +222,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
|||||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caller should NOT hold m.unsealedInfoMap.mux
|
// Caller should NOT hold m.unsealedInfoMap.lk
|
||||||
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
||||||
log.Infof("Starting packing sector %d", sectorID)
|
log.Infof("Starting packing sector %d", sectorID)
|
||||||
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
|
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
|
||||||
@ -214,14 +230,14 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.unsealedInfoMap.mux.Lock()
|
m.unsealedInfoMap.lk.Lock()
|
||||||
delete(m.unsealedInfoMap.infos, sectorID)
|
delete(m.unsealedInfoMap.infos, sectorID)
|
||||||
m.unsealedInfoMap.mux.Unlock()
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caller should hold m.unsealedInfoMap.mux
|
// Caller should hold m.unsealedInfoMap.lk
|
||||||
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
|
||||||
ss := abi.PaddedPieceSize(m.sealer.SectorSize())
|
ss := abi.PaddedPieceSize(m.sealer.SectorSize())
|
||||||
for k, v := range m.unsealedInfoMap.infos {
|
for k, v := range m.unsealedInfoMap.infos {
|
||||||
@ -231,7 +247,7 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ns, err := m.newSector()
|
ns, err := m.newDealSector()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
@ -245,8 +261,66 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
|
|||||||
return ns, nil, nil
|
return ns, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// newSector creates a new sector for deal storage
|
// newDealSector creates a new sector for deal storage
|
||||||
func (m *Sealing) newSector() (abi.SectorNumber, error) {
|
func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
|
||||||
|
// First make sure we don't have too many 'open' sectors
|
||||||
|
|
||||||
|
cfg, err := m.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("getting config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectorsForDeals > 0 {
|
||||||
|
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
|
||||||
|
return 0, xerrors.Errorf("too many sectors sealing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxWaitDealsSectors > 0 {
|
||||||
|
// run in a loop because we have to drop the map lock here for a bit
|
||||||
|
tries := 0
|
||||||
|
|
||||||
|
// we have to run in a loop as we're dropping unsealedInfoMap.lk
|
||||||
|
// to actually call StartPacking. When we do that, another entry can
|
||||||
|
// get added to unsealedInfoMap.
|
||||||
|
for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
|
||||||
|
if tries > 10 {
|
||||||
|
// whatever...
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if tries > 0 {
|
||||||
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
m.unsealedInfoMap.lk.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
tries++
|
||||||
|
var mostStored abi.PaddedPieceSize = math.MaxUint64
|
||||||
|
var best abi.SectorNumber = math.MaxUint64
|
||||||
|
|
||||||
|
for sn, info := range m.unsealedInfoMap.infos {
|
||||||
|
if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
|
||||||
|
best = sn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if best == math.MaxUint64 {
|
||||||
|
// probably not possible, but who knows
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
m.unsealedInfoMap.lk.Unlock()
|
||||||
|
if err := m.StartPacking(best); err != nil {
|
||||||
|
log.Error("newDealSector StartPacking error: %+v", err)
|
||||||
|
continue // let's pretend this is fine
|
||||||
|
}
|
||||||
|
m.unsealedInfoMap.lk.Lock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now actually create a new sector
|
||||||
|
|
||||||
sid, err := m.sc.Next()
|
sid, err := m.sc.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("getting sector number: %w", err)
|
return 0, xerrors.Errorf("getting sector number: %w", err)
|
||||||
@ -272,13 +346,13 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) {
|
|||||||
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
|
return 0, xerrors.Errorf("starting the sector fsm: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sd, err := m.getSealDelay()
|
cf, err := m.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("getting the sealing delay: %w", err)
|
return 0, xerrors.Errorf("getting the sealing delay: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if sd > 0 {
|
if cf.WaitDealsDelay > 0 {
|
||||||
timer := time.NewTimer(sd)
|
timer := time.NewTimer(cf.WaitDealsDelay)
|
||||||
go func() {
|
go func() {
|
||||||
<-timer.C
|
<-timer.C
|
||||||
m.StartPacking(sid)
|
m.StartPacking(sid)
|
||||||
|
11
extern/storage-sealing/sector_state.go
vendored
11
extern/storage-sealing/sector_state.go
vendored
@ -36,3 +36,14 @@ const (
|
|||||||
RemoveFailed SectorState = "RemoveFailed"
|
RemoveFailed SectorState = "RemoveFailed"
|
||||||
Removed SectorState = "Removed"
|
Removed SectorState = "Removed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func toStatState(st SectorState) statSectorState {
|
||||||
|
switch st {
|
||||||
|
case Empty, WaitDeals, Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitWait, FinalizeSector:
|
||||||
|
return sstSealing
|
||||||
|
case Proving, Removed, Removing:
|
||||||
|
return sstProving
|
||||||
|
}
|
||||||
|
|
||||||
|
return sstFailed
|
||||||
|
}
|
||||||
|
12
extern/storage-sealing/states_failed.go
vendored
12
extern/storage-sealing/states_failed.go
vendored
@ -85,6 +85,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
|
|||||||
return ctx.Send(SectorRetryPreCommit{})
|
return ctx.Send(SectorRetryPreCommit{})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
// noop
|
// noop
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
|
||||||
|
// TODO: check if the sector is committed (not sure how we'd end up here)
|
||||||
|
return nil
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
@ -158,6 +162,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
|||||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
// noop, this is expected
|
// noop, this is expected
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
// noop, already committed?
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
||||||
}
|
}
|
||||||
@ -186,6 +192,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
|||||||
return ctx.Send(SectorRetryPreCommitWait{})
|
return ctx.Send(SectorRetryPreCommitWait{})
|
||||||
case *ErrNoPrecommit:
|
case *ErrNoPrecommit:
|
||||||
return ctx.Send(SectorRetryPreCommit{})
|
return ctx.Send(SectorRetryPreCommit{})
|
||||||
|
case *ErrCommitWaitFailed:
|
||||||
|
if err := failedCooldown(ctx, sector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetryCommitWait{})
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
|
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
|
||||||
}
|
}
|
||||||
|
18
extern/storage-sealing/states_sealing.go
vendored
18
extern/storage-sealing/states_sealing.go
vendored
@ -149,6 +149,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
|
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
|
||||||
|
// TODO: check if the sector is committed (not sure how we'd end up here)
|
||||||
|
return nil
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
@ -275,6 +279,20 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.CommitMessage != nil {
|
||||||
|
log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber)
|
||||||
|
|
||||||
|
ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ml != nil {
|
||||||
|
// some weird retry paths can lead here
|
||||||
|
return ctx.Send(SectorRetryCommitWait{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("scheduling seal proof computation...")
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
||||||
|
45
extern/storage-sealing/stats.go
vendored
Normal file
45
extern/storage-sealing/stats.go
vendored
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
type statSectorState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
sstSealing statSectorState = iota
|
||||||
|
sstFailed
|
||||||
|
sstProving
|
||||||
|
nsst
|
||||||
|
)
|
||||||
|
|
||||||
|
type SectorStats struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
bySector map[abi.SectorID]statSectorState
|
||||||
|
totals [nsst]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
oldst, found := ss.bySector[id]
|
||||||
|
if found {
|
||||||
|
ss.totals[oldst]--
|
||||||
|
}
|
||||||
|
|
||||||
|
sst := toStatState(st)
|
||||||
|
ss.bySector[id] = sst
|
||||||
|
ss.totals[sst]++
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the number of sectors currently in the sealing pipeline
|
||||||
|
func (ss *SectorStats) curSealing() uint64 {
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
return ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||||
|
}
|
5
extern/storage-sealing/types.go
vendored
5
extern/storage-sealing/types.go
vendored
@ -3,8 +3,6 @@ package sealing
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
@ -14,6 +12,7 @@ import (
|
|||||||
"github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Piece is a tuple of piece and deal info
|
// Piece is a tuple of piece and deal info
|
||||||
@ -188,7 +187,7 @@ type MessageReceipt struct {
|
|||||||
GasUsed int64
|
GasUsed int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetSealingDelayFunc func() (time.Duration, error)
|
type GetSealingConfigFunc func() (sealiface.Config, error)
|
||||||
|
|
||||||
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
|
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
|
||||||
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed
|
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed
|
||||||
|
@ -328,8 +328,8 @@ func Online() Option {
|
|||||||
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
|
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
|
||||||
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
|
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
|
||||||
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
|
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
|
||||||
Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc),
|
Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc),
|
||||||
Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc),
|
Override(new(dtypes.GetSealingConfigFunc), modules.NewGetSealConfigFunc),
|
||||||
Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc),
|
Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc),
|
||||||
Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc),
|
Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc),
|
||||||
),
|
),
|
||||||
|
@ -31,10 +31,9 @@ type StorageMiner struct {
|
|||||||
Common
|
Common
|
||||||
|
|
||||||
Dealmaking DealmakingConfig
|
Dealmaking DealmakingConfig
|
||||||
|
Sealing SealingConfig
|
||||||
Storage sectorstorage.SealerConfig
|
Storage sectorstorage.SealerConfig
|
||||||
Fees MinerFeeConfig
|
Fees MinerFeeConfig
|
||||||
|
|
||||||
SealingDelay Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type DealmakingConfig struct {
|
type DealmakingConfig struct {
|
||||||
@ -48,6 +47,19 @@ type DealmakingConfig struct {
|
|||||||
Filter string
|
Filter string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SealingConfig struct {
|
||||||
|
// 0 = no limit
|
||||||
|
MaxWaitDealsSectors uint64
|
||||||
|
|
||||||
|
// includes failed, 0 = no limit
|
||||||
|
MaxSealingSectors uint64
|
||||||
|
|
||||||
|
// includes failed, 0 = no limit
|
||||||
|
MaxSealingSectorsForDeals uint64
|
||||||
|
|
||||||
|
WaitDealsDelay Duration
|
||||||
|
}
|
||||||
|
|
||||||
type MinerFeeConfig struct {
|
type MinerFeeConfig struct {
|
||||||
MaxPreCommitGasFee types.FIL
|
MaxPreCommitGasFee types.FIL
|
||||||
MaxCommitGasFee types.FIL
|
MaxCommitGasFee types.FIL
|
||||||
@ -131,6 +143,13 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
cfg := &StorageMiner{
|
cfg := &StorageMiner{
|
||||||
Common: defCommon(),
|
Common: defCommon(),
|
||||||
|
|
||||||
|
Sealing: SealingConfig{
|
||||||
|
MaxWaitDealsSectors: 2, // 64G with 32G sectors
|
||||||
|
MaxSealingSectors: 0,
|
||||||
|
MaxSealingSectorsForDeals: 0,
|
||||||
|
WaitDealsDelay: Duration(time.Hour),
|
||||||
|
},
|
||||||
|
|
||||||
Storage: sectorstorage.SealerConfig{
|
Storage: sectorstorage.SealerConfig{
|
||||||
AllowAddPiece: true,
|
AllowAddPiece: true,
|
||||||
AllowPreCommit1: true,
|
AllowPreCommit1: true,
|
||||||
@ -158,8 +177,6 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))),
|
MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))),
|
||||||
MaxWindowPoStGasFee: types.FIL(types.FromFil(50)),
|
MaxWindowPoStGasFee: types.FIL(types.FromFil(50)),
|
||||||
},
|
},
|
||||||
|
|
||||||
SealingDelay: Duration(time.Hour),
|
|
||||||
}
|
}
|
||||||
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
|
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
|
||||||
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"
|
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"
|
||||||
|
@ -62,8 +62,8 @@ type StorageMinerAPI struct {
|
|||||||
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
|
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
|
||||||
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
|
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
|
||||||
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
|
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
|
||||||
SetSealingDelayFunc dtypes.SetSealingDelayFunc
|
SetSealingConfigFunc dtypes.SetSealingConfigFunc
|
||||||
GetSealingDelayFunc dtypes.GetSealingDelayFunc
|
GetSealingConfigFunc dtypes.GetSealingConfigFunc
|
||||||
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc
|
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc
|
||||||
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc
|
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc
|
||||||
}
|
}
|
||||||
@ -232,11 +232,22 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
|
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
|
||||||
return sm.SetSealingDelayFunc(delay)
|
cfg, err := sm.GetSealingConfigFunc()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("get config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.WaitDealsDelay = delay
|
||||||
|
|
||||||
|
return sm.SetSealingConfigFunc(cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
|
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
|
||||||
return sm.GetSealingDelayFunc()
|
cfg, err := sm.GetSealingConfigFunc()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return cfg.WaitDealsDelay, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {
|
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {
|
||||||
|
@ -9,6 +9,8 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MinerAddress address.Address
|
type MinerAddress address.Address
|
||||||
@ -56,10 +58,10 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error)
|
|||||||
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
|
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
|
||||||
|
|
||||||
// SetSealingDelay sets how long a sector waits for more deals before sealing begins.
|
// SetSealingDelay sets how long a sector waits for more deals before sealing begins.
|
||||||
type SetSealingDelayFunc func(time.Duration) error
|
type SetSealingConfigFunc func(sealiface.Config) error
|
||||||
|
|
||||||
// GetSealingDelay returns how long a sector waits for more deals before sealing begins.
|
// GetSealingDelay returns how long a sector waits for more deals before sealing begins.
|
||||||
type GetSealingDelayFunc func() (time.Duration, error)
|
type GetSealingConfigFunc func() (sealiface.Config, error)
|
||||||
|
|
||||||
// SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take.
|
// SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take.
|
||||||
// Deals that would need to start earlier than this duration will be rejected.
|
// Deals that would need to start earlier than this duration will be rejected.
|
||||||
|
@ -48,6 +48,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
|
|
||||||
lapi "github.com/filecoin-project/lotus/api"
|
lapi "github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -141,8 +142,8 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
|
|||||||
return &sidsc{sc}
|
return &sidsc{sc}
|
||||||
}
|
}
|
||||||
|
|
||||||
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) {
|
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
|
||||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) {
|
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
|
||||||
maddr, err := minerAddrFromDS(ds)
|
maddr, err := minerAddrFromDS(ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -593,19 +594,28 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) {
|
func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) {
|
||||||
return func(delay time.Duration) (err error) {
|
return func(cfg sealiface.Config) (err error) {
|
||||||
err = mutateCfg(r, func(cfg *config.StorageMiner) {
|
err = mutateCfg(r, func(c *config.StorageMiner) {
|
||||||
cfg.SealingDelay = config.Duration(delay)
|
c.Sealing = config.SealingConfig{
|
||||||
|
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
|
||||||
|
MaxSealingSectors: cfg.MaxSealingSectors,
|
||||||
|
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) {
|
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||||
return func() (out time.Duration, err error) {
|
return func() (out sealiface.Config, err error) {
|
||||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||||
out = time.Duration(cfg.SealingDelay)
|
out = sealiface.Config{
|
||||||
|
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
||||||
|
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||||
|
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||||
|
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}, nil
|
}, nil
|
||||||
|
@ -108,6 +108,27 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing.MsgLookup, error) {
|
||||||
|
wmsg, err := s.delegate.StateSearchMsg(ctx, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if wmsg == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &sealing.MsgLookup{
|
||||||
|
Receipt: sealing.MessageReceipt{
|
||||||
|
ExitCode: wmsg.Receipt.ExitCode,
|
||||||
|
Return: wmsg.Receipt.Return,
|
||||||
|
GasUsed: wmsg.Receipt.GasUsed,
|
||||||
|
},
|
||||||
|
TipSetTok: wmsg.TipSet.Bytes(),
|
||||||
|
Height: wmsg.Height,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) {
|
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) {
|
||||||
tsk, err := types.TipSetKeyFromBytes(tok)
|
tsk, err := types.TipSetKeyFromBytes(tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -186,7 +207,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a
|
|||||||
return nil, xerrors.Errorf("checking if sector is allocated: %w", err)
|
return nil, xerrors.Errorf("checking if sector is allocated: %w", err)
|
||||||
}
|
}
|
||||||
if set {
|
if set {
|
||||||
return nil, xerrors.Errorf("sectorNumber is allocated")
|
return nil, sealing.ErrSectorAllocated
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -43,7 +43,7 @@ type Miner struct {
|
|||||||
maddr address.Address
|
maddr address.Address
|
||||||
worker address.Address
|
worker address.Address
|
||||||
|
|
||||||
getSealDelay dtypes.GetSealingDelayFunc
|
getSealConfig dtypes.GetSealingConfigFunc
|
||||||
sealing *sealing.Sealing
|
sealing *sealing.Sealing
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,6 +60,7 @@ type storageMinerApi interface {
|
|||||||
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
||||||
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
||||||
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
||||||
|
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
|
||||||
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually
|
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually
|
||||||
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
||||||
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
||||||
@ -84,7 +85,7 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
|
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
|
||||||
m := &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
feeCfg: feeCfg,
|
feeCfg: feeCfg,
|
||||||
@ -96,7 +97,7 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
|
|||||||
|
|
||||||
maddr: maddr,
|
maddr: maddr,
|
||||||
worker: worker,
|
worker: worker,
|
||||||
getSealDelay: gsd,
|
getSealConfig: gsd,
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
@ -120,7 +121,7 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
evts := events.NewEvents(ctx, m.api)
|
evts := events.NewEvents(ctx, m.api)
|
||||||
adaptedAPI := NewSealingAPIAdapter(m.api)
|
adaptedAPI := NewSealingAPIAdapter(m.api)
|
||||||
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
|
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
|
||||||
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay))
|
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig))
|
||||||
|
|
||||||
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
|
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user