sealing pipeline: Remove useless storage adapter code

This commit is contained in:
Łukasz Magiera 2022-08-09 13:30:34 +02:00
parent fe2a589890
commit 71dacb5af8
16 changed files with 239 additions and 467 deletions

View File

@ -106,6 +106,7 @@ const (
RelayIndexerMessagesKey RelayIndexerMessagesKey
// miner // miner
PreflightChecksKey
GetParamsKey GetParamsKey
HandleMigrateProviderFundsKey HandleMigrateProviderFundsKey
HandleDealsKey HandleDealsKey

View File

@ -116,9 +116,11 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter),
Override(new(*miner.Miner), modules.SetupBlockProducer), Override(new(*miner.Miner), modules.SetupBlockProducer),
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), Override(PreflightChecksKey, modules.PreflightChecks),
Override(new(*sealing.Sealing), modules.SealingPipeline(cfg.Fees)),
Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)), Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)),
Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))), Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
), ),
If(cfg.Subsystems.EnableSectorStorage, If(cfg.Subsystems.EnableSectorStorage,

View File

@ -49,7 +49,6 @@ import (
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline" sealing "github.com/filecoin-project/lotus/storage/pipeline"
@ -88,7 +87,7 @@ type StorageMinerAPI struct {
DAGStoreWrapper *mktsdagstore.Wrapper `optional:"true"` DAGStoreWrapper *mktsdagstore.Wrapper `optional:"true"`
// Miner / storage // Miner / storage
Miner *storage.Miner `optional:"true"` Miner *sealing.Sealing `optional:"true"`
BlockMiner *miner.Miner `optional:"true"` BlockMiner *miner.Miner `optional:"true"`
StorageMgr *sealer.Manager `optional:"true"` StorageMgr *sealer.Manager `optional:"true"`
IStorageMgr sealer.SectorManager `optional:"true"` IStorageMgr sealer.SectorManager `optional:"true"`
@ -405,7 +404,11 @@ func (sm *StorageMinerAPI) SectorPreCommitPending(ctx context.Context) ([]abi.Se
} }
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error { func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error {
return sm.Miner.MarkForUpgrade(ctx, id, snap) if !snap {
return fmt.Errorf("non-snap upgrades are not supported")
}
return sm.Miner.MarkForUpgrade(ctx, id)
} }
func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error { func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error {

View File

@ -50,6 +50,7 @@ import (
"github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -64,7 +65,6 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline" sealing "github.com/filecoin-project/lotus/storage/pipeline"
@ -202,7 +202,37 @@ func AddressSelector(addrConf *config.MinerAddressConfig) func() (*ctladdr.Addre
} }
} }
type StorageMinerParams struct { func PreflightChecks(mctx helpers.MetricsCtx, lc fx.Lifecycle, api v1api.FullNode, maddr dtypes.MinerAddress) error {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{OnStart: func(context.Context) error {
mi, err := api.StateMinerInfo(ctx, address.Address(maddr), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve miner info: %w", err)
}
workerKey, err := api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve worker key: %w", err)
}
has, err := api.WalletHas(ctx, workerKey)
if err != nil {
return xerrors.Errorf("failed to check wallet for worker key: %w", err)
}
if !has {
return errors.New("key for worker not found in local wallet")
}
log.Infof("starting up miner %s, worker addr %s", maddr, workerKey)
return nil
}})
return nil
}
type SealingPipelineParams struct {
fx.In fx.In
Lifecycle fx.Lifecycle Lifecycle fx.Lifecycle
@ -219,8 +249,8 @@ type StorageMinerParams struct {
Maddr dtypes.MinerAddress Maddr dtypes.MinerAddress
} }
func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) { func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams) (*sealing.Sealing, error) {
return func(params StorageMinerParams) (*storage.Miner, error) { return func(params SealingPipelineParams) (*sealing.Sealing, error) {
var ( var (
ds = params.MetadataDS ds = params.MetadataDS
mctx = params.MetricsCtx mctx = params.MetricsCtx
@ -238,24 +268,34 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
sm, err := storage.NewMiner(api, maddr, ds, sealer, sc, verif, prover, gsd, fc, j, as) evts, err := events.NewEvents(ctx, api)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("failed to subscribe to events: %w", err)
} }
md, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting miner info: %w", err)
}
provingBuffer := md.WPoStProvingPeriod * 2
pcp := sealing.NewBasicPreCommitPolicy(api, gsd, provingBuffer)
pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, sc, verif, prover, &pcp, gsd, j, as)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
return sm.Run(ctx) go pipeline.Run(ctx)
return nil
}, },
OnStop: sm.Stop, OnStop: pipeline.Stop,
}) })
return sm, nil return pipeline, nil
} }
} }
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*wdpost.WindowPoStScheduler, error) { func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
return func(params StorageMinerParams) (*wdpost.WindowPoStScheduler, error) { return func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
var ( var (
mctx = params.MetricsCtx mctx = params.MetricsCtx
lc = params.Lifecycle lc = params.Lifecycle

View File

@ -1,266 +0,0 @@
package storage
import (
"context"
"errors"
"time"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v8/miner"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/ctladdr"
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("storageminer")
// Miner is the central miner entrypoint object inside Lotus. It is
// instantiated in the node builder, along with the WindowPoStScheduler.
//
// This object is the owner of the sealing pipeline. Most of the actual logic
// lives in the pipeline module (sealing.Sealing), and the Miner object
// exposes it to the rest of the system by proxying calls.
//
// Miner#Run starts the sealing FSM.
type Miner struct {
api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig
sealer sealer.SectorManager
ds datastore.Batching
sc pipeline.SectorIDCounter
verif storiface.Verifier
prover storiface.Prover
addrSel *ctladdr.AddressSelector
maddr address.Address
getSealConfig dtypes.GetSealingConfigFunc
sealing *pipeline.Sealing
journal journal.Journal
}
// fullNodeFilteredAPI is the subset of the full node API the Miner needs from
// a Lotus full node.
type fullNodeFilteredAPI interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tok types.TipSetKey) (types.BigInt, error)
StateMinerActiveSectors(context.Context, address.Address, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error)
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (bool, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error)
StateMinerFaults(context.Context, address.Address, types.TipSetKey) (bitfield.BitField, error)
StateMinerRecoveries(context.Context, address.Address, types.TipSetKey) (bitfield.BitField, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
StateComputeDataCID(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainPutObj(context.Context, blocks.Block) error
ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error)
WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
}
// NewMiner creates a new Miner object.
func NewMiner(api fullNodeFilteredAPI,
maddr address.Address,
ds datastore.Batching,
sealer sealer.SectorManager,
sc pipeline.SectorIDCounter,
verif storiface.Verifier,
prover storiface.Prover,
gsd dtypes.GetSealingConfigFunc,
feeCfg config.MinerFeeConfig,
journal journal.Journal,
as *ctladdr.AddressSelector) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
sealer: sealer,
ds: ds,
sc: sc,
verif: verif,
prover: prover,
addrSel: as,
maddr: maddr,
getSealConfig: gsd,
journal: journal,
}
return m, nil
}
// Run starts the sealing FSM in the background, running preliminary checks first.
func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil {
return xerrors.Errorf("miner preflight checks failed: %w", err)
}
md, err := m.api.StateMinerProvingDeadline(ctx, m.maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}
// consumer of chain head changes.
evts, err := events.NewEvents(ctx, m.api)
if err != nil {
return xerrors.Errorf("failed to subscribe to events: %w", err)
}
// Instantiate a precommit policy.
cfg := pipeline.GetSealingConfigFunc(m.getSealConfig)
provingBuffer := md.WPoStProvingPeriod * 2
pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer)
// Instantiate the sealing FSM.
m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, m.addrSel)
// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
return nil
}
func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx)
}
// runPreflightChecks verifies that preconditions to run the miner are satisfied.
func (m *Miner) runPreflightChecks(ctx context.Context) error {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve miner info: %w", err)
}
workerKey, err := m.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve worker key: %w", err)
}
has, err := m.api.WalletHas(ctx, workerKey)
if err != nil {
return xerrors.Errorf("failed to check wallet for worker key: %w", err)
}
if !has {
return errors.New("key for worker not found in local wallet")
}
log.Infof("starting up miner %s, worker addr %s", m.maddr, workerKey)
return nil
}
type StorageWpp struct {
prover storiface.ProverPoSt
verifier storiface.Verifier
miner abi.ActorID
winnRpt abi.RegisteredPoStProof
}
func NewWinningPoStProver(api v1api.FullNode, prover storiface.ProverPoSt, verifier storiface.Verifier, miner dtypes.MinerID) (*StorageWpp, error) {
ma, err := address.NewIDAddress(uint64(miner))
if err != nil {
return nil, err
}
mi, err := api.StateMinerInfo(context.TODO(), ma, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}
if build.InsecurePoStValidation {
log.Warn("*****************************************************************************")
log.Warn(" Generating fake PoSt proof! You should only see this while running tests! ")
log.Warn("*****************************************************************************")
}
return &StorageWpp{prover, verifier, abi.ActorID(miner), mi.WindowPoStProofType}, nil
}
var _ gen.WinningPoStProver = (*StorageWpp)(nil)
func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
start := build.Clock.Now()
cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount)
if err != nil {
return nil, xerrors.Errorf("failed to generate candidates: %w", err)
}
log.Infof("Generate candidates took %s (C: %+v)", time.Since(start), cds)
return cds, nil
}
func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, currEpoch abi.ChainEpoch, nv network.Version) ([]builtin.PoStProof, error) {
if build.InsecurePoStValidation {
return []builtin.PoStProof{{ProofBytes: []byte("valid proof")}}, nil
}
log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand)
start := build.Clock.Now()
proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand)
if err != nil {
return nil, err
}
log.Infof("GenerateWinningPoSt took %s", time.Since(start))
return proof, nil
}

