diff --git a/node/builder.go b/node/builder.go index a8b082008..a36c8e66d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -106,6 +106,7 @@ const ( RelayIndexerMessagesKey // miner + PreflightChecksKey GetParamsKey HandleMigrateProviderFundsKey HandleDealsKey diff --git a/node/builder_miner.go b/node/builder_miner.go index b72b23761..54ab22683 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -116,9 +116,11 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter), Override(new(*miner.Miner), modules.SetupBlockProducer), Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver), - Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)), + Override(PreflightChecksKey, modules.PreflightChecks), + Override(new(*sealing.Sealing), modules.SealingPipeline(cfg.Fees)), + Override(new(*wdpost.WindowPoStScheduler), modules.WindowPostScheduler(cfg.Fees, cfg.Proving)), - Override(new(sectorblocks.SectorBuilder), From(new(*storage.Miner))), + Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))), ), If(cfg.Subsystems.EnableSectorStorage, diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 74f4429ef..4d1659dbd 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -49,7 +49,6 @@ import ( "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/paths" sealing "github.com/filecoin-project/lotus/storage/pipeline" @@ -88,7 +87,7 @@ type StorageMinerAPI struct { DAGStoreWrapper *mktsdagstore.Wrapper `optional:"true"` // Miner / storage - Miner *storage.Miner `optional:"true"` + Miner *sealing.Sealing `optional:"true"` BlockMiner *miner.Miner `optional:"true"` StorageMgr *sealer.Manager `optional:"true"` IStorageMgr sealer.SectorManager `optional:"true"` @@ -405,7 +404,11 @@ func (sm *StorageMinerAPI) SectorPreCommitPending(ctx context.Context) ([]abi.Se } func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error { - return sm.Miner.MarkForUpgrade(ctx, id, snap) + if !snap { + return fmt.Errorf("non-snap upgrades are not supported") + } + + return sm.Miner.MarkForUpgrade(ctx, id) } func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index fc6c6d888..c28342504 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -50,6 +50,7 @@ import ( "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/types" @@ -64,7 +65,6 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/paths" sealing "github.com/filecoin-project/lotus/storage/pipeline" @@ -202,7 +202,37 @@ func AddressSelector(addrConf *config.MinerAddressConfig) func() (*ctladdr.Addre } } -type StorageMinerParams struct { +func PreflightChecks(mctx helpers.MetricsCtx, lc fx.Lifecycle, api v1api.FullNode, maddr dtypes.MinerAddress) error { + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{OnStart: func(context.Context) error { + mi, err := api.StateMinerInfo(ctx, address.Address(maddr), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("failed to resolve miner info: %w", err) + } + + workerKey, err := api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("failed to resolve worker key: %w", err) + } + + has, err := api.WalletHas(ctx, workerKey) + if err != nil { + return xerrors.Errorf("failed to check wallet for worker key: %w", err) + } + + if !has { + return errors.New("key for worker not found in local wallet") + } + + log.Infof("starting up miner %s, worker addr %s", maddr, workerKey) + return nil + }}) + + return nil +} + +type SealingPipelineParams struct { fx.In Lifecycle fx.Lifecycle @@ -219,8 +249,8 @@ type StorageMinerParams struct { Maddr dtypes.MinerAddress } -func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) { - return func(params StorageMinerParams) (*storage.Miner, error) { +func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams) (*sealing.Sealing, error) { + return func(params SealingPipelineParams) (*sealing.Sealing, error) { var ( ds = params.MetadataDS mctx = params.MetricsCtx @@ -238,24 +268,34 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st ctx := helpers.LifecycleCtx(mctx, lc) - sm, err := storage.NewMiner(api, maddr, ds, sealer, sc, verif, prover, gsd, fc, j, as) + evts, err := events.NewEvents(ctx, api) if err != nil { - return nil, err + return nil, xerrors.Errorf("failed to subscribe to events: %w", err) } + md, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting miner info: %w", err) + } + provingBuffer := md.WPoStProvingPeriod * 2 + pcp := sealing.NewBasicPreCommitPolicy(api, gsd, provingBuffer) + + pipeline := sealing.New(ctx, api, fc, evts, maddr, ds, sealer, sc, verif, prover, &pcp, gsd, j, as) + lc.Append(fx.Hook{ OnStart: func(context.Context) error { - return sm.Run(ctx) + go pipeline.Run(ctx) + return nil }, - OnStop: sm.Stop, + OnStop: pipeline.Stop, }) - return sm, nil + return pipeline, nil } } -func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params StorageMinerParams) (*wdpost.WindowPoStScheduler, error) { - return func(params StorageMinerParams) (*wdpost.WindowPoStScheduler, error) { +func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) { + return func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) { var ( mctx = params.MetricsCtx lc = params.Lifecycle diff --git a/storage/miner.go b/storage/miner.go deleted file mode 100644 index fadd94962..000000000 --- a/storage/miner.go +++ /dev/null @@ -1,266 +0,0 @@ -package storage - -import ( - "context" - "errors" - "time" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-bitfield" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/builtin/v8/miner" - "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/go-state-types/dline" - "github.com/filecoin-project/go-state-types/network" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors/builtin" - lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/events" - "github.com/filecoin-project/lotus/chain/gen" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/storage/ctladdr" - pipeline "github.com/filecoin-project/lotus/storage/pipeline" - "github.com/filecoin-project/lotus/storage/sealer" - "github.com/filecoin-project/lotus/storage/sealer/storiface" -) - -var log = logging.Logger("storageminer") - -// Miner is the central miner entrypoint object inside Lotus. It is -// instantiated in the node builder, along with the WindowPoStScheduler. -// -// This object is the owner of the sealing pipeline. Most of the actual logic -// lives in the pipeline module (sealing.Sealing), and the Miner object -// exposes it to the rest of the system by proxying calls. -// -// Miner#Run starts the sealing FSM. -type Miner struct { - api fullNodeFilteredAPI - feeCfg config.MinerFeeConfig - sealer sealer.SectorManager - ds datastore.Batching - sc pipeline.SectorIDCounter - verif storiface.Verifier - prover storiface.Prover - addrSel *ctladdr.AddressSelector - - maddr address.Address - - getSealConfig dtypes.GetSealingConfigFunc - sealing *pipeline.Sealing - - journal journal.Journal -} - -// fullNodeFilteredAPI is the subset of the full node API the Miner needs from -// a Lotus full node. -type fullNodeFilteredAPI interface { - // Call a read only method on actors (no interaction with the chain required) - StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) - StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) - StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) - StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error) - StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error) - StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) - StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tok types.TipSetKey) (types.BigInt, error) - StateMinerActiveSectors(context.Context, address.Address, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) - StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error) - StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error) - StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) - StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) - StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) - StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (bool, error) - StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) - StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) - StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) - StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) - StateMinerFaults(context.Context, address.Address, types.TipSetKey) (bitfield.BitField, error) - StateMinerRecoveries(context.Context, address.Address, types.TipSetKey) (bitfield.BitField, error) - StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) - StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) - StateComputeDataCID(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) - StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) - - MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) - - GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) - GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) - GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) - - ChainHead(context.Context) (*types.TipSet, error) - ChainNotify(context.Context) (<-chan []*api.HeadChange, error) - StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) - StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) - ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) - ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) - ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) - ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) - ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error) - ChainReadObj(context.Context, cid.Cid) ([]byte, error) - ChainHasObj(context.Context, cid.Cid) (bool, error) - ChainPutObj(context.Context, blocks.Block) error - ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) - - WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error) - WalletBalance(context.Context, address.Address) (types.BigInt, error) - WalletHas(context.Context, address.Address) (bool, error) -} - -// NewMiner creates a new Miner object. -func NewMiner(api fullNodeFilteredAPI, - maddr address.Address, - ds datastore.Batching, - sealer sealer.SectorManager, - sc pipeline.SectorIDCounter, - verif storiface.Verifier, - prover storiface.Prover, - gsd dtypes.GetSealingConfigFunc, - feeCfg config.MinerFeeConfig, - journal journal.Journal, - as *ctladdr.AddressSelector) (*Miner, error) { - m := &Miner{ - api: api, - feeCfg: feeCfg, - sealer: sealer, - ds: ds, - sc: sc, - verif: verif, - prover: prover, - addrSel: as, - - maddr: maddr, - getSealConfig: gsd, - journal: journal, - } - - return m, nil -} - -// Run starts the sealing FSM in the background, running preliminary checks first. -func (m *Miner) Run(ctx context.Context) error { - if err := m.runPreflightChecks(ctx); err != nil { - return xerrors.Errorf("miner preflight checks failed: %w", err) - } - - md, err := m.api.StateMinerProvingDeadline(ctx, m.maddr, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting miner info: %w", err) - } - - // consumer of chain head changes. - evts, err := events.NewEvents(ctx, m.api) - if err != nil { - return xerrors.Errorf("failed to subscribe to events: %w", err) - } - - // Instantiate a precommit policy. - cfg := pipeline.GetSealingConfigFunc(m.getSealConfig) - provingBuffer := md.WPoStProvingPeriod * 2 - - pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer) - - // Instantiate the sealing FSM. - m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, m.addrSel) - - // Run the sealing FSM. - go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function - - return nil -} - -func (m *Miner) Stop(ctx context.Context) error { - return m.sealing.Stop(ctx) -} - -// runPreflightChecks verifies that preconditions to run the miner are satisfied. -func (m *Miner) runPreflightChecks(ctx context.Context) error { - mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("failed to resolve miner info: %w", err) - } - - workerKey, err := m.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("failed to resolve worker key: %w", err) - } - - has, err := m.api.WalletHas(ctx, workerKey) - if err != nil { - return xerrors.Errorf("failed to check wallet for worker key: %w", err) - } - - if !has { - return errors.New("key for worker not found in local wallet") - } - - log.Infof("starting up miner %s, worker addr %s", m.maddr, workerKey) - return nil -} - -type StorageWpp struct { - prover storiface.ProverPoSt - verifier storiface.Verifier - miner abi.ActorID - winnRpt abi.RegisteredPoStProof -} - -func NewWinningPoStProver(api v1api.FullNode, prover storiface.ProverPoSt, verifier storiface.Verifier, miner dtypes.MinerID) (*StorageWpp, error) { - ma, err := address.NewIDAddress(uint64(miner)) - if err != nil { - return nil, err - } - - mi, err := api.StateMinerInfo(context.TODO(), ma, types.EmptyTSK) - if err != nil { - return nil, xerrors.Errorf("getting sector size: %w", err) - } - - if build.InsecurePoStValidation { - log.Warn("*****************************************************************************") - log.Warn(" Generating fake PoSt proof! You should only see this while running tests! ") - log.Warn("*****************************************************************************") - } - - return &StorageWpp{prover, verifier, abi.ActorID(miner), mi.WindowPoStProofType}, nil -} - -var _ gen.WinningPoStProver = (*StorageWpp)(nil) - -func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) { - start := build.Clock.Now() - - cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount) - if err != nil { - return nil, xerrors.Errorf("failed to generate candidates: %w", err) - } - log.Infof("Generate candidates took %s (C: %+v)", time.Since(start), cds) - return cds, nil -} - -func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, currEpoch abi.ChainEpoch, nv network.Version) ([]builtin.PoStProof, error) { - if build.InsecurePoStValidation { - return []builtin.PoStProof{{ProofBytes: []byte("valid proof")}}, nil - } - - log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand) - - start := build.Clock.Now() - proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand) - if err != nil { - return nil, err - } - log.Infof("GenerateWinningPoSt took %s", time.Since(start)) - return proof, nil -} diff --git a/storage/miner_sealing.go b/storage/miner_sealing.go deleted file mode 100644 index 11db5a845..000000000 --- a/storage/miner_sealing.go +++ /dev/null @@ -1,165 +0,0 @@ -package storage - -import ( - "context" - - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/lotus/api" - pipeline "github.com/filecoin-project/lotus/storage/pipeline" - "github.com/filecoin-project/lotus/storage/pipeline/sealiface" - "github.com/filecoin-project/lotus/storage/sealer/storiface" - "github.com/filecoin-project/lotus/storage/sectorblocks" -) - -// TODO: refactor this to be direct somehow - -func (m *Miner) Address() address.Address { - return m.sealing.Address() -} - -func (m *Miner) StartPackingSector(sectorNum abi.SectorNumber) error { - return m.sealing.StartPacking(sectorNum) -} - -func (m *Miner) ListSectors() ([]pipeline.SectorInfo, error) { - return m.sealing.ListSectors() -} - -func (m *Miner) PledgeSector(ctx context.Context) (storiface.SectorRef, error) { - return m.sealing.PledgeSector(ctx) -} - -func (m *Miner) ForceSectorState(ctx context.Context, id abi.SectorNumber, state pipeline.SectorState) error { - return m.sealing.ForceSectorState(ctx, id, state) -} - -func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error { - return m.sealing.Remove(ctx, id) -} - -func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error { - return m.sealing.Terminate(ctx, id) -} - -func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) { - return m.sealing.TerminateFlush(ctx) -} - -func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { - return m.sealing.TerminatePending(ctx) -} - -func (m *Miner) SectorPreCommitFlush(ctx context.Context) ([]sealiface.PreCommitBatchRes, error) { - return m.sealing.SectorPreCommitFlush(ctx) -} - -func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) { - return m.sealing.SectorPreCommitPending(ctx) -} - -func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { - return m.sealing.CommitFlush(ctx) -} - -func (m *Miner) CommitPending(ctx context.Context) ([]abi.SectorID, error) { - return m.sealing.CommitPending(ctx) -} - -func (m *Miner) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error { - return m.sealing.MatchPendingPiecesToOpenSectors(ctx) -} - -func (m *Miner) MarkForUpgrade(ctx context.Context, id abi.SectorNumber, snap bool) error { - if snap { - return m.sealing.MarkForSnapUpgrade(ctx, id) - } - return xerrors.Errorf("Old CC upgrade deprecated, use snap deals CC upgrade") -} - -func (m *Miner) SectorAbortUpgrade(sectorNum abi.SectorNumber) error { - return m.sealing.AbortUpgrade(sectorNum) -} - -func (m *Miner) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storiface.Data, d api.PieceDealInfo) (api.SectorOffset, error) { - return m.sealing.SectorAddPieceToAny(ctx, size, r, d) -} - -func (m *Miner) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { - if showOnChainInfo { - return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported") - } - - info, err := m.sealing.GetSectorInfo(sid) - if err != nil { - return api.SectorInfo{}, err - } - - deals := make([]abi.DealID, len(info.Pieces)) - pieces := make([]api.SectorPiece, len(info.Pieces)) - for i, piece := range info.Pieces { - pieces[i].Piece = piece.Piece - if piece.DealInfo == nil { - continue - } - - pdi := *piece.DealInfo // copy - pieces[i].DealInfo = &pdi - - deals[i] = piece.DealInfo.DealID - } - - log := make([]api.SectorLog, len(info.Log)) - for i, l := range info.Log { - log[i] = api.SectorLog{ - Kind: l.Kind, - Timestamp: l.Timestamp, - Trace: l.Trace, - Message: l.Message, - } - } - - sInfo := api.SectorInfo{ - SectorID: sid, - State: api.SectorState(info.State), - CommD: info.CommD, - CommR: info.CommR, - Proof: info.Proof, - Deals: deals, - Pieces: pieces, - Ticket: api.SealTicket{ - Value: info.TicketValue, - Epoch: info.TicketEpoch, - }, - Seed: api.SealSeed{ - Value: info.SeedValue, - Epoch: info.SeedEpoch, - }, - PreCommitMsg: info.PreCommitMessage, - CommitMsg: info.CommitMessage, - Retries: info.InvalidProofs, - ToUpgrade: false, - ReplicaUpdateMessage: info.ReplicaUpdateMessage, - - LastErr: info.LastErr, - Log: log, - // on chain info - SealProof: info.SectorType, - Activation: 0, - Expiration: 0, - DealWeight: big.Zero(), - VerifiedDealWeight: big.Zero(), - InitialPledge: big.Zero(), - OnTime: 0, - Early: 0, - } - - return sInfo, nil -} - -var _ sectorblocks.SectorBuilder = &Miner{} diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 8ab7d77c5..d3d6fd12c 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -25,6 +25,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -63,7 +64,7 @@ type CommitBatcher struct { mctx context.Context addrSel AddressSelector feeCfg config.MinerFeeConfig - getConfig GetSealingConfigFunc + getConfig dtypes.GetSealingConfigFunc prover storiface.Prover cutoffs map[abi.SectorNumber]time.Time @@ -75,7 +76,7 @@ type CommitBatcher struct { lk sync.Mutex } -func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { +func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { b := &CommitBatcher{ api: api, maddr: maddr, diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index 439ee0ed1..b25b78438 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -4,7 +4,6 @@ package sealing_test import ( "bytes" "context" - "github.com/filecoin-project/lotus/storage/ctladdr" "sort" "sync" "testing" @@ -25,6 +24,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/ctladdr" pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/mocks" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index d59630e84..a95e9cbf7 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/filecoin-project/lotus/storage/sectorblocks" ) func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { @@ -376,7 +377,7 @@ func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, } } -func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error { +func (m *Sealing) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error { sp, err := m.currentSealProof(ctx) if err != nil { return xerrors.Errorf("failed to get current seal proof: %w", err) @@ -708,14 +709,14 @@ func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealPro return nil } -func (m *Sealing) StartPacking(sid abi.SectorNumber) error { +func (m *Sealing) StartPackingSector(sid abi.SectorNumber) error { m.startupWait.Wait() log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user") return m.sectors.Send(uint64(sid), SectorStartPacking{}) } -func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { +func (m *Sealing) SectorAbortUpgrade(sid abi.SectorNumber) error { m.startupWait.Wait() m.inputLk.Lock() @@ -727,6 +728,78 @@ func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) } +func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) { + if showOnChainInfo { + return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported") + } + + info, err := m.GetSectorInfo(sid) + if err != nil { + return api.SectorInfo{}, err + } + + deals := make([]abi.DealID, len(info.Pieces)) + pieces := make([]api.SectorPiece, len(info.Pieces)) + for i, piece := range info.Pieces { + pieces[i].Piece = piece.Piece + if piece.DealInfo == nil { + continue + } + + pdi := *piece.DealInfo // copy + pieces[i].DealInfo = &pdi + + deals[i] = piece.DealInfo.DealID + } + + log := make([]api.SectorLog, len(info.Log)) + for i, l := range info.Log { + log[i] = api.SectorLog{ + Kind: l.Kind, + Timestamp: l.Timestamp, + Trace: l.Trace, + Message: l.Message, + } + } + + sInfo := api.SectorInfo{ + SectorID: sid, + State: api.SectorState(info.State), + CommD: info.CommD, + CommR: info.CommR, + Proof: info.Proof, + Deals: deals, + Pieces: pieces, + Ticket: api.SealTicket{ + Value: info.TicketValue, + Epoch: info.TicketEpoch, + }, + Seed: api.SealSeed{ + Value: info.SeedValue, + Epoch: info.SeedEpoch, + }, + PreCommitMsg: info.PreCommitMessage, + CommitMsg: info.CommitMessage, + Retries: info.InvalidProofs, + ToUpgrade: false, + ReplicaUpdateMessage: info.ReplicaUpdateMessage, + + LastErr: info.LastErr, + Log: log, + // on chain info + SealProof: info.SectorType, + Activation: 0, + Expiration: 0, + DealWeight: big.Zero(), + VerifiedDealWeight: big.Zero(), + InitialPledge: big.Zero(), + OnTime: 0, + Early: 0, + } + + return sInfo, nil +} + func proposalCID(deal api.PieceDealInfo) cid.Cid { pc, err := deal.DealProposal.Cid() if err != nil { @@ -736,3 +809,5 @@ func proposalCID(deal api.PieceDealInfo) cid.Cid { return pc } + +var _ sectorblocks.SectorBuilder = &Sealing{} diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index cb34e7f6c..d0de68daa 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" ) @@ -52,7 +53,7 @@ type PreCommitBatcher struct { mctx context.Context addrSel AddressSelector feeCfg config.MinerFeeConfig - getConfig GetSealingConfigFunc + getConfig dtypes.GetSealingConfigFunc cutoffs map[abi.SectorNumber]time.Time todo map[abi.SectorNumber]*preCommitEntry @@ -63,7 +64,7 @@ type PreCommitBatcher struct { lk sync.Mutex } -func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { +func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *PreCommitBatcher { b := &PreCommitBatcher{ api: api, maddr: maddr, diff --git a/storage/pipeline/precommit_policy.go b/storage/pipeline/precommit_policy.go index 8a753ccf9..e0761d209 100644 --- a/storage/pipeline/precommit_policy.go +++ b/storage/pipeline/precommit_policy.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) type PreCommitPolicy interface { @@ -39,7 +40,7 @@ type Chain interface { // current epoch + the provided default duration. type BasicPreCommitPolicy struct { api Chain - getSealingConfig GetSealingConfigFunc + getSealingConfig dtypes.GetSealingConfigFunc provingBuffer abi.ChainEpoch } @@ -48,7 +49,7 @@ type BasicPreCommitPolicy struct { // // The provided duration is used as the default sector expiry when the sector // contains no deals. The proving boundary is used to adjust/align the sector's expiration. -func NewBasicPreCommitPolicy(api Chain, cfgGetter GetSealingConfigFunc, provingBuffer abi.ChainEpoch) BasicPreCommitPolicy { +func NewBasicPreCommitPolicy(api Chain, cfgGetter dtypes.GetSealingConfigFunc, provingBuffer abi.ChainEpoch) BasicPreCommitPolicy { return BasicPreCommitPolicy{ api: api, getSealingConfig: cfgGetter, diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 072307bf5..c261c0468 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -26,6 +26,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer" @@ -119,7 +120,7 @@ type Sealing struct { precommiter *PreCommitBatcher commiter *CommitBatcher - getConfig GetSealingConfigFunc + getConfig dtypes.GetSealingConfigFunc } type openSector struct { @@ -160,7 +161,7 @@ type pendingPiece struct { accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } -func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing { +func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc dtypes.GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing { s := &Sealing{ Api: api, DealInfo: &CurrentDealInfoManager{api}, @@ -218,6 +219,7 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events } func (m *Sealing) Run(ctx context.Context) error { + if err := m.restartSectors(ctx); err != nil { log.Errorf("%+v", err) return xerrors.Errorf("failed load sector states: %w", err) @@ -237,13 +239,13 @@ func (m *Sealing) Stop(ctx context.Context) error { return nil } -func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { +func (m *Sealing) RemoveSector(ctx context.Context, sid abi.SectorNumber) error { m.startupWait.Wait() return m.sectors.Send(uint64(sid), SectorRemove{}) } -func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { +func (m *Sealing) TerminateSector(ctx context.Context, sid abi.SectorNumber) error { m.startupWait.Wait() return m.sectors.Send(uint64(sid), SectorTerminate{}) diff --git a/storage/pipeline/terminate_batch.go b/storage/pipeline/terminate_batch.go index 9212cbc65..4842a4e1b 100644 --- a/storage/pipeline/terminate_batch.go +++ b/storage/pipeline/terminate_batch.go @@ -22,6 +22,7 @@ import ( lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) type TerminateBatcherApi interface { @@ -44,7 +45,7 @@ type TerminateBatcher struct { mctx context.Context addrSel AddressSelector feeCfg config.MinerFeeConfig - getConfig GetSealingConfigFunc + getConfig dtypes.GetSealingConfigFunc todo map[lminer.SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField @@ -55,7 +56,7 @@ type TerminateBatcher struct { lk sync.Mutex } -func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher { +func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig dtypes.GetSealingConfigFunc) *TerminateBatcher { b := &TerminateBatcher{ api: api, maddr: maddr, diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 17d1edfb7..69045bdfb 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -195,8 +194,6 @@ type SectorIDCounter interface { Next() (abi.SectorNumber, error) } -type GetSealingConfigFunc func() (sealiface.Config, error) - // SealingStateEvt is a journal event that records a sector state transition. type SealingStateEvt struct { SectorNumber abi.SectorNumber diff --git a/storage/pipeline/upgrade_queue.go b/storage/pipeline/upgrade_queue.go index 97dbf4ced..309e59573 100644 --- a/storage/pipeline/upgrade_queue.go +++ b/storage/pipeline/upgrade_queue.go @@ -11,7 +11,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error { +func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { si, err := m.GetSectorInfo(id) if err != nil { return xerrors.Errorf("getting sector info: %w", err) diff --git a/storage/winning_prover.go b/storage/winning_prover.go new file mode 100644 index 000000000..d61cd213c --- /dev/null +++ b/storage/winning_prover.go @@ -0,0 +1,79 @@ +package storage + +import ( + "context" + "time" + + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/gen" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("storageminer") + +type StorageWpp struct { + prover storiface.ProverPoSt + verifier storiface.Verifier + miner abi.ActorID + winnRpt abi.RegisteredPoStProof +} + +func NewWinningPoStProver(api v1api.FullNode, prover storiface.ProverPoSt, verifier storiface.Verifier, miner dtypes.MinerID) (*StorageWpp, error) { + ma, err := address.NewIDAddress(uint64(miner)) + if err != nil { + return nil, err + } + + mi, err := api.StateMinerInfo(context.TODO(), ma, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting sector size: %w", err) + } + + if build.InsecurePoStValidation { + log.Warn("*****************************************************************************") + log.Warn(" Generating fake PoSt proof! You should only see this while running tests! ") + log.Warn("*****************************************************************************") + } + + return &StorageWpp{prover, verifier, abi.ActorID(miner), mi.WindowPoStProofType}, nil +} + +var _ gen.WinningPoStProver = (*StorageWpp)(nil) + +func (wpp *StorageWpp) GenerateCandidates(ctx context.Context, randomness abi.PoStRandomness, eligibleSectorCount uint64) ([]uint64, error) { + start := build.Clock.Now() + + cds, err := wpp.verifier.GenerateWinningPoStSectorChallenge(ctx, wpp.winnRpt, wpp.miner, randomness, eligibleSectorCount) + if err != nil { + return nil, xerrors.Errorf("failed to generate candidates: %w", err) + } + log.Infof("Generate candidates took %s (C: %+v)", time.Since(start), cds) + return cds, nil +} + +func (wpp *StorageWpp) ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, currEpoch abi.ChainEpoch, nv network.Version) ([]builtin.PoStProof, error) { + if build.InsecurePoStValidation { + return []builtin.PoStProof{{ProofBytes: []byte("valid proof")}}, nil + } + + log.Infof("Computing WinningPoSt ;%+v; %v", ssi, rand) + + start := build.Clock.Now() + proof, err := wpp.prover.GenerateWinningPoSt(ctx, wpp.miner, ssi, rand) + if err != nil { + return nil, err + } + log.Infof("GenerateWinningPoSt took %s", time.Since(start)) + return proof, nil +}