From df7631df778217330b6c877f5d54528eb7ce96cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 May 2021 13:07:42 +0100 Subject: [PATCH 01/13] docs: rename some wdpost methods; add docs. --- storage/wdpost_run.go | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index cec86a09b..15525e001 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -233,8 +233,25 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B return sbf, nil } -func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { - ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries") +// declareRecoveries identifies sectors that were previously marked as faulty +// for our miner, but are now recovered (i.e. are now provable again) and +// still not reported as such. +// +// It then reports the recovery on chain via a `DeclareFaultsRecovered` +// message to our miner actor. +// +// This is always invoked ahead of time, before the deadline for the evaluated +// sectors arrives. That way, recoveries are declared in preparation for those +// sectors to be proven. +// +// If a declaration is made, it awaits for build.MessageConfidence confirmations +// on chain before returning. +// +// TODO: the waiting should happen in the background. Right now this +// is blocking/delaying the actual generation and submission of WindowPoSts in +// this deadline! +func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { + ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries") defer span.End() faulty := uint64(0) @@ -325,8 +342,21 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin return recoveries, sm, nil } -func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) { - ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults") +// declareFaults identifies the sectors on the specified proving deadline that +// are faulty, and reports the faults on chain via the `DeclareFaults` message +// to our miner actor. +// +// This is always invoked ahead of time, before the deadline for the evaluated +// sectors arrives. That way, faults are declared before a penalty is accrued. +// +// If a declaration is made, it awaits for build.MessageConfidence confirmations +// on chain before returning. +// +// TODO: the waiting should happen in the background. Right now this +// is blocking/delaying the actual generation and submission of WindowPoSts in +// this deadline! +func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) { + ctx, span := trace.StartSpan(ctx, "storage.declareFaults") defer span.End() bad := uint64(0) @@ -443,7 +473,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty } ) - if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { + if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse log.Errorf("checking sector recoveries: %v", err) } @@ -462,7 +492,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty return // FORK: declaring faults after ignition upgrade makes no sense } - if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { + if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { // TODO: This is also potentially really bad, but we try to post anyways log.Errorf("checking sector faults: %v", err) } From 5daacc0f07b2b8548eb5ce602eb94725d492aa7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 May 2021 13:08:52 +0100 Subject: [PATCH 02/13] docs: add docs to chain store methods. --- chain/store/store.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/chain/store/store.go b/chain/store/store.go index 1e78ce73d..7caddbd5c 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -853,6 +853,14 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet, return cs.LoadTipSet(l[len(l)-1].Parents()) } +// ReorgOps takes two tipsets (which can be at different heights), and walks +// their corresponding chains backwards one step at a time until we find +// a common ancestor. It then returns the respective chain segments that fork +// from the identified ancestor, in reverse order, where the first element of +// each slice is the supplied tipset, and the last element is the common +// ancenstor. +// +// If an error happens along the way, we return the error with nil slices. func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { return ReorgOps(cs.LoadTipSet, a, b) } @@ -1235,6 +1243,9 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) return blscids, secpkcids, nil } +// GetPath returns returns the sequence of atomic head change operations that +// need to be applied in order to switch the head of the chain from the `from` +// tipset to the `to` tipset. func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) { fts, err := cs.LoadTipSet(from) if err != nil { From 1d6941b0eb0c326fcf228d18f5797a8731749a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 May 2021 14:33:35 +0100 Subject: [PATCH 03/13] rename storage/{sealing=>miner_sealing}.go --- storage/{sealing.go => miner_sealing.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename storage/{sealing.go => miner_sealing.go} (100%) diff --git a/storage/sealing.go b/storage/miner_sealing.go similarity index 100% rename from storage/sealing.go rename to storage/miner_sealing.go From c7d50fe195b69338de65e55d7ec5a18aed35f4b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 May 2021 14:36:16 +0100 Subject: [PATCH 04/13] rename {storageMinerApi=>fullNodeFilteredAPI}; add docs. --- storage/adapter_storage_miner.go | 4 ++-- storage/miner.go | 8 +++++--- storage/wdpost_changehandler.go | 1 + storage/wdpost_run_test.go | 4 ++-- storage/wdpost_sched.go | 8 ++++---- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index fea02651a..502b9adb0 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -31,10 +31,10 @@ import ( var _ sealing.SealingAPI = new(SealingAPIAdapter) type SealingAPIAdapter struct { - delegate storageMinerApi + delegate fullNodeFilteredAPI } -func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter { +func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter { return SealingAPIAdapter{delegate: api} } diff --git a/storage/miner.go b/storage/miner.go index 9a24cbe9d..96184724b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -42,7 +42,7 @@ import ( var log = logging.Logger("storageminer") type Miner struct { - api storageMinerApi + api fullNodeFilteredAPI feeCfg config.MinerFeeConfig h host.Host sealer sectorstorage.SectorManager @@ -70,7 +70,9 @@ type SealingStateEvt struct { Error string } -type storageMinerApi interface { +// fullNodeFilteredAPI is the subset of the full node API the Miner needs from +// a Lotus full node. +type fullNodeFilteredAPI interface { // Call a read only method on actors (no interaction with the chain required) StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) @@ -116,7 +118,7 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) { +func NewMiner(api fullNodeFilteredAPI, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) { m := &Miner{ api: api, feeCfg: feeCfg, diff --git a/storage/wdpost_changehandler.go b/storage/wdpost_changehandler.go index 188d7e93a..081cfecaf 100644 --- a/storage/wdpost_changehandler.go +++ b/storage/wdpost_changehandler.go @@ -25,6 +25,7 @@ type changeHandlerAPI interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc + onAbort(ts *types.TipSet, deadline *dline.Info) failPost(err error, ts *types.TipSet, deadline *dline.Info) } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index 6a55bad1f..584369dff 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -35,7 +35,7 @@ import ( type mockStorageMinerAPI struct { partitions []api.Partition pushedMessages chan *types.Message - storageMinerApi + fullNodeFilteredAPI } func newMockStorageMinerAPI() *mockStorageMinerAPI { @@ -389,4 +389,4 @@ func (m *mockStorageMinerAPI) WalletHas(ctx context.Context, address address.Add return true, nil } -var _ storageMinerApi = &mockStorageMinerAPI{} +var _ fullNodeFilteredAPI = &mockStorageMinerAPI{} diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 8c24a5516..0ebd491ec 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -24,7 +24,7 @@ import ( ) type WindowPoStScheduler struct { - api storageMinerApi + api fullNodeFilteredAPI feeCfg config.MinerFeeConfig addrSel *AddressSelector prover storage.Prover @@ -43,7 +43,7 @@ type WindowPoStScheduler struct { // failLk sync.Mutex } -func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, verif ffiwrapper.Verifier, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) { +func NewWindowedPoStScheduler(api fullNodeFilteredAPI, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, verif ffiwrapper.Verifier, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) { mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting sector size: %w", err) @@ -71,13 +71,13 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as } type changeHandlerAPIImpl struct { - storageMinerApi + fullNodeFilteredAPI *WindowPoStScheduler } func (s *WindowPoStScheduler) Run(ctx context.Context) { // Initialize change handler - chImpl := &changeHandlerAPIImpl{storageMinerApi: s.api, WindowPoStScheduler: s} + chImpl := &changeHandlerAPIImpl{fullNodeFilteredAPI: s.api, WindowPoStScheduler: s} s.ch = newChangeHandler(chImpl, s.actor) defer s.ch.shutdown() s.ch.start() From 2d7f4b1c6121872a78a063f664f338151059e082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 19:45:47 +0100 Subject: [PATCH 05/13] docs: add godocs to storage module. --- extern/storage-sealing/precommit_policy.go | 5 ++++- storage/miner.go | 23 +++++++++++++++++++++- storage/wdpost_changehandler.go | 2 +- storage/wdpost_run.go | 13 +++++++++++- storage/wdpost_sched.go | 20 ++++++++++++++++--- 5 files changed, 56 insertions(+), 7 deletions(-) diff --git a/extern/storage-sealing/precommit_policy.go b/extern/storage-sealing/precommit_policy.go index 0b774b56f..a6add5693 100644 --- a/extern/storage-sealing/precommit_policy.go +++ b/extern/storage-sealing/precommit_policy.go @@ -40,7 +40,10 @@ type BasicPreCommitPolicy struct { duration abi.ChainEpoch } -// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy +// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy. +// +// The provided duration is used as the default sector expiry when the sector +// contains no deals. The proving boundary is used to adjust/align the sector's expiration. func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy { return BasicPreCommitPolicy{ api: api, diff --git a/storage/miner.go b/storage/miner.go index 96184724b..dc8fb019a 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -41,6 +41,14 @@ import ( var log = logging.Logger("storageminer") +// Miner is the central miner entrypoint object inside Lotus. It is +// instantiated in the node builder, along with the WindowPoStScheduler. +// +// This object is the owner of the sealing pipeline. Most of the actual logic +// lives in the storage-sealing module (sealing.Sealing), and the Miner object +// exposes it to the rest of the system by proxying calls. +// +// Miner#Run starts the sealing FSM. type Miner struct { api fullNodeFilteredAPI feeCfg config.MinerFeeConfig @@ -118,7 +126,18 @@ type fullNodeFilteredAPI interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api fullNodeFilteredAPI, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) { +// NewMiner creates a new Miner object. +func NewMiner(api fullNodeFilteredAPI, + maddr address.Address, + h host.Host, + ds datastore.Batching, + sealer sectorstorage.SectorManager, + sc sealing.SectorIDCounter, + verif ffiwrapper.Verifier, + gsd dtypes.GetSealingConfigFunc, + feeCfg config.MinerFeeConfig, + journal journal.Journal, + as *AddressSelector) (*Miner, error) { m := &Miner{ api: api, feeCfg: feeCfg, @@ -138,6 +157,7 @@ func NewMiner(api fullNodeFilteredAPI, maddr address.Address, h host.Host, ds da return m, nil } +// Run starts the sealing FSM in the background, running preliminary checks first. func (m *Miner) Run(ctx context.Context) error { if err := m.runPreflightChecks(ctx); err != nil { return xerrors.Errorf("miner preflight checks failed: %w", err) @@ -186,6 +206,7 @@ func (m *Miner) Stop(ctx context.Context) error { return m.sealing.Stop(ctx) } +// runPreflightChecks verifies that preconditions to run the miner are satisfied. func (m *Miner) runPreflightChecks(ctx context.Context) error { mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK) if err != nil { diff --git a/storage/wdpost_changehandler.go b/storage/wdpost_changehandler.go index 081cfecaf..8bcd7164e 100644 --- a/storage/wdpost_changehandler.go +++ b/storage/wdpost_changehandler.go @@ -23,9 +23,9 @@ type CompleteSubmitPoSTCb func(err error) type changeHandlerAPI interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) + startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc - onAbort(ts *types.TipSet, deadline *dline.Info) failPost(err error, ts *types.TipSet, deadline *dline.Info) } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 15525e001..25bacc6c1 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +// failPost records a failure in the journal. func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) { s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { c := evtCommon{Error: err} @@ -440,6 +441,12 @@ func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, p return faults, sm, nil } +// runPost runs a full cycle of the PoSt process: +// +// 1. performs recovery declarations for the next deadline. +// 2. performs fault declarations for the next deadline. +// 3. computes and submits proofs, batching partitions and making sure they +// don't exceed message capacity. func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) { ctx, span := trace.StartSpan(ctx, "storage.runPost") defer span.End() @@ -848,7 +855,9 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, } *msg = *gm - // estimate + // calculate a more frugal estimation; premium is estimated to guarantee + // inclusion within 5 tipsets, and fee cap is estimated for inclusion + // within 4 tipsets. minGasFeeMsg := *msg minGasFeeMsg.GasPremium, err = s.api.GasEstimateGasPremium(ctx, 5, msg.From, msg.GasLimit, types.EmptyTSK) @@ -863,6 +872,8 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, minGasFeeMsg.GasFeeCap = msg.GasFeeCap } + // goodFunds = funds needed for optimal inclusion probability. + // minFunds = funds needed for more speculative inclusion probability. goodFunds := big.Add(msg.RequiredFunds(), msg.Value) minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 0ebd491ec..d69f9c591 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -23,6 +23,12 @@ import ( "go.opencensus.io/trace" ) +// WindowPoStScheduler is the coordinator for WindowPoSt submissions, fault +// declaration, and recovery declarations. It watches the chain for reverts and +// applies, and schedules/run those processes as partition deadlines arrive. +// +// WindowPoStScheduler watches the chain though the changeHandler, which in turn +// turn calls the scheduler when the time arrives to do work. type WindowPoStScheduler struct { api fullNodeFilteredAPI feeCfg config.MinerFeeConfig @@ -43,7 +49,15 @@ type WindowPoStScheduler struct { // failLk sync.Mutex } -func NewWindowedPoStScheduler(api fullNodeFilteredAPI, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, verif ffiwrapper.Verifier, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) { +// NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler. +func NewWindowedPoStScheduler(api fullNodeFilteredAPI, + cfg config.MinerFeeConfig, + as *AddressSelector, + sp storage.Prover, + verif ffiwrapper.Verifier, + ft sectorstorage.FaultTracker, + j journal.Journal, + actor address.Address) (*WindowPoStScheduler, error) { mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting sector size: %w", err) @@ -51,9 +65,9 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI, fc config.MinerFeeConfig, return &WindowPoStScheduler{ api: api, - feeCfg: fc, + feeCfg: cfg, addrSel: as, - prover: sb, + prover: sp, verifier: verif, faultTracker: ft, proofType: mi.WindowPoStProofType, From 0187aa6d9c5d98414e16fc1b81b0e49aac34c6cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 19:46:41 +0100 Subject: [PATCH 06/13] imports reorder. --- storage/miner.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/storage/miner.go b/storage/miner.go index dc8fb019a..4381263d1 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -5,10 +5,6 @@ import ( "errors" "time" - "github.com/filecoin-project/go-state-types/network" - - "github.com/filecoin-project/go-state-types/dline" - "github.com/filecoin-project/go-bitfield" "github.com/ipfs/go-cid" @@ -20,9 +16,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/specs-storage/storage" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" From 89dfb0ba19b2e75edfafd7bc54738cdcbd951451 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 19:47:50 +0100 Subject: [PATCH 07/13] minor refactor, renames, docs in Miner#Run. --- storage/miner.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/storage/miner.go b/storage/miner.go index 4381263d1..6eb1789dc 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -174,17 +174,37 @@ func (m *Miner) Run(ctx context.Context) error { MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee), } - evts := events.NewEvents(ctx, m.api) - adaptedAPI := NewSealingAPIAdapter(m.api) - // TODO: Maybe we update this policy after actor upgrades? - pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod) + var ( + // consumer of chain head changes. + evts = events.NewEvents(ctx, m.api) + evtsAdapter = NewEventsAdapter(evts) - as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { - return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds) - } + // Create a shim to glue the API required by the sealing component + // with the API that Lotus is capable of providing. + // The shim translates between "tipset tokens" and tipset keys, and + // provides extra methods. + adaptedAPI = NewSealingAPIAdapter(m.api) - m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as) + // Instantiate a precommit policy. + defaultDuration = policy.GetMaxSectorExpirationExtension() - (md.WPoStProvingPeriod * 2) + provingBoundary = md.PeriodStart % md.WPoStProvingPeriod + // TODO: Maybe we update this policy after actor upgrades? + pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, defaultDuration, provingBoundary) + + // address selector. + as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds) + } + + // sealing configuration. + cfg = sealing.GetSealingConfigFunc(m.getSealConfig) + ) + + // Instantiate the sealing FSM. + m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, cfg, m.handleSealingNotifications, as) + + // Run the sealing FSM. go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function return nil From 0f4270126f056f06c087297ff758caa2ccce4894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 19:48:38 +0100 Subject: [PATCH 08/13] rename {submitPost=>submitPoStMessage}. --- storage/wdpost_run.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 25bacc6c1..35fdd0566 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -168,7 +168,7 @@ func (s *WindowPoStScheduler) runSubmitPoST( commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) if err != nil { err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err) - log.Errorf("submitPost failed: %+v", err) + log.Errorf("submitPoStMessage failed: %+v", err) return err } @@ -181,7 +181,7 @@ func (s *WindowPoStScheduler) runSubmitPoST( post.ChainCommitRand = commRand // Submit PoST - sm, submitErr := s.submitPost(ctx, post) + sm, submitErr := s.submitPoStMessage(ctx, post) if submitErr != nil { log.Errorf("submit window post failed: %+v", submitErr) } else { @@ -792,7 +792,10 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors, return proofSectors, nil } -func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) { +// submitPoStMessage builds a SubmitWindowedPoSt message and submits it to +// the mpool. It doesn't synchronously block on confirmations, but it does +// monitor in the background simply for the purposes of logging. +func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) { ctx, span := trace.StartSpan(ctx, "storage.commitPost") defer span.End() From 28efa9357c85713b1ba63f508d8cc76688260647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 19:49:05 +0100 Subject: [PATCH 09/13] rename {setSender=>prepareMessage}. --- storage/wdpost_run.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 35fdd0566..edb59a64f 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -320,7 +320,7 @@ func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint6 Value: types.NewInt(0), } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.setSender(ctx, msg, spec); err != nil { + if err := s.prepareMessage(ctx, msg, spec); err != nil { return recoveries, nil, err } @@ -418,7 +418,7 @@ func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, p Value: types.NewInt(0), // TODO: Is there a fee? } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.setSender(ctx, msg, spec); err != nil { + if err := s.prepareMessage(ctx, msg, spec); err != nil { return faults, nil, err } @@ -813,13 +813,11 @@ func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *mine Value: types.NewInt(0), } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - if err := s.setSender(ctx, msg, spec); err != nil { + if err := s.prepareMessage(ctx, msg, spec); err != nil { return nil, err } - // TODO: consider maybe caring about the output sm, err := s.api.MpoolPushMessage(ctx, msg, spec) - if err != nil { return nil, xerrors.Errorf("pushing message to mpool: %w", err) } @@ -843,14 +841,20 @@ func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *mine return sm, nil } -func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error { +// prepareMessage prepares a message before sending it, setting: +// +// * the sender (from the AddressSelector, falling back to the worker address if none set) +// * the right gas parameters +func (s *WindowPoStScheduler) prepareMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error { mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK) if err != nil { return xerrors.Errorf("error getting miner info: %w", err) } - // use the worker as a fallback + // set the worker as a fallback msg.From = mi.Worker + // (optimal) initial estimation with some overestimation that guarantees + // block inclusion within the next 20 tipsets. gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK) if err != nil { log.Errorw("estimating gas", "error", err) From 99122129499ce823737f8c92066fc13d165dfd9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 20:08:15 +0100 Subject: [PATCH 10/13] minor refactor to anonymize interface. --- storage/wdpost_sched.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index d69f9c591..88357c5b3 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -84,21 +84,24 @@ func NewWindowedPoStScheduler(api fullNodeFilteredAPI, }, nil } -type changeHandlerAPIImpl struct { - fullNodeFilteredAPI - *WindowPoStScheduler -} - func (s *WindowPoStScheduler) Run(ctx context.Context) { - // Initialize change handler - chImpl := &changeHandlerAPIImpl{fullNodeFilteredAPI: s.api, WindowPoStScheduler: s} - s.ch = newChangeHandler(chImpl, s.actor) + // Initialize change handler. + + // callbacks is a union of the fullNodeFilteredAPI and ourselves. + callbacks := struct { + fullNodeFilteredAPI + *WindowPoStScheduler + }{s.api, s} + + s.ch = newChangeHandler(callbacks, s.actor) defer s.ch.shutdown() s.ch.start() - var notifs <-chan []*api.HeadChange - var err error - var gotCur bool + var ( + notifs <-chan []*api.HeadChange + err error + gotCur bool + ) // not fine to panic after this point for { From c77f8fb382c424497acff91984e3b8632fb88a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 21:00:13 +0100 Subject: [PATCH 11/13] adopt clearer method names; fix typo. --- chain/store/store.go | 2 +- storage/wdpost_changehandler.go | 2 +- storage/wdpost_changehandler_test.go | 2 +- storage/wdpost_run.go | 16 ++++++++-------- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 7caddbd5c..7318a1007 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1243,7 +1243,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) return blscids, secpkcids, nil } -// GetPath returns returns the sequence of atomic head change operations that +// GetPath returns the sequence of atomic head change operations that // need to be applied in order to switch the head of the chain from the `from` // tipset to the `to` tipset. func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) { diff --git a/storage/wdpost_changehandler.go b/storage/wdpost_changehandler.go index 8bcd7164e..8b519aedd 100644 --- a/storage/wdpost_changehandler.go +++ b/storage/wdpost_changehandler.go @@ -27,7 +27,7 @@ type changeHandlerAPI interface { startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc onAbort(ts *types.TipSet, deadline *dline.Info) - failPost(err error, ts *types.TipSet, deadline *dline.Info) + recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) } type changeHandler struct { diff --git a/storage/wdpost_changehandler_test.go b/storage/wdpost_changehandler_test.go index bae4f40fd..a2283cb7c 100644 --- a/storage/wdpost_changehandler_test.go +++ b/storage/wdpost_changehandler_test.go @@ -191,7 +191,7 @@ func (m *mockAPI) wasAbortCalled() bool { return m.abortCalled } -func (m *mockAPI) failPost(err error, ts *types.TipSet, deadline *dline.Info) { +func (m *mockAPI) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) { } func (m *mockAPI) setChangeHandler(ch *changeHandler) { diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index edb59a64f..b4c702197 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -31,8 +31,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -// failPost records a failure in the journal. -func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) { +// recordPoStFailure records a failure in the journal. +func (s *WindowPoStScheduler) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) { s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { c := evtCommon{Error: err} if ts != nil { @@ -100,9 +100,9 @@ func (s *WindowPoStScheduler) runGeneratePoST( ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.generatePoST") defer span.End() - posts, err := s.runPost(ctx, *deadline, ts) + posts, err := s.runPoStCycle(ctx, *deadline, ts) if err != nil { - log.Errorf("runPost failed: %+v", err) + log.Errorf("runPoStCycle failed: %+v", err) return nil, err } @@ -441,18 +441,18 @@ func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, p return faults, sm, nil } -// runPost runs a full cycle of the PoSt process: +// runPoStCycle runs a full cycle of the PoSt process: // // 1. performs recovery declarations for the next deadline. // 2. performs fault declarations for the next deadline. // 3. computes and submits proofs, batching partitions and making sure they // don't exceed message capacity. -func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) { - ctx, span := trace.StartSpan(ctx, "storage.runPost") +func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) { + ctx, span := trace.StartSpan(ctx, "storage.runPoStCycle") defer span.End() go func() { - // TODO: extract from runPost, run on fault cutoff boundaries + // TODO: extract from runPoStCycle, run on fault cutoff boundaries // check faults / recoveries for the *next* deadline. It's already too // late to declare them for this deadline From 50360e68aeb486ca62374f8652c0de4b6e7613bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 May 2021 21:04:35 +0100 Subject: [PATCH 12/13] rename {changeHandlerAPI=>wdPoStCommands} + add docs. --- storage/wdpost_changehandler.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/storage/wdpost_changehandler.go b/storage/wdpost_changehandler.go index 8b519aedd..7b80f2744 100644 --- a/storage/wdpost_changehandler.go +++ b/storage/wdpost_changehandler.go @@ -21,7 +21,9 @@ const ( type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error) type CompleteSubmitPoSTCb func(err error) -type changeHandlerAPI interface { +// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used +// by the changeHandler to execute actions and query state. +type wdPoStCommands interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc @@ -31,13 +33,13 @@ type changeHandlerAPI interface { } type changeHandler struct { - api changeHandlerAPI + api wdPoStCommands actor address.Address proveHdlr *proveHandler submitHdlr *submitHandler } -func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler { +func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler { posts := newPostsCache() p := newProver(api, posts) s := newSubmitter(api, posts) @@ -147,7 +149,7 @@ type postResult struct { // proveHandler generates proofs type proveHandler struct { - api changeHandlerAPI + api wdPoStCommands posts *postsCache postResults chan *postResult @@ -164,7 +166,7 @@ type proveHandler struct { } func newProver( - api changeHandlerAPI, + api wdPoStCommands, posts *postsCache, ) *proveHandler { ctx, cancel := context.WithCancel(context.Background()) @@ -249,7 +251,7 @@ func (p *proveHandler) processPostResult(res *postResult) { di := res.currPost.di if res.err != nil { // Proving failed so inform the API - p.api.failPost(res.err, res.ts, di) + p.api.recordPoStFailure(res.err, res.ts, di) log.Warnf("Aborted window post Proving (Deadline: %+v)", di) p.api.onAbort(res.ts, di) @@ -296,7 +298,7 @@ type postInfo struct { // submitHandler submits proofs on-chain type submitHandler struct { - api changeHandlerAPI + api wdPoStCommands posts *postsCache submitResults chan *submitResult @@ -320,7 +322,7 @@ type submitHandler struct { } func newSubmitter( - api changeHandlerAPI, + api wdPoStCommands, posts *postsCache, ) *submitHandler { ctx, cancel := context.WithCancel(context.Background()) @@ -489,7 +491,7 @@ func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet func (s *submitHandler) processSubmitResult(res *submitResult) { if res.err != nil { // Submit failed so inform the API and go back to the start state - s.api.failPost(res.err, res.pw.ts, res.pw.di) + s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di) log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di) s.api.onAbort(res.pw.ts, res.pw.di) From 996feda1f75ea491f8da1b7593934f71f7701fe8 Mon Sep 17 00:00:00 2001 From: raulk Date: Wed, 19 May 2021 15:08:14 +0100 Subject: [PATCH 13/13] typo. Co-authored-by: dirkmc --- chain/store/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/store/store.go b/chain/store/store.go index 7318a1007..f8f1b0c49 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -858,7 +858,7 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet, // a common ancestor. It then returns the respective chain segments that fork // from the identified ancestor, in reverse order, where the first element of // each slice is the supplied tipset, and the last element is the common -// ancenstor. +// ancestor. // // If an error happens along the way, we return the error with nil slices. func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {