Precommit batch balancer support/config

This commit is contained in:
Łukasz Magiera 2021-09-30 16:53:12 +02:00 committed by Aayush Rajasekaran
parent 89d61ca238
commit 6fd9d5f28b
8 changed files with 171 additions and 13 deletions

View File

@ -40,6 +40,21 @@ func (m *MockPreCommitBatcherApi) EXPECT() *MockPreCommitBatcherApiMockRecorder
return m.recorder
}
// ChainBaseFee mocks base method.
func (m *MockPreCommitBatcherApi) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainBaseFee indicates an expected call of ChainBaseFee.
func (mr *MockPreCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainBaseFee), arg0, arg1)
}
// ChainHead mocks base method.
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
m.ctrl.T.Helper()

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
)
@ -31,6 +32,7 @@ type PreCommitBatcherApi interface {
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
ChainBaseFee(context.Context, TipSetToken) (abi.TokenAmount, error)
}
type preCommitEntry struct {
@ -185,8 +187,30 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat
return nil, nil
}
individual := false
if !cfg.BatchPreCommitAboveBaseFee.Equals(big.Zero()) {
tok, _, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
bf, err := b.api.ChainBaseFee(b.mctx, tok)
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
if bf.LessThan(cfg.BatchPreCommitAboveBaseFee) { // todo: only enable after nv14?
individual = true
}
}
// todo support multiple batches
res, err := b.processBatch(cfg)
var res []sealiface.PreCommitBatchRes
if !individual {
res, err = b.processBatch(cfg)
} else {
res, err = b.processIndividually(cfg)
}
if err != nil && len(res) == 0 {
return nil, err
}
@ -210,6 +234,82 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat
return res, nil
}
func (b *PreCommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) {
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}
avail := types.TotalFilecoinInt
if cfg.CollateralFromMinerBalance && !cfg.DisableCollateralFallback {
avail, err = b.api.StateMinerAvailableBalance(b.mctx, b.maddr, nil)
if err != nil {
return nil, xerrors.Errorf("getting available miner balance: %w", err)
}
avail = big.Sub(avail, cfg.AvailableBalanceBuffer)
if avail.LessThan(big.Zero()) {
avail = big.Zero()
}
}
var res []sealiface.PreCommitBatchRes
for sn, info := range b.todo {
r := sealiface.PreCommitBatchRes{
Sectors: []abi.SectorNumber{sn},
}
mcid, err := b.processSingle(cfg, mi, &avail, info)
if err != nil {
r.Error = err.Error()
} else {
r.Msg = &mcid
}
res = append(res, r)
}
return res, nil
}
func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi miner.MinerInfo, avail *abi.TokenAmount, params *preCommitEntry) (cid.Cid, error) {
enc := new(bytes.Buffer)
if err := params.pci.MarshalCBOR(enc); err != nil {
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
}
deposit := params.deposit
if cfg.CollateralFromMinerBalance {
c := big.Sub(deposit, *avail)
*avail = big.Sub(*avail, deposit)
deposit = c
if deposit.LessThan(big.Zero()) {
deposit = big.Zero()
}
if (*avail).LessThan(big.Zero()) {
*avail = big.Zero()
}
}
goodFunds := big.Add(deposit, big.Int(b.feeCfg.MaxPreCommitGasFee))
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSector, deposit, big.Int(b.feeCfg.MaxPreCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}
return mcid, nil
}
func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCommitBatchRes, error) {
params := miner5.PreCommitSectorBatchParams{}
deposit := big.Zero()

View File

@ -54,10 +54,11 @@ func TestPrecommitBatcher(t *testing.T) {
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MaxPreCommitBatch: maxBatch,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
BatchPreCommits: true,
MaxPreCommitBatch: maxBatch,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
BatchPreCommitAboveBaseFee: big.NewInt(10000),
AggregateCommits: true,
MinCommitBatch: miner5.MinAggregatedSectors,
@ -149,6 +150,9 @@ func TestPrecommitBatcher(t *testing.T) {
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(10001), nil)
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
@ -163,6 +167,25 @@ func TestPrecommitBatcher(t *testing.T) {
}
}
expectSendsSingle := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(9999), nil)
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
for _, number := range expect {
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
var params miner5.PreCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
require.Equal(t, number, params.SectorNumber)
return true
}))
}
return nil
}
}
flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
@ -211,6 +234,12 @@ func TestPrecommitBatcher(t *testing.T) {
addSectors(getSectors(maxBatch)),
},
},
"addMax-belowBaseFee": {
actions: []action{
expectSendsSingle(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}
for name, tc := range tcs {

View File

@ -41,7 +41,8 @@ type Config struct {
CommitBatchWait time.Duration
CommitBatchSlack time.Duration
AggregateAboveBaseFee abi.TokenAmount
AggregateAboveBaseFee abi.TokenAmount
BatchPreCommitAboveBaseFee abi.TokenAmount
TerminateBatchMax uint64
TerminateBatchMin uint64

View File

@ -109,7 +109,8 @@ func DefaultStorageMiner() *StorageMiner {
CommitBatchWait: Duration(24 * time.Hour), // this can be up to 30 days
CommitBatchSlack: Duration(1 * time.Hour), // time buffer for forceful batch submission before sectors/deals in batch would start expiring, higher value will lower the chances for message fail due to expiration
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(150))), // 0.15 nFIL
BatchPreCommitAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(320))), // 0.32 nFIL
AggregateAboveBaseFee: types.FIL(types.BigMul(types.PicoFil, types.NewInt(320))), // 0.32 nFIL
TerminateBatchMin: 1,
TerminateBatchMax: 100,

View File

@ -711,6 +711,13 @@ avoid the relatively high cost of unsealing the data later, at the cost of more
Comment: `time buffer for forceful batch submission before sectors/deals in batch would start expiring`,
},
{
Name: "BatchPreCommitAboveBaseFee",
Type: "types.FIL",
Comment: `network BaseFee below which to stop doing precommit batching, instead
sending precommit messages to the chain individually`,
},
{
Name: "AggregateAboveBaseFee",
Type: "types.FIL",

View File

@ -219,6 +219,10 @@ type SealingConfig struct {
// time buffer for forceful batch submission before sectors/deals in batch would start expiring
CommitBatchSlack Duration
// network BaseFee below which to stop doing precommit batching, instead
// sending precommit messages to the chain individually
BatchPreCommitAboveBaseFee types.FIL
// network BaseFee below which to stop doing commit aggregation, instead
// submitting proofs to the chain individually
AggregateAboveBaseFee types.FIL

View File

@ -918,12 +918,13 @@ func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
AggregateCommits: cfg.Sealing.AggregateCommits,
MinCommitBatch: cfg.Sealing.MinCommitBatch,
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
BatchPreCommitAboveBaseFee: types.BigInt(cfg.Sealing.BatchPreCommitAboveBaseFee),
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,