Merge pull request #6661 from filecoin-project/release/v1.10.1

v1.10.1 to releases
This commit is contained in:
Jennifer 2021-07-05 21:21:33 -04:00 committed by GitHub
commit d920b47e7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 240 additions and 100 deletions

View File

@ -1,5 +1,38 @@
# Lotus changelog # Lotus changelog
# 1.10.1 / 2021-07-05
This is an optional but **highly recommended** release of Lotus for lotus miners that has many bug fixes and improvements based on the feedback we got from the community since HyperDrive.
## New Features
- commit batch: AggregateAboveBaseFee config #6650
- `AggregateAboveBaseFee` is added to miner sealing configuration for setting the network base fee to start aggregating proofs. When the network base fee is lower than this value, the prove commits will be submitted individually via `ProveCommitSector`. According to the [Batch Incentive Alignment](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0013.md#batch-incentive-alignment) introduced in FIP-0013, we recommend miners to set this value to 0.15 nanoFIL(which is the default value) to avoid unexpected aggregation fee in burn and enjoy the most benefits of aggregation!
## Bug Fixes
- storage: Fix FinalizeSector with sectors in storage paths #6652
- Fix tiny error in check-client-datacap #6664
- Fix: precommit_batch method used the wrong cfg.PreCommitBatchWait #6658
- to optimize the batchwait #6636
- fix getTicket: sector precommitted but expired case #6635
- handleSubmitCommitAggregate() exception handling #6595
- remove precommit check in handleCommitFailed #6634
- ensure agg fee is adequate
- fix: miner balance is not enough, so that ProveCommitAggregate msg exec failed #6623
- commit batch: Initialize the FailedSectors map #6647
Contributors
| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| @magik6k| 7 | +151/-56 | 21 |
| @llifezou | 4 | +59/-20 | 4 |
| @johnli-helloworld | 2 | +45/-14 | 4 |
| @wangchao | 1 | +1/-27 | 1 |
| Jerry | 2 | +9/-4 | 2 |
| @zhoutian527 | 1 | +2/-2 | 1 |
| @ribasushi| 1 | +1/-1 | 1 |
# 1.10.0 / 2021-06-23 # 1.10.0 / 2021-06-23
This is a mandatory release of Lotus that introduces Filecoin network v13, codenamed the HyperDrive upgrade. The This is a mandatory release of Lotus that introduces Filecoin network v13, codenamed the HyperDrive upgrade. The

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -29,7 +29,7 @@ func buildType() string {
} }
// BuildVersion is the local build version, set by build system // BuildVersion is the local build version, set by build system
const BuildVersion = "1.10.0" const BuildVersion = "1.10.1"
func UserVersion() string { func UserVersion() string {
return BuildVersion + buildType() + CurrentCommit return BuildVersion + buildType() + CurrentCommit

View File

@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
} }
var AttoFil = NewInt(1)
var FemtoFil = BigMul(AttoFil, NewInt(1000))
var PicoFil = BigMul(FemtoFil, NewInt(1000))
var NanoFil = BigMul(PicoFil, NewInt(1000))
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"} var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
func (f FIL) Short() string { func (f FIL) Short() string {

View File

@ -211,7 +211,7 @@ var filplusCheckClientCmd = &cli.Command{
return err return err
} }
if dcap == nil { if dcap == nil {
return xerrors.Errorf("client %s is not a verified client", err) return xerrors.Errorf("client %s is not a verified client", caddr)
} }
fmt.Println(*dcap) fmt.Println(*dcap)

View File

@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -145,7 +145,7 @@ over time
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -587,10 +587,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
} }
} }
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
return err return err

View File

