Merge pull request #6650 from filecoin-project/feat/agg-balancer-config
commit batch: AggregateAboveBaseFee config
This commit is contained in:
commit
20c8250872
@ -23,6 +23,11 @@ func (f FIL) Unitless() string {
|
||||
return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".")
|
||||
}
|
||||
|
||||
var AttoFil = NewInt(1)
|
||||
var FemtoFil = BigMul(AttoFil, NewInt(1000))
|
||||
var PicoFil = BigMul(FemtoFil, NewInt(1000))
|
||||
var NanoFil = BigMul(PicoFil, NewInt(1000))
|
||||
|
||||
var unitPrefixes = []string{"a", "f", "p", "n", "μ", "m"}
|
||||
|
||||
func (f FIL) Short() string {
|
||||
|
20
extern/storage-sealing/commit_batch.go
vendored
20
extern/storage-sealing/commit_batch.go
vendored
@ -206,7 +206,25 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
|
||||
|
||||
var res []sealiface.CommitBatchRes
|
||||
|
||||
if total < cfg.MinCommitBatch || total < miner5.MinAggregatedSectors {
|
||||
individual := (total < cfg.MinCommitBatch) || (total < miner5.MinAggregatedSectors)
|
||||
|
||||
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
|
||||
tok, _, err := b.api.ChainHead(b.mctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bf, err := b.api.ChainBaseFee(b.mctx, tok)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
|
||||
}
|
||||
|
||||
if bf.LessThan(cfg.AggregateAboveBaseFee) {
|
||||
individual = true
|
||||
}
|
||||
}
|
||||
|
||||
if individual {
|
||||
res, err = b.processIndividually()
|
||||
} else {
|
||||
res, err = b.processBatch(cfg)
|
||||
|
115
extern/storage-sealing/commit_batch_test.go
vendored
115
extern/storage-sealing/commit_batch_test.go
vendored
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||||
@ -58,6 +59,8 @@ func TestCommitBatcher(t *testing.T) {
|
||||
CommitBatchWait: 24 * time.Hour,
|
||||
CommitBatchSlack: 1 * time.Hour,
|
||||
|
||||
AggregateAboveBaseFee: types.BigMul(types.PicoFil, types.NewInt(150)), // 0.15 nFIL
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: 5 * time.Minute,
|
||||
@ -143,7 +146,7 @@ func TestCommitBatcher(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
expectSend := func(expect []abi.SectorNumber) action {
|
||||
expectSend := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||||
|
||||
@ -153,14 +156,40 @@ func TestCommitBatcher(t *testing.T) {
|
||||
batch = true
|
||||
ti = 1
|
||||
}
|
||||
|
||||
basefee := types.PicoFil
|
||||
if aboveBalancer {
|
||||
basefee = types.NanoFil
|
||||
}
|
||||
|
||||
if batch {
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||
}
|
||||
|
||||
if !aboveBalancer {
|
||||
batch = false
|
||||
ti = len(expect)
|
||||
}
|
||||
|
||||
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||||
|
||||
pciC := len(expect)
|
||||
if failOnePCI {
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found
|
||||
pciC = len(expect) - 1
|
||||
if !batch {
|
||||
ti--
|
||||
}
|
||||
}
|
||||
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||||
PreCommitDeposit: big.Zero(),
|
||||
}, nil).Times(len(expect))
|
||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
|
||||
}, nil).Times(pciC)
|
||||
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC)
|
||||
|
||||
if batch {
|
||||
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
|
||||
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
|
||||
}
|
||||
|
||||
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||||
@ -183,11 +212,11 @@ func TestCommitBatcher(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
flush := func(expect []abi.SectorNumber) action {
|
||||
flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action {
|
||||
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||||
_ = expectSend(expect)(t, s, pcb)
|
||||
_ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb)
|
||||
|
||||
batch := len(expect) >= minBatch
|
||||
batch := len(expect) >= minBatch && aboveBalancer
|
||||
|
||||
r, err := pcb.Flush(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -198,6 +227,13 @@ func TestCommitBatcher(t *testing.T) {
|
||||
return r[0].Sectors[i] < r[0].Sectors[j]
|
||||
})
|
||||
require.Equal(t, expect, r[0].Sectors)
|
||||
if !failOnePCI {
|
||||
require.Len(t, r[0].FailedSectors, 0)
|
||||
} else {
|
||||
require.Len(t, r[0].FailedSectors, 1)
|
||||
_, found := r[0].FailedSectors[1]
|
||||
require.True(t, found)
|
||||
}
|
||||
} else {
|
||||
require.Len(t, r, len(expect))
|
||||
for _, res := range r {
|
||||
@ -209,6 +245,13 @@ func TestCommitBatcher(t *testing.T) {
|
||||
})
|
||||
for i, res := range r {
|
||||
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
||||
if failOnePCI && res.Sectors[0] == 1 {
|
||||
require.Len(t, res.FailedSectors, 1)
|
||||
_, found := res.FailedSectors[1]
|
||||
require.True(t, found)
|
||||
} else {
|
||||
require.Empty(t, res.FailedSectors)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,33 +270,75 @@ func TestCommitBatcher(t *testing.T) {
|
||||
tcs := map[string]struct {
|
||||
actions []action
|
||||
}{
|
||||
"addSingle": {
|
||||
"addSingle-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}),
|
||||
flush([]abi.SectorNumber{0}, true, false),
|
||||
},
|
||||
},
|
||||
"addTwo": {
|
||||
"addTwo-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2)),
|
||||
flush(getSectors(2), true, false),
|
||||
},
|
||||
},
|
||||
"addAte": {
|
||||
"addAte-aboveBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8)),
|
||||
flush(getSectors(8), true, false),
|
||||
},
|
||||
},
|
||||
"addMax": {
|
||||
"addMax-aboveBalancer": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch)),
|
||||
expectSend(getSectors(maxBatch), true, false),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
"addSingle-belowBalancer": {
|
||||
actions: []action{
|
||||
addSector(0),
|
||||
waitPending(1),
|
||||
flush([]abi.SectorNumber{0}, false, false),
|
||||
},
|
||||
},
|
||||
"addTwo-belowBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(2)),
|
||||
waitPending(2),
|
||||
flush(getSectors(2), false, false),
|
||||
},
|
||||
},
|
||||
"addAte-belowBalancer": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), false, false),
|
||||
},
|
||||
},
|
||||
"addMax-belowBalancer": {
|
||||
actions: []action{
|
||||
expectSend(getSectors(maxBatch), false, false),
|
||||
addSectors(getSectors(maxBatch)),
|
||||
},
|
||||
},
|
||||
|
||||
"addAte-aboveBalancer-failOne": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), true, true),
|
||||
},
|
||||
},
|
||||
"addAte-belowBalancer-failOne": {
|
||||
actions: []action{
|
||||
addSectors(getSectors(8)),
|
||||
waitPending(8),
|
||||
flush(getSectors(8), false, true),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range tcs {
|
||||
|
8
extern/storage-sealing/sealiface/config.go
vendored
8
extern/storage-sealing/sealiface/config.go
vendored
@ -1,6 +1,10 @@
|
||||
package sealiface
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
// this has to be in a separate package to not make lotus API depend on filecoin-ffi
|
||||
|
||||
@ -31,6 +35,8 @@ type Config struct {
|
||||
CommitBatchWait time.Duration
|
||||
CommitBatchSlack time.Duration
|
||||
|
||||
AggregateAboveBaseFee abi.TokenAmount
|
||||
|
||||
TerminateBatchMax uint64
|
||||
TerminateBatchMin uint64
|
||||
TerminateBatchWait time.Duration
|
||||
|
@ -144,6 +144,10 @@ type SealingConfig struct {
|
||||
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
|
||||
CommitBatchSlack Duration
|
||||
|
||||
// network BaseFee below which to stop doing commit aggregation, instead
|
||||
// submitting proofs to the chain individually
|
||||
AggregateAboveBaseFee types.FIL
|
||||
|
||||
TerminateBatchMax uint64
|
||||
TerminateBatchMin uint64
|
||||
TerminateBatchWait Duration
|
||||
@ -330,6 +334,8 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
|
||||
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
|
||||
|
||||
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
|
||||
|
||||
TerminateBatchMin: 1,
|
||||
TerminateBatchMax: 100,
|
||||
TerminateBatchWait: Duration(5 * time.Minute),
|
||||
|
@ -871,6 +871,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
||||
MaxCommitBatch: cfg.MaxCommitBatch,
|
||||
CommitBatchWait: config.Duration(cfg.CommitBatchWait),
|
||||
CommitBatchSlack: config.Duration(cfg.CommitBatchSlack),
|
||||
AggregateAboveBaseFee: types.FIL(cfg.AggregateAboveBaseFee),
|
||||
|
||||
TerminateBatchMax: cfg.TerminateBatchMax,
|
||||
TerminateBatchMin: cfg.TerminateBatchMin,
|
||||
@ -902,6 +903,7 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
||||
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
||||
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
||||
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
||||
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
|
||||
|
||||
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
||||
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
||||
|
Loading…
Reference in New Issue
Block a user