Merge pull request #6665 from filecoin-project/backports/v1.11.0-rc2

Backports for v1.11.0-rc2
This commit is contained in:
Jennifer 2021-07-02 17:09:33 -04:00 committed by GitHub
commit 0519cd371e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 466 additions and 123 deletions

View File

@ -826,6 +826,11 @@ workflows:
suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sector_finalize_early
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"
- test:
name: test-itest-sector_pledge
suite: itest-sector_pledge

View File

@ -1,10 +1,13 @@
# Lotus changelog
# 1.11.0-rc1 / 2021-06-28
# 1.11.0-rc2 / 2021-07-02
This is the first release candidate for the optional Lotus v1.11.0 release that introduces several months of bugfixes and feature development.
This is the second release candidate for the optional Lotus v1.11.0 release that introduces several months of bugfixes
and feature development. A more detailed changelog will follow upon final release.
- github.com/filecoin-project/lotus:
- update changelog and bump version to v1.11.0-rc2
- Lotus version 1.11.0
- gateway: Add support for Version method ([filecoin-project/lotus#6618](https://github.com/filecoin-project/lotus/pull/6618))
- Miner SimultaneousTransfers config ([filecoin-project/lotus#6612](https://github.com/filecoin-project/lotus/pull/6612))
@ -197,7 +200,7 @@ Contributors
| chadwick2143 | 3 | +739/-1 | 4 |
| Peter Rabbitson | 21 | +487/-164 | 36 |
| hannahhoward | 5 | +544/-5 | 19 |
| Jennifer Wang | 8 | +206/-172 | 17 |
| Jennifer Wang | 9 | +241/-174 | 19 |
| frrist | 1 | +137/-88 | 7 |
| Travis Person | 3 | +175/-6 | 7 |
| Alex Wade | 1 | +48/-129 | 1 |
@ -220,6 +223,38 @@ Contributors
| Jack Yao | 1 | +1/-1 | 1 |
| IPFSUnion | 1 | +1/-1 | 1 |
# 1.10.1-rc1 / 2021-07-02
This is an optional, but **highly recommended** release of Lotus that have many bug fixes and improvements based on the feedbacks we got from the community since HyperDrive.
## New Features
- commit batch: AggregateAboveBaseFee config #6650
- `AggregateAboveBaseFee` is added to miner sealing configuration for setting the network base fee to start aggregating proofs. When the network base fee is lower than this value, the prove commits will be submitted individually via `ProveCommitSector`. According to the [Batch Incentive Alignment](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0013. md#batch-incentive-alignment) introduced in FIP-0013, we recommend miners to set this value to 0.15 nanoFIL(which is also the default) to avoid unexpected aggregation fee in burn and enjoy the most benefits of aggregation!
## Bug Fixes
- storage: Fix FinalizeSector with sectors in storage paths #6652
- Fix tiny error in check-client-datacap #6664
- Fix: precommit_batch method used the wrong cfg.PreCommitBatchWait #6658
- to optimize the batchwait #6636
- fix getTicket: sector precommitted but expired case #6635
- handleSubmitCommitAggregate() exception handling #6595
- remove precommit check in handleCommitFailed #6634
- ensure agg fee is adequate
- fix: miner balance is not enough, so that ProveCommitAggregate msg exec failed #6623
- commit batch: Initialize the FailedSectors map #6647
Contributors
| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Łukasz Magiera | 7 | +151/-56 | 21 |
| llifezou | 4 | +59/-20 | 4 |
| johnli-helloworld | 2 | +45/-14 | 4 |
| wangchao | 1 | +1/-27 | 1 |
| Jerry | 2 | +9/-4 | 2 |
| zhoutian527 | 1 | +2/-2 | 1 |
| Peter Rabbitson | 1 | +1/-1 | 1 |
# 1.10.0 / 2021-06-23
This is a mandatory release of Lotus that introduces Filecoin network v13, codenamed the HyperDrive upgrade. The

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -34,7 +34,7 @@ func buildType() string {
}
// BuildVersion is the local build version, set by build system
const BuildVersion = "1.11.0-rc1"
const BuildVersion = "1.11.0-rc2"
func UserVersion() string {
if os.Getenv("LOTUS_VERSION_IGNORE_COMMIT") == "1" {

View File

@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
}
var AttoFil = NewInt(1)
var FemtoFil = BigMul(AttoFil, NewInt(1000))
var PicoFil = BigMul(FemtoFil, NewInt(1000))
var NanoFil = BigMul(PicoFil, NewInt(1000))
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
func (f FIL) Short() string {

View File

@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{
return err
}
if dcap == nil {
return xerrors.Errorf("client %s is not a verified client", err)
return xerrors.Errorf("client %s is not a verified client", caddr)
}
fmt.Println(*dcap)

View File

@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
}
if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
return xerrors.Errorf("must specify at least one of --store or --seal")
}
b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -145,7 +145,7 @@ over time
}
if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
return xerrors.Errorf("must specify at least one of --store or --seal")
}
b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -7,7 +7,7 @@ USAGE:
lotus-miner [global options] command [command options] [arguments...]
VERSION:
1.11.0-dev
1.11.0-rc2
COMMANDS:
init Initialize a lotus miner repo

View File

@ -7,7 +7,7 @@ USAGE:
lotus-worker [global options] command [command options] [arguments...]
VERSION:
1.11.0-dev
1.11.0-rc2
COMMANDS:
run Start lotus worker

View File

@ -7,7 +7,7 @@ USAGE:
lotus [global options] command [command options] [arguments...]
VERSION:
1.11.0-dev
1.11.0-rc2
COMMANDS:
daemon Start a lotus daemon process

View File

@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
return err

View File

@ -32,6 +32,9 @@ import (
const arp = abi.RegisteredAggregationProof_SnarkPackV1
var aggFeeNum = big.NewInt(110)
var aggFeeDen = big.NewInt(100)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
type CommitBatcherApi interface {
@ -103,6 +106,7 @@ func (b *CommitBatcher) run() {
panic(err)
}
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
for {
if forceRes != nil {
forceRes <- lastMsg
@ -118,7 +122,7 @@ func (b *CommitBatcher) run() {
return
case <-b.notify:
sendAboveMax = true
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
case <-timer.C:
// do nothing
case fr := <-b.force: // user triggered
forceRes = fr
@ -129,17 +133,26 @@ func (b *CommitBatcher) run() {
if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
}
}
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now()
b.lk.Lock()
defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
return maxWait
}
var cutoff time.Time
@ -157,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
}
if cutoff.IsZero() {
return time.After(maxWait)
return maxWait
}
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
return time.Nanosecond // can't return 0
}
wait := cutoff.Sub(now)
@ -170,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
wait = maxWait
}
return time.After(wait)
return wait
}
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
@ -193,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
var res []sealiface.CommitBatchRes
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
tok, _, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
if bf.LessThan(cfg.AggregateAboveBaseFee) {
individual = true
}
}
if individual {
res, err = b.processIndividually()
} else {
res, err = b.processBatch(cfg)
@ -229,7 +260,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
total := len(b.todo)
var res sealiface.CommitBatchRes
res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
}
params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
@ -305,16 +338,18 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}
aggFee := policy.AggregateNetworkFee(nv, len(infos), bf)
aggFee := big.Div(big.Mul(policy.AggregateNetworkFee(nv, len(infos), bf), aggFeeNum), aggFeeDen)
goodFunds := big.Add(maxFee, big.Add(collateral, aggFee))
needFunds := big.Add(collateral, aggFee)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
goodFunds := big.Add(maxFee, needFunds)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, maxFee, enc.Bytes())
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}
@ -341,7 +376,8 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error
for sn, info := range b.todo {
r := sealiface.CommitBatchRes{
Sectors: []abi.SectorNumber{sn},
Sectors: []abi.SectorNumber{sn},
FailedSectors: map[abi.SectorNumber]string{},
}
mcid, err := b.processSingle(mi, sn, info, tok)

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) {
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) {
}
}
expectSend := func(expect []abi.SectorNumber) action {
expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) {
batch = true
ti = 1
}
basefee := types.PicoFil
if aboveBalancer {
basefee = types.NanoFil
}
if batch {
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
}
if !aboveBalancer {
batch = false
ti = len(expect)
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
pciC := len(expect)
if failOnePCI {
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
pciC = len(expect) - 1
if !batch {
ti--
}
}
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect))
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
}, nil).Times(pciC)
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)
if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
}
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) {
}
}
flush := func(expect []abi.SectorNumber) action {
flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
_ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
batch := len(expect) >= minBatch
batch := len(expect) >= minBatch && aboveBalancer
r, err := pcb.Flush(ctx)
require.NoError(t, err)
@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
if !failOnePCI {
require.Len(t, r[0].FailedSectors, 0)
} else {
require.Len(t, r[0].FailedSectors, 1)
_, found := r[0].FailedSectors[1]
require.True(t, found)
}
} else {
require.Len(t, r, len(expect))
for _, res := range r {
@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) {
})
for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
if failOnePCI && res.Sectors[0] == 1 {
require.Len(t, res.FailedSectors, 1)
_, found := res.FailedSectors[1]
require.True(t, found)
} else {
require.Empty(t, res.FailedSectors)
}
}
}
@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) {
tcs := map[string]struct {
actions []action
}{
"addSingle": {
"addSingle-aboveBalancer": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
flush([]abi.SectorNumber{0}, true, false),
},
},
"addTwo": {
"addTwo-aboveBalancer": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
flush(getSectors(2), true, false),
},
},
"addAte": {
"addAte-aboveBalancer": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8)),
flush(getSectors(8), true, false),
},
},
"addMax": {
"addMax-aboveBalancer": {
actions: []action{
expectSend(getSectors(maxBatch)),
expectSend(getSectors(maxBatch), true, false),
addSectors(getSectors(maxBatch)),
},
},
"addSingle-belowBalancer": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}, false, false),
},
},
"addTwo-belowBalancer": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2), false, false),
},
},
"addAte-belowBalancer": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), false, false),
},
},
"addMax-belowBalancer": {
actions: []action{
expectSend(getSectors(maxBatch), false, false),
addSectors(getSectors(maxBatch)),
},
},
"addAte-aboveBalancer-failOne": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), true, true),
},
},
"addAte-belowBalancer-failOne": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8), false, true),
},
},
}
for name, tc := range tcs {

View File

@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
SubmitCommitAggregate: planOne(
on(SectorCommitAggregateSent{}, CommitWait),
on(SectorCommitFailed{}, CommitFailed),
on(SectorRetrySubmitCommit{}, SubmitCommit),
),
CommitWait: planOne(
on(SectorProving{}, FinalizeSector),

View File

@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
panic(err)
}
timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
for {
if forceRes != nil {
forceRes <- lastRes
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
return
case <-b.notify:
sendAboveMax = true
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
case <-timer.C:
// do nothing
case fr := <-b.force: // user triggered
forceRes = fr
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
if err != nil {
log.Warnw("PreCommitBatcher processBatch error", "error", err)
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
}
}
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
now := time.Now()
b.lk.Lock()
defer b.lk.Unlock()
if len(b.todo) == 0 {
return nil
return maxWait
}
var cutoff time.Time
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
}
if cutoff.IsZero() {
return time.After(maxWait)
return maxWait
}
cutoff = cutoff.Add(-slack)
if cutoff.Before(now) {
return time.After(time.Nanosecond) // can't return 0
return time.Nanosecond // can't return 0
}
wait := cutoff.Sub(now)
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
wait = maxWait
}
return time.After(wait)
return wait
}
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {

View File

@ -1,6 +1,10 @@
package sealiface
import "time"
import (
"time"
"github.com/filecoin-project/go-state-types/abi"
)
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
@ -31,6 +35,8 @@ type Config struct {
CommitBatchWait time.Duration
CommitBatchSlack time.Duration
AggregateAboveBaseFee abi.TokenAmount
TerminateBatchMax uint64
TerminateBatchMin uint64
TerminateBatchWait time.Duration

View File

@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
}
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.api.ChainHead(ctx.Context())
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
}
}
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
return nil
case *ErrBadCommD:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)})
case *ErrBadTicket:
return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)})
case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case nil:
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
case *ErrPrecommitOnChain:
// noop, this is expected
case *ErrSectorNumberAllocated:
// noop, already committed?
default:
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
}
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
switch err.(type) {
case *ErrApi:

View File

@ -105,48 +105,66 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool {
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
}
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) {
func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
return currEpoch > preCommitEpoch+msd
}
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
tok, epoch, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil, 0, nil
log.Errorf("getTicket: api error, not proceeding: %+v", err)
return nil, 0, false, nil
}
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
return nil, 0, false, nil
}
ticketEpoch := epoch - policy.SealRandomnessLookback
buf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(buf); err != nil {
return nil, 0, err
return nil, 0, allocated, err
}
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
return nil, 0, xerrors.Errorf("getting precommit info: %w", err)
return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
}
if pci != nil {
ticketEpoch = pci.Info.SealRandEpoch
if checkTicketExpired(ticketEpoch, epoch) {
return nil, 0, xerrors.Errorf("ticket expired for precommitted sector")
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
}
}
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
}
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
if err != nil {
return nil, 0, err
return nil, 0, allocated, err
}
return abi.SealRandomness(rand), ticketEpoch, nil
return abi.SealRandomness(rand), ticketEpoch, allocated, nil
}
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector)
ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
if err != nil {
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
if aerr != nil {
log.Errorf("error checking if sector is allocated: %+v", aerr)
}
if allocated {
if sector.CommitMessage != nil {
// Some recovery paths with unfortunate timing lead here
@ -182,14 +200,35 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
}
}
_, height, err := m.api.ChainHead(ctx.Context())
tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
}
if checkTicketExpired(sector.TicketEpoch, height) {
return ctx.Send(SectorOldTicket{}) // go get new ticket
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if err != nil {
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
return nil
}
if pci == nil {
return ctx.Send(SectorOldTicket{}) // go get new ticket
}
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
return nil
}
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
// if height > PreCommitEpoch + msd, there is no need to recalculate
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
return ctx.Send(SectorOldTicket{}) // will be removed
}
}
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
Spt: sector.SectorType,
})
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
return ctx.Send(SectorRetrySubmitCommit{})
}
if res.Error != "" {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
return ctx.Send(SectorRetrySubmitCommit{})
}
if e, found := res.FailedSectors[sector.SectorNumber]; found {

View File

@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
kit.QuietMiningLogs()
var blockTime = 1 * time.Second
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)