@ -32,6 +32,9 @@ import (
const arp = abi.RegisteredAggregationProof_SnarkPackV1 const arp = abi.RegisteredAggregationProof_SnarkPackV1
var aggFeeNum = big.NewInt(110)
var aggFeeDen = big.NewInt(100)
type CommitBatcherApi interface { type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
@ -101,6 +104,7 @@ func (b *CommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastMsg forceRes <- lastMsg
@ -116,7 +120,7 @@ func (b *CommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -127,17 +131,26 @@ func (b *CommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
} }
} }
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -155,12 +168,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -168,7 +181,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) { func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
@ -191,7 +204,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
var res []sealiface.CommitBatchRes var res []sealiface.CommitBatchRes
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors { individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
tok, _, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
if bf.LessThan(cfg.AggregateAboveBaseFee) {
individual = true
}
}
if individual {
res, err = b.processIndividually() res, err = b.processIndividually()
} else { } else {
res, err = b.processBatch(cfg) res, err = b.processBatch(cfg)
@ -227,7 +258,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
total := len(b.todo) total := len(b.todo)
var res sealiface.CommitBatchRes res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
}
params := miner5.ProveCommitAggregateParams{ params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(), SectorNumbers: bitfield.New(),
@ -303,16 +336,18 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
} }
aggFee := policy.AggregateNetworkFee(nv, len(infos), bf) aggFee := big.Div(big.Mul(policy.AggregateNetworkFee(nv, len(infos), bf), aggFeeNum), aggFeeDen)
goodFunds := big.Add(maxFee, big.Add(collateral, aggFee)) needFunds := big.Add(collateral, aggFee)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) goodFunds := big.Add(maxFee, needFunds)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil { if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
} }
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, maxFee, enc.Bytes()) mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil { if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
} }
@ -339,7 +374,8 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error
for sn, info := range b.todo { for sn, info := range b.todo {
r := sealiface.CommitBatchRes{ r := sealiface.CommitBatchRes{
Sectors: []abi.SectorNumber{sn}, Sectors: []abi.SectorNumber{sn},
FailedSectors: map[abi.SectorNumber]string{},
} }
mcid, err := b.processSingle(mi, sn, info, tok) mcid, err := b.processSingle(mi, sn, info, tok)

View File

@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
SubmitCommitAggregate: planOne( SubmitCommitAggregate: planOne(
on(SectorCommitAggregateSent{}, CommitWait), on(SectorCommitAggregateSent{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed), on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
), ),
CommitWait: planOne( CommitWait: planOne(
on(SectorProving{}, FinalizeSector), on(SectorProving{}, FinalizeSector),

View File

@ -86,6 +86,7 @@ func (b *PreCommitBatcher) run() {
panic(err) panic(err)
} }
timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
for { for {
if forceRes != nil { if forceRes != nil {
forceRes <- lastRes forceRes <- lastRes
@ -100,7 +101,7 @@ func (b *PreCommitBatcher) run() {
return return
case <-b.notify: case <-b.notify:
sendAboveMax = true sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack): case <-timer.C:
// do nothing // do nothing
case fr := <-b.force: // user triggered case fr := <-b.force: // user triggered
forceRes = fr forceRes = fr
@ -111,17 +112,26 @@ func (b *PreCommitBatcher) run() {
if err != nil { if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err) log.Warnw("PreCommitBatcher processBatch error", "error", err)
} }
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
} }
} }
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time { func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now() now := time.Now()
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
if len(b.todo) == 0 { if len(b.todo) == 0 {
return nil return maxWait
} }
var cutoff time.Time var cutoff time.Time
@ -139,12 +149,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
} }
if cutoff.IsZero() { if cutoff.IsZero() {
return time.After(maxWait) return maxWait
} }
cutoff = cutoff.Add(-slack) cutoff = cutoff.Add(-slack)
if cutoff.Before(now) { if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0 return time.Nanosecond // can't return 0
} }
wait := cutoff.Sub(now) wait := cutoff.Sub(now)
@ -152,7 +162,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
wait = maxWait wait = maxWait
} }
return time.After(wait) return wait
} }
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) { func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {

View File

@ -1,6 +1,10 @@
package sealiface package sealiface
import "time" import (
"time"
"github.com/filecoin-project/go-state-types/abi"
)
// this has to be in a separate package to not make lotus API depend on filecoin-ffi // this has to be in a separate package to not make lotus API depend on filecoin-ffi
@ -31,6 +35,8 @@ type Config struct {
CommitBatchWait time.Duration CommitBatchWait time.Duration
CommitBatchSlack time.Duration CommitBatchSlack time.Duration
AggregateAboveBaseFee abi.TokenAmount
TerminateBatchMax uint64 TerminateBatchMax uint64
TerminateBatchMin uint64 TerminateBatchMin uint64
TerminateBatchWait time.Duration TerminateBatchWait time.Duration

View File

@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
} }
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context()) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err) log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil return nil
@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
} }
} }
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain:
// noop, this is expected
case *ErrSectorNumberAllocated:
// noop, already committed?
default:
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
}
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:

View File

