sealing: Use bitfields to manage sector numbers

This commit is contained in:
Łukasz Magiera 2022-08-17 11:53:44 -04:00
parent 5274ddce83
commit ca72590e49
9 changed files with 129 additions and 31 deletions

View File

@ -387,7 +387,7 @@ func migratePreSealMeta(ctx context.Context, api v1api.FullNode, metadata string
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID))
return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
return mds.Put(ctx, datastore.NewKey(pipeline.StorageCounterDSPrefix), buf[:size])
}
func findMarketDealID(ctx context.Context, api v1api.FullNode, deal market8.DealProposal) (abi.DealID, error) {

View File

@ -0,0 +1 @@
package main

View File

@ -992,5 +992,5 @@ func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.Metad
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID))
return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
return mds.Put(ctx, datastore.NewKey(pipeline.StorageCounterDSPrefix), buf[:size])
}

View File

@ -108,7 +108,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storiface.ProverPoSt), From(new(sectorstorage.SectorManager))),
// Sealing (todo should be under EnableSealing, but storagefsm is currently bundled with storage.Miner)
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
Override(GetParamsKey, modules.GetParams),
Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc),

View File

@ -42,7 +42,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
provider "github.com/filecoin-project/index-provider"
"github.com/filecoin-project/lotus/api"
@ -76,8 +75,7 @@ import (
)
var (
StorageCounterDSPrefix = "/storage/nextid"
StagingAreaDirName = "deal-staging"
StagingAreaDirName = "deal-staging"
)
type UuidWrapper struct {
@ -155,20 +153,6 @@ func SealProofType(maddr dtypes.MinerAddress, fnapi v1api.FullNode) (abi.Registe
return miner.PreferredSealProofTypeFromWindowPoStType(networkVersion, mi.WindowPoStProofType)
}
type sidsc struct {
sc *storedcounter.StoredCounter
}
func (s *sidsc) Next() (abi.SectorNumber, error) {
i, err := s.sc.Next()
return abi.SectorNumber(i), err
}
func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
sc := storedcounter.New(ds, datastore.NewKey(StorageCounterDSPrefix))
return &sidsc{sc}
}
func AddressSelector(addrConf *config.MinerAddressConfig) func() (*ctladdr.AddressSelector, error) {
return func() (*ctladdr.AddressSelector, error) {
as := &ctladdr.AddressSelector{}
@ -257,7 +241,6 @@ type SealingPipelineParams struct {
API v1api.FullNode
MetadataDS dtypes.MetadataDS
Sealer sealer.SectorManager
SectorIDCounter sealing.SectorIDCounter
Verifier storiface.Verifier
Prover storiface.Prover
GetSealingConfigFn dtypes.GetSealingConfigFunc
@ -274,7 +257,6 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
lc = params.Lifecycle
api = params.API
sealer = params.Sealer
sc = params.SectorIDCounter
verif = params.Verifier
prover = params.Prover
gsd = params.GetSealingConfigFn
@ -297,7 +279,7 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
provingBuffer := md.WPoStProvingPeriod * 2
pcp := sealing.NewBasicPreCommitPolicy(api, gsd, provingBuffer)
pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, sc, verif, prover, &pcp, gsd, j, as)
pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, verif, prover, &pcp, gsd, j, as)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {

View File

@ -626,7 +626,7 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
sid, err := m.sc.Next()
sid, err := m.NextSectorNumber(ctx)
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}

View File

@ -0,0 +1 @@
package sealing

View File

@ -1,7 +1,9 @@
package sealing
import (
"bytes"
"context"
"encoding/binary"
"sync"
"time"
@ -12,6 +14,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/miner"
@ -19,6 +23,7 @@ import (
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/lotus/api"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -35,6 +40,9 @@ import (
const SectorStorePrefix = "/sectors"
var StorageCounterDSPrefix = "/storage/nextid"
var SectorBitfieldsDSPrefix = "/storage/sectors/"
var ErrTooManySectorsSealing = xerrors.New("too many sectors sealing")
var log = logging.Logger("sectors")
@ -87,6 +95,8 @@ type Sealing struct {
Api SealingAPI
DealInfo *CurrentDealInfoManager
ds datastore.Batching
feeCfg config.MinerFeeConfig
events Events
@ -96,7 +106,6 @@ type Sealing struct {
sealer sealer.SectorManager
sectors *statemachine.StateGroup
sc SectorIDCounter
verif storiface.Verifier
pcp PreCommitPolicy
@ -120,6 +129,9 @@ type Sealing struct {
precommiter *PreCommitBatcher
commiter *CommitBatcher
sclk sync.Mutex
legacySc *storedcounter.StoredCounter
getConfig dtypes.GetSealingConfigFunc
}
@ -161,17 +173,18 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
}
func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing {
func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing {
s := &Sealing{
Api: api,
DealInfo: &CurrentDealInfoManager{api},
ds: ds,
feeCfg: fc,
events: events,
maddr: maddr,
sealer: sealer,
sc: sc,
verif: verif,
pcp: pcp,
@ -193,6 +206,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
getConfig: gc,
legacySc: storedcounter.New(ds, datastore.NewKey(StorageCounterDSPrefix)),
stats: SectorStats{
bySector: map[abi.SectorID]SectorState{},
byState: map[SectorState]int64{},
@ -314,3 +329,107 @@ func getDealPerSectorLimit(size abi.SectorSize) (int, error) {
}
return 512, nil
}
func (m *Sealing) loadBitField(ctx context.Context, name string) (*bitfield.BitField, error) {
raw, err := m.ds.Get(ctx, datastore.NewKey(SectorBitfieldsDSPrefix+name))
if err == datastore.ErrNotFound {
return nil, nil
}
if err != nil {
return nil, err
}
var bf bitfield.BitField
if err := bf.UnmarshalCBOR(bytes.NewBuffer(raw)); err != nil {
return nil, err
}
return &bf, nil
}
func (m *Sealing) saveBitField(ctx context.Context, name string, bf *bitfield.BitField) error {
var bb bytes.Buffer
err := bf.MarshalCBOR(&bb)
if err != nil {
return err
}
return m.ds.Put(ctx, datastore.NewKey(SectorBitfieldsDSPrefix+name), bb.Bytes())
}
func (m *Sealing) NextSectorNumber(ctx context.Context) (abi.SectorNumber, error) {
m.sclk.Lock()
defer m.sclk.Unlock()
reserved, err := m.loadBitField(ctx, "reserved")
if err != nil {
return 0, err
}
allocated, err := m.loadBitField(ctx, "allocated")
if err != nil {
return 0, err
}
if allocated == nil {
i, err := m.legacySc.Next()
if err != nil {
return 0, err
}
rl := &rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{
{
Val: true,
Len: i,
},
}}
bf, err := bitfield.NewFromIter(rl)
if err != nil {
return 0, err
}
allocated = &bf
}
if reserved == nil {
reserved = allocated
}
// todo union with miner allocated nums
inuse, err := bitfield.MergeBitFields(*reserved, *allocated)
if err != nil {
return 0, err
}
iri, err := inuse.RunIterator()
if err != nil {
return 0, err
}
var firstFree abi.SectorNumber
for iri.HasNext() {
r, err := iri.NextRun()
if err != nil {
return 0, err
}
if !r.Val {
break
}
firstFree += abi.SectorNumber(r.Len)
}
allocated.Set(uint64(firstFree))
if err := m.saveBitField(ctx, "allocated", allocated); err != nil {
return 0, err
}
// save legacy counter so that in case of a miner downgrade things keep working
{
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(firstFree))
if err := m.ds.Put(ctx, datastore.NewKey(StorageCounterDSPrefix), buf[:size]); err != nil {
return 0, err
}
}
return firstFree, nil
}

View File

@ -190,10 +190,6 @@ func (t *SectorInfo) keepUnsealedRanges(pieces []Piece, invert, alwaysKeep bool)
return out
}
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
// SealingStateEvt is a journal event that records a sector state transition.
type SealingStateEvt struct {
SectorNumber abi.SectorNumber