View File

@ -1,165 +0,0 @@
package storage
import (
"context"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
// TODO: refactor this to be direct somehow
func (m *Miner) Address() address.Address {
return m.sealing.Address()
}
func (m *Miner) StartPackingSector(sectorNum abi.SectorNumber) error {
return m.sealing.StartPacking(sectorNum)
}
func (m *Miner) ListSectors() ([]pipeline.SectorInfo, error) {
return m.sealing.ListSectors()
}
func (m *Miner) PledgeSector(ctx context.Context) (storiface.SectorRef, error) {
return m.sealing.PledgeSector(ctx)
}
func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state pipeline.SectorState) error {
return m.sealing.ForceSectorState(ctx, id, state)
}
func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error {
return m.sealing.Remove(ctx, id)
}
func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error {
return m.sealing.Terminate(ctx, id)
}
func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
return m.sealing.TerminateFlush(ctx)
}
func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.TerminatePending(ctx)
}
func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) {
return m.sealing.SectorPreCommitFlush(ctx)
}
func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.SectorPreCommitPending(ctx)
}
func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
return m.sealing.CommitFlush(ctx)
}
func (m *Miner) CommitPending(ctx context.Context) ([]abi.SectorID, error) {
return m.sealing.CommitPending(ctx)
}
func (m *Miner) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.sealing.MatchPendingPiecesToOpenSectors(ctx)
}
func (m *Miner) MarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error {
if snap {
return m.sealing.MarkForSnapUpgrade(ctx, id)
}
return xerrors.Errorf("Old CC upgrade deprecated, use snap deals CC upgrade")
}
func (m *Miner) SectorAbortUpgrade(sectorNum abi.SectorNumber) error {
return m.sealing.AbortUpgrade(sectorNum)
}
func (m *Miner) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) {
return m.sealing.SectorAddPieceToAny(ctx, size, r, d)
}
func (m *Miner) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
if showOnChainInfo {
return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported")
}
info, err := m.sealing.GetSectorInfo(sid)
if err != nil {
return api.SectorInfo{}, err
}
deals := make([]abi.DealID, len(info.Pieces))
pieces := make([]api.SectorPiece, len(info.Pieces))
for i, piece := range info.Pieces {
pieces[i].Piece = piece.Piece
if piece.DealInfo == nil {
continue
}
pdi := *piece.DealInfo // copy
pieces[i].DealInfo = &pdi
deals[i] = piece.DealInfo.DealID
}
log := make([]api.SectorLog, len(info.Log))
for i, l := range info.Log {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
}
}
sInfo := api.SectorInfo{
SectorID: sid,
State: api.SectorState(info.State),
CommD: info.CommD,
CommR: info.CommR,
Proof: info.Proof,
Deals: deals,
Pieces: pieces,
Ticket: api.SealTicket{
Value: info.TicketValue,
Epoch: info.TicketEpoch,
},
Seed: api.SealSeed{
Value: info.SeedValue,
Epoch: info.SeedEpoch,
},
PreCommitMsg: info.PreCommitMessage,
CommitMsg: info.CommitMessage,
Retries: info.InvalidProofs,
ToUpgrade: false,
ReplicaUpdateMessage: info.ReplicaUpdateMessage,
LastErr: info.LastErr,
Log: log,
// on chain info
SealProof: info.SectorType,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
return sInfo, nil
}
var _ sectorblocks.SectorBuilder = &Miner{}

