Merge pull request #6259 from filecoin-project/raulk/docs-storage

`storage` module: add go docs and minor code quality refactors
This commit is contained in:
Łukasz Magiera 2021-05-20 21:42:30 +02:00 committed by GitHub
commit 5f2e163bbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 195 additions and 70 deletions

View File

@ -853,6 +853,14 @@ func (cs *ChainStore) NearestCommonAncestor(a, b *types.TipSet) (*types.TipSet,
return cs.LoadTipSet(l[len(l)-1].Parents()) return cs.LoadTipSet(l[len(l)-1].Parents())
} }
// ReorgOps takes two tipsets (which can be at different heights), and walks
// their corresponding chains backwards one step at a time until we find
// a common ancestor. It then returns the respective chain segments that fork
// from the identified ancestor, in reverse order, where the first element of
// each slice is the supplied tipset, and the last element is the common
// ancestor.
//
// If an error happens along the way, we return the error with nil slices.
func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) { func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.TipSet, error) {
return ReorgOps(cs.LoadTipSet, a, b) return ReorgOps(cs.LoadTipSet, a, b)
} }
@ -1235,6 +1243,9 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return blscids, secpkcids, nil return blscids, secpkcids, nil
} }
// GetPath returns the sequence of atomic head change operations that
// need to be applied in order to switch the head of the chain from the `from`
// tipset to the `to` tipset.
func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) { func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) {
fts, err := cs.LoadTipSet(from) fts, err := cs.LoadTipSet(from)
if err != nil { if err != nil {

View File

@ -40,7 +40,10 @@ type BasicPreCommitPolicy struct {
duration abi.ChainEpoch duration abi.ChainEpoch
} }
// NewBasicPreCommitPolicy produces a BasicPreCommitPolicy // NewBasicPreCommitPolicy produces a BasicPreCommitPolicy.
//
// The provided duration is used as the default sector expiry when the sector
// contains no deals. The proving boundary is used to adjust/align the sector's expiration.
func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy { func NewBasicPreCommitPolicy(api Chain, duration abi.ChainEpoch, provingBoundary abi.ChainEpoch) BasicPreCommitPolicy {
return BasicPreCommitPolicy{ return BasicPreCommitPolicy{
api: api, api: api,

View File

@ -31,10 +31,10 @@ import (
var _ sealing.SealingAPI = new(SealingAPIAdapter) var _ sealing.SealingAPI = new(SealingAPIAdapter)
type SealingAPIAdapter struct { type SealingAPIAdapter struct {
delegate storageMinerApi delegate fullNodeFilteredAPI
} }
func NewSealingAPIAdapter(api storageMinerApi) SealingAPIAdapter { func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter {
return SealingAPIAdapter{delegate: api} return SealingAPIAdapter{delegate: api}
} }

View File

@ -5,10 +5,6 @@ import (
"errors" "errors"
"time" "time"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -20,9 +16,13 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/specs-storage/storage"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
@ -41,8 +41,16 @@ import (
var log = logging.Logger("storageminer") var log = logging.Logger("storageminer")
// Miner is the central miner entrypoint object inside Lotus. It is
// instantiated in the node builder, along with the WindowPoStScheduler.
//
// This object is the owner of the sealing pipeline. Most of the actual logic
// lives in the storage-sealing module (sealing.Sealing), and the Miner object
// exposes it to the rest of the system by proxying calls.
//
// Miner#Run starts the sealing FSM.
type Miner struct { type Miner struct {
api storageMinerApi api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
h host.Host h host.Host
sealer sectorstorage.SectorManager sealer sectorstorage.SectorManager
@ -70,7 +78,9 @@ type SealingStateEvt struct {
Error string Error string
} }
type storageMinerApi interface { // fullNodeFilteredAPI is the subset of the full node API the Miner needs from
// a Lotus full node.
type fullNodeFilteredAPI interface {
// Call a read only method on actors (no interaction with the chain required) // Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
@ -116,7 +126,18 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) { // NewMiner creates a new Miner object.
func NewMiner(api fullNodeFilteredAPI,
maddr address.Address,
h host.Host,
ds datastore.Batching,
sealer sectorstorage.SectorManager,
sc sealing.SectorIDCounter,
verif ffiwrapper.Verifier,
gsd dtypes.GetSealingConfigFunc,
feeCfg config.MinerFeeConfig,
journal journal.Journal,
as *AddressSelector) (*Miner, error) {
m := &Miner{ m := &Miner{
api: api, api: api,
feeCfg: feeCfg, feeCfg: feeCfg,
@ -136,6 +157,7 @@ func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datast
return m, nil return m, nil
} }
// Run starts the sealing FSM in the background, running preliminary checks first.
func (m *Miner) Run(ctx context.Context) error { func (m *Miner) Run(ctx context.Context) error {
if err := m.runPreflightChecks(ctx); err != nil { if err := m.runPreflightChecks(ctx); err != nil {
return xerrors.Errorf("miner preflight checks failed: %w", err) return xerrors.Errorf("miner preflight checks failed: %w", err)
@ -152,17 +174,37 @@ func (m *Miner) Run(ctx context.Context) error {
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee), MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
} }
evts := events.NewEvents(ctx, m.api) var (
adaptedAPI := NewSealingAPIAdapter(m.api) // consumer of chain head changes.
// TODO: Maybe we update this policy after actor upgrades? evts = events.NewEvents(ctx, m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod) evtsAdapter = NewEventsAdapter(evts)
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { // Create a shim to glue the API required by the sealing component
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds) // with the API that Lotus is capable of providing.
} // The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI = NewSealingAPIAdapter(m.api)
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as) // Instantiate a precommit policy.
defaultDuration = policy.GetMaxSectorExpirationExtension() - (md.WPoStProvingPeriod * 2)
provingBoundary = md.PeriodStart % md.WPoStProvingPeriod
// TODO: Maybe we update this policy after actor upgrades?
pcp = sealing.NewBasicPreCommitPolicy(adaptedAPI, defaultDuration, provingBoundary)
// address selector.
as = func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// sealing configuration.
cfg = sealing.GetSealingConfigFunc(m.getSealConfig)
)
// Instantiate the sealing FSM.
m.sealing = sealing.New(adaptedAPI, fc, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, cfg, m.handleSealingNotifications, as)
// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
return nil return nil
@ -184,6 +226,7 @@ func (m *Miner) Stop(ctx context.Context) error {
return m.sealing.Stop(ctx) return m.sealing.Stop(ctx)
} }
// runPreflightChecks verifies that preconditions to run the miner are satisfied.
func (m *Miner) runPreflightChecks(ctx context.Context) error { func (m *Miner) runPreflightChecks(ctx context.Context) error {
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK) mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil { if err != nil {

View File

@ -21,22 +21,25 @@ const (
type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error) type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
type CompleteSubmitPoSTCb func(err error) type CompleteSubmitPoSTCb func(err error)
type changeHandlerAPI interface { // wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// by the changeHandler to execute actions and query state.
type wdPoStCommands interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
onAbort(ts *types.TipSet, deadline *dline.Info) onAbort(ts *types.TipSet, deadline *dline.Info)
failPost(err error, ts *types.TipSet, deadline *dline.Info) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
} }
type changeHandler struct { type changeHandler struct {
api changeHandlerAPI api wdPoStCommands
actor address.Address actor address.Address
proveHdlr *proveHandler proveHdlr *proveHandler
submitHdlr *submitHandler submitHdlr *submitHandler
} }
func newChangeHandler(api changeHandlerAPI, actor address.Address) *changeHandler { func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
posts := newPostsCache() posts := newPostsCache()
p := newProver(api, posts) p := newProver(api, posts)
s := newSubmitter(api, posts) s := newSubmitter(api, posts)
@ -146,7 +149,7 @@ type postResult struct {
// proveHandler generates proofs // proveHandler generates proofs
type proveHandler struct { type proveHandler struct {
api changeHandlerAPI api wdPoStCommands
posts *postsCache posts *postsCache
postResults chan *postResult postResults chan *postResult
@ -163,7 +166,7 @@ type proveHandler struct {
} }
func newProver( func newProver(
api changeHandlerAPI, api wdPoStCommands,
posts *postsCache, posts *postsCache,
) *proveHandler { ) *proveHandler {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -248,7 +251,7 @@ func (p *proveHandler) processPostResult(res *postResult) {
di := res.currPost.di di := res.currPost.di
if res.err != nil { if res.err != nil {
// Proving failed so inform the API // Proving failed so inform the API
p.api.failPost(res.err, res.ts, di) p.api.recordPoStFailure(res.err, res.ts, di)
log.Warnf("Aborted window post Proving (Deadline: %+v)", di) log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
p.api.onAbort(res.ts, di) p.api.onAbort(res.ts, di)
@ -295,7 +298,7 @@ type postInfo struct {
// submitHandler submits proofs on-chain // submitHandler submits proofs on-chain
type submitHandler struct { type submitHandler struct {
api changeHandlerAPI api wdPoStCommands
posts *postsCache posts *postsCache
submitResults chan *submitResult submitResults chan *submitResult
@ -319,7 +322,7 @@ type submitHandler struct {
} }
func newSubmitter( func newSubmitter(
api changeHandlerAPI, api wdPoStCommands,
posts *postsCache, posts *postsCache,
) *submitHandler { ) *submitHandler {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -488,7 +491,7 @@ func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet
func (s *submitHandler) processSubmitResult(res *submitResult) { func (s *submitHandler) processSubmitResult(res *submitResult) {
if res.err != nil { if res.err != nil {
// Submit failed so inform the API and go back to the start state // Submit failed so inform the API and go back to the start state
s.api.failPost(res.err, res.pw.ts, res.pw.di) s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di) log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
s.api.onAbort(res.pw.ts, res.pw.di) s.api.onAbort(res.pw.ts, res.pw.di)

View File

@ -191,7 +191,7 @@ func (m *mockAPI) wasAbortCalled() bool {
return m.abortCalled return m.abortCalled
} }
func (m *mockAPI) failPost(err error, ts *types.TipSet, deadline *dline.Info) { func (m *mockAPI) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) {
} }
func (m *mockAPI) setChangeHandler(ch *changeHandler) { func (m *mockAPI) setChangeHandler(ch *changeHandler) {

View File

@ -31,7 +31,8 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
func (s *WindowPoStScheduler) failPost(err error, ts *types.TipSet, deadline *dline.Info) { // recordPoStFailure records a failure in the journal.
func (s *WindowPoStScheduler) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) {
s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} { s.journal.RecordEvent(s.evtTypes[evtTypeWdPoStScheduler], func() interface{} {
c := evtCommon{Error: err} c := evtCommon{Error: err}
if ts != nil { if ts != nil {
@ -99,9 +100,9 @@ func (s *WindowPoStScheduler) runGeneratePoST(
ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.generatePoST") ctx, span := trace.StartSpan(ctx, "WindowPoStScheduler.generatePoST")
defer span.End() defer span.End()
posts, err := s.runPost(ctx, *deadline, ts) posts, err := s.runPoStCycle(ctx, *deadline, ts)
if err != nil { if err != nil {
log.Errorf("runPost failed: %+v", err) log.Errorf("runPoStCycle failed: %+v", err)
return nil, err return nil, err
} }
@ -167,7 +168,7 @@ func (s *WindowPoStScheduler) runSubmitPoST(
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil { if err != nil {
err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err) err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (ts=%d; deadline=%d): %w", ts.Height(), commEpoch, err)
log.Errorf("submitPost failed: %+v", err) log.Errorf("submitPoStMessage failed: %+v", err)
return err return err
} }
@ -180,7 +181,7 @@ func (s *WindowPoStScheduler) runSubmitPoST(
post.ChainCommitRand = commRand post.ChainCommitRand = commRand
// Submit PoST // Submit PoST
sm, submitErr := s.submitPost(ctx, post) sm, submitErr := s.submitPoStMessage(ctx, post)
if submitErr != nil { if submitErr != nil {
log.Errorf("submit window post failed: %+v", submitErr) log.Errorf("submit window post failed: %+v", submitErr)
} else { } else {
@ -233,8 +234,25 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
return sbf, nil return sbf, nil
} }
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { // declareRecoveries identifies sectors that were previously marked as faulty
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries") // for our miner, but are now recovered (i.e. are now provable again) and
// still not reported as such.
//
// It then reports the recovery on chain via a `DeclareFaultsRecovered`
// message to our miner actor.
//
// This is always invoked ahead of time, before the deadline for the evaluated
// sectors arrives. That way, recoveries are declared in preparation for those
// sectors to be proven.
//
// If a declaration is made, it awaits for build.MessageConfidence confirmations
// on chain before returning.
//
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareRecoveries")
defer span.End() defer span.End()
faulty := uint64(0) faulty := uint64(0)
@ -302,7 +320,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
Value: types.NewInt(0), Value: types.NewInt(0),
} }
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil { if err := s.prepareMessage(ctx, msg, spec); err != nil {
return recoveries, nil, err return recoveries, nil, err
} }
@ -325,8 +343,21 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
return recoveries, sm, nil return recoveries, sm, nil
} }
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) { // declareFaults identifies the sectors on the specified proving deadline that
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults") // are faulty, and reports the faults on chain via the `DeclareFaults` message
// to our miner actor.
//
// This is always invoked ahead of time, before the deadline for the evaluated
// sectors arrives. That way, faults are declared before a penalty is accrued.
//
// If a declaration is made, it awaits for build.MessageConfidence confirmations
// on chain before returning.
//
// TODO: the waiting should happen in the background. Right now this
// is blocking/delaying the actual generation and submission of WindowPoSts in
// this deadline!
func (s *WindowPoStScheduler) declareFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition, tsk types.TipSetKey) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.declareFaults")
defer span.End() defer span.End()
bad := uint64(0) bad := uint64(0)
@ -387,7 +418,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
Value: types.NewInt(0), // TODO: Is there a fee? Value: types.NewInt(0), // TODO: Is there a fee?
} }
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil { if err := s.prepareMessage(ctx, msg, spec); err != nil {
return faults, nil, err return faults, nil, err
} }
@ -410,12 +441,18 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
return faults, sm, nil return faults, sm, nil
} }
func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) { // runPoStCycle runs a full cycle of the PoSt process:
ctx, span := trace.StartSpan(ctx, "storage.runPost") //
// 1. performs recovery declarations for the next deadline.
// 2. performs fault declarations for the next deadline.
// 3. computes and submits proofs, batching partitions and making sure they
// don't exceed message capacity.
func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, ts *types.TipSet) ([]miner.SubmitWindowedPoStParams, error) {
ctx, span := trace.StartSpan(ctx, "storage.runPoStCycle")
defer span.End() defer span.End()
go func() { go func() {
// TODO: extract from runPost, run on fault cutoff boundaries // TODO: extract from runPoStCycle, run on fault cutoff boundaries
// check faults / recoveries for the *next* deadline. It's already too // check faults / recoveries for the *next* deadline. It's already too
// late to declare them for this deadline // late to declare them for this deadline
@ -443,7 +480,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
} }
) )
if recoveries, sigmsg, err = s.checkNextRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { if recoveries, sigmsg, err = s.declareRecoveries(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse // TODO: This is potentially quite bad, but not even trying to post when this fails is objectively worse
log.Errorf("checking sector recoveries: %v", err) log.Errorf("checking sector recoveries: %v", err)
} }
@ -462,7 +499,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return // FORK: declaring faults after ignition upgrade makes no sense return // FORK: declaring faults after ignition upgrade makes no sense
} }
if faults, sigmsg, err = s.checkNextFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil { if faults, sigmsg, err = s.declareFaults(context.TODO(), declDeadline, partitions, ts.Key()); err != nil {
// TODO: This is also potentially really bad, but we try to post anyways // TODO: This is also potentially really bad, but we try to post anyways
log.Errorf("checking sector faults: %v", err) log.Errorf("checking sector faults: %v", err)
} }
@ -755,7 +792,10 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors,
return proofSectors, nil return proofSectors, nil
} }
func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) { // submitPoStMessage builds a SubmitWindowedPoSt message and submits it to
// the mpool. It doesn't synchronously block on confirmations, but it does
// monitor in the background simply for the purposes of logging.
func (s *WindowPoStScheduler) submitPoStMessage(ctx context.Context, proof *miner.SubmitWindowedPoStParams) (*types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.commitPost") ctx, span := trace.StartSpan(ctx, "storage.commitPost")
defer span.End() defer span.End()
@ -773,13 +813,11 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
Value: types.NewInt(0), Value: types.NewInt(0),
} }
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
if err := s.setSender(ctx, msg, spec); err != nil { if err := s.prepareMessage(ctx, msg, spec); err != nil {
return nil, err return nil, err
} }
// TODO: consider maybe caring about the output
sm, err := s.api.MpoolPushMessage(ctx, msg, spec) sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
if err != nil { if err != nil {
return nil, xerrors.Errorf("pushing message to mpool: %w", err) return nil, xerrors.Errorf("pushing message to mpool: %w", err)
} }
@ -803,14 +841,20 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
return sm, nil return sm, nil
} }
func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error { // prepareMessage prepares a message before sending it, setting:
//
// * the sender (from the AddressSelector, falling back to the worker address if none set)
// * the right gas parameters
func (s *WindowPoStScheduler) prepareMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error {
mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK) mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK)
if err != nil { if err != nil {
return xerrors.Errorf("error getting miner info: %w", err) return xerrors.Errorf("error getting miner info: %w", err)
} }
// use the worker as a fallback // set the worker as a fallback
msg.From = mi.Worker msg.From = mi.Worker
// (optimal) initial estimation with some overestimation that guarantees
// block inclusion within the next 20 tipsets.
gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK) gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK)
if err != nil { if err != nil {
log.Errorw("estimating gas", "error", err) log.Errorw("estimating gas", "error", err)
@ -818,7 +862,9 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
} }
*msg = *gm *msg = *gm
// estimate // calculate a more frugal estimation; premium is estimated to guarantee
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
// within 4 tipsets.
minGasFeeMsg := *msg minGasFeeMsg := *msg
minGasFeeMsg.GasPremium, err = s.api.GasEstimateGasPremium(ctx, 5, msg.From, msg.GasLimit, types.EmptyTSK) minGasFeeMsg.GasPremium, err = s.api.GasEstimateGasPremium(ctx, 5, msg.From, msg.GasLimit, types.EmptyTSK)
@ -833,6 +879,8 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
minGasFeeMsg.GasFeeCap = msg.GasFeeCap minGasFeeMsg.GasFeeCap = msg.GasFeeCap
} }
// goodFunds = funds needed for optimal inclusion probability.
// minFunds = funds needed for more speculative inclusion probability.
goodFunds := big.Add(msg.RequiredFunds(), msg.Value) goodFunds := big.Add(msg.RequiredFunds(), msg.Value)
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)

View File

@ -35,7 +35,7 @@ import (
type mockStorageMinerAPI struct { type mockStorageMinerAPI struct {
partitions []api.Partition partitions []api.Partition
pushedMessages chan *types.Message pushedMessages chan *types.Message
storageMinerApi fullNodeFilteredAPI
} }
func newMockStorageMinerAPI() *mockStorageMinerAPI { func newMockStorageMinerAPI() *mockStorageMinerAPI {
@ -389,4 +389,4 @@ func (m *mockStorageMinerAPI) WalletHas(ctx context.Context, address address.Add
return true, nil return true, nil
} }
var _ storageMinerApi = &mockStorageMinerAPI{} var _ fullNodeFilteredAPI = &mockStorageMinerAPI{}

View File

@ -23,8 +23,14 @@ import (
"go.opencensus.io/trace" "go.opencensus.io/trace"
) )
// WindowPoStScheduler is the coordinator for WindowPoSt submissions, fault
// declaration, and recovery declarations. It watches the chain for reverts and
// applies, and schedules/run those processes as partition deadlines arrive.
//
// WindowPoStScheduler watches the chain though the changeHandler, which in turn
// turn calls the scheduler when the time arrives to do work.
type WindowPoStScheduler struct { type WindowPoStScheduler struct {
api storageMinerApi api fullNodeFilteredAPI
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
addrSel *AddressSelector addrSel *AddressSelector
prover storage.Prover prover storage.Prover
@ -43,7 +49,15 @@ type WindowPoStScheduler struct {
// failLk sync.Mutex // failLk sync.Mutex
} }
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as *AddressSelector, sb storage.Prover, verif ffiwrapper.Verifier, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) { // NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
func NewWindowedPoStScheduler(api fullNodeFilteredAPI,
cfg config.MinerFeeConfig,
as *AddressSelector,
sp storage.Prover,
verif ffiwrapper.Verifier,
ft sectorstorage.FaultTracker,
j journal.Journal,
actor address.Address) (*WindowPoStScheduler, error) {
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err) return nil, xerrors.Errorf("getting sector size: %w", err)
@ -51,9 +65,9 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as
return &WindowPoStScheduler{ return &WindowPoStScheduler{
api: api, api: api,
feeCfg: fc, feeCfg: cfg,
addrSel: as, addrSel: as,
prover: sb, prover: sp,
verifier: verif, verifier: verif,
faultTracker: ft, faultTracker: ft,
proofType: mi.WindowPoStProofType, proofType: mi.WindowPoStProofType,
@ -70,21 +84,24 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, as
}, nil }, nil
} }
type changeHandlerAPIImpl struct {
storageMinerApi
*WindowPoStScheduler
}
func (s *WindowPoStScheduler) Run(ctx context.Context) { func (s *WindowPoStScheduler) Run(ctx context.Context) {
// Initialize change handler // Initialize change handler.
chImpl := &changeHandlerAPIImpl{storageMinerApi: s.api, WindowPoStScheduler: s}
s.ch = newChangeHandler(chImpl, s.actor) // callbacks is a union of the fullNodeFilteredAPI and ourselves.
callbacks := struct {
fullNodeFilteredAPI
*WindowPoStScheduler
}{s.api, s}
s.ch = newChangeHandler(callbacks, s.actor)
defer s.ch.shutdown() defer s.ch.shutdown()
s.ch.start() s.ch.start()
var notifs <-chan []*api.HeadChange var (
var err error notifs <-chan []*api.HeadChange
var gotCur bool err error
gotCur bool
)
// not fine to panic after this point // not fine to panic after this point
for { for {