View File

@ -2,22 +2,29 @@ package kit
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/miner"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
// TestMiner represents a miner enrolled in an Ensemble.
@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
fmt.Printf("COMMIT BATCH: %+v\n", cb)
}
}
const metaFile = "sectorstore.json"
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
p, err := ioutil.TempDir("", "lotus-testsectors-")
require.NoError(t, err)
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
require.NoError(t, err)
}
}
_, err = os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
require.NoError(t, err)
}
cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: weight,
CanSeal: seal,
CanStore: store,
}
if !(cfg.CanStore || cfg.CanSeal) {
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
}
b, err := json.MarshalIndent(cfg, "", " ")
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
require.NoError(t, err)
err = tm.StorageAddLocal(ctx, p)
require.NoError(t, err)
}

View File

@ -0,0 +1,66 @@
package itests
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
kit.QuietMiningLogs()
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true
return modules.ToSealingConfig(cf), nil
}, nil
})))) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner)
ctx := context.Background()
miner.AddStorage(ctx, t, 1000000000, true, false)
miner.AddStorage(ctx, t, 1000000000, false, true)
sl, err := miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
t.Run("single", func(t *testing.T) {
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
})
sl, err = miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
}

View File

@ -144,6 +144,10 @@ type SealingConfig struct {
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration
// network BaseFee below which to stop doing commit aggregation, instead
// submitting proofs to the chain individually
AggregateAboveBaseFee types.FIL
TerminateBatchMax uint64
TerminateBatchMin uint64
TerminateBatchWait Duration
@ -330,6 +334,8 @@ func DefaultStorageMiner() *StorageMiner {
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: Duration(5 * time.Minute),

View File

@ -866,11 +866,12 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
PreCommitBatchWait: config.Duration(cfg.PreCommitBatchWait),
PreCommitBatchSlack: config.Duration(cfg.PreCommitBatchSlack),
AggregateCommits: cfg.AggregateCommits,
MinCommitBatch: cfg.MinCommitBatch,
MaxCommitBatch: cfg.MaxCommitBatch,
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
AggregateCommits: cfg.AggregateCommits,
MinCommitBatch: cfg.MinCommitBatch,
MaxCommitBatch: cfg.MaxCommitBatch,
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
TerminateBatchMax: cfg.TerminateBatchMax,
TerminateBatchMin: cfg.TerminateBatchMin,
@ -881,32 +882,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
}, nil
}
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
return sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
}
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly,
BatchPreCommits: cfg.Sealing.BatchPreCommits,
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
}
out = ToSealingConfig(cfg)
})
return
}, nil