Merge branch 'backports/v1.11.0-rc2' into jen/changelogv11
This commit is contained in:
commit
83114546c6
@ -826,6 +826,11 @@ workflows:
|
||||
suite: itest-sdr_upgrade
|
||||
target: "./itests/sdr_upgrade_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-sector_finalize_early
|
||||
suite: itest-sector_finalize_early
|
||||
target: "./itests/sector_finalize_early_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-sector_pledge
|
||||
suite: itest-sector_pledge
|
||||
|
@ -7,6 +7,7 @@ and feature development. A more detailed changelog will follow upon final releas
|
||||
|
||||
- github.com/filecoin-project/lotus:
|
||||
- update changelog and bump version to v1.11.0-rc2
|
||||
|
||||
- Lotus version 1.11.0
|
||||
- gateway: Add support for Version method ([filecoin-project/lotus#6618](https://github.com/filecoin-project/lotus/pull/6618))
|
||||
- Miner SimultaneousTransfers config ([filecoin-project/lotus#6612](https://github.com/filecoin-project/lotus/pull/6612))
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
|
||||
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
|
||||
}
|
||||
|
||||
var AttoFil = NewInt(1)
|
||||
var FemtoFil = BigMul(AttoFil, NewInt(1000))
|
||||
var PicoFil = BigMul(FemtoFil, NewInt(1000))
|
||||
var NanoFil = BigMul(PicoFil, NewInt(1000))
|
||||
|
||||
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
|
||||
|
||||
func (f FIL) Short() string {
|
||||
|
@ -210,7 +210,7 @@ var filplusCheckClientCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
if dcap == nil {
|
||||
return xerrors.Errorf("client %s is not a verified client", err)
|
||||
return xerrors.Errorf("client %s is not a verified client", caddr)
|
||||
}
|
||||
|
||||
fmt.Println(*dcap)
|
||||
|
@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
||||
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
|
@ -145,7 +145,7 @@ over time
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
||||
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus-miner [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.11.0-dev
|
||||
1.11.0-rc2
|
||||
|
||||
COMMANDS:
|
||||
init Initialize a lotus miner repo
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus-worker [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.11.0-dev
|
||||
1.11.0-rc2
|
||||
|
||||
COMMANDS:
|
||||
run Start lotus worker
|
||||
|
@ -7,7 +7,7 @@ USAGE:
|
||||
lotus [global options] command [command options] [arguments...]
|
||||
|
||||
VERSION:
|
||||
1.11.0-dev
|
||||
1.11.0-rc2
|
||||
|
||||
COMMANDS:
|
||||
daemon Start a lotus daemon process
|
||||
|
17
extern/sector-storage/manager.go
vendored
17
extern/sector-storage/manager.go
vendored
@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
||||
}
|
||||
}
|
||||
|
||||
pathType := storiface.PathStorage
|
||||
{
|
||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||
}
|
||||
|
||||
for _, store := range sealedStores {
|
||||
if store.CanSeal {
|
||||
pathType = storiface.PathSealing
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
||||
|
||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||
return err
|
||||
|
60
extern/storage-sealing/commit_batch.go
vendored
60
extern/storage-sealing/commit_batch.go
vendored
@ -32,6 +32,9 @@ import (
|
||||
|
||||
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
||||
|
||||
var aggFeeNum = big.NewInt(110)
|
||||
var aggFeeDen = big.NewInt(100)
|
||||
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
|
||||
|
||||
type CommitBatcherApi interface {
|
||||
@ -103,6 +106,7 @@ func (b *CommitBatcher) run() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
timer := time.NewTimer(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||
for {
|
||||
if forceRes != nil {
|
||||
forceRes <- lastMsg
|
||||
@ -118,7 +122,7 @@ func (b *CommitBatcher) run() {
|
||||
return
|
||||
case <-b.notify:
|
||||
sendAboveMax = true
|
||||
case <-b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack):
|
||||
case <-timer.C:
|
||||
// do nothing
|
||||
case fr := <-b.force: // user triggered
|
||||
forceRes = fr
|
||||
@ -129,17 +133,26 @@ func (b *CommitBatcher) run() {
|
||||
if err != nil {
|
||||
log.Warnw("CommitBatcher processBatch error", "error", err)
|
||||
}
|
||||
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
timer.Reset(b.batchWait(cfg.CommitBatchWait, cfg.CommitBatchSlack))
|
||||
}
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
||||
func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||
now := time.Now()
|
||||
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
|
||||
if len(b.todo) == 0 {
|
||||
return nil
|
||||
return maxWait
|
||||
}
|
||||
|
||||
var cutoff time.Time
|
||||
@ -157,12 +170,12 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
||||
}
|
||||
|
||||
if cutoff.IsZero() {
|
||||
return time.After(maxWait)
|
||||
return maxWait
|
||||
}
|
||||
|
||||
cutoff = cutoff.Add(-slack)
|
||||
if cutoff.Before(now) {
|
||||
return time.After(time.Nanosecond) // can't return 0
|
||||
return time.Nanosecond // can't return 0
|
||||
}
|
||||
|
||||
wait := cutoff.Sub(now)
|
||||
@ -170,7 +183,7 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
|
||||
wait = maxWait
|
||||
}
|
||||
|
||||
return time.After(wait)
|
||||
return wait
|
||||
}
|
||||
|
||||
func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, error) {
|
||||
@ -193,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
|
||||
|
||||
var res []sealiface.CommitBatchRes
|
||||
|
||||
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
|
||||
individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
|
||||
|
||||
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
|
||||
tok, _, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bf, err := b.api.ChainBaseFee(b.mctx, tok)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
|
||||
}
|
||||
|
||||
if bf.LessThan(cfg.AggregateAboveBaseFee) {
|
||||
individual = true
|
||||
}
|
||||
}
|
||||
|
||||
if individual {
|
||||
res, err = b.processIndividually()
|
||||
} else {
|
||||
res, err = b.processBatch(cfg)
|
||||
@ -229,7 +260,9 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
|
||||
total := len(b.todo)
|
||||
|
||||
var res sealiface.CommitBatchRes
|
||||
res := sealiface.CommitBatchRes{
|
||||
FailedSectors: map[abi.SectorNumber]string{},
|
||||
}
|
||||
|
||||
params := miner5.ProveCommitAggregateParams{
|
||||
SectorNumbers: bitfield.New(),
|
||||
@ -305,16 +338,18 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
|
||||
}
|
||||
|
||||
aggFee := policy.AggregateNetworkFee(nv, len(infos), bf)
|
||||
aggFee := big.Div(big.Mul(policy.AggregateNetworkFee(nv, len(infos), bf), aggFeeNum), aggFeeDen)
|
||||
|
||||
goodFunds := big.Add(maxFee, big.Add(collateral, aggFee))
|
||||
needFunds := big.Add(collateral, aggFee)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
|
||||
goodFunds := big.Add(maxFee, needFunds)
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds)
|
||||
if err != nil {
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, collateral, maxFee, enc.Bytes())
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
|
||||
}
|
||||
@ -342,6 +377,7 @@ func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error
|
||||
for sn, info := range b.todo {
|
||||
r := sealiface.CommitBatchRes{
|
||||
Sectors: []abi.SectorNumber{sn},
|
||||
FailedSectors: map[abi.SectorNumber]string{},
|
||||
}
|
||||
|
||||
mcid, err := b.processSingle(mi, sn, info, tok)
|
||||
|
115
extern/storage-sealing/commit_batch_test.go
vendored
115
extern/storage-sealing/commit_batch_test.go
vendored
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||||
@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) {
|
||||
CommitBatchWait: 24 * time.Hour,
|
||||
CommitBatchSlack: 1 * time.Hour,
|
||||
|
||||
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: 5 * time.Minute,
|
||||
@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
expectSend := func(expect []abi.SectorNumber) action {
|
||||
expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||||
|
||||
@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) {
|
||||
batch = true
|
||||
ti = 1
|
||||
}
|
||||
|
||||
basefee := types.PicoFil
|
||||
if aboveBalancer {
|
||||
basefee = types.NanoFil
|
||||
}
|
||||
|
||||
if batch {
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||
}
|
||||
|
||||
if !aboveBalancer {
|
||||
batch = false
|
||||
ti = len(expect)
|
||||
}
|
||||
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
|
||||
pciC := len(expect)
|
||||
if failOnePCI {
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
|
||||
pciC = len(expect) - 1
|
||||
if !batch {
|
||||
ti--
|
||||
}
|
||||
}
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||||
PreCommitDeposit: big.Zero(),
|
||||
}, nil).Times(len(expect))
|
||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
|
||||
}, nil).Times(pciC)
|
||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)
|
||||
|
||||
if batch {
|
||||
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||
}
|
||||
|
||||
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||||
@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
flush := func(expect []abi.SectorNumber) action {
|
||||
flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
_ = expectSend(expect)(t, s, pcb)
|
||||
_ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
|
||||
|
||||
batch := len(expect) >= minBatch
|
||||
batch := len(expect) >= minBatch && aboveBalancer
|
||||
|
||||
r, err := pcb.Flush(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) {
|
||||
return r[0].Sectors[i] < r[0].Sectors[j]
|
||||
})
|
||||
require.Equal(t, expect, r[0].Sectors)
|
||||
if !failOnePCI {
|
||||
require.Len(t, r[0].FailedSectors, 0)
|
||||
} else {
|
||||
require.Len(t, r[0].FailedSectors, 1)
|
||||
_, found := r[0].FailedSectors[1]
|
||||
require.True(t, found)
|
||||
}
|
||||
} else {
|
||||
require.Len(t, r, len(expect))
|
||||
for _, res := range r {
|
||||
@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) {
|
||||
})
|
||||
for i, res := range r {
|
||||
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
||||
if failOnePCI && res.Sectors[0] == 1 {
|
||||
require.Len(t, res.FailedSectors, 1)
|
||||
_, found := res.FailedSectors[1]
|
||||
require.True(t, found)
|
||||
} else {
|
||||
require.Empty(t, res.FailedSectors)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) {
|
||||
tcs := map[string]struct {
|
||||
actions []action
|
||||
}{
|
||||
"addSingle": {
|
||||
"addSingle-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}),
|
||||
flush([]abi.SectorNumber{0}, true, false),
|
||||
},
|
||||
},
|
||||
"addTwo": {
|
||||
"addTwo-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2)),
|
||||
flush(getSectors(2), true, false),
|
||||
},
|
||||
},
|
||||
"addAte": {
|
||||
"addAte-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8)),
|
||||
flush(getSectors(8), true, false),
|
||||
},
|
||||
},
|
||||
"addMax": {
|
||||
"addMax-aboveBalancer": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch)),
|
||||
expectSend(getSectors(maxBatch), true, false),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
"addSingle-belowBalancer": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}, false, false),
|
||||
},
|
||||
},
|
||||
"addTwo-belowBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2), false, false),
|
||||
},
|
||||
},
|
||||
"addAte-belowBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), false, false),
|
||||
},
|
||||
},
|
||||
"addMax-belowBalancer": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch), false, false),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
|
||||
"addAte-aboveBalancer-failOne": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), true, true),
|
||||
},
|
||||
},
|
||||
"addAte-belowBalancer-failOne": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), false, true),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
|
1
extern/storage-sealing/fsm.go
vendored
1
extern/storage-sealing/fsm.go
vendored
@ -115,6 +115,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
SubmitCommitAggregate: planOne(
|
||||
on(SectorCommitAggregateSent{}, CommitWait),
|
||||
on(SectorCommitFailed{}, CommitFailed),
|
||||
on(SectorRetrySubmitCommit{}, SubmitCommit),
|
||||
),
|
||||
CommitWait: planOne(
|
||||
on(SectorProving{}, FinalizeSector),
|
||||
|
22
extern/storage-sealing/precommit_batch.go
vendored
22
extern/storage-sealing/precommit_batch.go
vendored
@ -88,6 +88,7 @@ func (b *PreCommitBatcher) run() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
timer := time.NewTimer(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
|
||||
for {
|
||||
if forceRes != nil {
|
||||
forceRes <- lastRes
|
||||
@ -102,7 +103,7 @@ func (b *PreCommitBatcher) run() {
|
||||
return
|
||||
case <-b.notify:
|
||||
sendAboveMax = true
|
||||
case <-b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack):
|
||||
case <-timer.C:
|
||||
// do nothing
|
||||
case fr := <-b.force: // user triggered
|
||||
forceRes = fr
|
||||
@ -113,17 +114,26 @@ func (b *PreCommitBatcher) run() {
|
||||
if err != nil {
|
||||
log.Warnw("PreCommitBatcher processBatch error", "error", err)
|
||||
}
|
||||
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
timer.Reset(b.batchWait(cfg.PreCommitBatchWait, cfg.PreCommitBatchSlack))
|
||||
}
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time {
|
||||
func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) time.Duration {
|
||||
now := time.Now()
|
||||
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
|
||||
if len(b.todo) == 0 {
|
||||
return nil
|
||||
return maxWait
|
||||
}
|
||||
|
||||
var cutoff time.Time
|
||||
@ -141,12 +151,12 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
||||
}
|
||||
|
||||
if cutoff.IsZero() {
|
||||
return time.After(maxWait)
|
||||
return maxWait
|
||||
}
|
||||
|
||||
cutoff = cutoff.Add(-slack)
|
||||
if cutoff.Before(now) {
|
||||
return time.After(time.Nanosecond) // can't return 0
|
||||
return time.Nanosecond // can't return 0
|
||||
}
|
||||
|
||||
wait := cutoff.Sub(now)
|
||||
@ -154,7 +164,7 @@ func (b *PreCommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.T
|
||||
wait = maxWait
|
||||
}
|
||||
|
||||
return time.After(wait)
|
||||
return wait
|
||||
}
|
||||
|
||||
func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBatchRes, error) {
|
||||
|
8
extern/storage-sealing/sealiface/config.go
vendored
8
extern/storage-sealing/sealiface/config.go
vendored
@ -1,6 +1,10 @@
|
||||
package sealiface
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
|
||||
|
||||
@ -31,6 +35,8 @@ type Config struct {
|
||||
CommitBatchWait time.Duration
|
||||
CommitBatchSlack time.Duration
|
||||
|
||||
AggregateAboveBaseFee abi.TokenAmount
|
||||
|
||||
TerminateBatchMax uint64
|
||||
TerminateBatchMin uint64
|
||||
TerminateBatchWait time.Duration
|
||||
|
29
extern/storage-sealing/states_failed.go
vendored
29
extern/storage-sealing/states_failed.go
vendored
@ -182,7 +182,7 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
|
||||
}
|
||||
|
||||
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
tok, height, err := m.api.ChainHead(ctx.Context())
|
||||
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
@ -216,33 +216,6 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
||||
}
|
||||
}
|
||||
|
||||
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
|
||||
switch err.(type) {
|
||||
case *ErrApi:
|
||||
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
case *ErrBadCommD:
|
||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||
case *ErrExpiredTicket:
|
||||
return ctx.Send(SectorTicketExpired{xerrors.Errorf("ticket expired error, removing sector: %w", err)})
|
||||
case *ErrBadTicket:
|
||||
return ctx.Send(SectorTicketExpired{xerrors.Errorf("expired ticket, removing sector: %w", err)})
|
||||
case *ErrInvalidDeals:
|
||||
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
|
||||
return ctx.Send(SectorInvalidDealIDs{Return: RetCommitFailed})
|
||||
case *ErrExpiredDeals:
|
||||
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
|
||||
case nil:
|
||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
||||
case *ErrPrecommitOnChain:
|
||||
// noop, this is expected
|
||||
case *ErrSectorNumberAllocated:
|
||||
// noop, already committed?
|
||||
default:
|
||||
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
|
||||
switch err.(type) {
|
||||
case *ErrApi:
|
||||
|
85
extern/storage-sealing/states_sealing.go
vendored
85
extern/storage-sealing/states_sealing.go
vendored
@ -105,48 +105,66 @@ func checkTicketExpired(ticket, head abi.ChainEpoch) bool {
|
||||
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
|
||||
}
|
||||
|
||||
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, error) {
|
||||
func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
|
||||
return currEpoch > preCommitEpoch+msd
|
||||
}
|
||||
|
||||
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
|
||||
tok, epoch, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
||||
return nil, 0, nil
|
||||
log.Errorf("getTicket: api error, not proceeding: %+v", err)
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
|
||||
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
|
||||
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
|
||||
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if aerr != nil {
|
||||
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
|
||||
return nil, 0, false, nil
|
||||
}
|
||||
|
||||
ticketEpoch := epoch - policy.SealRandomnessLookback
|
||||
buf := new(bytes.Buffer)
|
||||
if err := m.maddr.MarshalCBOR(buf); err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, allocated, err
|
||||
}
|
||||
|
||||
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
|
||||
if err != nil {
|
||||
return nil, 0, xerrors.Errorf("getting precommit info: %w", err)
|
||||
return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
|
||||
}
|
||||
|
||||
if pci != nil {
|
||||
ticketEpoch = pci.Info.SealRandEpoch
|
||||
|
||||
if checkTicketExpired(ticketEpoch, epoch) {
|
||||
return nil, 0, xerrors.Errorf("ticket expired for precommitted sector")
|
||||
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||
if err != nil {
|
||||
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
|
||||
}
|
||||
|
||||
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
|
||||
|
||||
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
|
||||
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
|
||||
}
|
||||
}
|
||||
|
||||
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
|
||||
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
|
||||
}
|
||||
|
||||
rand, err := m.api.ChainGetRandomnessFromTickets(ctx.Context(), tok, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes())
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
return nil, 0, allocated, err
|
||||
}
|
||||
|
||||
return abi.SealRandomness(rand), ticketEpoch, nil
|
||||
return abi.SealRandomness(rand), ticketEpoch, allocated, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
|
||||
ticketValue, ticketEpoch, err := m.getTicket(ctx, sector)
|
||||
ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
|
||||
if err != nil {
|
||||
allocated, aerr := m.api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if aerr != nil {
|
||||
log.Errorf("error checking if sector is allocated: %+v", aerr)
|
||||
}
|
||||
|
||||
if allocated {
|
||||
if sector.CommitMessage != nil {
|
||||
// Some recovery paths with unfortunate timing lead here
|
||||
@ -182,16 +200,37 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
||||
}
|
||||
}
|
||||
|
||||
_, height, err := m.api.ChainHead(ctx.Context())
|
||||
tok, height, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if checkTicketExpired(sector.TicketEpoch, height) {
|
||||
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
|
||||
if err != nil {
|
||||
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if pci == nil {
|
||||
return ctx.Send(SectorOldTicket{}) // go get new ticket
|
||||
}
|
||||
|
||||
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||
if err != nil {
|
||||
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
|
||||
|
||||
// if height > PreCommitEpoch + msd, there is no need to recalculate
|
||||
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
|
||||
return ctx.Send(SectorOldTicket{}) // will be removed
|
||||
}
|
||||
}
|
||||
|
||||
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
||||
if err != nil {
|
||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
||||
@ -624,11 +663,21 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
|
||||
Spt: sector.SectorType,
|
||||
})
|
||||
if err != nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
|
||||
return ctx.Send(SectorRetrySubmitCommit{})
|
||||
}
|
||||
|
||||
if res.Error != "" {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
|
||||
tok, _, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
|
||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorRetrySubmitCommit{})
|
||||
}
|
||||
|
||||
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
||||
|
@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
var blockTime = 1 * time.Second
|
||||
var blockTime = 50 * time.Millisecond
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
|
@ -2,22 +2,29 @@ package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestMiner represents a miner enrolled in an Ensemble.
|
||||
@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
|
||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||
}
|
||||
}
|
||||
|
||||
const metaFile = "sectorstore.json"
|
||||
|
||||
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
|
||||
p, err := ioutil.TempDir("", "lotus-testsectors-")
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.MkdirAll(p, 0755); err != nil {
|
||||
if !os.IsExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = os.Stat(filepath.Join(p, metaFile))
|
||||
if !os.IsNotExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
ID: stores.ID(uuid.New().String()),
|
||||
Weight: weight,
|
||||
CanSeal: seal,
|
||||
CanStore: store,
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tm.StorageAddLocal(ctx, p)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
66
itests/sector_finalize_early_test.go
Normal file
66
itests/sector_finalize_early_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
func TestDealsWithFinalizeEarly(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode")
|
||||
}
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
var blockTime = 50 * time.Millisecond
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
|
||||
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (sealiface.Config, error) {
|
||||
cf := config.DefaultStorageMiner()
|
||||
cf.Sealing.FinalizeEarly = true
|
||||
return modules.ToSealingConfig(cf), nil
|
||||
}, nil
|
||||
})))) // no mock proofs.
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
miner.AddStorage(ctx, t, 1000000000, true, false)
|
||||
miner.AddStorage(ctx, t, 1000000000, false, true)
|
||||
|
||||
sl, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
for si, d := range sl {
|
||||
i, err := miner.StorageInfo(ctx, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||
}
|
||||
|
||||
t.Run("single", func(t *testing.T) {
|
||||
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
|
||||
})
|
||||
|
||||
sl, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
for si, d := range sl {
|
||||
i, err := miner.StorageInfo(ctx, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||
}
|
||||
}
|
@ -144,6 +144,10 @@ type SealingConfig struct {
|
||||
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
|
||||
CommitBatchSlack Duration
|
||||
|
||||
// network BaseFee below which to stop doing commit aggregation, instead
|
||||
// submitting proofs to the chain individually
|
||||
AggregateAboveBaseFee types.FIL
|
||||
|
||||
TerminateBatchMax uint64
|
||||
TerminateBatchMin uint64
|
||||
TerminateBatchWait Duration
|
||||
@ -330,6 +334,8 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
|
||||
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
|
||||
|
||||
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: Duration(5 * time.Minute),
|
||||
|
@ -871,6 +871,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
||||
MaxCommitBatch: cfg.MaxCommitBatch,
|
||||
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
|
||||
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
|
||||
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
|
||||
|
||||
TerminateBatchMax: cfg.TerminateBatchMax,
|
||||
TerminateBatchMin: cfg.TerminateBatchMin,
|
||||
@ -881,10 +882,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (out sealiface.Config, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = sealiface.Config{
|
||||
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
|
||||
return sealiface.Config{
|
||||
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
||||
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||
@ -902,11 +901,18 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
||||
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
||||
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
||||
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
||||
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
|
||||
|
||||
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
||||
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
||||
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
||||
}
|
||||
}
|
||||
|
||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (out sealiface.Config, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = ToSealingConfig(cfg)
|
||||
})
|
||||
return
|
||||
}, nil
|
||||
|
Loading…
Reference in New Issue
Block a user