View File

@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -63,7 +64,7 @@ type CommitBatcher struct {
mctx context.Context mctx context.Context
addrSel AddressSelector addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig dtypes.GetSealingConfigFunc
prover storiface.Prover prover storiface.Prover
cutoffs map[abi.SectorNumber]time.Time cutoffs map[abi.SectorNumber]time.Time
@ -75,7 +76,7 @@ type CommitBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher {
b := &CommitBatcher{ b := &CommitBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,

View File

@ -4,7 +4,6 @@ package sealing_test
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/lotus/storage/ctladdr"
"sort" "sort"
"sync" "sync"
"testing" "testing"
@ -25,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/ctladdr"
pipeline "github.com/filecoin-project/lotus/storage/pipeline" pipeline "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/mocks" "github.com/filecoin-project/lotus/storage/pipeline/mocks"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/pipeline/sealiface"

View File

@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sectorblocks"
) )
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
@ -376,7 +377,7 @@ func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp,
} }
} }
func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error { func (m *Sealing) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error {
sp, err := m.currentSealProof(ctx) sp, err := m.currentSealProof(ctx)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get current seal proof: %w", err) return xerrors.Errorf("failed to get current seal proof: %w", err)
@ -708,14 +709,14 @@ func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealPro
return nil return nil
} }
func (m *Sealing) StartPacking(sid abi.SectorNumber) error { func (m *Sealing) StartPackingSector(sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user") log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorStartPacking{}) return m.sectors.Send(uint64(sid), SectorStartPacking{})
} }
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { func (m *Sealing) SectorAbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
m.inputLk.Lock() m.inputLk.Lock()
@ -727,6 +728,78 @@ func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
} }
func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
if showOnChainInfo {
return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported")
}
info, err := m.GetSectorInfo(sid)
if err != nil {
return api.SectorInfo{}, err
}
deals := make([]abi.DealID, len(info.Pieces))
pieces := make([]api.SectorPiece, len(info.Pieces))
for i, piece := range info.Pieces {
pieces[i].Piece = piece.Piece
if piece.DealInfo == nil {
continue
}
pdi := *piece.DealInfo // copy
pieces[i].DealInfo = &pdi
deals[i] = piece.DealInfo.DealID
}
log := make([]api.SectorLog, len(info.Log))
for i, l := range info.Log {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
}
}
sInfo := api.SectorInfo{
SectorID: sid,
State: api.SectorState(info.State),
CommD: info.CommD,
CommR: info.CommR,
Proof: info.Proof,
Deals: deals,
Pieces: pieces,
Ticket: api.SealTicket{
Value: info.TicketValue,
Epoch: info.TicketEpoch,
},
Seed: api.SealSeed{
Value: info.SeedValue,
Epoch: info.SeedEpoch,
},
PreCommitMsg: info.PreCommitMessage,
CommitMsg: info.CommitMessage,
Retries: info.InvalidProofs,
ToUpgrade: false,
ReplicaUpdateMessage: info.ReplicaUpdateMessage,
LastErr: info.LastErr,
Log: log,
// on chain info
SealProof: info.SectorType,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
return sInfo, nil
}
func proposalCID(deal api.PieceDealInfo) cid.Cid { func proposalCID(deal api.PieceDealInfo) cid.Cid {
pc, err := deal.DealProposal.Cid() pc, err := deal.DealProposal.Cid()
if err != nil { if err != nil {
@ -736,3 +809,5 @@ func proposalCID(deal api.PieceDealInfo) cid.Cid {
return pc return pc
} }
var _ sectorblocks.SectorBuilder = &Sealing{}

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/pipeline/sealiface"
) )
@ -52,7 +53,7 @@ type PreCommitBatcher struct {
mctx context.Context mctx context.Context
addrSel AddressSelector addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig dtypes.GetSealingConfigFunc
cutoffs map[abi.SectorNumber]time.Time cutoffs map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*preCommitEntry todo map[abi.SectorNumber]*preCommitEntry
@ -63,7 +64,7 @@ type PreCommitBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *PreCommitBatcher {
b := &PreCommitBatcher{ b := &PreCommitBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
) )
type PreCommitPolicy interface { type PreCommitPolicy interface {
@ -39,7 +40,7 @@ type Chain interface {
// current epoch + the provided default duration. // current epoch + the provided default duration.
type BasicPreCommitPolicy struct { type BasicPreCommitPolicy struct {
api Chain api Chain
getSealingConfig GetSealingConfigFunc getSealingConfig dtypes.GetSealingConfigFunc
provingBuffer abi.ChainEpoch provingBuffer abi.ChainEpoch
} }
@ -48,7 +49,7 @@ type BasicPreCommitPolicy struct {
// //
// The provided duration is used as the default sector expiry when the sector // The provided duration is used as the default sector expiry when the sector
// contains no deals. The proving boundary is used to adjust/align the sector's expiration. // contains no deals. The proving boundary is used to adjust/align the sector's expiration.
func NewBasicPreCommitPolicy(api Chain, cfgGetter GetSealingConfigFunc, provingBuffer abi.ChainEpoch) BasicPreCommitPolicy { func NewBasicPreCommitPolicy(api Chain, cfgGetter dtypes.GetSealingConfigFunc, provingBuffer abi.ChainEpoch) BasicPreCommitPolicy {
return BasicPreCommitPolicy{ return BasicPreCommitPolicy{
api: api, api: api,
getSealingConfig: cfgGetter, getSealingConfig: cfgGetter,

View File

@ -26,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
@ -119,7 +120,7 @@ type Sealing struct {
precommiter *PreCommitBatcher precommiter *PreCommitBatcher
commiter *CommitBatcher commiter *CommitBatcher
getConfig GetSealingConfigFunc getConfig dtypes.GetSealingConfigFunc
} }
type openSector struct { type openSector struct {
@ -160,7 +161,7 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
} }
func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing { func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing {
s := &Sealing{ s := &Sealing{
Api: api, Api: api,
DealInfo: &CurrentDealInfoManager{api}, DealInfo: &CurrentDealInfoManager{api},
@ -218,6 +219,7 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
} }
func (m *Sealing) Run(ctx context.Context) error { func (m *Sealing) Run(ctx context.Context) error {
if err := m.restartSectors(ctx); err != nil { if err := m.restartSectors(ctx); err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return xerrors.Errorf("failed load sector states: %w", err) return xerrors.Errorf("failed load sector states: %w", err)
@ -237,13 +239,13 @@ func (m *Sealing) Stop(ctx context.Context) error {
return nil return nil
} }
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { func (m *Sealing) RemoveSector(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorRemove{}) return m.sectors.Send(uint64(sid), SectorRemove{})
} }
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { func (m *Sealing) TerminateSector(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorTerminate{}) return m.sectors.Send(uint64(sid), SectorTerminate{})

View File

@ -22,6 +22,7 @@ import (
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
) )
type TerminateBatcherApi interface { type TerminateBatcherApi interface {
@ -44,7 +45,7 @@ type TerminateBatcher struct {
mctx context.Context mctx context.Context
addrSel AddressSelector addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig dtypes.GetSealingConfigFunc
todo map[lminer.SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField todo map[lminer.SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
@ -55,7 +56,7 @@ type TerminateBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher { func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *TerminateBatcher {
b := &TerminateBatcher{ b := &TerminateBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,

View File

@ -11,7 +11,6 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -195,8 +194,6 @@ type SectorIDCounter interface {
Next() (abi.SectorNumber, error) Next() (abi.SectorNumber, error)
} }
type GetSealingConfigFunc func() (sealiface.Config, error)
// SealingStateEvt is a journal event that records a sector state transition. // SealingStateEvt is a journal event that records a sector state transition.
type SealingStateEvt struct { type SealingStateEvt struct {
SectorNumber abi.SectorNumber SectorNumber abi.SectorNumber

View File

@ -11,7 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error { func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
si, err := m.GetSectorInfo(id) si, err := m.GetSectorInfo(id)
if err != nil { if err != nil {
return xerrors.Errorf("getting sector info: %w", err) return xerrors.Errorf("getting sector info: %w", err)

79
storage/winning_prover.go Normal file
View File

@ -0,0 +1,79 @@
package storage
import (
"context"
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("storageminer")
type StorageWpp struct {
prover storiface.ProverPoSt
verifier storiface.Verifier
miner abi.ActorID
winnRpt abi.RegisteredPoStProof
}
func NewWinningPoStProver(api v1api.FullNode, prover storiface.ProverPoSt, verifier storiface.Verifier, miner dtypes.MinerID) (*StorageWpp, error) {
ma, err := address.NewIDAddress(uint64(miner))
if err != nil {
return nil, err
}
mi, err := api.StateMinerInfo(context.TODO(), ma, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}
if build.InsecurePoStValidation {
log.Warn("*****************************************************************************")
log.Warn(" Generating fake PoSt proof! You should only see this while running tests! ")
log.Warn("*****************************************************************************")
}
return &StorageWpp{prover, verifier, abi.ActorID(miner), mi.WindowPoStProofType}, nil
}
var _ gen.WinningPoStProver = (*StorageWpp)(nil)
func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) {
start := build.Clock.Now()
cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount)
if err != nil {
return nil, xerrors.Errorf("failed to generate candidates: %w", err)
}
log.Infof("Generate candidates took %s (C: %+v)", time.Since(start), cds)
return cds, nil
}
func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, currEpoch abi.ChainEpoch, nv network.Version) ([]builtin.PoStProof, error) {
if build.InsecurePoStValidation {
return []builtin.PoStProof{{ProofBytes: []byte("valid proof")}}, nil
}
log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand)
start := build.Clock.Now()
proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand)
if err != nil {
return nil, err
}
log.Infof("GenerateWinningPoSt took %s", time.Since(start))
return proof, nil
}