@ -105,48 +105,66 @@ func checkTicketExpired(sector SectorInfo, epoch abi.ChainEpoch) bool {
return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations return epoch-sector.TicketEpoch > MaxTicketAge // TODO: allow configuring expected seal durations
} }
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) { func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
return currEpoch > preCommitEpoch+msd
}
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
tok, epoch, err := m.api.ChainHead(ctx.Context()) tok, epoch, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) log.Errorf("getTicket: api error, not proceeding: %+v", err)
return nil, 0, nil return nil, 0, false, nil
}
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
return nil, 0, false, nil
} }
ticketEpoch := epoch - policy.SealRandomnessLookback ticketEpoch := epoch - policy.SealRandomnessLookback
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(buf); err != nil { if err := m.maddr.MarshalCBOR(buf); err != nil {
return nil, 0, err return nil, 0, allocated, err
} }
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil { if err != nil {
return nil, 0, xerrors.Errorf("getting precommit info: %w", err) return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
} }
if pci != nil { if pci != nil {
ticketEpoch = pci.Info.SealRandEpoch ticketEpoch = pci.Info.SealRandEpoch
if checkTicketExpired(sector, ticketEpoch) { nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
return nil, 0, xerrors.Errorf("ticket expired for precommitted sector") if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
} }
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
}
}
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
} }
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes()) rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
if err != nil { if err != nil {
return nil, 0, err return nil, 0, allocated, err
} }
return abi.SealRandomness(rand), ticketEpoch, nil return abi.SealRandomness(rand), ticketEpoch, allocated, nil
} }
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector) ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
if err != nil { if err != nil {
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("error checking if sector is allocated: %+v", aerr)
}
if allocated { if allocated {
if sector.CommitMessage != nil { if sector.CommitMessage != nil {
// Some recovery paths with unfortunate timing lead here // Some recovery paths with unfortunate timing lead here
@ -182,14 +200,35 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
} }
} }
_, height, err := m.api.ChainHead(ctx.Context()) tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil return nil
} }
if checkTicketExpired(sector, height) { if checkTicketExpired(sector, height) {
return ctx.Send(SectorOldTicket{}) // go get new ticket pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
return nil
}
if pci == nil {
return ctx.Send(SectorOldTicket{}) // go get new ticket
}
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
return nil
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
// if height > PreCommitEpoch + msd, there is no need to recalculate
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
return ctx.Send(SectorOldTicket{}) // will be removed
}
} }
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
spt: sector.SectorType, spt: sector.SectorType,
}) })
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) return ctx.Send(SectorRetrySubmitCommit{})
} }
if res.Error != "" { if res.Error != "" {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)}) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
return ctx.Send(SectorRetrySubmitCommit{})
} }
if e, found := res.FailedSectors[sector.SectorNumber]; found { if e, found := res.FailedSectors[sector.SectorNumber]; found {

View File

@ -110,6 +110,10 @@ type SealingConfig struct {
// time buffer for forceful batch submission before sectors/deals in batch would start expiring // time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration CommitBatchSlack Duration
// network BaseFee below which to stop doing commit aggregation, instead
// submitting proofs to the chain individually
AggregateAboveBaseFee types.FIL
TerminateBatchMax uint64 TerminateBatchMax uint64
TerminateBatchMin uint64 TerminateBatchMin uint64
TerminateBatchWait Duration TerminateBatchWait Duration
@ -296,6 +300,8 @@ func DefaultStorageMiner() *StorageMiner {
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
TerminateBatchMin: 1, TerminateBatchMin: 1,
TerminateBatchMax: 100, TerminateBatchMax: 100,
TerminateBatchWait: Duration(5 * time.Minute), TerminateBatchWait: Duration(5 * time.Minute),

View File

@ -840,11 +840,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait), PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait),
PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack), PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack),
AggregateCommits: cfg.AggregateCommits, AggregateCommits: cfg.AggregateCommits,
MinCommitBatch: cfg.MinCommitBatch, MinCommitBatch: cfg.MinCommitBatch,
MaxCommitBatch: cfg.MaxCommitBatch, MaxCommitBatch: cfg.MaxCommitBatch,
CommitBatchWait: config.Duration(cfg.CommitBatchWait), CommitBatchWait: config.Duration(cfg.CommitBatchWait),
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack), CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
TerminateBatchMax: cfg.TerminateBatchMax, TerminateBatchMax: cfg.TerminateBatchMax,
TerminateBatchMin: cfg.TerminateBatchMin, TerminateBatchMin: cfg.TerminateBatchMin,
@ -855,32 +856,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
}, nil }, nil
} }
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
return sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
}
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out sealiface.Config, err error) { return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) { err = readCfg(r, func(cfg *config.StorageMiner) {
out = sealiface.Config{ out = ToSealingConfig(cfg)
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
}) })
return return
}, nil }, nil