proper config for termination batching and commit wait
This commit is contained in:
parent
febf7cf28f
commit
357c0868b7
13
extern/storage-sealing/commit_batch.go
vendored
13
extern/storage-sealing/commit_batch.go
vendored
@ -22,12 +22,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// TODO: config!
|
|
||||||
|
|
||||||
CommitBatchWait = 5 * time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
const arp = abi.RegisteredAggregationProof_SnarkPackV1
|
||||||
|
|
||||||
type CommitBatcherApi interface {
|
type CommitBatcherApi interface {
|
||||||
@ -87,6 +81,11 @@ func (b *CommitBatcher) run() {
|
|||||||
var forceRes chan *cid.Cid
|
var forceRes chan *cid.Cid
|
||||||
var lastMsg *cid.Cid
|
var lastMsg *cid.Cid
|
||||||
|
|
||||||
|
cfg, err := b.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if forceRes != nil {
|
if forceRes != nil {
|
||||||
forceRes <- lastMsg
|
forceRes <- lastMsg
|
||||||
@ -101,7 +100,7 @@ func (b *CommitBatcher) run() {
|
|||||||
return
|
return
|
||||||
case <-b.notify:
|
case <-b.notify:
|
||||||
sendAboveMax = true
|
sendAboveMax = true
|
||||||
case <-time.After(TerminateBatchWait):
|
case <-time.After(cfg.CommitBatchWait):
|
||||||
sendAboveMin = true
|
sendAboveMin = true
|
||||||
case fr := <-b.force: // user triggered
|
case fr := <-b.force: // user triggered
|
||||||
forceRes = fr
|
forceRes = fr
|
||||||
|
5
extern/storage-sealing/sealiface/config.go
vendored
5
extern/storage-sealing/sealiface/config.go
vendored
@ -21,4 +21,9 @@ type Config struct {
|
|||||||
AggregateCommits bool
|
AggregateCommits bool
|
||||||
MinCommitBatch int
|
MinCommitBatch int
|
||||||
MaxCommitBatch int
|
MaxCommitBatch int
|
||||||
|
CommitBatchWait time.Duration
|
||||||
|
|
||||||
|
TerminateBatchMax uint64
|
||||||
|
TerminateBatchMin uint64
|
||||||
|
TerminateBatchWait time.Duration
|
||||||
}
|
}
|
||||||
|
2
extern/storage-sealing/sealing.go
vendored
2
extern/storage-sealing/sealing.go
vendored
@ -152,7 +152,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
notifee: notifee,
|
notifee: notifee,
|
||||||
addrSel: as,
|
addrSel: as,
|
||||||
|
|
||||||
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc),
|
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc, gc),
|
||||||
commiter: NewCommitBatcher(context.TODO(), maddr, api, as, fc, gc, verif),
|
commiter: NewCommitBatcher(context.TODO(), maddr, api, as, fc, gc, verif),
|
||||||
|
|
||||||
getConfig: gc,
|
getConfig: gc,
|
||||||
|
41
extern/storage-sealing/terminate_batch.go
vendored
41
extern/storage-sealing/terminate_batch.go
vendored
@ -21,14 +21,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
// TODO: config
|
|
||||||
|
|
||||||
TerminateBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k
|
|
||||||
TerminateBatchMin uint64 = 1
|
|
||||||
TerminateBatchWait = 5 * time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
type TerminateBatcherApi interface {
|
type TerminateBatcherApi interface {
|
||||||
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
||||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||||
@ -38,11 +30,12 @@ type TerminateBatcherApi interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type TerminateBatcher struct {
|
type TerminateBatcher struct {
|
||||||
api TerminateBatcherApi
|
api TerminateBatcherApi
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
mctx context.Context
|
mctx context.Context
|
||||||
addrSel AddrSel
|
addrSel AddrSel
|
||||||
feeCfg FeeConfig
|
feeCfg FeeConfig
|
||||||
|
getConfig GetSealingConfigFunc
|
||||||
|
|
||||||
todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
|
todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
|
||||||
|
|
||||||
@ -53,13 +46,14 @@ type TerminateBatcher struct {
|
|||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher {
|
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher {
|
||||||
b := &TerminateBatcher{
|
b := &TerminateBatcher{
|
||||||
api: api,
|
api: api,
|
||||||
maddr: maddr,
|
maddr: maddr,
|
||||||
mctx: mctx,
|
mctx: mctx,
|
||||||
addrSel: addrSel,
|
addrSel: addrSel,
|
||||||
feeCfg: feeCfg,
|
feeCfg: feeCfg,
|
||||||
|
getConfig: getConfig,
|
||||||
|
|
||||||
todo: map[SectorLocation]*bitfield.BitField{},
|
todo: map[SectorLocation]*bitfield.BitField{},
|
||||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
||||||
@ -113,6 +107,11 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
return nil, xerrors.Errorf("getting proving deadline info failed: %w", err)
|
return nil, xerrors.Errorf("getting proving deadline info failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg, err := b.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting sealing config: %W", err)
|
||||||
|
}
|
||||||
|
|
||||||
b.lk.Lock()
|
b.lk.Lock()
|
||||||
defer b.lk.Unlock()
|
defer b.lk.Unlock()
|
||||||
params := miner2.TerminateSectorsParams{}
|
params := miner2.TerminateSectorsParams{}
|
||||||
@ -193,11 +192,11 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
|||||||
return nil, nil // nothing to do
|
return nil, nil // nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
if notif && total < TerminateBatchMax {
|
if notif && total < cfg.TerminateBatchMax {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if after && total < TerminateBatchMin {
|
if after && total < cfg.TerminateBatchMin {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,9 +242,13 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
WaitDealsDelay: Duration(time.Hour * 6),
|
WaitDealsDelay: Duration(time.Hour * 6),
|
||||||
AlwaysKeepUnsealedCopy: true,
|
AlwaysKeepUnsealedCopy: true,
|
||||||
|
|
||||||
AggregateCommits: true,
|
AggregateCommits: true,
|
||||||
MinCommitBatch: 1, // we must have at least one proof to aggregate
|
MinCommitBatch: 1, // we must have at least one proof to aggregate
|
||||||
MaxCommitBatch: 204, // this is the maximum aggregation per FIP13
|
MaxCommitBatch: 204, // this is the maximum aggregation per FIP13
|
||||||
|
CommitBatchWait: time.Day, // this can be up to 6 days
|
||||||
|
TerminateBatchMin: 1, // same as above
|
||||||
|
TerminateBatchMax: 204, // same as above
|
||||||
|
TerminateBatchWait: time.Day, // this can be up to 6 days
|
||||||
},
|
},
|
||||||
|
|
||||||
Storage: sectorstorage.SealerConfig{
|
Storage: sectorstorage.SealerConfig{
|
||||||
|
Loading…
Reference in New Issue
Block a user